diff --git a/api/api.go b/api/api.go index 382bcafffbdf72ea40dd8ee0152d035aa34f134d..79dd9eae37f2a584f2f9f4cadaa4574adefa7fc2 100644 --- a/api/api.go +++ b/api/api.go @@ -5,12 +5,16 @@ import ( "net/http" "os" "regexp" + "sync" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" + queues_mem "gitlab.com/uafrica/go-utils/queues/mem" + queues_sqs "gitlab.com/uafrica/go-utils/queues/sqs" "gitlab.com/uafrica/go-utils/service" ) @@ -50,8 +54,10 @@ type Api struct { //options: localPort int //==0 for lambda, >0 for http.ListenAndServe + eventHandlers map[string]interface{} crashReporter ICrashReporter cors ICORS + producer queues.IProducer dbConn service.IDatabaseConnector checks []check auditor IAuditor @@ -102,6 +108,11 @@ func (api Api) WithCORS(cors ICORS) Api { return api } +func (api Api) WithProducer(producer queues.IProducer) Api { + api.producer = producer + return api +} + type ICORS interface { CORS() map[string]string //return CORS headers } @@ -117,10 +128,11 @@ type ICrashReporter interface { Catch(ctx Context) //Report(method string, path string, crash interface{}) } -func (api Api) WithLocalPort(localPortPtr *int) Api { +func (api Api) WithLocalPort(localPortPtr *int, eventHandlers map[string]interface{}) Api { if localPortPtr != nil && *localPortPtr > 0 { api.localPort = *localPortPtr } + api.eventHandlers = eventHandlers return api } @@ -128,11 +140,42 @@ func (api Api) WithLocalPort(localPortPtr *int) Api { func (api Api) Run() { //decide local of lambda if api.localPort > 0 { + //running locally with standard HTTP server + + if api.eventHandlers != nil { + //when running locally - we want to send and process SQS events locally using channels + //here we create a SQS chan and start listening to it + //again: this is quick hack... will make this part of framework once it works well + api.Debugf("Creating local queue consumer/producer...") + memConsumer := queues_mem.NewConsumer(api.eventHandlers) + api.producer = queues_mem.NewProducer(memConsumer) + + sqsEventChan := make(chan events.SQSEvent) + sqsWaitGroup := sync.WaitGroup{} + sqsWaitGroup.Add(1) + go func() { + for event := range sqsEventChan { + logger.Debug("NOT YET PROCESSING SQS Event: %+v", event) + } + sqsWaitGroup.Done() + }() + + //when we terminate, close the sqs chan and wait for it to complete processing + defer func() { + close(sqsEventChan) + sqsWaitGroup.Wait() + }() + } else { + //use SQS for events + api.producer = queues_sqs.NewProducer(api.requestIDHeaderKey) + } + err := http.ListenAndServe(fmt.Sprintf(":%d", api.localPort), api) //calls app.ServeHTTP() which calls app.Handler() if err != nil { panic(err) } } else { + api.producer = queues_sqs.NewProducer(api.requestIDHeaderKey) lambda.Start(api.Handler) //calls app.Handler directly } } diff --git a/api/context.go b/api/context.go index 37f73d23c2866dff81a298957eeade30c6e099fa..ec961f0e43ded2b0629d785900922002048f4370 100644 --- a/api/context.go +++ b/api/context.go @@ -13,25 +13,45 @@ import ( "github.com/uptrace/bun" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) type IContext interface { context.Context logger.ILogger + queues.IProducer StartTime() time.Time MillisecondsSinceStart() int64 - //DB() *bun.DB + CheckValues(checkName string) interface{} + CheckValue(checkName, valueName string) interface{} } type Context struct { service.Context + queues.IProducer Request events.APIGatewayProxyRequest RequestID string ValuesFromChecks map[string]map[string]interface{} //also in context.Value(), but cannot retrieve iteratively from there for logging... DB *bun.DB } +func (ctx Context) CheckValues(checkName string) interface{} { + if cv, ok := ctx.ValuesFromChecks[checkName]; ok { + return cv + } + return nil +} + +func (ctx Context) CheckValue(checkName, valueName string) interface{} { + if cv, ok := ctx.ValuesFromChecks[checkName]; ok { + if v, ok := cv[valueName]; ok { + return v + } + } + return nil +} + // func (ctx Context) Audit(org, new interface{}, eventType types.AuditEventType) { // //call old function for now - should become part of context ONLY // audit.SaveAuditEvent(org, new, ctx.Claims, eventType, &ctx.RequestID) @@ -214,10 +234,6 @@ func (ctx Context) GetRequestBody(requestStructType reflect.Type) (interface{}, return requestStructValuePtr.Elem().Interface(), nil } -// func (ctx Context) DB() *bun.DB { -// return ctx.DB -// } - type IValidator interface { Validate() error } diff --git a/api/handler.go b/api/handler.go index 7340bd1ff756de4f1025564ddb7b76911e11ce1f..8e4e785684117135fe33f1a6b3c980d4b21f7365 100644 --- a/api/handler.go +++ b/api/handler.go @@ -26,7 +26,7 @@ func NewHandler(fnc interface{}) (handler, error) { //arg[0] must implement interface lambda_helpers.Context if _, ok := reflect.New(fncType.In(0)).Interface().(IContext); !ok { - return h, errors.Errorf("first arg %v does not implement lambda_helpers.IContext", fncType.In(0)) + return h, errors.Errorf("first arg %v does not implement api.IContext", fncType.In(0)) } //arg[1] must be a struct for params. It may be an empty struct, but diff --git a/api/lambda.go b/api/lambda.go index 3299faceb54e7ef204b3faf6dc71b9d5ec85a7d4..ec476a322cb87ead528a7df272f9c55ae4fd01d8 100644 --- a/api/lambda.go +++ b/api/lambda.go @@ -40,6 +40,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat "env": api.env, "request_id": requestID, }), + IProducer: api.producer, Request: apiGatewayProxyReq, RequestID: requestID, ValuesFromChecks: map[string]map[string]interface{}{}, @@ -60,7 +61,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat defer func() { ctx.LogAPIRequestAndResponse(res, err) if err != nil { - if withCause, ok := err.(errors.ErrorWithCause); ok { + if withCause, ok := err.(errors.ErrorWithCause); ok && withCause.Code() != 0 { res.StatusCode = withCause.Code() } errorMessage := fmt.Sprintf("%c", err) @@ -78,16 +79,27 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat ) }() - //do checks before proceed + if api.dbConn != nil { + ctx.DB, err = api.dbConn.Connect() + if err != nil { + err = errors.Wrapf(err, "failed to connect to db") + return + } + } + + //do checks before proceed (may use db connection) //(typical maintenance mode, rate limits, authentication/claims, ...) for _, check := range api.checks { var checkValues map[string]interface{} checkValues, err = check.checker.Check(apiGatewayProxyReq) - for n, v := range checkValues { - ctx.Context = ctx.Context.WithValue(check.name+"_"+n, v) - } + //for n, v := range checkValues { + //ctx.Context = ctx.Context.WithValue(check.name+"_"+n, v) + //ctx.Debugf("Defined value[%s]=(%T)%v", check.name+"_"+n, v, v) + //} ctx.ValuesFromChecks[check.name] = checkValues + ctx.Debugf("Defined ValuesFromChecks[%s]=(%T)%v", check.name, checkValues, checkValues) if err != nil { + err = errors.Wrapf(err, "check(%s) failed", check.name) return } } @@ -101,14 +113,6 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat rand.Seed(time.Now().Unix()) - if api.dbConn != nil { - ctx.DB, err = api.dbConn.Connect() - if err != nil { - err = errors.Wrapf(err, "failed to connect to db") - return - } - } - ctx.Debugf("HTTP %s %s ...\n", apiGatewayProxyReq.HTTPMethod, apiGatewayProxyReq.Resource) ctx.WithFields(map[string]interface{}{ "http_method": ctx.Request.HTTPMethod, diff --git a/api/local.go b/api/local.go index a88f1e8fdcfd791d522000ceabc0aa94aa7699a5..fb4b14293d35cdf90bf868b288c74b2a0da1373c 100644 --- a/api/local.go +++ b/api/local.go @@ -26,9 +26,7 @@ func (api Api) ServeHTTP(httpRes http.ResponseWriter, httpReq *http.Request) { MultiValueQueryStringParameters: map[string][]string{}, PathParameters: nil, // map[string]string `json:"pathParameters"` StageVariables: nil, // map[string]string `json:"stageVariables"` - //RequestContext: events.APIGatewayProxyRequestContext{}, // APIGatewayProxyRequestContext `json:"requestContext"` - //Body: bodyString, // string `json:"body"` - IsBase64Encoded: false, + IsBase64Encoded: false, } //copy significant headers @@ -56,7 +54,7 @@ func (api Api) ServeHTTP(httpRes http.ResponseWriter, httpReq *http.Request) { } req.RequestContext = events.APIGatewayProxyRequestContext{ - AccountID: "", // string `json:"accountId"` + AccountID: "", //string `json:"accountId"` ResourceID: "", //string `json:"resourceId"` OperationName: "", //string `json:"operationName,omitempty"` Stage: "", //string `json:"stage"` diff --git a/go.mod b/go.mod index aa20d27efb15b77d12ba2a5c16acb7fdaeb4ce09..38c85f6088f9244cb90b22f9e8be46008ab7f44a 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/go-pg/pg/v10 v10.10.5 github.com/pkg/errors v0.9.1 github.com/thoas/go-funk v0.9.1 - golang.org/x/text v0.3.3 + golang.org/x/text v0.3.6 ) require ( @@ -21,6 +21,14 @@ require ( github.com/vmihailenco/tagparser v0.1.2 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b // indirect - golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect + golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect mellium.im/sasl v0.2.1 // indirect ) + +require ( + github.com/aws/aws-sdk-go v1.40.50 // indirect + github.com/fatih/color v1.13.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/mattn/go-colorable v0.1.9 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect +) diff --git a/go.sum b/go.sum index f70e5aba5321fa5a5c04024ac9986c45ba41c56b..70dc620d48f589101277bab909c1f17b6c599823 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aws/aws-lambda-go v1.26.0 h1:6ujqBpYF7tdZcBvPIccs98SpeGfrt/UOVEiexfNIdHA= github.com/aws/aws-lambda-go v1.26.0/go.mod h1:jJmlefzPfGnckuHdXX7/80O3BvUUi12XOkbv4w9SGLU= +github.com/aws/aws-sdk-go v1.40.50 h1:QP4NC9EZWBszbNo2UbG6bbObMtN35kCFb4h0r08q884= +github.com/aws/aws-sdk-go v1.40.50/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -11,6 +13,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -41,10 +45,18 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= @@ -106,6 +118,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -117,19 +130,25 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -163,6 +182,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/logger/global.go b/logger/global.go index e584038c5070908c8c182fab348d792e6e3648d0..f65afc41deb7a35be4195ce740d381874fc64b99 100644 --- a/logger/global.go +++ b/logger/global.go @@ -4,6 +4,8 @@ import ( "fmt" "os" + "github.com/fatih/color" + "gitlab.com/uafrica/go-utils/errors" ) @@ -14,6 +16,8 @@ func init() { level: LevelDebug, writer: os.Stderr, data: map[string]interface{}{}, + fg: color.FgWhite, + bg: color.BgBlack, } // InitLogs(nil, nil) } diff --git a/logger/logger.go b/logger/logger.go index 353d6d1bf71e8311ba52b06df1a895fd24fd35f7..8347fc56399c102740390e07446c965415e998fc 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -31,6 +31,71 @@ type Logger struct { level Level writer io.Writer data map[string]interface{} + + fg color.Attribute + bg color.Attribute +} + +func (l Logger) WithForeground(fg color.Attribute) Logger { + l.fg = fg + return l +} + +func (l Logger) WithBackground(bg color.Attribute) Logger { + l.bg = bg + return l +} + +type textColor struct { + fg color.Attribute + bg color.Attribute +} + +var ( + fgColors = []color.Attribute{ + color.FgWhite, + color.FgRed, + color.FgGreen, + color.FgYellow, + color.FgBlue, + color.FgMagenta, + color.FgCyan, + color.FgBlack, + } + bgColors = []color.Attribute{ + color.BgBlack, + color.BgWhite, + color.BgRed, + color.BgGreen, + color.BgYellow, + color.BgBlue, + color.BgMagenta, + color.BgCyan, + } + nextFg = 0 + nextBg = 0 +) + +func (l Logger) NextColor() Logger { + l.fg = 0 + l.bg = 0 + for l.fg == l.bg { + l.bg = bgColors[nextBg] + l.fg = fgColors[nextFg] + incColor() + } + return l +} + +func incColor() { + nextFg++ + if nextFg >= len(fgColors) { + nextFg = 0 + nextBg++ + if nextBg >= len(bgColors) { + nextBg++ + } + } } func (l Logger) WithFields(data map[string]interface{}) Logger { @@ -38,6 +103,8 @@ func (l Logger) WithFields(data map[string]interface{}) Logger { level: l.level, writer: l.writer, data: map[string]interface{}{}, + fg: l.fg, + bg: l.bg, } for n, v := range l.data { newLogger.data[n] = v @@ -132,7 +199,8 @@ func (l Logger) log(level Level, skip int, msg string) { } cyan(buffer, fmt.Sprintf(" %-40.40s| ", source)) - buffer.Write([]byte(entry.Message)) + base := color.New(l.fg, l.bg).FprintfFunc() + base(buffer, entry.Message) //buffer.Write([]byte(entry.Message)) if len(entry.Data) > 0 { jsonData, _ := json.Marshal(entry.Data) diff --git a/queues/consumer.go b/queues/consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..1b5fae22e53b7e5f3bad7dfc041b3a1a8110c953 --- /dev/null +++ b/queues/consumer.go @@ -0,0 +1,8 @@ +package queues + +import "gitlab.com/uafrica/go-utils/service" + +type IConsumer interface { + WithDb(dbConn service.IDatabaseConnector) IConsumer + Run() +} diff --git a/queues/context.go b/queues/context.go new file mode 100644 index 0000000000000000000000000000000000000000..335bb870f959d97d9cb2180b3377b8bf77d1a29c --- /dev/null +++ b/queues/context.go @@ -0,0 +1,50 @@ +package queues + +import ( + "context" + "encoding/json" + "reflect" + "time" + + "github.com/uptrace/bun" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/service" +) + +type IContext interface { + context.Context + logger.ILogger + IProducer + StartTime() time.Time + MillisecondsSinceStart() int64 + //DB() *bun.DB +} + +type Context struct { + service.Context + IProducer //todo: would be nice to have a method to requeue same event with delay and incrementing attempt nr + Event Event + RequestID string + DB *bun.DB +} + +func (ctx Context) GetRecord(recordType reflect.Type) (interface{}, error) { + recordValuePtr := reflect.New(recordType) + err := json.Unmarshal([]byte(ctx.Event.BodyJSON), recordValuePtr.Interface()) + if err != nil { + return nil, errors.Wrapf(err, "failed to JSON decode message body") + } + + if validator, ok := recordValuePtr.Interface().(IValidator); ok { + if err := validator.Validate(); err != nil { + return nil, errors.Wrapf(err, "invalid message body") + } + } + + return recordValuePtr.Elem().Interface(), nil +} + +type IValidator interface { + Validate() error +} diff --git a/queues/event.go b/queues/event.go new file mode 100644 index 0000000000000000000000000000000000000000..6f4e0f960798c8004ead9eb06b77ddb421001675 --- /dev/null +++ b/queues/event.go @@ -0,0 +1,103 @@ +package queues + +import ( + "encoding/json" + "fmt" + "time" + + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" +) + +type IProducerLogger interface { + IProducer + logger.ILogger +} + +func NewEvent(producer IProducerLogger, queueName string) Event { + if producer == nil { + panic(errors.Errorf("NewEvent(producer=nil)")) + } + + return Event{ + producer: producer, + QueueName: queueName, + TypeName: "", + DueTime: time.Now(), + RequestIDValue: "", + ParamValues: map[string]string{}, + BodyJSON: "", + } +} + +type Event struct { + producer IProducerLogger + MessageID string //assigned by implementation (AWS/mem/..) + QueueName string //queue determine sequencing, items in same queue are delivered one-after-the-other, other queues may deliver concurrent to this queue + TypeName string //type determines which handler processes the event + DueTime time.Time //do not process before this time + RequestIDValue string //service request-id that sends the event - for tracing + ParamValues map[string]string //parameters + BodyJSON string //expecting a JSON string +} + +func (event Event) Format(f fmt.State, c rune) { + f.Write([]byte(fmt.Sprintf("{queue:%s,type:%s,due:%s,request-id:%s,params:%v,bodyJSON:%20.20s...,msg-id:%s}", + event.QueueName, + event.TypeName, + event.DueTime.Format("2006-01-02 15:04:05"), + event.RequestIDValue, + event.ParamValues, + event.BodyJSON, + event.MessageID, + ))) +} + +func (event Event) Delay(dur time.Duration) Event { + if dur >= 0 { + event.DueTime = time.Now().Add(dur) + } + return event +} + +func (event Event) Type(typeName string) Event { + if typeName != "" { + event.TypeName = typeName + } + return event +} + +func (event Event) RequestID(requestID string) Event { + if requestID != "" { + event.RequestIDValue = requestID + } + return event +} + +func (event Event) Params(params map[string]string) Event { + for n, v := range params { + event.ParamValues[n] = v + } + return event +} + +func (event Event) Send(value interface{}) (string, error) { + event.producer.Debugf("Queue(%s) Send SQS Event: %v", + event.QueueName, + event) + if value != nil { + jsonBody, err := json.Marshal(value) + if err != nil { + return "", errors.Wrapf(err, "failed to JSON encode event body") + } + event.BodyJSON = string(jsonBody) + } + if event.producer == nil { + return "", errors.Errorf("Send with producer==nil") + } + if msgID, err := event.producer.Send(event); err != nil { + return "", errors.Wrapf(err, "failed to send event") + } else { + return msgID, nil + } +} diff --git a/queues/handler.go b/queues/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..aa29674f912e4c9204c356874c429ad2fd4ad3bb --- /dev/null +++ b/queues/handler.go @@ -0,0 +1,51 @@ +package queues + +import ( + "reflect" + + "gitlab.com/uafrica/go-utils/errors" +) + +type Handler struct { + RecordType reflect.Type + FuncValue reflect.Value +} + +func NewHandler(fnc interface{}) (Handler, error) { + h := Handler{} + + fncType := reflect.TypeOf(fnc) + if fncType.NumIn() != 2 { + return h, errors.Errorf("takes %d args instead of (Context, Record)", fncType.NumIn()) + } + if fncType.NumOut() != 1 { + return h, errors.Errorf("returns %d results instead of (error)", fncType.NumOut()) + } + + //arg[0] must implement interface sqs.IContext + if _, ok := reflect.New(fncType.In(0)).Interface().(IContext); !ok { + return h, errors.Errorf("first arg %v does not implement sqs.IContext", fncType.In(0)) + } + + //arg[1] must be a struct for the message record body. It may be an empty struct, but + //all public fields require a json tag which we will use to math the URL param name + if err := validateStructType(fncType.In(1)); err != nil { + return h, errors.Errorf("second arg %v is not valid record struct type", fncType.In(1)) + } + h.RecordType = fncType.In(1) + + //result must be error + if _, ok := reflect.New(fncType.Out(0)).Interface().(*error); !ok { + return h, errors.Errorf("result %v is not error type", fncType.Out(0)) + } + + h.FuncValue = reflect.ValueOf(fnc) + return h, nil +} + +func validateStructType(t reflect.Type) error { + if t.Kind() != reflect.Struct { + return errors.Errorf("%v is %v, not a struct", t, t.Kind()) + } + return nil +} diff --git a/queues/mem/README.md b/queues/mem/README.md new file mode 100644 index 0000000000000000000000000000000000000000..58ae44f94dd076f2b25588382010a94e62e54d34 --- /dev/null +++ b/queues/mem/README.md @@ -0,0 +1,3 @@ +# Memory Queues + +This is an in-memory implementation of go-utils/queues for use in local development and testing only. \ No newline at end of file diff --git a/queues/mem/consumer.go b/queues/mem/consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..7727ff0d2d76da9bdfb2c72957a1993998c33463 --- /dev/null +++ b/queues/mem/consumer.go @@ -0,0 +1,148 @@ +package mem + +import ( + "context" + "fmt" + "math/rand" + "reflect" + "sync" + "time" + + "github.com/google/uuid" + "github.com/uptrace/bun" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" + "gitlab.com/uafrica/go-utils/service" +) + +func NewConsumer(routes map[string]interface{}) *Consumer { + router, err := queues.NewRouter(routes) + if err != nil { + panic(fmt.Sprintf("cannot create router: %+v", err)) + } + return &Consumer{ + ILogger: logger.New().WithFields(map[string]interface{}{"env": "dev"}).NextColor(), + router: router, + queues: map[string]*queue{}, + } +} + +type Consumer struct { + sync.Mutex + logger.ILogger + router queues.Router + dbConn service.IDatabaseConnector + queues map[string]*queue +} + +func (consumer *Consumer) WithDb(dbConn service.IDatabaseConnector) queues.IConsumer { + consumer.dbConn = dbConn + return consumer +} + +func (consumer *Consumer) Queue(name string) (*queue, error) { + consumer.Lock() + defer consumer.Unlock() + q, ok := consumer.queues[name] + if !ok { + q = &queue{ + consumer: consumer, + name: name, + ch: make(chan queues.Event), + } + go q.run() + consumer.queues[name] = q + } + return q, nil +} + +//do not call this - when using local producer, the consumer is automatically running +//for each queue you send to, and processing from q.run() +func (consumer *Consumer) Run() { + panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER")) +} + +type queue struct { + consumer *Consumer + name string + ch chan queues.Event +} + +func (q *queue) run() { + for event := range q.ch { + //todo: create context with logger + rand.Seed(time.Now().Unix()) + + //report handler crashes + //defer sqs.crashReporter.Catch(ctx) + + var db *bun.DB + if q.consumer.dbConn != nil { + var err error + db, err = q.consumer.dbConn.Connect() + if err != nil { + q.consumer.Errorf("failed to connect to db: %+v", err) + continue + } + } + + baseCtx := context.Background() + ctx := queues.Context{ + Context: service.NewContext(baseCtx, map[string]interface{}{ + "env": "dev", + "request_id": event.RequestID, + "message_type": event.TypeName, + }), + IProducer: q, //todo: q can only send back into this queue... may need to send to other queues! + Event: event, + RequestID: event.RequestIDValue, + DB: db, + } + + ctx.WithFields(map[string]interface{}{ + "params": event.ParamValues, + "body": event.BodyJSON, + }).Infof("Queue(%s) Recv SQS Event: %v", q.name, event) + + //routing on messageType + sqsHandler, err := q.consumer.router.Route(event.TypeName) + if err != nil { + ctx.Errorf("Unhandled event type(%v): %v", event.TypeName, err) + continue + } + handler, ok := sqsHandler.(queues.Handler) + if !ok { + ctx.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler) + continue + } + + args := []reflect.Value{ + reflect.ValueOf(ctx), + } + + //allocate, populate and validate request struct + var recordStruct interface{} + recordStruct, err = ctx.GetRecord(handler.RecordType) + if err != nil { + ctx.Errorf("invalid message: %+v", err) + continue + } + + ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + args = append(args, reflect.ValueOf(recordStruct)) + + results := handler.FuncValue.Call(args) + if len(results) > 0 && !results[0].IsNil() { + ctx.Errorf("handler failed: %+v", results[0].Interface().(error)) + } else { + ctx.Debugf("handler success") + } + } //for each event from chan +} //queue.run() + +func (q *queue) Send(event queues.Event) (msgID string, err error) { + event.MessageID = uuid.New().String() + q.ch <- event + return event.MessageID, nil +} diff --git a/queues/mem/producer.go b/queues/mem/producer.go new file mode 100644 index 0000000000000000000000000000000000000000..0f90a67df5941ec8078cf495bc9284d9400f4b93 --- /dev/null +++ b/queues/mem/producer.go @@ -0,0 +1,34 @@ +package mem + +import ( + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/queues" +) + +//can only produce locally if also consuming local +func NewProducer(consumer *Consumer) queues.IProducer { + if consumer == nil { + panic(errors.Errorf("cannot product locally without consumer")) + } + return &producer{ + consumer: consumer, + } +} + +type producer struct { + consumer *Consumer +} + +func (producer *producer) Send(event queues.Event) (string, error) { + q, err := producer.consumer.Queue(event.QueueName) + if err != nil { + return "", errors.Wrapf(err, "failed to get/create queue(%s)", event.QueueName) + } + + msgID, err := q.Send(event) + if err != nil { + return "", errors.Wrapf(err, "failed to send to queue(%s)", event.QueueName) + } + + return msgID, nil +} diff --git a/queues/producer.go b/queues/producer.go new file mode 100644 index 0000000000000000000000000000000000000000..1f9b09f71df3bfa88c999aec8b44452ef74c7b2a --- /dev/null +++ b/queues/producer.go @@ -0,0 +1,5 @@ +package queues + +type IProducer interface { + Send(event Event) (msgID string, err error) +} diff --git a/queues/router.go b/queues/router.go new file mode 100644 index 0000000000000000000000000000000000000000..f2357365b0e2b7a16d10bfd8ab741c91f3b22ba7 --- /dev/null +++ b/queues/router.go @@ -0,0 +1,76 @@ +package queues + +import ( + "fmt" + + "github.com/aws/aws-lambda-go/events" + "gitlab.com/uafrica/go-utils/errors" +) + +type Router struct { + endpoints map[string]interface{} +} + +func (r Router) Endpoints() map[string]interface{} { + return r.endpoints +} + +func (r Router) Route(messageType string) (interface{}, error) { + if handler, ok := r.endpoints[messageType]; !ok { + return nil, errors.Errorf("%s not found", messageType) + } else { + return handler, nil + } +} + +//check that all endpoints are correctly defined using one of the supported handler types +//return updated endpoints with additional information +func NewRouter(endpoints map[string]interface{}) (Router, error) { + countLegacyEvent := 0 + countLegacyMessage := 0 + countHandler := 0 + for messageType, handlerFunc := range endpoints { + if messageType == "" { + return Router{}, errors.Errorf("blank messageType") + } + if messageType == "/sqs-docs" { + return Router{}, errors.Errorf("%s may not be a defined endpoint - it is reserved", messageType) + } + if handlerFunc == nil { + return Router{}, errors.Errorf("nil handler on %s", messageType) + } + + if _, ok := handlerFunc.(func(event events.SQSEvent) error); ok { + //ok - leave as is - we support this legacyHandler (typical in shiplogic) + fmt.Printf("%30.30s: OK (legacy event handler)\n", messageType) + countLegacyEvent++ + } else { + handler, err := NewHandler(handlerFunc) + if err != nil { + return Router{}, errors.Wrapf(err, "%30.30s has invalid handler %T", messageType, handlerFunc) + } + + //replace the endpoint value so we can quickly call this handler + endpoints[messageType] = handler + fmt.Printf("%30.30s: OK (record: %v)\n", messageType, handler.RecordType) + countHandler++ + } + } + fmt.Printf("Checked %d legacy event and %d legacy message and %d new handlers\n", countLegacyEvent, countLegacyMessage, countHandler) + + //add reserved endpoint to generate documentation + r := Router{ + endpoints: endpoints, + } + + // { + // docsHandler, err := NewHandler(GETApiDocs(r)) //endpoints)) + // if err != nil { + // return Router{}, errors.Wrapf(err, "failed to define handler for docs") + // } + // endpoints["/api-docs"] = map[string]interface{}{ + // "GET": docsHandler, + // } + // } + return r, nil +} diff --git a/queues/sqs/README.md b/queues/sqs/README.md new file mode 100644 index 0000000000000000000000000000000000000000..d35d0cee82094e2ad1f257d0d52020fcebc8e936 --- /dev/null +++ b/queues/sqs/README.md @@ -0,0 +1,3 @@ +# AWS SQS Queues + +This is an implementation of go-utils/queues using AWS SQS. \ No newline at end of file diff --git a/queues/sqs/consumer.go b/queues/sqs/consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..753a87510e00886b48787aec50a673f078edc157 --- /dev/null +++ b/queues/sqs/consumer.go @@ -0,0 +1,174 @@ +package sqs + +import ( + "context" + "fmt" + "math/rand" + "os" + "reflect" + "time" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/uptrace/bun" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" + "gitlab.com/uafrica/go-utils/service" +) + +func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queues.IConsumer { + env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod + if env == "" { + env = "dev" + } + router, err := queues.NewRouter(routes) + if err != nil { + panic(fmt.Sprintf("cannot create router: %+v", err)) + } + + //legacy message type - when running SQS instance for one type of messages only + //when defined, make sure handler exists for this type + sqsMessageType := os.Getenv("SQS_MESSAGE_TYPE") + if sqsMessageType != "" { + if _, err := router.Route(sqsMessageType); err != nil { + panic(errors.Errorf("No route defined for SQS_MESSAGE_TYPE=\"%s\"", sqsMessageType)) + } + } + + return consumer{ + ILogger: logger.New().WithFields(map[string]interface{}{"env": env}), + env: env, + router: router, + requestIDHeaderKey: requestIDHeaderKey, + ConstantMessageType: sqsMessageType, + producer: NewProducer(requestIDHeaderKey), + } +} + +type consumer struct { + logger.ILogger //for logging outside of context + env string + router queues.Router + requestIDHeaderKey string + ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE") + dbConn service.IDatabaseConnector + producer queues.IProducer +} + +func (consumer consumer) WithDb(dbConn service.IDatabaseConnector) queues.IConsumer { + consumer.dbConn = dbConn + return consumer +} + +func (consumer consumer) Run() { + lambda.Start(consumer.Handler) +} + +func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error { + //todo: create context with logger + rand.Seed(time.Now().Unix()) + + //report handler crashes + //defer sqs.crashReporter.Catch(ctx) + + var db *bun.DB + if consumer.dbConn != nil { + var err error + db, err = consumer.dbConn.Connect() + if err != nil { + return errors.Wrapf(err, "failed to connect to db") + } + } + + if consumer.ConstantMessageType != "" { + //legacy mode for fixed message type as used in shiplogic + //where the whole instance is started for a specific SQS_MESSAGE_TYPE defined in environment + handler, err := consumer.router.Route(consumer.ConstantMessageType) + if err != nil { + return errors.Wrapf(err, "messageType=%s not handled", consumer.ConstantMessageType) //checked on startup - should never get here!!! + } + + if msgHandler, ok := handler.(func(events.SQSEvent) error); !ok { + return errors.Wrapf(err, "SQS_MESSAGE_TYPE=%s: handler signature %T not supported", consumer.ConstantMessageType, handler) + } else { + return msgHandler(lambdaEvent) + } + } else { + //support different message types - obtained from the individual event records + //process all message records in this event: + for messageIndex, message := range lambdaEvent.Records { + //get request-id for this message record + requestID := "" + if requestIDAttr, ok := message.MessageAttributes[consumer.requestIDHeaderKey]; ok { + requestID = *requestIDAttr.StringValue + } + + messageType := "" + if messageTypeAttr, ok := message.MessageAttributes["type"]; !ok || messageTypeAttr.StringValue == nil { + consumer.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required + continue + } else { + messageType = *messageTypeAttr.StringValue + } + + ctx := queues.Context{ + Context: service.NewContext(baseCtx, map[string]interface{}{ + "env": consumer.env, + "request_id": requestID, + "message_type": messageType, + }), + IProducer: consumer.producer, //needed so handler can queue other events or requeue this event + Event: queues.Event{ + //producer: nil, + MessageID: message.MessageId, + QueueName: "N/A", //not sure how to get queue name from lambda Event... would be good to log it, may be in os.Getenv(???)? + TypeName: messageType, + DueTime: time.Now(), + RequestIDValue: requestID, + BodyJSON: message.Body, + }, + RequestID: requestID, + DB: db, + } + + ctx.WithFields(map[string]interface{}{ + "message_index": messageIndex, + "message": message, + }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event.QueueName, ctx.Event) + + //routing on messageType + sqsHandler, err := consumer.router.Route(messageType) + if err != nil { + ctx.Errorf("Unhandled sqs messageType(%v): %v", messageType, err) + continue + } + handler, ok := sqsHandler.(queues.Handler) + if !ok { + ctx.Errorf("messageType(%v) unsupported signature: %T", messageType, sqsHandler) + continue + } + + args := []reflect.Value{ + reflect.ValueOf(ctx), + } + + //allocate, populate and validate request struct + var recordStruct interface{} + recordStruct, err = ctx.GetRecord(handler.RecordType) + if err != nil { + ctx.Errorf("invalid message: %+v", err) + continue + } + + ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + args = append(args, reflect.ValueOf(recordStruct)) + + results := handler.FuncValue.Call(args) + if len(results) > 0 && !results[0].IsNil() { + ctx.Errorf("handler failed: %+v", results[0].Interface().(error)) + } + } + } + return nil +} diff --git a/queues/sqs/producer.go b/queues/sqs/producer.go new file mode 100644 index 0000000000000000000000000000000000000000..e5de43248faa42b9e3d353c52c245e1a9631ebd8 --- /dev/null +++ b/queues/sqs/producer.go @@ -0,0 +1,128 @@ +package sqs + +import ( + "os" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" +) + +func NewProducer(requestIDHeaderKey string) queues.IProducer { + region := os.Getenv("AWS_REGION") + if region == "" { + panic(errors.Errorf("environment AWS_REGION is not defined")) + } + if requestIDHeaderKey == "" { + requestIDHeaderKey = "request-id" + } + return &producer{ + region: region, + requestIDHeaderKey: requestIDHeaderKey, + session: nil, + queues: map[string]*Messenger{}, + } +} + +type producer struct { + sync.Mutex + region string + requestIDHeaderKey string + session *session.Session + queues map[string]*Messenger +} + +// Note: Calling code needs SQS IAM permissions +func (producer *producer) Send(event queues.Event) (string, error) { + messenger, ok := producer.queues[event.QueueName] + if !ok { + producer.Lock() + defer producer.Unlock() + messenger, ok = producer.queues[event.QueueName] + if !ok { + envName := strings.ToUpper(event.QueueName + "_QUEUE_URL") + queueURL := os.Getenv(envName) + if queueURL == "" { + return "", errors.Errorf("cannot send to queue(%s) because environment(%s) is undefined", event.QueueName, envName) + } + + // Make an AWS session + sess, err := session.NewSessionWithOptions(session.Options{ + Config: aws.Config{ + Region: aws.String(producer.region), + }, + }) + if err != nil { + return "", errors.Wrapf(err, "failed to create AWS session") + } + + messenger = &Messenger{ + session: sess, + service: sqs.New(sess), + queueURL: queueURL, + } + producer.queues[event.QueueName] = messenger + } //if not defined in mutex + } //if not defined + + if msgID, err := messenger.Send(event); err != nil { + return "", errors.Wrapf(err, "failed to send") + } else { + return msgID, nil + } +} + +// Messenger sends an arbitrary message via SQS to a particular queue URL +type Messenger struct { + producer *producer + session *session.Session + service *sqs.SQS + queueURL string +} + +func (m *Messenger) Send(event queues.Event) (string, error) { + logger.Debugf("Sending event %+v", event) + + //add params as message attributes + msgAttrs := make(map[string]*sqs.MessageAttributeValue) + for key, val := range event.ParamValues { + msgAttrs[key] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(val), + } + } + + msgAttrs[m.producer.requestIDHeaderKey] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(event.RequestIDValue), + } + msgAttrs["type"] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(event.TypeName), + } + + // SQS has max of 15 minutes delay + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html + // if due later than that, queue just for this much time + delaySeconds := int64(time.Until(event.DueTime) / time.Second) + if delaySeconds > 900 { + delaySeconds = 900 + } + + if res, err := m.service.SendMessage(&sqs.SendMessageInput{ + MessageAttributes: msgAttrs, + DelaySeconds: &delaySeconds, + MessageBody: aws.String(event.BodyJSON), + QueueUrl: &m.queueURL, + }); err != nil { + return "", errors.Wrapf(err, "failed to send") + } else { + return *res.MessageId, nil + } +} diff --git a/service/context.go b/service/context.go index 1f172e80adb099336d2f5908ed2f1c346ddaacd2..da6d76918b452aab81809346f3ff80639ae76779 100644 --- a/service/context.go +++ b/service/context.go @@ -20,7 +20,7 @@ func NewContext(base context.Context, values map[string]interface{}) Context { } return Context{ Context: base, - Logger: logger.New().WithFields(values), + Logger: logger.New().WithFields(values).NextColor(), startTime: time.Now(), } } diff --git a/string_utils/string_utils.go b/string_utils/string_utils.go index ee99f665459edc6eda66f29306f015a5eee0e013..d6b4a78e86da3e807acbee006e6a26c286001598 100644 --- a/string_utils/string_utils.go +++ b/string_utils/string_utils.go @@ -51,7 +51,7 @@ func StandardisePhoneNumber(number string) string { // is the first rune/char of the string a 0 if []rune(number)[0] == []rune("0")[0] { // Add south african country code (hardcoded for now) - number = "+27" + number[1:len(number)] + number = "+27" + number[1:] } return number }