Skip to content
Snippets Groups Projects
Commit cdff38c1 authored by Jano Hendriks's avatar Jano Hendriks
Browse files

Add batch submit job

parent 4a1a9360
No related branches found
No related tags found
No related merge requests found
package batch
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/batch"
"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/utils"
"io"
"time"
)
const (
binaryPath = "/root/home/workers" // The worker binary path in the docker container
)
type BatchJobQueue string
type BatchJobMessageType string
// Queue names are the same as the batch Job queue names in aws
const (
BatchJobQueueLow BatchJobQueue = "batch-job-queue-low"
BatchJobQueueMedium BatchJobQueue = "batch-job-queue-medium"
BatchJobQueueHigh BatchJobQueue = "batch-job-queue-high"
)
const (
BatchJobMessageTypeS3 BatchJobMessageType = "s3"
)
func SubmitJob(name *string, job any, fullJobDefinition string, fullJobQueue string, messagesBucketName string, isDebug bool) error {
if isDebug {
go func() {
resty.New().R().
SetBody(job).
Post("http://127.0.0.1:3000/batch/")
}()
time.Sleep(time.Second * 1)
return nil
}
options := session.Options{
Config: aws.Config{
Region: utils.ValueToPointer("af-south-1"),
},
}
sess, err := session.NewSessionWithOptions(options)
batchClient := batch.New(sess)
if name == nil {
id := uuid.New()
jobID := "job" + id.String()
name = utils.ValueToPointer(jobID)
}
err = uploadMessageToS3(*name, job, messagesBucketName, isDebug)
if err != nil {
return err
}
command := []*string{
utils.ValueToPointer(binaryPath),
name,
}
environmentOverwrite := []*batch.KeyValuePair{{
Name: utils.ValueToPointer("BATCH_MESSAGE_TYPE"),
Value: utils.ValueToPointer(string(BatchJobMessageTypeS3)),
},
}
input := &batch.SubmitJobInput{
JobDefinition: utils.ValueToPointer(fullJobDefinition),
JobName: name,
JobQueue: utils.ValueToPointer(fullJobQueue),
ContainerOverrides: &batch.ContainerOverrides{
Command: command,
Environment: environmentOverwrite,
},
}
_, err = batchClient.SubmitJob(input)
if err != nil {
return err
}
return nil
}
func uploadMessageToS3(name string, job any, bucketName string, isDebug bool) error {
jobBytes, err := json.Marshal(job)
if err != nil {
return err
}
// Upload message
_, err = s3.GetSession(isDebug).UploadWithSettingsRevised(jobBytes, bucketName, s3.S3UploadSettings{
FileName: name,
})
if err != nil {
return err
}
return nil
}
func RetrieveMessageFromS3(filename string, messagesBucketName string, isDebug bool) ([]byte, error) {
// get the file contents
rawObject, err := s3.GetSession(isDebug).GetObject(messagesBucketName, filename, isDebug)
if err != nil {
return []byte{}, err
}
defer rawObject.Body.Close()
// Read the file, unzip the data and unmarshall the json
var bodyBytes []byte
bodyBytes, err = io.ReadAll(rawObject.Body)
if err != nil {
logs.ErrorWithMsg("Could not read file", err)
return bodyBytes, err
}
return bodyBytes, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment