Skip to content
Snippets Groups Projects
Select Git revision
  • e46524732b4de6943c26965f0a75fb213c513b68
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

sqs.go

Blame
  • sqs.go 2.32 KiB
    package handler_utils
    
    import (
    	"encoding/json"
    	"github.com/aws/aws-lambda-go/events"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/logs"
    	"gitlab.com/uafrica/go-utils/s3"
    	"gitlab.com/uafrica/go-utils/sqs"
    	"reflect"
    )
    
    // ValidateSQSEndpoints checks that all SQS endpoints are correctly defined using one of the supported handler types
    // return updated endpoints with additional information
    func ValidateSQSEndpoints(endpoints map[string]interface{}) (map[string]interface{}, error) {
    	for messageType, handlerFunc := range endpoints {
    		if messageType == "" {
    			return nil, errors.Errorf("blank messageType")
    		}
    		if handlerFunc == nil {
    			return nil, errors.Errorf("nil handler on %s", messageType)
    		}
    
    		handler, err := NewSQSHandler(handlerFunc)
    		if err != nil {
    			return nil, errors.Wrapf(err, "%v has invalid handler %T", messageType, handlerFunc)
    		}
    		// replace the endpoint value so we can quickly call this handler
    		endpoints[messageType] = handler
    		logs.Info("%s: OK (request: %v)\n", messageType, handler.RecordType)
    	}
    	return endpoints, nil
    }
    
    func GetRecord(s3Session *s3.SessionWithHelpers, bucket string, message events.SQSMessage, recordType reflect.Type) (interface{}, error) {
    
    	recordValuePtr := reflect.New(recordType)
    
    	// Check if message body should be retrieved from S3
    	if messageAttribute, ok := message.MessageAttributes[sqs.SQSMessageOnS3Key]; ok {
    		if messageAttribute.StringValue != nil && *messageAttribute.StringValue == "true" {
    			messageBytes, err := sqs.RetrieveMessageFromS3(s3Session, bucket, message.Body)
    			if err != nil {
    				return nil, errors.Wrapf(err, "failed to get sqs message body from s3")
    			}
    
    			err = json.Unmarshal(messageBytes, recordValuePtr.Interface())
    			if err != nil {
    				return nil, errors.Wrapf(err, "failed to JSON decode message body")
    			}
    		}
    	} else {
    		// Message was small enough, it is contained in the message body
    		err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface())
    		if err != nil {
    			return nil, errors.Wrapf(err, "failed to JSON decode message body")
    		}
    	}
    
    	if validator, ok := recordValuePtr.Interface().(IValidator); ok {
    		if err := validator.Validate(); err != nil {
    			return nil, errors.Wrapf(err, "invalid message body")
    		}
    	}
    
    	return recordValuePtr.Elem().Interface(), nil
    }
    
    type IValidator interface {
    	Validate() error