From 1956c10153f5943e64e7f3084e4c9fda2feb6d1f Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Thu, 11 Nov 2021 10:57:17 +0200 Subject: [PATCH] Cleanup and better api-logs (and sqs also possible) with actions list capturing significant actions taken during handlers (off by default) --- api/api.go | 10 +- api/lambda.go | 26 +-- consumer/mem_consumer/consumer.go | 30 +-- consumer/sqs_consumer/consumer.go | 19 +- examples/core/app/users/users.go | 3 +- logs/action.go | 291 ++++++++++++++++++++++++++++++ logs/api-logs.go | 167 ++++------------- logs/config.go | 61 +++++++ logs/sqs-logs.go | 23 +-- queues/README.md | 4 +- queues/event.go | 31 ++-- queues/sqs_producer/producer.go | 6 +- service/README.md | 2 +- service/context.go | 13 ++ 14 files changed, 497 insertions(+), 189 deletions(-) create mode 100644 logs/action.go create mode 100644 logs/config.go diff --git a/api/api.go b/api/api.go index 5cd5e16..718cc6f 100644 --- a/api/api.go +++ b/api/api.go @@ -6,8 +6,10 @@ import ( "runtime/debug" "github.com/aws/aws-lambda-go/lambda" + "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/consumer/mem_consumer" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/queues/sqs_producer" "gitlab.com/uafrica/go-utils/service" @@ -117,15 +119,19 @@ func (api Api) WithEvents(eventHandlers map[string]interface{}) Api { //run and panic on error func (api Api) Run() { //decide local or SQS + var producer queues.Producer if (os.Getenv("LOG_LEVEL") == "debug") && api.localQueueEventHandlers != nil { //use in-memory channels for async events api.Debugf("Using in-memory channels for async events ...") - api = api.WithProducer(mem_consumer.NewProducer(mem_consumer.New(api.Service, api.localQueueEventHandlers))) + producer = mem_consumer.NewProducer(mem_consumer.New(api.Service, api.localQueueEventHandlers)) } else { //use SQS for async events api.Debugf("Using SQS queue producer for async events ...") - api = api.WithProducer(sqs_producer.New(api.requestIDHeaderKey)) + producer = sqs_producer.New(api.requestIDHeaderKey) } + api = api.WithProducer(producer) + audit.Init(producer) + logs.Init(producer) //run as an AWS Lambda function lambda.Start(api.Handler) diff --git a/api/lambda.go b/api/lambda.go index a36bfbf..19edeae 100644 --- a/api/lambda.go +++ b/api/lambda.go @@ -99,7 +99,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat if api.requestIDHeaderKey != "" { res.Headers[api.requestIDHeaderKey] = Ctx.RequestID() } - if err := logs.LogIncomingAPIRequest(Ctx.StartTime(), apiGatewayProxyReq, res); err != nil { + if err := logs.LogIncomingAPIRequest(Ctx.StartTime(), Ctx.RequestID(), Ctx.Claim(), apiGatewayProxyReq, res); err != nil { Ctx.Errorf("failed to log: %+v", err) } }() @@ -126,7 +126,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat } } - Ctx.Debugf("HTTP %s %s ...\n", apiGatewayProxyReq.HTTPMethod, apiGatewayProxyReq.Resource) + Ctx.Tracef("HTTP %s %s ...\n", apiGatewayProxyReq.HTTPMethod, apiGatewayProxyReq.Resource) Ctx.WithFields(map[string]interface{}{ "http_method": Ctx.Request().HTTPMethod, "path": Ctx.Request().Path, @@ -215,7 +215,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat // return // } - Ctx.Debugf("Body: (%T) %+v", bodyStruct, bodyStruct) + Ctx.Tracef("Body: (%T) %+v", bodyStruct, bodyStruct) args = append(args, reflect.ValueOf(bodyStruct)) } @@ -260,15 +260,19 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat if len(results) > 1 { responseStruct := results[0].Interface() Ctx.Debugf("Response type: %T", responseStruct) - - var bodyBytes []byte - bodyBytes, err = json.Marshal(responseStruct) - if err != nil { - err = errors.Wrapf(err, "failed to encode response content") - return + if responseString, ok := responseStruct.(string); ok { + res.Headers["Content-Type"] = "application/json" + res.Body = responseString + } else { + var bodyBytes []byte + bodyBytes, err = json.Marshal(responseStruct) + if err != nil { + err = errors.Wrapf(err, "failed to encode response content") + return + } + res.Headers["Content-Type"] = "application/json" + res.Body = string(bodyBytes) } - res.Headers["Content-Type"] = "application/json" - res.Body = string(bodyBytes) } else { //no content delete(res.Headers, "Content-Type") diff --git a/consumer/mem_consumer/consumer.go b/consumer/mem_consumer/consumer.go index 99274c9..0732399 100644 --- a/consumer/mem_consumer/consumer.go +++ b/consumer/mem_consumer/consumer.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) @@ -157,18 +158,18 @@ func (q *queue) process(event queues.Event) error { return errors.Wrapf(err, "invalid message body") } - ctx.WithFields(map[string]interface{}{ - "params": event.ParamValues, - "body": event.BodyJSON, - }).Infof("RECV(%s) Queue(%s).Type(%s).Due(%s): (%T)%v", - "---", //not yet available here - not part of event, and in SQS I think it is passed in SQS layer, so need to extend local channel to include this along with event - q.name, - event.TypeName, - event.DueTime, - recordStruct, - recordStruct) - - ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + //log if not internal queue + if q.name != "AUDIT" && q.name != "API_LOGS" { + ctx.WithFields(map[string]interface{}{ + "params": event.ParamValues, + "body": event.BodyJSON, + }).Infof("RECV(%s) Queue(%s).Type(%s).Due(%s)", + "---", //not yet available here - not part of event, and in SQS I think it is passed in SQS layer, so need to extend local channel to include this along with event + q.name, + event.TypeName, + event.DueTime) + ctx.Tracef("RECV(%s) Request(%T)%v", q.name, recordStruct, recordStruct) + } args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) @@ -180,6 +181,11 @@ func (q *queue) process(event queues.Event) error { } //queue.process() func (q *queue) Send(event queues.Event) (msgID string, err error) { + startTime := time.Now() + defer func() { + logs.LogSQSSent(startTime, event.QueueName, event.TypeName, event.BodyJSON) + }() + event.MessageID = uuid.New().String() q.ch <- event return event.MessageID, nil diff --git a/consumer/sqs_consumer/consumer.go b/consumer/sqs_consumer/consumer.go index bbf2e5b..7974fa0 100644 --- a/consumer/sqs_consumer/consumer.go +++ b/consumer/sqs_consumer/consumer.go @@ -15,6 +15,7 @@ import ( "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" "github.com/google/uuid" + "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" @@ -42,8 +43,11 @@ func New(requestIDHeaderKey string, routes map[string]interface{}) consumer.Cons } } + producer := sqs_producer.New(requestIDHeaderKey) s := service.New(). - WithProducer(sqs_producer.New(requestIDHeaderKey)) + WithProducer(producer) + audit.Init(producer) + logs.Init(producer) return sqsConsumer{ Service: s, @@ -175,10 +179,13 @@ func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEven return err } - ctx.WithFields(map[string]interface{}{ - "message_index": messageIndex, - "message": message, - }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event()) + //log if not internal queue + if ctx.Event().QueueName != "AUDIT" && ctx.Event().QueueName != "API_LOGS" { + ctx.WithFields(map[string]interface{}{ + "message_index": messageIndex, + "message": message, + }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event()) + } //routing on messageType sqsHandler, err := c.router.Route(messageType) @@ -205,7 +212,7 @@ func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEven } requestToLog = recordStruct //replace string log with structured log - ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + ctx.Tracef("message (%T) %+v", recordStruct, recordStruct) args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) diff --git a/examples/core/app/users/users.go b/examples/core/app/users/users.go index 5599be9..a7dd6cb 100644 --- a/examples/core/app/users/users.go +++ b/examples/core/app/users/users.go @@ -11,7 +11,6 @@ import ( "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/examples/core/email" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/queues" ) type User struct { @@ -113,7 +112,7 @@ func Add(ctx api.Context, params noParams, newUser POSTUser) (User, error) { Subject: "Welcome User", Body: "Your account has been created", } - /*eventID*/ _, err := queues.NewEvent(ctx, "notify").RequestID(ctx.RequestID()).Type("email").Delay(time.Second * 5).Params(map[string]string{}).Send(email) + /*eventID*/ _, err := ctx.NewEvent("notify").RequestID(ctx.RequestID()).Type("email").Delay(time.Second * 5).Params(map[string]string{}).Send(email) if err != nil { ctx.Errorf("failed to notify: %+v", err) } diff --git a/logs/action.go b/logs/action.go new file mode 100644 index 0000000..9bb602d --- /dev/null +++ b/logs/action.go @@ -0,0 +1,291 @@ +package logs + +import ( + "encoding/json" + "sync" + "time" +) + +//Call LogOutgoingAPIRequest() after calling an API end-point as part of a handler, +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogOutgoingAPIRequest(url string, method string, requestBody string, responseBody string, responseCode int, startTime time.Time) error { + endTime := time.Now() + log := ApiCallLog{ + URL: url, + Method: method, + ResponseCode: responseCode, + } + if requestBody != "" { + log.Request = &BodyLog{ + BodySize: len(requestBody), + Body: requestBody, + } + } + if responseBody != "" { + log.Response = &BodyLog{ + BodySize: len(responseBody), + Body: responseBody, + } + } + + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeApiCall, + ApiCall: &log, + }) + actionListMutex.Unlock() + + return nil +} //LogOutgoingAPIRequest() + +//Call LogSQL() after executing any SQL query +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogSQL( + startTime time.Time, + sql string, + rowsCount int, //optional nr of rows to report, else 0 + ids []int64, //optional list of ids to report, else nil + err error, //only if failed, else nil +) { + endTime := time.Now() + log := SQLQueryLog{ + SQL: sql, + RowCount: rowsCount, + InsertIDs: ids, + } + if err != nil { + log.Error = err.Error() + } + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeSqlQuery, + SQLQuery: &log, + }) + actionListMutex.Unlock() +} + +//Call LogSQSSent() after sending an SQS event +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogSQSSent(startTime time.Time, queueName string, messageType string, request interface{}) { + //do not log internal events sent to audit/api-log + if queueName == "API_LOGS" || queueName == "AUDIT" { + return + } + + endTime := time.Now() + log := SQSSentLog{ + QueueName: queueName, + MessageType: messageType, + } + if request != nil { + if requestString, ok := request.(string); ok { + log.Request = &BodyLog{ + BodySize: len(requestString), //do not marshal, else we have double escaped JSON + Body: requestString, + } + } else { + jsonRequest, _ := json.Marshal(request) + log.Request = &BodyLog{ + BodySize: len(jsonRequest), + Body: string(jsonRequest), + } + } + } + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeSqsSent, + SQSSent: &log, + }) + actionListMutex.Unlock() +} + +//Call LogSleep() after doing time.Sleep() +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogSleep(startTime time.Time) { + endTime := time.Now() + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeSleep, + }) + actionListMutex.Unlock() +} + +var ( + actionListMutex sync.Mutex + actionList = []ActionLog{} +) + +type ActionLog struct { + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurMs int64 `json:"duration_ms"` //duration in milliseconds + Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` + ApiCall *ApiCallLog `json:"api_call,omitempty"` + SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` + SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` +} + +func (action ActionLog) Relative(relTime time.Time) RelativeActionLog { + return RelativeActionLog{ + StartMs: action.StartTime.Sub(relTime).Milliseconds(), + EndMs: action.EndTime.Sub(relTime).Milliseconds(), + DurMs: action.DurMs, + Type: action.Type, + ApiCall: action.ApiCall, + SQSSent: action.SQSSent, + SQLQuery: action.SQLQuery, + } +} + +type RelativeActionLog struct { + StartMs int64 `json:"start_ms" doc:"Start time in milliseconds after start timestamp"` + EndMs int64 `json:"end_ms" doc:"End time in milliseconds after start timestamp"` + DurMs int64 `json:"duration_ms"` //duration in milliseconds + Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` + ApiCall *ApiCallLog `json:"api_call,omitempty"` + SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` + SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` +} + +type ActionType string + +var ActionTypeList = []ActionType{ + ActionTypeNone, + ActionTypeApiCall, + ActionTypeSqsSent, + ActionTypeSqlQuery, + ActionTypeSleep, +} + +const ( + ActionTypeNone ActionType = "none" + ActionTypeApiCall ActionType = "api-call" + ActionTypeSqsSent ActionType = "sqs-sent" + ActionTypeSqlQuery ActionType = "sql-query" + ActionTypeSleep ActionType = "sleep" +) + +//APICallLog captures details of an outgoing API call made from a handler +type ApiCallLog struct { + URL string `json:"url"` + Method string `json:"method"` + ResponseCode int `json:"response_code"` + Request *BodyLog `json:"request,omitempty"` + Response *BodyLog `json:"response,omitempty"` +} + +type BodyLog struct { + BodySize int `json:"body_size"` + Body string `json:"body"` +} + +//SQSSentLog captures details of an SQS event sent from a handler +type SQSSentLog struct { + QueueName string `json:"queue_name"` + MessageType string `json:"message_type"` + Request *BodyLog `json:"request,omitempty"` +} + +//SQLQueryLog captures details of an SQL query executed from a handler resulting in either rows returned, ids inserted or an error +type SQLQueryLog struct { + SQL string `json:"sql"` + RowCount int `json:"row_count,omitempty"` + InsertIDs []int64 `json:"insert_ids,omitempty"` + Error string `json:"error,omitempty"` +} + +//compile the relative action list that took place during this handler +//copy then reset actionList for the next handler +//we copy it with relation to this API's start..end time, rather than full timestamps, which are hard to read in the list +//start and end are current total handler period that actions should be inside that +func relativeActionList(startTime, endTime time.Time) []RelativeActionLog { + actionListMutex.Lock() + defer func() { + //after copy/discard, reset (global!) action list for the next handler + actionList = []ActionLog{} + actionListMutex.Unlock() + }() + + cfg := currentLogConfig() + if !cfg.ActionsKeep { + return nil + } + + //todo: runtime config: load temporary from REDIS after N seconds + //which will allow us to monitor better for a short while during trouble shooting + //then something like this to reload every 5min (5min could also be part of config) + // if dynamicExpireTime.Before(time.Now()) { + // dynamicApiConfig := apiConfig //loaded from env at startup + // ...look for keys in REDIS and override ... + // dynamicExpireTime = time.Now().Add(time.Minute*5) + // } + // do this in go routing with sleep... so handlers do not have to check :-) + // and it can also be used for api-log part that is not action list, e.g. for api-log req/res body len etc... + relActionList := []RelativeActionLog{} + for _, action := range actionList { + if action.EndTime.Before(startTime) || action.StartTime.After(endTime) { + continue //not expected - skip actions outside log window + } + + //apply reduction filters to limit string lengths + switch action.Type { + case ActionTypeNone: + case ActionTypeSqlQuery: + if action.SQLQuery != nil && len(action.SQLQuery.SQL) > int(cfg.ActionsMaxSQLLength) { + action.SQLQuery.SQL = action.SQLQuery.SQL[:cfg.ActionsMaxSQLLength] + } + case ActionTypeSqsSent: + if action.SQSSent != nil && action.SQSSent.Request != nil && len(action.SQSSent.Request.Body) > int(cfg.ActionsMaxSQSReqBodyLength) { + action.SQSSent.Request.Body = action.SQSSent.Request.Body[:cfg.ActionsMaxSQSReqBodyLength] + } + case ActionTypeApiCall: + if action.ApiCall != nil { + if action.ApiCall.Request != nil && len(action.ApiCall.Request.Body) > int(cfg.ActionsMaxAPIReqBodyLength) { + action.ApiCall.Request.Body = action.ApiCall.Request.Body[:cfg.ActionsMaxAPIReqBodyLength] + } + if action.ApiCall.Response != nil && len(action.ApiCall.Response.Body) > int(cfg.ActionsMaxAPIResBodyLength) { + action.ApiCall.Response.Body = action.ApiCall.Response.Body[:cfg.ActionsMaxAPIResBodyLength] + } + } + } + + //make relative and append to the list + relActionList = append(relActionList, action.Relative(startTime)) + } + + //also append to the list any nonAction periods greater than thresholdMs + //to indicate significant gaps in the action list that we did not account for + thresholdMs := int64(50) + //make period list, remove all action periods, then we're left with non-action periods :-) + nonActionPeriods := NewPeriods(startTime, endTime) + for _, action := range actionList { + nonActionPeriods = nonActionPeriods.Without(Period{Start: action.StartTime, End: action.EndTime}) + } + for _, nonAction := range nonActionPeriods { + if nonAction.Duration().Milliseconds() > thresholdMs { + relActionList = append(relActionList, ActionLog{ + StartTime: nonAction.Start, + EndTime: nonAction.End, + DurMs: nonAction.Duration().Milliseconds(), + Type: ActionTypeNone, + }.Relative(startTime)) + } + } + return relActionList +} diff --git a/logs/api-logs.go b/logs/api-logs.go index 55a33cc..9552e10 100644 --- a/logs/api-logs.go +++ b/logs/api-logs.go @@ -2,27 +2,31 @@ package logs import ( "net/http" + "sort" "strings" - "sync" "time" "github.com/aws/aws-lambda-go/events" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/queues" - "gitlab.com/uafrica/go-utils/service" ) +var producer queues.Producer + +func Init(p queues.Producer) { + producer = p +} + //Call this at the end of an API request handler to capture the req/res as well as all actions taken during the processing //(note: action list is only reset when this is called - so must be called after each handler, else action list has to be reset at the start) -func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyRequest, res events.APIGatewayProxyResponse) error { - if service.Ctx == nil { - return errors.Errorf("Cannot log without service context") +func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[string]interface{}, req events.APIGatewayProxyRequest, res events.APIGatewayProxyResponse) error { + if producer == nil { + return errors.Errorf("logs queue producer not set") } //todo: filter out some noisy (method+path) endTime := time.Now() - dur := endTime.Sub(startTime) var authType string var authUsername string @@ -40,18 +44,17 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques } } - claim := service.Ctx.Claim() username, _ := claim["user_id"].(string) accountID, _ := claim["account_id"].(int64) apiLog := ApiLog{ StartTime: startTime, EndTime: endTime, - DurMs: int(dur / time.Millisecond), + DurMs: endTime.Sub(startTime).Milliseconds(), Method: req.HTTPMethod, Address: req.RequestContext.DomainName, Path: req.Path, ResponseCode: res.StatusCode, - RequestID: service.Ctx.RequestID(), + RequestID: requestID, InitialAuthType: authType, InitialAuthUsername: authUsername, SourceIP: req.RequestContext.Identity.SourceIP, @@ -69,14 +72,15 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques BodySize: len(res.Body), Body: res.Body, }, - //RelevantID: service.Ctx.,??? + Actions: nil, } - //copy then reset actionList for the next handler - actionListMutex.Lock() - apiLog.Actions = actionList - actionList = []ActionLog{} - actionListMutex.Unlock() + //compile action list + apiLog.Actions = relativeActionList(apiLog.StartTime, apiLog.EndTime) + + //sort action list on startTime, cause actions are added when they end, i.e. ordered by end time + //and all non-actions were appended at the end of the list + sort.Slice(apiLog.Actions, func(i, j int) bool { return apiLog.Actions[i].StartMs < apiLog.Actions[j].StartMs }) //also copy multi-value query parameters to the log as CSV array values for n, as := range req.MultiValueQueryStringParameters { @@ -90,7 +94,7 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques } //todo: filter out sensitive values (e.g. OTP) - if _, err := queues.NewEvent(service.Ctx, "API_LOGS"). + if _, err := producer.NewEvent("API_LOGS"). Type("api-log"). RequestID(apiLog.RequestID). Send(apiLog); err != nil { @@ -101,24 +105,24 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques //ApiLog is the SQS event details struct encoded as JSON document, sent to SQS, to be logged for each API handler executed. type ApiLog struct { - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - DurMs int `json:"duration_ms"` //duration in milliseconds - Method string `json:"method"` - Address string `json:"address"` //server address for incoming and outgoing - Path string `json:"path"` - ResponseCode int `json:"response_code"` - RequestID string `json:"request_id"` - InitialAuthUsername string `json:"initial_auth_username,omitempty"` - InitialAuthType string `json:"initial_auth_type,omitempty"` - AccountID int64 `json:"account_id,omitempty"` - Username string `json:"username,omitempty"` - SourceIP string `json:"source_ip,omitempty"` //only logged for incoming API - UserAgent string `json:"user_agent,omitempty"` //only for incoming, indicate type of browser when UI - RelevantID string `json:"relevant_id,omitempty"` - Request ApiLogRequest `json:"request"` - Response ApiLogResponse `json:"response"` - Actions []ActionLog `json:"actions,omitempty"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurMs int64 `json:"duration_ms"` //duration in milliseconds + Method string `json:"method"` + Address string `json:"address"` //server address for incoming and outgoing + Path string `json:"path"` + ResponseCode int `json:"response_code"` + RequestID string `json:"request_id"` + InitialAuthUsername string `json:"initial_auth_username,omitempty"` + InitialAuthType string `json:"initial_auth_type,omitempty"` + AccountID int64 `json:"account_id,omitempty"` + Username string `json:"username,omitempty"` + SourceIP string `json:"source_ip,omitempty"` //only logged for incoming API + UserAgent string `json:"user_agent,omitempty"` //only for incoming, indicate type of browser when UI + RelevantID string `json:"relevant_id,omitempty"` + Request ApiLogRequest `json:"request"` + Response ApiLogResponse `json:"response"` + Actions []RelativeActionLog `json:"actions,omitempty"` } type ApiLogRequest struct { @@ -133,98 +137,3 @@ type ApiLogResponse struct { BodySize int `json:"body_size"` //set even when body is truncated/omitted Body string `json:"body,omitempty"` //json content as a string } - -func LogOutgoingAPIRequest(url string, method string, requestBody string, responseBody string, responseCode int, startTime time.Time) error { - endTime := time.Now() - dur := endTime.Sub(startTime) - - apiCallLog := ApiCallLog{ - URL: url, - Method: method, - ResponseCode: responseCode, - Request: ApiCallRequestLog{ - BodySize: len(requestBody), - Body: requestBody, - }, - Response: ApiCallResponseLog{ - BodySize: len(responseBody), - Body: responseBody, - }, - } - - actionListMutex.Lock() - actionList = append(actionList, ActionLog{ - StartTime: startTime, - EndTime: endTime, - DurMs: int(dur / time.Millisecond), - Type: ActionTypeApiCall, - ApiCall: &apiCallLog, - }) - actionListMutex.Unlock() - - return nil -} //LogOutgoingAPIRequest() - -var ( - actionListMutex sync.Mutex - actionList = []ActionLog{} -) - -type ActionLog struct { - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - DurMs int `json:"duration_ms"` //duration in milliseconds - Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` - ApiCall *ApiCallLog `json:"api_call,omitempty"` - SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` - SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` - Sleep *SleepLog `json:"sleep,omitempty"` -} - -type ActionType string - -var ActionTypeList = []ActionType{ - ActionTypeApiCall, - ActionTypeSqsSent, - ActionTypeSqlQuery, - ActionTypeSleep, -} - -const ( - ActionTypeApiCall ActionType = "api-call" - ActionTypeSqsSent ActionType = "sqs-sent" - ActionTypeSqlQuery ActionType = "sql-query" - ActionTypeSleep ActionType = "sleep" -) - -//APICallLog captures details of an outgoing API call made from a handler -type ApiCallLog struct { - URL string `json:"url"` - Method string `json:"method"` - ResponseCode int `json:"response_code"` - Request ApiCallRequestLog `json:"request"` - Response ApiCallResponseLog `json:"response"` -} - -type ApiCallRequestLog struct { - BodySize int `json:"body_size"` - Body string `json:"body"` -} - -type ApiCallResponseLog struct { - BodySize int `json:"body_size"` - Body string `json:"body"` -} - -//SQSSentLog captures details of an SQS event sent from a handler -type SQSSentLog struct { -} - -//SQLQueryLog captures details of an SQL query executed from a handler -type SQLQueryLog struct { -} - -//SleepLog captures details of time spent sleeping from a handler -type SleepLog struct { - //nothing to record apart from the action start/end/dur... -} diff --git a/logs/config.go b/logs/config.go new file mode 100644 index 0000000..f05411e --- /dev/null +++ b/logs/config.go @@ -0,0 +1,61 @@ +package logs + +import ( + "context" + "time" + + "gitlab.com/uafrica/go-utils/config" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/redis" +) + +type Config struct { + ActionsKeep bool `json:"actions_keep" doc:"Set true to keep list of actions in logs"` + ActionsMaxSQLLength int64 `json:"actions_max_sql_length" doc:"Set length of SQL query to keep in action list (default 0 = delete)"` + ActionsMaxSQSReqBodyLength int64 `json:"actions_max_sqs_req_body_length" doc:"Set length of SQS Request body to keep in action list (default 0 = delete)"` + ActionsMaxAPIReqBodyLength int64 `json:"actions_max_api_req_body_length" doc:"Set length of API Request body to keep in action list (default 0 = delete)"` + ActionsMaxAPIResBodyLength int64 `json:"actions_max_api_res_body_length" doc:"Set length of API Response body to keep in action list (default 0 = delete)"` +} + +const configPrefix = "LOGS" + +var ( + logConfig Config + dynamicLogConfig Config + dynamicExpireTime time.Time + redisCli redis.IRedis +) + +func init() { + if err := config.LoadEnv(configPrefix, &logConfig); err != nil { + logger.Errorf("failed to load LOGS config: %+v", err) + } + dynamicLogConfig = logConfig + dynamicExpireTime = time.Now() + + //see if can load overrides from redis + var err error + redisCli, err = redis.New(context.Background()) + if err != nil { + logger.Errorf("Not able to connect to REDIS for runtime %s config: %+v", configPrefix, err) + } +} + +//todo: call only on each use and check expiry time before reading from REDIS again, e.g. reload no faster that 10s +func currentLogConfig() Config { + if redisCli == nil || dynamicExpireTime.After(time.Now()) { + return dynamicLogConfig + } + + //time to attempt reload + //copy static config then overload values which are defined from REDIS + dynamicLogConfig = logConfig + dynamicExpireTime = time.Now().Add(time.Second * 10) + + if err := config.Load(configPrefix, &dynamicLogConfig, redisCli); err != nil { + logger.Errorf("failed to load %s config from REDIS", configPrefix) + } else { + logger.Debugf("Loaded %s config: %+v", configPrefix, dynamicLogConfig) + } + return dynamicLogConfig +} //runtimeConfigLoad diff --git a/logs/sqs-logs.go b/logs/sqs-logs.go index 49f446b..22f45d2 100644 --- a/logs/sqs-logs.go +++ b/logs/sqs-logs.go @@ -8,8 +8,6 @@ import ( "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/queues" - "gitlab.com/uafrica/go-utils/service" ) //Call this at the end of an SQS event handler to capture the req and result as well as all actions taken during the processing @@ -20,8 +18,8 @@ func LogSQSRequest(startTime time.Time, req interface{}, handlerErr error, ) error { - if service.Ctx == nil { - return errors.Errorf("Cannot log without service context") + if producer == nil { + return errors.Errorf("logs queue producer not set") } if !sqsLogEnabled { @@ -29,20 +27,24 @@ func LogSQSRequest(startTime time.Time, } endTime := time.Now() - dur := endTime.Sub(startTime) log := ApiLog{ StartTime: startTime, EndTime: endTime, - DurMs: int(dur / time.Millisecond), + DurMs: endTime.Sub(startTime).Milliseconds(), RequestID: requestID, Method: "SQS", Path: messageType, } if req != nil { - if jsonReq, err := json.Marshal(req); err == nil { - log.Request.Body = string(jsonReq) - log.Request.BodySize = len(log.Request.Body) + if reqString, ok := req.(string); ok { + log.Request.Body = reqString //do not marshal else we have double-escaped JSON + log.Request.BodySize = len(reqString) + } else { + if jsonReq, err := json.Marshal(req); err == nil { + log.Request.Body = string(jsonReq) + log.Request.BodySize = len(log.Request.Body) + } } } @@ -60,7 +62,6 @@ func LogSQSRequest(startTime time.Time, //copy then reset actionList for the next handler actionListMutex.Lock() - log.Actions = actionList actionList = []ActionLog{} actionListMutex.Unlock() @@ -69,7 +70,7 @@ func LogSQSRequest(startTime time.Time, //note: we send SQS logs to "API_LOGS" which already exists... should be renamed to simply "LOGS" //it use the same structure, but method="SQS" and path="messageType" and request is the event body //so they can be plotted on the same dashboard visualisation in OpenSearch with all the same filters/metrics - if _, err := queues.NewEvent(service.Ctx, "API_LOGS"). + if _, err := producer.NewEvent("API_LOGS"). Type("api-log"). RequestID(requestID). Send(log); err != nil { diff --git a/queues/README.md b/queues/README.md index c592a56..1ac1212 100644 --- a/queues/README.md +++ b/queues/README.md @@ -59,10 +59,10 @@ func MyHandler(ctx queues.Context, body MyBody) (err error) { Notes: * The route name is the name specified in - ```service.NewEvent(...).Type(<name>)``` + ```ctx.NewEvent(...).Type(<name>)``` * The ```body``` should have the same type you used elsewhere in - ```service.NewEvent(...).Send(<type>)``` + ```ctx.NewEvent(...).Send(<type>)``` * The optional body type Validate() method will be called and must return nil before the handler will be called. diff --git a/queues/event.go b/queues/event.go index 376947d..18fca85 100644 --- a/queues/event.go +++ b/queues/event.go @@ -83,23 +83,30 @@ func (event Event) Send(value interface{}) (string, error) { return "", errors.Errorf("send with producer==nil") } if value != nil { - jsonBody, err := json.Marshal(value) - if err != nil { - return "", errors.Wrapf(err, "failed to JSON encode event body") + if valueString, ok := value.(string); ok { + event.BodyJSON = valueString + } else { + jsonBody, err := json.Marshal(value) + if err != nil { + return "", errors.Wrapf(err, "failed to JSON encode event body") + } + event.BodyJSON = string(jsonBody) } - event.BodyJSON = string(jsonBody) } if msgID, err := event.producer.Send(event); err != nil { return "", errors.Wrapf(err, "failed to send event") } else { - log.WithFields(map[string]interface{}{ - "queue": event.QueueName, - "type": event.TypeName, - "due": event.DueTime, - "params": event.ParamValues, - "body": event.BodyJSON, - "msg_id": msgID, - }).Info("Sent event") + //do not log when we send to internal AUDIT/API_LOGS + if event.QueueName != "AUDIT" && event.QueueName != "API_LOGS" { + log.WithFields(map[string]interface{}{ + "queue": event.QueueName, + "type": event.TypeName, + "due": event.DueTime, + "params": event.ParamValues, + "body": event.BodyJSON, + "msg_id": msgID, + }).Info("Sent event") + } return msgID, nil } } diff --git a/queues/sqs_producer/producer.go b/queues/sqs_producer/producer.go index 55a0fd7..649f6db 100644 --- a/queues/sqs_producer/producer.go +++ b/queues/sqs_producer/producer.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" ) @@ -94,7 +95,10 @@ type QueueProducer struct { } func (m *QueueProducer) Send(event queues.Event) (string, error) { - logger.Debugf("SQS producer.queue(%s) Sending event %+v", m.queueURL, event) + startTime := time.Now() + defer func() { + logs.LogSQSSent(startTime, event.QueueName, event.TypeName, event.BodyJSON) + }() //add params as message attributes msgAttrs := make(map[string]*sqs.MessageAttributeValue) diff --git a/service/README.md b/service/README.md index 60b637e..6ee5841 100644 --- a/service/README.md +++ b/service/README.md @@ -167,7 +167,7 @@ The AuditChange() method logs the changes between an original and new value. Events are sent for async processing with ```ctx.NewEvent()...Send()``` as in this example: - if _, err := service.NewEvent(ctx, "BILLING"). + if _, err := ctx.NewEvent(ctx, "BILLING"). Type("provider-invoice"). RequestID(ctx.RequestID()). Delay(time.Second * 5). diff --git a/service/context.go b/service/context.go index 533af78..49595c4 100644 --- a/service/context.go +++ b/service/context.go @@ -9,6 +9,7 @@ import ( "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/string_utils" ) @@ -53,6 +54,10 @@ type Context interface { //write a data change audit event AuditChange(eventType string, orgValue, newValue interface{}) + + //Sleep() does a time.Sleep and record it in log actions so that we can account for the time spent sleeping + //vs e.g. time waiting for outgoing API calls or db queries + Sleep(dur time.Duration) } //values: are added to context and logger @@ -228,3 +233,11 @@ func (ctx *serviceContext) AuditChange(eventType string, orgValue, newValue inte return } } + +func (ctx *serviceContext) Sleep(dur time.Duration) { + if dur > 0 { + startTime := time.Now() + time.Sleep(dur) + logs.LogSleep(startTime) + } +} -- GitLab