diff --git a/batch/batch.go b/batch/batch.go new file mode 100644 index 0000000000000000000000000000000000000000..4cc9c3033c6e64da866ac9b361b668c50888c38e --- /dev/null +++ b/batch/batch.go @@ -0,0 +1,126 @@ +package batch + +import ( + "encoding/json" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/batch" + "github.com/go-resty/resty/v2" + "github.com/google/uuid" + "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs" + "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3" + "gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/utils" + "io" + "time" +) + +const ( + binaryPath = "/root/home/workers" // The worker binary path in the docker container +) + +type BatchJobQueue string +type BatchJobMessageType string + +// Queue names are the same as the batch Job queue names in aws +const ( + BatchJobQueueLow BatchJobQueue = "batch-job-queue-low" + BatchJobQueueMedium BatchJobQueue = "batch-job-queue-medium" + BatchJobQueueHigh BatchJobQueue = "batch-job-queue-high" +) + +const ( + BatchJobMessageTypeS3 BatchJobMessageType = "s3" +) + +func SubmitJob(name *string, job any, fullJobDefinition string, fullJobQueue string, messagesBucketName string, isDebug bool) error { + if isDebug { + go func() { + resty.New().R(). + SetBody(job). + Post("http://127.0.0.1:3000/batch/") + }() + time.Sleep(time.Second * 1) + return nil + } + + options := session.Options{ + Config: aws.Config{ + Region: utils.ValueToPointer("af-south-1"), + }, + } + sess, err := session.NewSessionWithOptions(options) + + batchClient := batch.New(sess) + + if name == nil { + id := uuid.New() + jobID := "job" + id.String() + name = utils.ValueToPointer(jobID) + } + + err = uploadMessageToS3(*name, job, messagesBucketName, isDebug) + if err != nil { + return err + } + + command := []*string{ + utils.ValueToPointer(binaryPath), + name, + } + + environmentOverwrite := []*batch.KeyValuePair{{ + Name: utils.ValueToPointer("BATCH_MESSAGE_TYPE"), + Value: utils.ValueToPointer(string(BatchJobMessageTypeS3)), + }, + } + + input := &batch.SubmitJobInput{ + JobDefinition: utils.ValueToPointer(fullJobDefinition), + JobName: name, + JobQueue: utils.ValueToPointer(fullJobQueue), + ContainerOverrides: &batch.ContainerOverrides{ + Command: command, + Environment: environmentOverwrite, + }, + } + _, err = batchClient.SubmitJob(input) + if err != nil { + return err + } + return nil +} + +func uploadMessageToS3(name string, job any, bucketName string, isDebug bool) error { + jobBytes, err := json.Marshal(job) + if err != nil { + return err + } + + // Upload message + _, err = s3.GetSession(isDebug).UploadWithSettingsRevised(jobBytes, bucketName, s3.S3UploadSettings{ + FileName: name, + }) + if err != nil { + return err + } + + return nil +} + +func RetrieveMessageFromS3(filename string, messagesBucketName string, isDebug bool) ([]byte, error) { + // get the file contents + rawObject, err := s3.GetSession(isDebug).GetObject(messagesBucketName, filename, isDebug) + if err != nil { + return []byte{}, err + } + defer rawObject.Body.Close() + + // Read the file, unzip the data and unmarshall the json + var bodyBytes []byte + bodyBytes, err = io.ReadAll(rawObject.Body) + if err != nil { + logs.ErrorWithMsg("Could not read file", err) + return bodyBytes, err + } + return bodyBytes, nil +}