diff --git a/api/api.go b/api/api.go index 5cd5e16a794f0331ba5550cb33945509d9e64edf..718cc6f74ed5e65c88f4c8cd31ed8c3882bc983d 100644 --- a/api/api.go +++ b/api/api.go @@ -6,8 +6,10 @@ import ( "runtime/debug" "github.com/aws/aws-lambda-go/lambda" + "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/consumer/mem_consumer" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/queues/sqs_producer" "gitlab.com/uafrica/go-utils/service" @@ -117,15 +119,19 @@ func (api Api) WithEvents(eventHandlers map[string]interface{}) Api { //run and panic on error func (api Api) Run() { //decide local or SQS + var producer queues.Producer if (os.Getenv("LOG_LEVEL") == "debug") && api.localQueueEventHandlers != nil { //use in-memory channels for async events api.Debugf("Using in-memory channels for async events ...") - api = api.WithProducer(mem_consumer.NewProducer(mem_consumer.New(api.Service, api.localQueueEventHandlers))) + producer = mem_consumer.NewProducer(mem_consumer.New(api.Service, api.localQueueEventHandlers)) } else { //use SQS for async events api.Debugf("Using SQS queue producer for async events ...") - api = api.WithProducer(sqs_producer.New(api.requestIDHeaderKey)) + producer = sqs_producer.New(api.requestIDHeaderKey) } + api = api.WithProducer(producer) + audit.Init(producer) + logs.Init(producer) //run as an AWS Lambda function lambda.Start(api.Handler) diff --git a/api/lambda.go b/api/lambda.go index a36bfbf1bccaf0e905b863615b77390ceda83be7..19edeae70990a2a3ce1df13b58f7af18159209c7 100644 --- a/api/lambda.go +++ b/api/lambda.go @@ -99,7 +99,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat if api.requestIDHeaderKey != "" { res.Headers[api.requestIDHeaderKey] = Ctx.RequestID() } - if err := logs.LogIncomingAPIRequest(Ctx.StartTime(), apiGatewayProxyReq, res); err != nil { + if err := logs.LogIncomingAPIRequest(Ctx.StartTime(), Ctx.RequestID(), Ctx.Claim(), apiGatewayProxyReq, res); err != nil { Ctx.Errorf("failed to log: %+v", err) } }() @@ -126,7 +126,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat } } - Ctx.Debugf("HTTP %s %s ...\n", apiGatewayProxyReq.HTTPMethod, apiGatewayProxyReq.Resource) + Ctx.Tracef("HTTP %s %s ...\n", apiGatewayProxyReq.HTTPMethod, apiGatewayProxyReq.Resource) Ctx.WithFields(map[string]interface{}{ "http_method": Ctx.Request().HTTPMethod, "path": Ctx.Request().Path, @@ -215,7 +215,7 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat // return // } - Ctx.Debugf("Body: (%T) %+v", bodyStruct, bodyStruct) + Ctx.Tracef("Body: (%T) %+v", bodyStruct, bodyStruct) args = append(args, reflect.ValueOf(bodyStruct)) } @@ -260,15 +260,19 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat if len(results) > 1 { responseStruct := results[0].Interface() Ctx.Debugf("Response type: %T", responseStruct) - - var bodyBytes []byte - bodyBytes, err = json.Marshal(responseStruct) - if err != nil { - err = errors.Wrapf(err, "failed to encode response content") - return + if responseString, ok := responseStruct.(string); ok { + res.Headers["Content-Type"] = "application/json" + res.Body = responseString + } else { + var bodyBytes []byte + bodyBytes, err = json.Marshal(responseStruct) + if err != nil { + err = errors.Wrapf(err, "failed to encode response content") + return + } + res.Headers["Content-Type"] = "application/json" + res.Body = string(bodyBytes) } - res.Headers["Content-Type"] = "application/json" - res.Body = string(bodyBytes) } else { //no content delete(res.Headers, "Content-Type") diff --git a/config/struct.go b/config/struct.go index 84053adcaf3b59a1c65c726795bf2fdb7dfec447..04bfb884bc8d265c4f505b898490923544e6a585 100644 --- a/config/struct.go +++ b/config/struct.go @@ -5,6 +5,7 @@ import ( "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/string_utils" "gitlab.com/uafrica/go-utils/struct_utils" ) @@ -12,7 +13,47 @@ var ( prefixStructs = map[string]interface{}{} ) -func Load(prefix string, configStructPtr interface{}) error { +func LoadEnv(prefix string, configStructPtr interface{}) error { + return Load(prefix, configStructPtr, string_utils.EnvironmentKeyReader()) +} + +func Load(prefix string, configStructPtr interface{}, keyReader string_utils.KeyReader) error { + if !prefixRegex.MatchString(prefix) { + return errors.Errorf("config(%s) invalid prefix", prefix) + } + + //store before load in case it fails to be still part of docs + prefixStructs[prefix] = configStructPtr + + //read os.Getenv() or other reader... + nv := struct_utils.NamedValuesFromReader(prefix, keyReader) + logger.Debugf("nv: %+v", nv) + + //parse into struct + unused, err := struct_utils.UnmarshalNamedValues(nv, configStructPtr) + if err != nil { + return errors.Wrapf(err, "config(%s) cannot load", prefix) + } + if len(unused) > 0 { + //we still use os.Getenv() elsewhere, so some variables may not be in the struct + //e.g. AUDIT_QUEUE_URL is read from queues/sqs/producer which match config(prefix="AUDIT") + //so we cannot yet fail here, which we should, because config setting not used is often + //a reason for errors, when we try to configure something, then it does not work, and + //we cannot figure out why, but the value we did set, might just be misspelled etc. + //so, for now - do not fail here, just report the unused values + logger.Warnf("Note unused env (might be used elsewhere) for config(%s): %+v", prefix, unused) + //return errors.Errorf("config(%s): unknown %+v", prefix, unused) + } + + if validator, ok := configStructPtr.(Validator); ok { + if err := validator.Validate(); err != nil { + return errors.Wrapf(err, "config(%s) is invalid", prefix) + } + } + return nil +} + +func LoadRedis(prefix string, configStructPtr interface{}) error { if !prefixRegex.MatchString(prefix) { return errors.Errorf("config(%s) invalid prefix", prefix) } diff --git a/config/struct_test.go b/config/struct_test.go index 587f8bdc787c1fdf29a5be6a8d3f6184667043a7..297664f93ac5c3899e7aabfb6239eb13adde201c 100644 --- a/config/struct_test.go +++ b/config/struct_test.go @@ -37,7 +37,7 @@ func TestLoad(t *testing.T) { os.Setenv("TEST_VALUE_HOLIDAYS", "[2021-03-21,2021-04-27,2021-05-01,2021-06-16,2021-08-09,2021-12-16,2021-12-25]") c := Config{} - if err := config.Load("TEST_VALUE", &c); err != nil { + if err := config.LoadEnv("TEST_VALUE", &c); err != nil { t.Fatalf("Cannot load config: %+v", err) } t.Logf("Loaded config: %+v", c) @@ -135,7 +135,7 @@ func TestLogConfig(t *testing.T) { os.Setenv("LOG_OTHER", "1") os.Setenv("LOG_SEARCH_OTHER", "2") c := LogConfig{} - err := config.Load("LOG", &c) + err := config.LoadEnv("LOG", &c) if err != nil { t.Fatalf("Failed: %+v", err) } diff --git a/consumer/mem_consumer/consumer.go b/consumer/mem_consumer/consumer.go index 99274c99b52c114ca4e9615545eec0e6d2019f19..0732399897e6888c35a0ae9c2dcff63283ccc09b 100644 --- a/consumer/mem_consumer/consumer.go +++ b/consumer/mem_consumer/consumer.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) @@ -157,18 +158,18 @@ func (q *queue) process(event queues.Event) error { return errors.Wrapf(err, "invalid message body") } - ctx.WithFields(map[string]interface{}{ - "params": event.ParamValues, - "body": event.BodyJSON, - }).Infof("RECV(%s) Queue(%s).Type(%s).Due(%s): (%T)%v", - "---", //not yet available here - not part of event, and in SQS I think it is passed in SQS layer, so need to extend local channel to include this along with event - q.name, - event.TypeName, - event.DueTime, - recordStruct, - recordStruct) - - ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + //log if not internal queue + if q.name != "AUDIT" && q.name != "API_LOGS" { + ctx.WithFields(map[string]interface{}{ + "params": event.ParamValues, + "body": event.BodyJSON, + }).Infof("RECV(%s) Queue(%s).Type(%s).Due(%s)", + "---", //not yet available here - not part of event, and in SQS I think it is passed in SQS layer, so need to extend local channel to include this along with event + q.name, + event.TypeName, + event.DueTime) + ctx.Tracef("RECV(%s) Request(%T)%v", q.name, recordStruct, recordStruct) + } args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) @@ -180,6 +181,11 @@ func (q *queue) process(event queues.Event) error { } //queue.process() func (q *queue) Send(event queues.Event) (msgID string, err error) { + startTime := time.Now() + defer func() { + logs.LogSQSSent(startTime, event.QueueName, event.TypeName, event.BodyJSON) + }() + event.MessageID = uuid.New().String() q.ch <- event return event.MessageID, nil diff --git a/consumer/sqs_consumer/consumer.go b/consumer/sqs_consumer/consumer.go index bbf2e5ba61122ac030ea812b113d08ad21f842b1..7974fa05718cc5f01836a239947673fd3e0f09e2 100644 --- a/consumer/sqs_consumer/consumer.go +++ b/consumer/sqs_consumer/consumer.go @@ -15,6 +15,7 @@ import ( "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" "github.com/google/uuid" + "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" @@ -42,8 +43,11 @@ func New(requestIDHeaderKey string, routes map[string]interface{}) consumer.Cons } } + producer := sqs_producer.New(requestIDHeaderKey) s := service.New(). - WithProducer(sqs_producer.New(requestIDHeaderKey)) + WithProducer(producer) + audit.Init(producer) + logs.Init(producer) return sqsConsumer{ Service: s, @@ -175,10 +179,13 @@ func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEven return err } - ctx.WithFields(map[string]interface{}{ - "message_index": messageIndex, - "message": message, - }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event()) + //log if not internal queue + if ctx.Event().QueueName != "AUDIT" && ctx.Event().QueueName != "API_LOGS" { + ctx.WithFields(map[string]interface{}{ + "message_index": messageIndex, + "message": message, + }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event()) + } //routing on messageType sqsHandler, err := c.router.Route(messageType) @@ -205,7 +212,7 @@ func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEven } requestToLog = recordStruct //replace string log with structured log - ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) + ctx.Tracef("message (%T) %+v", recordStruct, recordStruct) args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) diff --git a/examples/core/app/users/users.go b/examples/core/app/users/users.go index 5599be9e7793a29a67e3aa2fbbef63e15a4e4968..a7dd6cbbc321211c2ed3908b93e8fd2cde62a8f2 100644 --- a/examples/core/app/users/users.go +++ b/examples/core/app/users/users.go @@ -11,7 +11,6 @@ import ( "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/examples/core/email" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/queues" ) type User struct { @@ -113,7 +112,7 @@ func Add(ctx api.Context, params noParams, newUser POSTUser) (User, error) { Subject: "Welcome User", Body: "Your account has been created", } - /*eventID*/ _, err := queues.NewEvent(ctx, "notify").RequestID(ctx.RequestID()).Type("email").Delay(time.Second * 5).Params(map[string]string{}).Send(email) + /*eventID*/ _, err := ctx.NewEvent("notify").RequestID(ctx.RequestID()).Type("email").Delay(time.Second * 5).Params(map[string]string{}).Send(email) if err != nil { ctx.Errorf("failed to notify: %+v", err) } diff --git a/logger/global.go b/logger/global.go index c04253667b5c9dff7402c2d2a5e853fce8cd5478..ca25a0273c19b7227d0b5d4106cb02ecad9338e1 100644 --- a/logger/global.go +++ b/logger/global.go @@ -76,3 +76,11 @@ func Debugf(format string, args ...interface{}) { func Debug(args ...interface{}) { globalLogger.log(LevelDebug, 1, fmt.Sprint(args...)) } + +func Tracef(format string, args ...interface{}) { + globalLogger.log(LevelTrace, 1, fmt.Sprintf(format, args...)) +} + +func Trace(args ...interface{}) { + globalLogger.log(LevelTrace, 1, fmt.Sprint(args...)) +} diff --git a/logger/level.go b/logger/level.go index 3014994e7f9fa39006368bf8ad965ffb44e3ced9..2546da64324ae5f999ad05d10ac91970ba8b0096 100644 --- a/logger/level.go +++ b/logger/level.go @@ -16,6 +16,8 @@ func (level Level) String() string { return "info" case LevelDebug: return "debug" + case LevelTrace: + return "trace" } return fmt.Sprintf("Level(%d)", level) } @@ -30,4 +32,5 @@ const ( LevelWarn LevelInfo LevelDebug + LevelTrace ) diff --git a/logger/logger.go b/logger/logger.go index 736846d0ec5117c1f6090c2bf3b2061c87d5a49f..6899dc3300b8a60bbcd33d11ccacc8c1941662de 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -20,6 +20,8 @@ type Logger interface { Info(args ...interface{}) Debugf(format string, args ...interface{}) Debug(args ...interface{}) + Tracef(format string, args ...interface{}) + Trace(args ...interface{}) WithFields(data map[string]interface{}) logger } @@ -87,6 +89,14 @@ func (l logger) Debug(args ...interface{}) { l.log(LevelDebug, 1, fmt.Sprint(args...)) } +func (l logger) Tracef(format string, args ...interface{}) { + l.log(LevelTrace, 1, fmt.Sprintf(format, args...)) +} + +func (l logger) Trace(args ...interface{}) { + l.log(LevelTrace, 1, fmt.Sprint(args...)) +} + func (l logger) log(level Level, skip int, msg string) { if level <= l.level && l.writer != nil { entry := Entry{ diff --git a/logs/action.go b/logs/action.go new file mode 100644 index 0000000000000000000000000000000000000000..9bb602d8bc347521fa36ee5464acfeff159af29e --- /dev/null +++ b/logs/action.go @@ -0,0 +1,291 @@ +package logs + +import ( + "encoding/json" + "sync" + "time" +) + +//Call LogOutgoingAPIRequest() after calling an API end-point as part of a handler, +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogOutgoingAPIRequest(url string, method string, requestBody string, responseBody string, responseCode int, startTime time.Time) error { + endTime := time.Now() + log := ApiCallLog{ + URL: url, + Method: method, + ResponseCode: responseCode, + } + if requestBody != "" { + log.Request = &BodyLog{ + BodySize: len(requestBody), + Body: requestBody, + } + } + if responseBody != "" { + log.Response = &BodyLog{ + BodySize: len(responseBody), + Body: responseBody, + } + } + + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeApiCall, + ApiCall: &log, + }) + actionListMutex.Unlock() + + return nil +} //LogOutgoingAPIRequest() + +//Call LogSQL() after executing any SQL query +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogSQL( + startTime time.Time, + sql string, + rowsCount int, //optional nr of rows to report, else 0 + ids []int64, //optional list of ids to report, else nil + err error, //only if failed, else nil +) { + endTime := time.Now() + log := SQLQueryLog{ + SQL: sql, + RowCount: rowsCount, + InsertIDs: ids, + } + if err != nil { + log.Error = err.Error() + } + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeSqlQuery, + SQLQuery: &log, + }) + actionListMutex.Unlock() +} + +//Call LogSQSSent() after sending an SQS event +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogSQSSent(startTime time.Time, queueName string, messageType string, request interface{}) { + //do not log internal events sent to audit/api-log + if queueName == "API_LOGS" || queueName == "AUDIT" { + return + } + + endTime := time.Now() + log := SQSSentLog{ + QueueName: queueName, + MessageType: messageType, + } + if request != nil { + if requestString, ok := request.(string); ok { + log.Request = &BodyLog{ + BodySize: len(requestString), //do not marshal, else we have double escaped JSON + Body: requestString, + } + } else { + jsonRequest, _ := json.Marshal(request) + log.Request = &BodyLog{ + BodySize: len(jsonRequest), + Body: string(jsonRequest), + } + } + } + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeSqsSent, + SQSSent: &log, + }) + actionListMutex.Unlock() +} + +//Call LogSleep() after doing time.Sleep() +//to capture the details +//and add it to the current handler log story for reporting/metrics +func LogSleep(startTime time.Time) { + endTime := time.Now() + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: endTime.Sub(startTime).Milliseconds(), + Type: ActionTypeSleep, + }) + actionListMutex.Unlock() +} + +var ( + actionListMutex sync.Mutex + actionList = []ActionLog{} +) + +type ActionLog struct { + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurMs int64 `json:"duration_ms"` //duration in milliseconds + Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` + ApiCall *ApiCallLog `json:"api_call,omitempty"` + SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` + SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` +} + +func (action ActionLog) Relative(relTime time.Time) RelativeActionLog { + return RelativeActionLog{ + StartMs: action.StartTime.Sub(relTime).Milliseconds(), + EndMs: action.EndTime.Sub(relTime).Milliseconds(), + DurMs: action.DurMs, + Type: action.Type, + ApiCall: action.ApiCall, + SQSSent: action.SQSSent, + SQLQuery: action.SQLQuery, + } +} + +type RelativeActionLog struct { + StartMs int64 `json:"start_ms" doc:"Start time in milliseconds after start timestamp"` + EndMs int64 `json:"end_ms" doc:"End time in milliseconds after start timestamp"` + DurMs int64 `json:"duration_ms"` //duration in milliseconds + Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` + ApiCall *ApiCallLog `json:"api_call,omitempty"` + SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` + SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` +} + +type ActionType string + +var ActionTypeList = []ActionType{ + ActionTypeNone, + ActionTypeApiCall, + ActionTypeSqsSent, + ActionTypeSqlQuery, + ActionTypeSleep, +} + +const ( + ActionTypeNone ActionType = "none" + ActionTypeApiCall ActionType = "api-call" + ActionTypeSqsSent ActionType = "sqs-sent" + ActionTypeSqlQuery ActionType = "sql-query" + ActionTypeSleep ActionType = "sleep" +) + +//APICallLog captures details of an outgoing API call made from a handler +type ApiCallLog struct { + URL string `json:"url"` + Method string `json:"method"` + ResponseCode int `json:"response_code"` + Request *BodyLog `json:"request,omitempty"` + Response *BodyLog `json:"response,omitempty"` +} + +type BodyLog struct { + BodySize int `json:"body_size"` + Body string `json:"body"` +} + +//SQSSentLog captures details of an SQS event sent from a handler +type SQSSentLog struct { + QueueName string `json:"queue_name"` + MessageType string `json:"message_type"` + Request *BodyLog `json:"request,omitempty"` +} + +//SQLQueryLog captures details of an SQL query executed from a handler resulting in either rows returned, ids inserted or an error +type SQLQueryLog struct { + SQL string `json:"sql"` + RowCount int `json:"row_count,omitempty"` + InsertIDs []int64 `json:"insert_ids,omitempty"` + Error string `json:"error,omitempty"` +} + +//compile the relative action list that took place during this handler +//copy then reset actionList for the next handler +//we copy it with relation to this API's start..end time, rather than full timestamps, which are hard to read in the list +//start and end are current total handler period that actions should be inside that +func relativeActionList(startTime, endTime time.Time) []RelativeActionLog { + actionListMutex.Lock() + defer func() { + //after copy/discard, reset (global!) action list for the next handler + actionList = []ActionLog{} + actionListMutex.Unlock() + }() + + cfg := currentLogConfig() + if !cfg.ActionsKeep { + return nil + } + + //todo: runtime config: load temporary from REDIS after N seconds + //which will allow us to monitor better for a short while during trouble shooting + //then something like this to reload every 5min (5min could also be part of config) + // if dynamicExpireTime.Before(time.Now()) { + // dynamicApiConfig := apiConfig //loaded from env at startup + // ...look for keys in REDIS and override ... + // dynamicExpireTime = time.Now().Add(time.Minute*5) + // } + // do this in go routing with sleep... so handlers do not have to check :-) + // and it can also be used for api-log part that is not action list, e.g. for api-log req/res body len etc... + relActionList := []RelativeActionLog{} + for _, action := range actionList { + if action.EndTime.Before(startTime) || action.StartTime.After(endTime) { + continue //not expected - skip actions outside log window + } + + //apply reduction filters to limit string lengths + switch action.Type { + case ActionTypeNone: + case ActionTypeSqlQuery: + if action.SQLQuery != nil && len(action.SQLQuery.SQL) > int(cfg.ActionsMaxSQLLength) { + action.SQLQuery.SQL = action.SQLQuery.SQL[:cfg.ActionsMaxSQLLength] + } + case ActionTypeSqsSent: + if action.SQSSent != nil && action.SQSSent.Request != nil && len(action.SQSSent.Request.Body) > int(cfg.ActionsMaxSQSReqBodyLength) { + action.SQSSent.Request.Body = action.SQSSent.Request.Body[:cfg.ActionsMaxSQSReqBodyLength] + } + case ActionTypeApiCall: + if action.ApiCall != nil { + if action.ApiCall.Request != nil && len(action.ApiCall.Request.Body) > int(cfg.ActionsMaxAPIReqBodyLength) { + action.ApiCall.Request.Body = action.ApiCall.Request.Body[:cfg.ActionsMaxAPIReqBodyLength] + } + if action.ApiCall.Response != nil && len(action.ApiCall.Response.Body) > int(cfg.ActionsMaxAPIResBodyLength) { + action.ApiCall.Response.Body = action.ApiCall.Response.Body[:cfg.ActionsMaxAPIResBodyLength] + } + } + } + + //make relative and append to the list + relActionList = append(relActionList, action.Relative(startTime)) + } + + //also append to the list any nonAction periods greater than thresholdMs + //to indicate significant gaps in the action list that we did not account for + thresholdMs := int64(50) + //make period list, remove all action periods, then we're left with non-action periods :-) + nonActionPeriods := NewPeriods(startTime, endTime) + for _, action := range actionList { + nonActionPeriods = nonActionPeriods.Without(Period{Start: action.StartTime, End: action.EndTime}) + } + for _, nonAction := range nonActionPeriods { + if nonAction.Duration().Milliseconds() > thresholdMs { + relActionList = append(relActionList, ActionLog{ + StartTime: nonAction.Start, + EndTime: nonAction.End, + DurMs: nonAction.Duration().Milliseconds(), + Type: ActionTypeNone, + }.Relative(startTime)) + } + } + return relActionList +} diff --git a/logs/api-logs.go b/logs/api-logs.go index 55a33cc371463545a65eb385f8ef50d763dd10ed..9552e107271595e2ee3d516b70171c9328e934c5 100644 --- a/logs/api-logs.go +++ b/logs/api-logs.go @@ -2,27 +2,31 @@ package logs import ( "net/http" + "sort" "strings" - "sync" "time" "github.com/aws/aws-lambda-go/events" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/queues" - "gitlab.com/uafrica/go-utils/service" ) +var producer queues.Producer + +func Init(p queues.Producer) { + producer = p +} + //Call this at the end of an API request handler to capture the req/res as well as all actions taken during the processing //(note: action list is only reset when this is called - so must be called after each handler, else action list has to be reset at the start) -func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyRequest, res events.APIGatewayProxyResponse) error { - if service.Ctx == nil { - return errors.Errorf("Cannot log without service context") +func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[string]interface{}, req events.APIGatewayProxyRequest, res events.APIGatewayProxyResponse) error { + if producer == nil { + return errors.Errorf("logs queue producer not set") } //todo: filter out some noisy (method+path) endTime := time.Now() - dur := endTime.Sub(startTime) var authType string var authUsername string @@ -40,18 +44,17 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques } } - claim := service.Ctx.Claim() username, _ := claim["user_id"].(string) accountID, _ := claim["account_id"].(int64) apiLog := ApiLog{ StartTime: startTime, EndTime: endTime, - DurMs: int(dur / time.Millisecond), + DurMs: endTime.Sub(startTime).Milliseconds(), Method: req.HTTPMethod, Address: req.RequestContext.DomainName, Path: req.Path, ResponseCode: res.StatusCode, - RequestID: service.Ctx.RequestID(), + RequestID: requestID, InitialAuthType: authType, InitialAuthUsername: authUsername, SourceIP: req.RequestContext.Identity.SourceIP, @@ -69,14 +72,15 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques BodySize: len(res.Body), Body: res.Body, }, - //RelevantID: service.Ctx.,??? + Actions: nil, } - //copy then reset actionList for the next handler - actionListMutex.Lock() - apiLog.Actions = actionList - actionList = []ActionLog{} - actionListMutex.Unlock() + //compile action list + apiLog.Actions = relativeActionList(apiLog.StartTime, apiLog.EndTime) + + //sort action list on startTime, cause actions are added when they end, i.e. ordered by end time + //and all non-actions were appended at the end of the list + sort.Slice(apiLog.Actions, func(i, j int) bool { return apiLog.Actions[i].StartMs < apiLog.Actions[j].StartMs }) //also copy multi-value query parameters to the log as CSV array values for n, as := range req.MultiValueQueryStringParameters { @@ -90,7 +94,7 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques } //todo: filter out sensitive values (e.g. OTP) - if _, err := queues.NewEvent(service.Ctx, "API_LOGS"). + if _, err := producer.NewEvent("API_LOGS"). Type("api-log"). RequestID(apiLog.RequestID). Send(apiLog); err != nil { @@ -101,24 +105,24 @@ func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyReques //ApiLog is the SQS event details struct encoded as JSON document, sent to SQS, to be logged for each API handler executed. type ApiLog struct { - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - DurMs int `json:"duration_ms"` //duration in milliseconds - Method string `json:"method"` - Address string `json:"address"` //server address for incoming and outgoing - Path string `json:"path"` - ResponseCode int `json:"response_code"` - RequestID string `json:"request_id"` - InitialAuthUsername string `json:"initial_auth_username,omitempty"` - InitialAuthType string `json:"initial_auth_type,omitempty"` - AccountID int64 `json:"account_id,omitempty"` - Username string `json:"username,omitempty"` - SourceIP string `json:"source_ip,omitempty"` //only logged for incoming API - UserAgent string `json:"user_agent,omitempty"` //only for incoming, indicate type of browser when UI - RelevantID string `json:"relevant_id,omitempty"` - Request ApiLogRequest `json:"request"` - Response ApiLogResponse `json:"response"` - Actions []ActionLog `json:"actions,omitempty"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurMs int64 `json:"duration_ms"` //duration in milliseconds + Method string `json:"method"` + Address string `json:"address"` //server address for incoming and outgoing + Path string `json:"path"` + ResponseCode int `json:"response_code"` + RequestID string `json:"request_id"` + InitialAuthUsername string `json:"initial_auth_username,omitempty"` + InitialAuthType string `json:"initial_auth_type,omitempty"` + AccountID int64 `json:"account_id,omitempty"` + Username string `json:"username,omitempty"` + SourceIP string `json:"source_ip,omitempty"` //only logged for incoming API + UserAgent string `json:"user_agent,omitempty"` //only for incoming, indicate type of browser when UI + RelevantID string `json:"relevant_id,omitempty"` + Request ApiLogRequest `json:"request"` + Response ApiLogResponse `json:"response"` + Actions []RelativeActionLog `json:"actions,omitempty"` } type ApiLogRequest struct { @@ -133,98 +137,3 @@ type ApiLogResponse struct { BodySize int `json:"body_size"` //set even when body is truncated/omitted Body string `json:"body,omitempty"` //json content as a string } - -func LogOutgoingAPIRequest(url string, method string, requestBody string, responseBody string, responseCode int, startTime time.Time) error { - endTime := time.Now() - dur := endTime.Sub(startTime) - - apiCallLog := ApiCallLog{ - URL: url, - Method: method, - ResponseCode: responseCode, - Request: ApiCallRequestLog{ - BodySize: len(requestBody), - Body: requestBody, - }, - Response: ApiCallResponseLog{ - BodySize: len(responseBody), - Body: responseBody, - }, - } - - actionListMutex.Lock() - actionList = append(actionList, ActionLog{ - StartTime: startTime, - EndTime: endTime, - DurMs: int(dur / time.Millisecond), - Type: ActionTypeApiCall, - ApiCall: &apiCallLog, - }) - actionListMutex.Unlock() - - return nil -} //LogOutgoingAPIRequest() - -var ( - actionListMutex sync.Mutex - actionList = []ActionLog{} -) - -type ActionLog struct { - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - DurMs int `json:"duration_ms"` //duration in milliseconds - Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` - ApiCall *ApiCallLog `json:"api_call,omitempty"` - SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` - SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` - Sleep *SleepLog `json:"sleep,omitempty"` -} - -type ActionType string - -var ActionTypeList = []ActionType{ - ActionTypeApiCall, - ActionTypeSqsSent, - ActionTypeSqlQuery, - ActionTypeSleep, -} - -const ( - ActionTypeApiCall ActionType = "api-call" - ActionTypeSqsSent ActionType = "sqs-sent" - ActionTypeSqlQuery ActionType = "sql-query" - ActionTypeSleep ActionType = "sleep" -) - -//APICallLog captures details of an outgoing API call made from a handler -type ApiCallLog struct { - URL string `json:"url"` - Method string `json:"method"` - ResponseCode int `json:"response_code"` - Request ApiCallRequestLog `json:"request"` - Response ApiCallResponseLog `json:"response"` -} - -type ApiCallRequestLog struct { - BodySize int `json:"body_size"` - Body string `json:"body"` -} - -type ApiCallResponseLog struct { - BodySize int `json:"body_size"` - Body string `json:"body"` -} - -//SQSSentLog captures details of an SQS event sent from a handler -type SQSSentLog struct { -} - -//SQLQueryLog captures details of an SQL query executed from a handler -type SQLQueryLog struct { -} - -//SleepLog captures details of time spent sleeping from a handler -type SleepLog struct { - //nothing to record apart from the action start/end/dur... -} diff --git a/logs/config.go b/logs/config.go new file mode 100644 index 0000000000000000000000000000000000000000..f05411edaf4fdf65b0901e0435cb5266e8901760 --- /dev/null +++ b/logs/config.go @@ -0,0 +1,61 @@ +package logs + +import ( + "context" + "time" + + "gitlab.com/uafrica/go-utils/config" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/redis" +) + +type Config struct { + ActionsKeep bool `json:"actions_keep" doc:"Set true to keep list of actions in logs"` + ActionsMaxSQLLength int64 `json:"actions_max_sql_length" doc:"Set length of SQL query to keep in action list (default 0 = delete)"` + ActionsMaxSQSReqBodyLength int64 `json:"actions_max_sqs_req_body_length" doc:"Set length of SQS Request body to keep in action list (default 0 = delete)"` + ActionsMaxAPIReqBodyLength int64 `json:"actions_max_api_req_body_length" doc:"Set length of API Request body to keep in action list (default 0 = delete)"` + ActionsMaxAPIResBodyLength int64 `json:"actions_max_api_res_body_length" doc:"Set length of API Response body to keep in action list (default 0 = delete)"` +} + +const configPrefix = "LOGS" + +var ( + logConfig Config + dynamicLogConfig Config + dynamicExpireTime time.Time + redisCli redis.IRedis +) + +func init() { + if err := config.LoadEnv(configPrefix, &logConfig); err != nil { + logger.Errorf("failed to load LOGS config: %+v", err) + } + dynamicLogConfig = logConfig + dynamicExpireTime = time.Now() + + //see if can load overrides from redis + var err error + redisCli, err = redis.New(context.Background()) + if err != nil { + logger.Errorf("Not able to connect to REDIS for runtime %s config: %+v", configPrefix, err) + } +} + +//todo: call only on each use and check expiry time before reading from REDIS again, e.g. reload no faster that 10s +func currentLogConfig() Config { + if redisCli == nil || dynamicExpireTime.After(time.Now()) { + return dynamicLogConfig + } + + //time to attempt reload + //copy static config then overload values which are defined from REDIS + dynamicLogConfig = logConfig + dynamicExpireTime = time.Now().Add(time.Second * 10) + + if err := config.Load(configPrefix, &dynamicLogConfig, redisCli); err != nil { + logger.Errorf("failed to load %s config from REDIS", configPrefix) + } else { + logger.Debugf("Loaded %s config: %+v", configPrefix, dynamicLogConfig) + } + return dynamicLogConfig +} //runtimeConfigLoad diff --git a/logs/periods.go b/logs/periods.go new file mode 100644 index 0000000000000000000000000000000000000000..536727df91ebfff8deb69a21826ad00f17ded2cf --- /dev/null +++ b/logs/periods.go @@ -0,0 +1,103 @@ +package logs + +import ( + "time" +) + +type Period struct { + Start time.Time `json:"start_time"` + End time.Time `json:"end_time"` +} + +func (p Period) Duration() time.Duration { + return p.End.Sub(p.Start) +} + +type Periods []Period + +func NewPeriods(start time.Time, end time.Time) Periods { + if end.Before(start) { + return []Period{} + } + return []Period{{Start: start, End: end}} +} + +func (ps Periods) Without(p Period) Periods { + if len(ps) == 0 { + return ps //nothing left to take from + } + if p.End.Before(ps[0].Start) { + return ps //before first period + } + if p.Start.After(ps[len(ps)-1].End) { + return ps //after last period + } + + //logger.Debugf("Start: %+v", ps) + nextIndex := 0 + for nextIndex < len(ps) && ps[nextIndex].End.Before(p.Start) { + //logger.Debugf("skip[%d]: %s > %s", nextIndex, p.Start, ps[nextIndex].End) + nextIndex++ + } + toDelete := []int{} + for nextIndex < len(ps) && ps[nextIndex].End.Before(p.End) { + if ps[nextIndex].Start.Before(p.Start) { + //trim tail + //logger.Debugf("tail[%d] %s->%s", nextIndex, ps[nextIndex].End, p.Start) + ps[nextIndex].End = p.Start + } else { + //delete this period completely and move to next + toDelete = append(toDelete, nextIndex) + //logger.Debugf("delete[%d] %s..%s", nextIndex, ps[nextIndex].Start, ps[nextIndex].End) + } + nextIndex++ + } + if nextIndex < len(ps) && ps[nextIndex].End.After(p.End) { + if ps[nextIndex].Start.Before(p.Start) { + //remove part of this period + ps = append(ps, Period{Start: p.End, End: ps[nextIndex].End}) + ps[nextIndex].End = p.Start + //logger.Debugf("split[%d]", nextIndex) + } else { + if ps[nextIndex].Start.Before(p.End) { + //trim head of period to start after removed peroid, then stop + //logger.Debugf("head[%d] %s->%s", nextIndex, ps[nextIndex].Start, p.End) + ps[nextIndex].Start = p.End + } + } + } + + //delete selected periods completely + newPS := []Period{} + for i, p := range ps { + if len(toDelete) > 0 && i == toDelete[0] { + toDelete = toDelete[1:] + } else { + newPS = append(newPS, p) + } + } + //logger.Debugf("final: %+v", newPS) + return newPS +} + +//Span is (last.end - first.start) +func (ps Periods) Span() time.Duration { + if len(ps) > 0 { + return ps[len(ps)-1].End.Sub(ps[0].Start) + } + return time.Duration(0) +} + +//Duration is sum of all period durations +func (ps Periods) Duration() time.Duration { + dur := time.Duration(0) + for _, p := range ps { + dur += p.Duration() + } + return dur +} + +//Gaps is (Span - Duration), i.e. time between periods +func (ps Periods) Gaps() time.Duration { + return ps.Span() - ps.Duration() +} diff --git a/logs/periods_test.go b/logs/periods_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e8804248aa15cd348d13b3077c259b35d330709b --- /dev/null +++ b/logs/periods_test.go @@ -0,0 +1,59 @@ +package logs_test + +import ( + "testing" + "time" + + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/logs" +) + +func TestPeriods(t *testing.T) { + logger.SetGlobalFormat(logger.NewConsole()) + logger.SetGlobalLevel(logger.LevelDebug) + t0 := time.Date(2021, 01, 01, 0, 0, 0, 0, time.Now().Location()) + ps := logs.NewPeriods(t0, t0.Add(time.Hour)) + t.Log(ps) + //ps: 0..60 + + //split[0] + ps1 := ps.Without(logs.Period{Start: t0.Add(time.Minute * 5), End: t0.Add(time.Minute * 10)}) + t.Log(ps1) + //-(5..10) -> ps1: 0..5, 10..60 + + //split[1] + ps2 := ps1.Without(logs.Period{Start: t0.Add(time.Minute * 15), End: t0.Add(time.Minute * 20)}) + t.Log(ps2) + //-(15..20) -> ps1: 0..5, 10..15, 20..60 + + //trim head[2] + ps3 := ps2.Without(logs.Period{Start: t0.Add(time.Minute * 18), End: t0.Add(time.Minute * 21)}) + t.Log(ps3) + //-(18..21) -> ps1: 0..5, 10..15, 21..60 + + //trim tail[1] + ps4 := ps3.Without(logs.Period{Start: t0.Add(time.Minute * 14), End: t0.Add(time.Minute * 19)}) + t.Log(ps4) + //-(14..19) -> ps1: 0..5, 10..14, 21..60 + + //tail, delete, head + ps5 := ps4.Without(logs.Period{Start: t0.Add(time.Minute * 4), End: t0.Add(time.Minute * 22)}) + t.Log(ps5) + //-(4..22) -> ps1: 0..4, 22..60 + + //over start + ps6 := ps5.Without(logs.Period{Start: t0.Add(-time.Minute * 1), End: t0.Add(time.Minute * 2)}) + t.Log(ps6) + //-(-1..2) -> ps1: 2..4, 22..60 + + //over end + ps7 := ps6.Without(logs.Period{Start: t0.Add(time.Minute * 50), End: t0.Add(time.Minute * 120)}) + t.Log(ps7) + //-(50..120) -> ps1: 2..4, 22..50 + + //all + ps8 := ps7.Without(logs.Period{Start: t0.Add(time.Minute * 0), End: t0.Add(time.Minute * 120)}) + t.Log(ps8) + //-(0..120) -> ps1: nil + +} diff --git a/logs/sqs-logs.go b/logs/sqs-logs.go index 49f446b6d0d0b98ca45e66f07f19538bb617848e..22f45d21f13ec945238fac83e263c325adf8c1de 100644 --- a/logs/sqs-logs.go +++ b/logs/sqs-logs.go @@ -8,8 +8,6 @@ import ( "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/queues" - "gitlab.com/uafrica/go-utils/service" ) //Call this at the end of an SQS event handler to capture the req and result as well as all actions taken during the processing @@ -20,8 +18,8 @@ func LogSQSRequest(startTime time.Time, req interface{}, handlerErr error, ) error { - if service.Ctx == nil { - return errors.Errorf("Cannot log without service context") + if producer == nil { + return errors.Errorf("logs queue producer not set") } if !sqsLogEnabled { @@ -29,20 +27,24 @@ func LogSQSRequest(startTime time.Time, } endTime := time.Now() - dur := endTime.Sub(startTime) log := ApiLog{ StartTime: startTime, EndTime: endTime, - DurMs: int(dur / time.Millisecond), + DurMs: endTime.Sub(startTime).Milliseconds(), RequestID: requestID, Method: "SQS", Path: messageType, } if req != nil { - if jsonReq, err := json.Marshal(req); err == nil { - log.Request.Body = string(jsonReq) - log.Request.BodySize = len(log.Request.Body) + if reqString, ok := req.(string); ok { + log.Request.Body = reqString //do not marshal else we have double-escaped JSON + log.Request.BodySize = len(reqString) + } else { + if jsonReq, err := json.Marshal(req); err == nil { + log.Request.Body = string(jsonReq) + log.Request.BodySize = len(log.Request.Body) + } } } @@ -60,7 +62,6 @@ func LogSQSRequest(startTime time.Time, //copy then reset actionList for the next handler actionListMutex.Lock() - log.Actions = actionList actionList = []ActionLog{} actionListMutex.Unlock() @@ -69,7 +70,7 @@ func LogSQSRequest(startTime time.Time, //note: we send SQS logs to "API_LOGS" which already exists... should be renamed to simply "LOGS" //it use the same structure, but method="SQS" and path="messageType" and request is the event body //so they can be plotted on the same dashboard visualisation in OpenSearch with all the same filters/metrics - if _, err := queues.NewEvent(service.Ctx, "API_LOGS"). + if _, err := producer.NewEvent("API_LOGS"). Type("api-log"). RequestID(requestID). Send(log); err != nil { diff --git a/queues/README.md b/queues/README.md index c592a569ecf9057956dbb2edfe8d71bba8b0f806..1ac12125d6ef156d35320068073c366eb26c069b 100644 --- a/queues/README.md +++ b/queues/README.md @@ -59,10 +59,10 @@ func MyHandler(ctx queues.Context, body MyBody) (err error) { Notes: * The route name is the name specified in - ```service.NewEvent(...).Type(<name>)``` + ```ctx.NewEvent(...).Type(<name>)``` * The ```body``` should have the same type you used elsewhere in - ```service.NewEvent(...).Send(<type>)``` + ```ctx.NewEvent(...).Send(<type>)``` * The optional body type Validate() method will be called and must return nil before the handler will be called. diff --git a/queues/event.go b/queues/event.go index 376947d423b3b6b1eaf2131ad864004333e35a43..18fca85535378b4134d61d70db8bd3af00d4ca2f 100644 --- a/queues/event.go +++ b/queues/event.go @@ -83,23 +83,30 @@ func (event Event) Send(value interface{}) (string, error) { return "", errors.Errorf("send with producer==nil") } if value != nil { - jsonBody, err := json.Marshal(value) - if err != nil { - return "", errors.Wrapf(err, "failed to JSON encode event body") + if valueString, ok := value.(string); ok { + event.BodyJSON = valueString + } else { + jsonBody, err := json.Marshal(value) + if err != nil { + return "", errors.Wrapf(err, "failed to JSON encode event body") + } + event.BodyJSON = string(jsonBody) } - event.BodyJSON = string(jsonBody) } if msgID, err := event.producer.Send(event); err != nil { return "", errors.Wrapf(err, "failed to send event") } else { - log.WithFields(map[string]interface{}{ - "queue": event.QueueName, - "type": event.TypeName, - "due": event.DueTime, - "params": event.ParamValues, - "body": event.BodyJSON, - "msg_id": msgID, - }).Info("Sent event") + //do not log when we send to internal AUDIT/API_LOGS + if event.QueueName != "AUDIT" && event.QueueName != "API_LOGS" { + log.WithFields(map[string]interface{}{ + "queue": event.QueueName, + "type": event.TypeName, + "due": event.DueTime, + "params": event.ParamValues, + "body": event.BodyJSON, + "msg_id": msgID, + }).Info("Sent event") + } return msgID, nil } } diff --git a/queues/sqs_producer/producer.go b/queues/sqs_producer/producer.go index 55a0fd7e20bb12d5bacccce505793518636d6bbc..649f6dbad7a998c07bb31a4c53a816ca51dfa08c 100644 --- a/queues/sqs_producer/producer.go +++ b/queues/sqs_producer/producer.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" ) @@ -94,7 +95,10 @@ type QueueProducer struct { } func (m *QueueProducer) Send(event queues.Event) (string, error) { - logger.Debugf("SQS producer.queue(%s) Sending event %+v", m.queueURL, event) + startTime := time.Now() + defer func() { + logs.LogSQSSent(startTime, event.QueueName, event.TypeName, event.BodyJSON) + }() //add params as message attributes msgAttrs := make(map[string]*sqs.MessageAttributeValue) diff --git a/redis/redis.go b/redis/redis.go index 23d667e5a1b39bed142a5409419b364f5617f1f9..5dac4995280bbb7ac8ac91bbedebffd2f22dd850 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -10,9 +10,11 @@ import ( "github.com/go-redis/redis/v8" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/string_utils" ) type IRedis interface { + string_utils.KeyReader Del(key string) error SetJSON(key string, value interface{}) error SetJSONIndefinitely(key string, value interface{}) error @@ -21,7 +23,6 @@ type IRedis interface { SetString(key string, value string) error SetStringIndefinitely(key string, value string) error SetStringForDur(key string, value string, dur time.Duration) error - GetString(key string) (value string, ok bool) } type redisWithContext struct { @@ -67,14 +68,18 @@ func (r redisWithContext) SetJSONForDur(key string, value interface{}, dur time. if r.client == nil { return errors.Errorf("REDIS disabled: cannot set JSON key(%s) = (%T)%v", key, value, value) } - jsonBytes, err := json.Marshal(value) - if err != nil { - return errors.Wrapf(err, "failed to JSON encode key(%s) = (%T)", key, value) + valueStr, ok := value.(string) + if !ok { + jsonBytes, err := json.Marshal(value) + if err != nil { + return errors.Wrapf(err, "failed to JSON encode key(%s) = (%T)", key, value) + } + valueStr = string(jsonBytes) } - if _, err = r.client.Set(r.Context, key, string(jsonBytes), dur).Result(); err != nil { + if _, err := r.client.Set(r.Context, key, valueStr, dur).Result(); err != nil { return errors.Wrapf(err, "failed to set JSON key(%s)", key) } - logger.Debugf("REDIS.SetJSON(%s)=%s (%T) (exp: %v)", key, string(jsonBytes), value, dur) + logger.Debugf("REDIS.SetJSON(%s)=%s (%T) (exp: %v)", key, valueStr, value, dur) return nil } @@ -130,6 +135,23 @@ func (r redisWithContext) GetString(key string) (string, bool) { return value, true } +func (r redisWithContext) Keys(prefix string) []string { + if r.client == nil { + return nil + } + value, err := r.client.Keys(r.Context, prefix+"*").Result() + if err != nil { /* Actual error */ + if err != redis.Nil { /* other than no keys match */ + logger.Errorf("Error fetching redis keys(%s*): %+v", prefix, err) + } else { + logger.Errorf("Failed: %+v", err) + } + return nil //no matches + } + logger.Debugf("Keys(%s): %+v", prefix, value) + return value +} + //global connection to REDIS used in all context var globalClient *redis.Client diff --git a/search/time_series.go b/search/time_series.go index 0e6d95ef35f2439c51d6d9de980dcbe888d0a387..152e81fab3fa8502356eb4e893cd937ffaafbc6b 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -258,7 +258,7 @@ func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) erro x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{ StartTime: startTime, EndTime: endTime, - DurationMs: int64(endTime.Sub(startTime) / time.Millisecond), + DurationMs: endTime.Sub(startTime).Milliseconds(), })) return ts.w.Write(indexName, x.Elem().Interface()) } diff --git a/search/writer.go b/search/writer.go index 8b492beb3d37445ef5c12010aad75e9c54471484..5347c2b2f4b4df136988c7896a697d8b883023d3 100644 --- a/search/writer.go +++ b/search/writer.go @@ -10,6 +10,7 @@ import ( opensearch "github.com/opensearch-project/opensearch-go" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" ) type Writer interface { @@ -26,21 +27,22 @@ func New(config Config) (Writer, error) { timeSeriesByName: map[string]TimeSeries{}, } - // Initialize the client with SSL/TLS enabled. - var err error - w.client, err = opensearch.NewClient(opensearch.Config{ + searchConfig := opensearch.Config{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, Addresses: config.Addresses, Username: config.Username, Password: config.Password, - }) + } + // Initialize the client with SSL/TLS enabled. + var err error + w.client, err = opensearch.NewClient(searchConfig) if err != nil { return nil, errors.Wrapf(err, "cannot initialize opensearch connection") } // Print OpenSearch version information on console. - //fmt.Println(client.Info()) + logger.Debugf("Search client created with config: %+v", searchConfig) w.api = opensearchapi.New(w.client) return w, nil @@ -58,13 +60,17 @@ func (writer writer) Write(indexName string, doc interface{}) error { if writer.client == nil { return errors.Errorf("writer closed") } - jsonDoc, err := json.Marshal(doc) - if err != nil { - return errors.Wrapf(err, "failed to JSON encode document") + jsonDocStr, ok := doc.(string) + if !ok { + jsonDoc, err := json.Marshal(doc) + if err != nil { + return errors.Wrapf(err, "failed to JSON encode document") + } + jsonDocStr = string(jsonDoc) } indexResponse, err := writer.api.Index( indexName, - strings.NewReader(string(jsonDoc)), + strings.NewReader(jsonDocStr), ) if err != nil { return errors.Wrapf(err, "failed to index document") diff --git a/service/README.md b/service/README.md index 60b637eafeaf45df8e242fe74c937442ab93dc4d..6ee5841321774968c539d0bb4794d66da69259e8 100644 --- a/service/README.md +++ b/service/README.md @@ -167,7 +167,7 @@ The AuditChange() method logs the changes between an original and new value. Events are sent for async processing with ```ctx.NewEvent()...Send()``` as in this example: - if _, err := service.NewEvent(ctx, "BILLING"). + if _, err := ctx.NewEvent(ctx, "BILLING"). Type("provider-invoice"). RequestID(ctx.RequestID()). Delay(time.Second * 5). diff --git a/service/context.go b/service/context.go index 533af78892b47915af5901b4fc2cf9e1c81233d8..49595c4e41efb4a280844ba665998d145a298eaf 100644 --- a/service/context.go +++ b/service/context.go @@ -9,6 +9,7 @@ import ( "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/string_utils" ) @@ -53,6 +54,10 @@ type Context interface { //write a data change audit event AuditChange(eventType string, orgValue, newValue interface{}) + + //Sleep() does a time.Sleep and record it in log actions so that we can account for the time spent sleeping + //vs e.g. time waiting for outgoing API calls or db queries + Sleep(dur time.Duration) } //values: are added to context and logger @@ -228,3 +233,11 @@ func (ctx *serviceContext) AuditChange(eventType string, orgValue, newValue inte return } } + +func (ctx *serviceContext) Sleep(dur time.Duration) { + if dur > 0 { + startTime := time.Now() + time.Sleep(dur) + logs.LogSleep(startTime) + } +} diff --git a/struct_utils/named_values_to_struct.go b/struct_utils/named_values_to_struct.go index 2f118e4dcdcb90411227f39c5ddd60d2a071db5d..81c58360fad45405036a43ae412fc36d84fd1486 100644 --- a/struct_utils/named_values_to_struct.go +++ b/struct_utils/named_values_to_struct.go @@ -3,13 +3,14 @@ package struct_utils import ( "encoding/csv" "encoding/json" - "os" "reflect" "sort" "strconv" "strings" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/string_utils" ) //Purpose: @@ -34,24 +35,31 @@ import ( // // MY_LIB_CONFIG_ADDRS=["55 Crescent, Town", "12 Big Street, City"] -> 2 values including commas because of quoted CSV func NamedValuesFromEnv(prefix string) map[string][]string { + return NamedValuesFromReader(prefix, string_utils.EnvironmentKeyReader()) +} + +func NamedValuesFromReader(prefix string, reader string_utils.KeyReader) map[string][]string { + if reader == nil { + return nil + } result := map[string][]string{} prefix += "_" - for _, env := range os.Environ() { - if strings.HasPrefix(env, prefix) { - parts := strings.SplitN(env[len(prefix):], "=", 2) - if len(parts) == 2 { - //name = parts[0], value = parts[1] - value := parts[1] - result[strings.ToLower(parts[0])] = []string{value} + for _, key := range reader.Keys(prefix) { + value, ok := reader.GetString(key) + key = key[len(prefix):] + if !ok { + logger.Debugf("Key(%s) undefined", key) + continue + } + logger.Debugf("key(%s)=\"%s\"", key, value) + result[strings.ToLower(key)] = []string{value} - //split only if valid CSV between [...] - if value[0] == '[' && value[len(value)-1] == ']' { - csvReader := csv.NewReader(strings.NewReader(value[1 : len(value)-1])) - csvValues, csvErr := csvReader.Read() //this automatically removes quotes around some/all CSV inside the [...] - if csvErr == nil { - result[strings.ToLower(parts[0])] = csvValues - } - } + //split only if valid CSV between [...] + if value[0] == '[' && value[len(value)-1] == ']' { + csvReader := csv.NewReader(strings.NewReader(value[1 : len(value)-1])) + csvValues, csvErr := csvReader.Read() //this automatically removes quotes around some/all CSV inside the [...] + if csvErr == nil { + result[strings.ToLower(key)] = csvValues } } }