Skip to content
Snippets Groups Projects
Select Git revision
  • 809fb16724b7ad8b5cd32eab0d7b5d9bbf3a9464
  • main default protected
  • trading_hours
  • refactor_trading_hours
  • audit_cleaning_cater_for_non_struct_fields
  • remove-info-logs
  • sl-refactor
  • 18-use-scan-for-param-values
  • 17-order-search-results
  • 4-simplify-framework-2
  • 1-http-error
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
  • v1.278.0
31 results

sqs.go

Blame
  • sqs.go 2.27 KiB
    package handler_utils
    
    import (
    	"encoding/json"
    	"github.com/aws/aws-lambda-go/events"
    	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
    	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/s3"
    	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/sqs"
    	"reflect"
    )
    
    // ValidateSQSEndpoints checks that all SQS endpoints are correctly defined using one of the supported handler types
    // return updated endpoints with additional information
    func ValidateSQSEndpoints(endpoints map[string]interface{}) (map[string]interface{}, error) {
    	for messageType, handlerFunc := range endpoints {
    		if messageType == "" {
    			return nil, errors.Errorf("blank messageType")
    		}
    		if handlerFunc == nil {
    			return nil, errors.Errorf("nil handler on %s", messageType)
    		}
    
    		handler, err := NewSQSHandler(handlerFunc)
    		if err != nil {
    			return nil, errors.Wrapf(err, "%v has invalid handler %T", messageType, handlerFunc)
    		}
    		// replace the endpoint value so we can quickly call this handler
    		endpoints[messageType] = handler
    	}
    	return endpoints, nil
    }
    
    func GetRecord(s3Client *s3.ClientWithHelpers, bucket string, message events.SQSMessage, recordType reflect.Type) (interface{}, error) {
    
    	recordValuePtr := reflect.New(recordType)
    
    	// Check if message body should be retrieved from S3
    	if messageAttribute, ok := message.MessageAttributes[sqs.SQSMessageOnS3Key]; ok && messageAttribute.StringValue != nil && *messageAttribute.StringValue == "true" {
    		messageBytes, err := sqs.RetrieveMessageFromS3(s3Client, bucket, message.Body)
    		if err != nil {
    			return nil, errors.Wrapf(err, "failed to get sqs message body from s3")
    		}
    
    		err = json.Unmarshal(messageBytes, recordValuePtr.Interface())
    		if err != nil {
    			return nil, errors.Wrapf(err, "failed to JSON decode message body")
    		}
    	} else {
    		// Message was small enough, it is contained in the message body
    		err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface())
    		if err != nil {
    			return nil, errors.Wrapf(err, "failed to JSON decode message body")
    		}
    	}
    
    	if validator, ok := recordValuePtr.Interface().(IValidator); ok {
    		if err := validator.Validate(); err != nil {
    			return nil, errors.Wrapf(err, "invalid message body")
    		}
    	}
    
    	return recordValuePtr.Elem().Interface(), nil
    }
    
    type IValidator interface {
    	Validate() error
    }