Skip to content
Snippets Groups Projects
Select Git revision
  • d9bbff7bbf4416ee9aca9d64ec57195aef7a3767
  • dev default protected
  • prod protected
  • 1.0.58
  • 1.0.57
  • 1.0.52
  • 1.0.56
  • 1.0.51
  • 1.0.50
  • 1.0.33
  • 1.0.32
  • 1.0.31
  • 1.0.30
  • 1.0.29
  • 1.0.28
  • 1.0.27
  • 1.0.26
  • 1.0.25
  • 1.0.24
  • 1.0.23
  • 1.0.22
  • 1.0.21
  • 1.0.20
23 results

ConfigChangeObserver.php

Blame
  • sqs.go 2.32 KiB
    package handler_utils
    
    import (
    	"encoding/json"
    	"github.com/aws/aws-lambda-go/events"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/logs"
    	"gitlab.com/uafrica/go-utils/s3"
    	"gitlab.com/uafrica/go-utils/sqs"
    	"reflect"
    )
    
    // ValidateSQSEndpoints checks that all SQS endpoints are correctly defined using one of the supported handler types
    // return updated endpoints with additional information
    func ValidateSQSEndpoints(endpoints map[string]interface{}) (map[string]interface{}, error) {
    	for messageType, handlerFunc := range endpoints {
    		if messageType == "" {
    			return nil, errors.Errorf("blank messageType")
    		}
    		if handlerFunc == nil {
    			return nil, errors.Errorf("nil handler on %s", messageType)
    		}
    
    		handler, err := NewSQSHandler(handlerFunc)
    		if err != nil {
    			return nil, errors.Wrapf(err, "%v has invalid handler %T", messageType, handlerFunc)
    		}
    		// replace the endpoint value so we can quickly call this handler
    		endpoints[messageType] = handler
    		logs.Info("%s: OK (request: %v)\n", messageType, handler.RecordType)
    	}
    	return endpoints, nil
    }
    
    func GetRecord(s3Session *s3.SessionWithHelpers, bucket string, message events.SQSMessage, recordType reflect.Type) (interface{}, error) {
    
    	recordValuePtr := reflect.New(recordType)
    
    	// Check if message body should be retrieved from S3
    	if messageAttribute, ok := message.MessageAttributes[sqs.SQSMessageOnS3Key]; ok {
    		if messageAttribute.StringValue != nil && *messageAttribute.StringValue == "true" {
    			messageBytes, err := sqs.RetrieveMessageFromS3(s3Session, bucket, message.Body)
    			if err != nil {
    				return nil, errors.Wrapf(err, "failed to get sqs message body from s3")
    			}
    
    			err = json.Unmarshal(messageBytes, recordValuePtr.Interface())
    			if err != nil {
    				return nil, errors.Wrapf(err, "failed to JSON decode message body")
    			}
    		}
    	} else {
    		// Message was small enough, it is contained in the message body
    		err := json.Unmarshal([]byte(message.Body), recordValuePtr.Interface())
    		if err != nil {
    			return nil, errors.Wrapf(err, "failed to JSON decode message body")
    		}
    	}
    
    	if validator, ok := recordValuePtr.Interface().(IValidator); ok {
    		if err := validator.Validate(); err != nil {
    			return nil, errors.Wrapf(err, "invalid message body")
    		}
    	}
    
    	return recordValuePtr.Elem().Interface(), nil
    }
    
    type IValidator interface {
    	Validate() error