Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • bob-public-utils/bobgroup-go-utils
1 result
Select Git revision
Loading items
Show changes
Commits on Source (4)
Showing with 145 additions and 1254 deletions
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Connect to server",
"type": "go",
"request": "attach",
"mode": "remote",
"remotePath": "${workspaceFolder}",
"port": 5986,
"host": "127.0.0.1"
}
]
}
\ No newline at end of file
# Config
Only used for local development on the terminal console, to set ENV from a JSON file, that simulates the environment created by AWS for our lambda images.
## How it works:
When api/cron/sqs starts (see V3),
They check if running local for testing (i.e. command line option used),
If so they look for config.local.json in the current directory or any parent directory
They generally find the one in the project repo top level directory
Then set all the values in that file in the env
package config
import (
"encoding/json"
"os"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
)
func MustGetOrDefault(valuePtr interface{}, name string) {
//update value if defined
envValueStr := os.Getenv(name)
if envValueStr != "" {
if err := json.Unmarshal([]byte(envValueStr), valuePtr); err != nil {
if err := json.Unmarshal([]byte("\""+envValueStr+"\""), valuePtr); err != nil {
panic(errors.Errorf("cannot parse environment %s=%s into %T", name, envValueStr, valuePtr))
}
}
logger.Debugf("CONFIG: Set %s=%s", name, envValueStr)
}
if validator, ok := valuePtr.(Validator); ok {
if err := validator.Validate(); err != nil {
panic(errors.Errorf("Invalid %s=(%T)%+v", name, valuePtr, valuePtr))
}
logger.Debugf("CONFIG: Validated %s", name)
}
logger.Debugf("CONFIG: %s=(%T)%+v", name, valuePtr, valuePtr)
}
type Validator interface {
Validate() error
}
package config
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/struct_utils"
)
func Doc(f *os.File, showValues bool, format int) {
if f == nil {
return
}
entries := []docEntry{}
for prefix, structPtr := range prefixStructs {
prefixEntries := docStruct(prefix, reflect.TypeOf(structPtr).Elem(), reflect.ValueOf(structPtr).Elem())
if showValues {
nv := struct_utils.NamedValuesFromEnv(prefix)
for i, e := range prefixEntries {
name := strings.ToLower(e.Env[len(prefix)+1:])
if values, ok := nv[name]; ok {
e.Current = values
prefixEntries[i] = e
delete(nv, name)
}
}
}
entries = append(entries, prefixEntries...)
}
sort.Slice(entries, func(i, j int) bool { return entries[i].Env < entries[j].Env })
switch format {
case 1: //Mark Down
fmt.Fprintf(f, "# Configuration from Environment\n")
fmt.Fprintf(f, "\n")
if !showValues {
fmt.Fprintf(f, "|Environment|Type|Default|Description & Rules|\n")
fmt.Fprintf(f, "|---|---|---|---|\n")
} else {
fmt.Fprintf(f, "|Environment|Type|Default|Description & Rules|Values|\n")
fmt.Fprintf(f, "|---|---|---|---|---|\n")
}
for _, e := range entries {
text := e.Text
if text != "" && e.Rules != "" {
text += "; " + e.Rules
}
fmt.Fprintf(f, "|%s|%s|%s|%s|",
e.Env,
e.Type,
e.Default,
text)
if showValues {
if len(e.Current) == 0 {
fmt.Fprintf(f, "(Not Defined)|") //no values
} else {
if len(e.Current) == 1 {
fmt.Fprintf(f, "%s|", e.Current[0]) //only one value
} else {
fmt.Fprintf(f, "%s|", strings.Join(e.Current, ", ")) //multiple values
}
}
}
fmt.Fprintf(f, "\n")
}
default:
//just dump it
fmt.Fprintf(f, "=====[ CONFIGURATION ]=====\n")
for _, e := range entries {
fmt.Fprintf(f, "%+v\n", e)
}
}
}
func docStruct(prefix string, t reflect.Type, v reflect.Value) (entries []docEntry) {
logger.Debugf("docStruct(%s, %s)", prefix, t.Name())
entries = []docEntry{}
for i := 0; i < t.NumField(); i++ {
tf := t.Field(i)
if tf.Anonymous {
if tf.Type.Kind() == reflect.Struct {
entries = append(entries, docStruct(prefix, tf.Type, v.Field(i))...) //anonymous embedded sub-struct
}
continue //anonymous embedded non-struct
}
tag := strings.SplitN(tf.Tag.Get("json"), ",", 2)[0]
if tag == "" || tag == "-" {
continue //excluded field
}
fieldName := prefix + "_" + strings.ToUpper(tag)
switch tf.Type.Kind() {
case reflect.Struct:
entries = append(entries, docStruct(fieldName, tf.Type, v.Field(i))...) //anonymous embedded sub-struct
case reflect.Slice:
entries = append(entries, docEntry{
Env: fieldName,
Type: "list of " + tf.Type.Elem().Name(),
Text: tf.Tag.Get("doc"),
Default: tf.Tag.Get("default"),
Rules: tf.Tag.Get("rules"),
Value: v.Field(i),
})
default:
entries = append(entries, docEntry{
Env: fieldName,
Type: tf.Type.Name(),
Text: tf.Tag.Get("doc"),
Default: tf.Tag.Get("default"),
Rules: tf.Tag.Get("rules"),
Value: v.Field(i),
})
}
}
return entries
}
type docEntry struct {
Env string
Type string
Text string
Default string
Rules string
Value reflect.Value
Current []string
}
# Configuration from Environment
|Environment|Type|Default|Description & Rules|Values|
|---|---|---|---|---|
|API_LOGS_CLEANUP_DAYS|int64||Nr of days to keep before cleanup. Default 31.|N/A|
|API_LOGS_INDEX_NAME|string||Name of index for api-logs (lowercase alpha-numerics with dashes, default: uafrica-v3-api-logs)|N/A|
|API_LOGS_MAX_RESPONSE_SIZE|int64||Maximum length of response body stored. Defaults to 1024.|N/A|
|API_LOGS_SEARCH_ADDRESSES|list of string||List of server addresses. Requires at least one, e.g. "https://localhost:9200" for local testing|[https://search-uafrica-v3-api-logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/]|
|API_LOGS_SEARCH_PASSWORD|string||User password for HTTP basic auth. Defaults to admin for local testing.|[Aiz}a4ee]|
|API_LOGS_SEARCH_USERNAME|string||User name for HTTP basic auth. Defaults to admin for local testing.|[uafrica]|
|AUDIT_MAX_RESPONSE_SIZE|int64||Maximum length of response body stored. Defaults to 1024.|N/A|
\ No newline at end of file
package config
import (
"encoding/json"
"fmt"
"os"
"path"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
)
func LoadLocal() error {
configDir, err := os.Getwd()
if err != nil {
return errors.Wrapf(err, "cannot get working directory")
}
configFilename := "config.local.json"
for {
fn := configDir + "/" + configFilename
f, err := os.Open(fn)
if err != nil {
logger.Debugf("%s not found in %s", configFilename, configDir)
parentDir := path.Dir(configDir)
if parentDir == configDir {
return errors.Errorf("did not find file %s in working dir or any parent dir", configFilename)
}
configDir = parentDir
continue
}
defer f.Close()
var config map[string]interface{}
if err := json.NewDecoder(f).Decode(&config); err != nil {
return errors.Wrapf(err, "failed to decode JSON from file %s", fn)
}
for n, v := range config {
vs := fmt.Sprintf("%v", v)
os.Setenv(n, vs)
logger.Debugf("Defined local config %s=%s", n, vs)
}
return nil
}
} //LoadLocal()
package config
import (
"regexp"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/string_utils"
"gitlab.com/uafrica/go-utils/struct_utils"
)
var (
prefixStructs = map[string]interface{}{}
)
func LoadEnv(prefix string, configStructPtr interface{}) error {
return Load(prefix, configStructPtr, string_utils.EnvironmentKeyReader())
}
func Load(prefix string, configStructPtr interface{}, keyReader string_utils.KeyReader) error {
if !prefixRegex.MatchString(prefix) {
return errors.Errorf("config(%s) invalid prefix", prefix)
}
//store before load in case it fails to be still part of docs
prefixStructs[prefix] = configStructPtr
//read os.Getenv() or other reader...
nv := struct_utils.NamedValuesFromReader(prefix, keyReader)
logger.Debugf("nv: %+v", nv)
//parse into struct
unused, err := struct_utils.UnmarshalNamedValues(nv, configStructPtr)
if err != nil {
return errors.Wrapf(err, "config(%s) cannot load", prefix)
}
if len(unused) > 0 {
//we still use os.Getenv() elsewhere, so some variables may not be in the struct
//e.g. AUDIT_QUEUE_URL is read from queues/sqs/producer which match config(prefix="AUDIT")
//so we cannot yet fail here, which we should, because config setting not used is often
//a reason for errors, when we try to configure something, then it does not work, and
//we cannot figure out why, but the value we did set, might just be misspelled etc.
//so, for now - do not fail here, just report the unused values
logger.Warnf("Note unused env (might be used elsewhere) for config(%s): %+v", prefix, unused)
//return errors.Errorf("config(%s): unknown %+v", prefix, unused)
}
if validator, ok := configStructPtr.(Validator); ok {
if err := validator.Validate(); err != nil {
return errors.Wrapf(err, "config(%s) is invalid", prefix)
}
}
return nil
}
func LoadRedis(prefix string, configStructPtr interface{}) error {
if !prefixRegex.MatchString(prefix) {
return errors.Errorf("config(%s) invalid prefix", prefix)
}
//store before load in case it fails to be still part of docs
prefixStructs[prefix] = configStructPtr
//read os.Getenv()
nv := struct_utils.NamedValuesFromEnv(prefix)
//parse into struct
unused, err := struct_utils.UnmarshalNamedValues(nv, configStructPtr)
if err != nil {
return errors.Wrapf(err, "config(%s) cannot load", prefix)
}
if len(unused) > 0 {
//we still use os.Getenv() elsewhere, so some variables may not be in the struct
//e.g. AUDIT_QUEUE_URL is read from queues/sqs/producer which match config(prefix="AUDIT")
//so we cannot yet fail here, which we should, because config setting not used is often
//a reason for errors, when we try to configure something, then it does not work, and
//we cannot figure out why, but the value we did set, might just be misspelled etc.
//so, for now - do not fail here, just report the unused values
logger.Warnf("Note unused env (might be used elsewhere) for config(%s): %+v", prefix, unused)
//return errors.Errorf("config(%s): unknown %+v", prefix, unused)
}
if validator, ok := configStructPtr.(Validator); ok {
if err := validator.Validate(); err != nil {
return errors.Wrapf(err, "config(%s) is invalid", prefix)
}
}
return nil
}
const prefixPattern = `[A-Z]([A-Z0-9_]*[A-Z0-9])*`
var prefixRegex = regexp.MustCompile("^" + prefixPattern + "$")
package config_test
import (
"encoding/json"
"fmt"
"os"
"strings"
"testing"
"time"
"gitlab.com/uafrica/go-utils/config"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
)
func TestLoad(t *testing.T) {
logger.SetGlobalFormat(logger.NewConsole())
logger.SetGlobalLevel(logger.LevelDebug)
//booleans
os.Setenv("TEST_VALUE_ENABLE_CACHE", "true")
os.Setenv("TEST_VALUE_DISABLE_LOG", "true")
os.Setenv("TEST_VALUE_ADMIN", "false")
//integers
os.Setenv("TEST_VALUE_MAX_SIZE", "12")
os.Setenv("TEST_VALUE_SEQ1", "[4,5,6]") //list in one value
os.Setenv("TEST_VALUE_SEQ2_10", "10") //numbered list elements
os.Setenv("TEST_VALUE_SEQ2_20", "20")
os.Setenv("TEST_VALUE_SEQ2_4", "4")
os.Setenv("TEST_VALUE_SEQ2_15", "15")
os.Setenv("TEST_VALUE_SEQ2", "100")
os.Setenv("TEST_VALUE_CUTOFF", "2021-11-20T12:00:00+02:00")
os.Setenv("TEST_VALUE_HOLIDAYS", "[2021-03-21,2021-04-27,2021-05-01,2021-06-16,2021-08-09,2021-12-16,2021-12-25]")
c := Config{}
if err := config.LoadEnv("TEST_VALUE", &c); err != nil {
t.Fatalf("Cannot load config: %+v", err)
}
t.Logf("Loaded config: %+v", c)
if !c.EnableCache || !c.DisableLog || c.Admin {
t.Fatalf("wrong bool values: %+v", c)
}
if c.MaxSize != 12 {
t.Fatalf("wrong nr values: %+v", c)
}
if len(c.Seq1) != 3 || c.Seq1[0] != 4 || c.Seq1[1] != 5 || c.Seq1[2] != 6 {
t.Fatalf("wrong seq1: %+v", c)
}
if len(c.Seq2) != 5 || c.Seq2[0] != 100 || c.Seq2[1] != 4 || c.Seq2[2] != 10 || c.Seq2[3] != 15 || c.Seq2[4] != 20 {
t.Fatalf("wrong seq2: %+v", c)
}
if c.Cutoff.UTC().Format("2006-01-02 15:04:05") != "2021-11-20 10:00:00" {
t.Fatalf("wrong cutoff")
}
if len(c.Holidays) != 7 ||
c.Holidays[0].String() != "2021-03-21" ||
c.Holidays[1].String() != "2021-04-27" ||
c.Holidays[2].String() != "2021-05-01" ||
c.Holidays[3].String() != "2021-06-16" ||
c.Holidays[4].String() != "2021-08-09" ||
c.Holidays[5].String() != "2021-12-16" ||
c.Holidays[6].String() != "2021-12-25" {
t.Fatalf("wrong holidays")
}
{
t.Logf("config(TEST) = %+v", c)
e := json.NewEncoder(os.Stdout)
e.SetIndent("", " ")
e.Encode(c)
}
}
type Config struct {
EnableCache bool `json:"enable_cache"`
DisableLog bool `json:"disable_log"`
Admin bool `json:"admin"`
MaxSize int64 `json:"max_size"`
Seq1 []int `json:"seq1"`
Seq2 []int64 `json:"seq2"`
Cutoff time.Time `json:"cutoff"`
Holidays []Date `json:"holidays"`
}
type Date struct {
Y, M, D int
}
func (d *Date) Scan(value []byte) error {
s := strings.Trim(string(value), "\"")
v, err := time.ParseInLocation("2006-01-02", s, time.Now().Location())
if err != nil {
return errors.Errorf("%s is not CCYY-MM-DD", s)
}
d.Y = v.Year()
d.M = int(v.Month())
d.D = v.Day()
return nil
}
func (d *Date) UnmarshalJSON(value []byte) error {
return d.Scan(value)
}
func (d Date) String() string {
return fmt.Sprintf("%04d-%02d-%02d", d.Y, d.M, d.D)
}
func (d Date) MarshalJSON() ([]byte, error) {
return []byte("\"" + d.String() + "\""), nil
}
type SearchConfig struct {
Addresses []string `json:"addresses"`
}
type LogConfig struct {
SearchConfig `json:"search"`
Search2 SearchConfig `json:"search2"`
IndexName string `json:"index_name"`
}
func TestLogConfig(t *testing.T) {
logger.SetGlobalFormat(logger.NewConsole())
logger.SetGlobalLevel(logger.LevelDebug)
os.Setenv("LOG_INDEX_NAME", "abc")
os.Setenv("LOG_SEARCH_ADDRESSES", "[A,B,C]")
os.Setenv("LOG_SEARCH2_ADDRESSES", "[D,E,F]")
os.Setenv("LOG_OTHER", "1")
os.Setenv("LOG_SEARCH_OTHER", "2")
c := LogConfig{}
err := config.LoadEnv("LOG", &c)
if err != nil {
t.Fatalf("Failed: %+v", err)
}
t.Logf("Loaded: %+v", c)
if c.IndexName != "abc" {
t.Fatalf("wrong index_name:%s", c.IndexName)
}
if len(c.Addresses) != 3 || c.Addresses[0] != "A" || c.Addresses[1] != "B" || c.Addresses[2] != "C" {
t.Fatalf("wrong addresses:%+v", c.Addresses)
}
if len(c.Search2.Addresses) != 3 || c.Search2.Addresses[0] != "D" || c.Search2.Addresses[1] != "E" || c.Search2.Addresses[2] != "F" {
t.Fatalf("wrong search2 addresses:%+v", c.Search2.Addresses)
}
}
......@@ -12,7 +12,6 @@ import (
"github.com/google/uuid"
"gitlab.com/uafrica/go-utils/consumer"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logs"
"gitlab.com/uafrica/go-utils/queues"
"gitlab.com/uafrica/go-utils/service"
)
......@@ -181,11 +180,6 @@ func (q *queue) process(event queues.Event) error {
} //queue.process()
func (q *queue) Send(event queues.Event) (msgID string, err error) {
startTime := time.Now()
defer func() {
logs.LogSQSSent(startTime, event.QueueName, event.TypeName, event.BodyJSON)
}()
event.MessageID = uuid.New().String()
q.ch <- event
return event.MessageID, nil
......
......@@ -135,28 +135,13 @@ func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEven
//process all message records in this event:
for messageIndex, message := range lambdaEvent.Records {
//get request-id for this message record
startTime := time.Now()
requestID := ""
if requestIDAttr, ok := message.MessageAttributes[c.requestIDHeaderKey]; ok {
requestID = *requestIDAttr.StringValue
}
messageType := ""
var requestToLog interface{}
var handlerErr error
requestToLog = message.Body //will be logged as string if failed before parsing body into struct
defer func() {
if err := logs.LogSQSRequest(
startTime,
requestID,
messageType,
requestToLog,
handlerErr,
); err != nil {
c.Errorf("failed to log: %+v", err)
}
}()
if messageTypeAttr, ok := message.MessageAttributes["type"]; !ok || messageTypeAttr.StringValue == nil {
c.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required
continue
......@@ -210,7 +195,6 @@ func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEven
ctx.Errorf("invalid message: %+v", err)
continue
}
requestToLog = recordStruct //replace string log with structured log
ctx.Tracef("message (%T) %+v", recordStruct, recordStruct)
args = append(args, reflect.ValueOf(recordStruct))
......
package main
import (
"flag"
"gitlab.com/uafrica/go-utils/config"
"gitlab.com/uafrica/go-utils/consumer/sqs_consumer"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/examples/core/db"
"gitlab.com/uafrica/go-utils/logger"
)
func main() {
reqFile := flag.String("req", "", "Request file to process for testing.")
flag.Parse()
sqsRoutes := map[string]interface{}{}
consumer := sqs_consumer.New("uafrica-request-id", sqsRoutes).
WithStarter("db", db.Connector("core"))
if reqFile != nil && *reqFile != "" {
if err := config.LoadLocal(); err != nil {
panic(errors.Errorf("Failed to load local config: %+v", err))
}
if err := consumer.ProcessFile(*reqFile); err != nil {
panic(errors.Errorf("processing failed: %+v", err))
}
logger.Debugf("Stop after processing event from file.")
return
}
consumer.Run()
}
......@@ -5,7 +5,6 @@ go 1.17
require (
github.com/aws/aws-lambda-go v1.26.0
github.com/aws/aws-sdk-go v1.40.50
github.com/aws/aws-secretsmanager-caching-go v1.1.0
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.13.0
......@@ -28,9 +27,6 @@ require (
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/text v0.3.7
mellium.im/sasl v0.2.1 // indirect
)
require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/opensearch-project/opensearch-go v1.0.0
github.com/r3labs/diff/v2 v2.14.0
......
# Logs
This package provides functions to log API, SQS and CRON events, capturing for example an API method and path along with request and response details, or SQS event details along with the time it spent in the queue before being processed.
These logs are sent to a SQS queue. The handling of the queue event and capturing of the log, is done as SQS handlers, not as part of this package.
Do not confuse this with logger, which write free-format log entries to stdout.
# API-LOGS
At the end of each API handler, an api-log is captured to describe the incoming API request and response. Also part of this log is a list of actions takens during the handler, including:
* API calls made
* SQS Events sent
* Database Statements executed
* Time slept
Each of those has a start/end time and duration, and they are summed and it should add up to most of the API-Log total duration.
If there is a big difference between the summed time and the total duration, then we are doing something that takes time that we are not monitoring which should be investigated.
The total time spent sleeping, waiting for db calls, waiting for outgoing API calls, is logged in the API log.
This can be logged and broken down per path and method to see where the API is spending most time, and that could be investigated and optimised to improvie performance.
# SQS-LOGS
SQS logs are written at the end of a SQS event handler, similar to API logs.
Since SQS is used to write API logs, those handlers should not be logged, as it will create a circular infinite queue ramping up huge bills.
To be safe, SQS logs are therefore DISABLED by default.
It should only be enabled for things like provider rate requests or any SQS handler that is part of the functionality of the system doing async work, not handlers that are part of the infrastructure.
SQS log will also write to the API_LOGS queue and the same index in OpenSearch (can review and change this in future)
It logs with method "SQS" and path is the message type.
That means we can log durations and through put in the same way and on the same graph as API when needed
It also captures the actions taken as part of the handler, in the same way it is captured for API.
So when one finds some action takes too long in API, and move it to an SQS handler, the change will be visibile on the dashboard and indicate the improvement or not if your change did not have the desired effect.
That it is idea.
We can easily disable SQS logs and we can easily move it to another index in OpenSearch if necessary. Will have to try it for a while an see if it is useful in the current form or not.
# CRON-LOGS
In the same way we log API/SQS, it will be useful to monitor crons with a bit of output, e.g. nr of items deleted by a janitor etc.
One can get that currently from cloud watch if the logs are not disabled, and CloudWatch should ideally not be source of metrics, but that is currently the case, so not changing it yet.
\ No newline at end of file
package logs
import (
"encoding/json"
"sync"
"time"
)
//Call LogOutgoingAPIRequest() after calling an API end-point as part of a handler,
//to capture the details
//and add it to the current handler log story for reporting/metrics
func LogOutgoingAPIRequest(url string, method string, requestBody string, responseBody string, responseCode int, startTime time.Time) error {
endTime := time.Now()
log := ApiCallLog{
URL: url,
Method: method,
ResponseCode: responseCode,
}
if requestBody != "" {
log.Request = &BodyLog{
BodySize: len(requestBody),
Body: requestBody,
}
}
if responseBody != "" {
log.Response = &BodyLog{
BodySize: len(responseBody),
Body: responseBody,
}
}
actionListMutex.Lock()
actionList = append(actionList, ActionLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: ActionTypeApiCall,
ApiCall: &log,
})
actionListMutex.Unlock()
return nil
} //LogOutgoingAPIRequest()
//Call LogSQL() after executing any SQL query
//to capture the details
//and add it to the current handler log story for reporting/metrics
func LogSQL(
startTime time.Time,
sql string,
rowsCount int, //optional nr of rows to report, else 0
ids []int64, //optional list of ids to report, else nil
err error, //only if failed, else nil
) {
endTime := time.Now()
log := SQLQueryLog{
SQL: sql,
RowCount: rowsCount,
InsertIDs: ids,
}
if err != nil {
log.Error = err.Error()
}
actionListMutex.Lock()
actionList = append(actionList, ActionLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: ActionTypeSqlQuery,
SQLQuery: &log,
})
actionListMutex.Unlock()
}
//Call LogSQSSent() after sending an SQS event
//to capture the details
//and add it to the current handler log story for reporting/metrics
func LogSQSSent(startTime time.Time, queueName string, messageType string, request interface{}) {
//do not log internal events sent to audit/api-log
if queueName == "API_LOGS" || queueName == "AUDIT" {
return
}
endTime := time.Now()
log := SQSSentLog{
QueueName: queueName,
MessageType: messageType,
}
if request != nil {
if requestString, ok := request.(string); ok {
log.Request = &BodyLog{
BodySize: len(requestString), //do not marshal, else we have double escaped JSON
Body: requestString,
}
} else {
jsonRequest, _ := json.Marshal(request)
log.Request = &BodyLog{
BodySize: len(jsonRequest),
Body: string(jsonRequest),
}
}
}
actionListMutex.Lock()
actionList = append(actionList, ActionLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: ActionTypeSqsSent,
SQSSent: &log,
})
actionListMutex.Unlock()
}
func LogSearch(startTime time.Time, index string, query string) {
endTime := time.Now()
actionListMutex.Lock()
actionList = append(actionList, ActionLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: ActionTypeSearch,
Search: &SearchLog{
Index: index,
Query: query,
},
})
actionListMutex.Unlock()
}
//Call LogSleep() after doing time.Sleep()
//to capture the details
//and add it to the current handler log story for reporting/metrics
func LogSleep(startTime time.Time) {
endTime := time.Now()
actionListMutex.Lock()
actionList = append(actionList, ActionLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: ActionTypeSleep,
})
actionListMutex.Unlock()
}
var (
actionListMutex sync.Mutex
actionList = []ActionLog{}
)
type ActionLog struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
DurMs int64 `json:"duration_ms"` //duration in milliseconds
Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"`
ApiCall *ApiCallLog `json:"api_call,omitempty"`
SQSSent *SQSSentLog `json:"sqs_sent,omitempty"`
SQLQuery *SQLQueryLog `json:"sql_query,omitempty"`
Search *SearchLog `json:"search"`
}
func (action ActionLog) Relative(relTime time.Time) RelativeActionLog {
return RelativeActionLog{
StartMs: action.StartTime.Sub(relTime).Milliseconds(),
EndMs: action.EndTime.Sub(relTime).Milliseconds(),
DurMs: action.DurMs,
Type: action.Type,
ApiCall: action.ApiCall,
SQSSent: action.SQSSent,
SQLQuery: action.SQLQuery,
}
}
type RelativeActionLog struct {
StartMs int64 `json:"start_ms" doc:"Start time in milliseconds after start timestamp"`
EndMs int64 `json:"end_ms" doc:"End time in milliseconds after start timestamp"`
DurMs int64 `json:"duration_ms"` //duration in milliseconds
Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep" search:"keyword"`
ApiCall *ApiCallLog `json:"api_call,omitempty"`
SQSSent *SQSSentLog `json:"sqs_sent,omitempty"`
SQLQuery *SQLQueryLog `json:"sql_query,omitempty"`
}
type ActionType string
var ActionTypeList = []ActionType{
ActionTypeNone,
ActionTypeApiCall,
ActionTypeSqsSent,
ActionTypeSqlQuery,
ActionTypeSearch,
ActionTypeSleep,
}
const (
ActionTypeNone ActionType = "none"
ActionTypeApiCall ActionType = "api-call"
ActionTypeSqsSent ActionType = "sqs-sent"
ActionTypeSqlQuery ActionType = "sql-query"
ActionTypeSearch ActionType = "search"
ActionTypeSleep ActionType = "sleep"
)
//APICallLog captures details of an outgoing API call made from a handler
type ApiCallLog struct {
URL string `json:"url" search:"keyword"`
Method string `json:"method" search:"keyword"`
ResponseCode int `json:"response_code" search:"keyword"`
Request *BodyLog `json:"request,omitempty"`
Response *BodyLog `json:"response,omitempty"`
}
type BodyLog struct {
BodySize int `json:"body_size"`
Body string `json:"body"`
}
//SQSSentLog captures details of an SQS event sent from a handler
type SQSSentLog struct {
QueueName string `json:"queue_name" search:"keyword"`
MessageType string `json:"message_type" search:"keyword"`
Request *BodyLog `json:"request,omitempty"`
}
//SQLQueryLog captures details of an SQL query executed from a handler resulting in either rows returned, ids inserted or an error
type SQLQueryLog struct {
SQL string `json:"sql"`
RowCount int `json:"row_count,omitempty"`
InsertIDs []int64 `json:"insert_ids,omitempty"`
Error string `json:"error,omitempty"`
}
type SearchLog struct {
Index string `json:"index"`
Query string `json:"query"`
}
//compile the relative action list that took place during this handler
//copy then reset actionList for the next handler
//we copy it with relation to this API's start..end time, rather than full timestamps, which are hard to read in the list
//start and end are current total handler period that actions should be inside that
func relativeActionList(startTime, endTime time.Time) []RelativeActionLog {
actionListMutex.Lock()
defer func() {
//after copy/discard, reset (global!) action list for the next handler
actionList = []ActionLog{}
actionListMutex.Unlock()
}()
cfg := currentLogConfig()
if !cfg.ActionsKeep {
return nil
}
//todo: runtime config: load temporary from REDIS after N seconds
//which will allow us to monitor better for a short while during trouble shooting
//then something like this to reload every 5min (5min could also be part of config)
// if dynamicExpireTime.Before(time.Now()) {
// dynamicApiConfig := apiConfig //loaded from env at startup
// ...look for keys in REDIS and override ...
// dynamicExpireTime = time.Now().Add(time.Minute*5)
// }
// do this in go routing with sleep... so handlers do not have to check :-)
// and it can also be used for api-log part that is not action list, e.g. for api-log req/res body len etc...
relActionList := []RelativeActionLog{}
for _, action := range actionList {
if action.EndTime.Before(startTime) || action.StartTime.After(endTime) {
continue //not expected - skip actions outside log window
}
//apply reduction filters to limit string lengths
switch action.Type {
case ActionTypeNone:
case ActionTypeSqlQuery:
if action.SQLQuery != nil && len(action.SQLQuery.SQL) > int(cfg.ActionsMaxSQLLength) {
action.SQLQuery.SQL = action.SQLQuery.SQL[:cfg.ActionsMaxSQLLength]
}
case ActionTypeSqsSent:
if action.SQSSent != nil && action.SQSSent.Request != nil && len(action.SQSSent.Request.Body) > int(cfg.ActionsMaxSQSReqBodyLength) {
action.SQSSent.Request.Body = action.SQSSent.Request.Body[:cfg.ActionsMaxSQSReqBodyLength]
}
case ActionTypeApiCall:
if action.ApiCall != nil {
if action.ApiCall.Request != nil && len(action.ApiCall.Request.Body) > int(cfg.ActionsMaxAPIReqBodyLength) {
action.ApiCall.Request.Body = action.ApiCall.Request.Body[:cfg.ActionsMaxAPIReqBodyLength]
}
if action.ApiCall.Response != nil && len(action.ApiCall.Response.Body) > int(cfg.ActionsMaxAPIResBodyLength) {
action.ApiCall.Response.Body = action.ApiCall.Response.Body[:cfg.ActionsMaxAPIResBodyLength]
}
}
case ActionTypeSearch:
if action.Search != nil {
if len(action.Search.Query) > int(cfg.ActionsMaxSearchQueryLength) {
action.Search.Query = action.Search.Query[:cfg.ActionsMaxSearchQueryLength]
}
}
}
//make relative and append to the list
relActionList = append(relActionList, action.Relative(startTime))
}
//also append to the list any nonAction periods greater than thresholdMs
//to indicate significant gaps in the action list that we did not account for
thresholdMs := int64(50)
//make period list, remove all action periods, then we're left with non-action periods :-)
nonActionPeriods := NewPeriods(startTime, endTime)
for _, action := range actionList {
nonActionPeriods = nonActionPeriods.Without(Period{Start: action.StartTime, End: action.EndTime})
}
for _, nonAction := range nonActionPeriods {
if nonAction.Duration().Milliseconds() > thresholdMs {
relActionList = append(relActionList, ActionLog{
StartTime: nonAction.Start,
EndTime: nonAction.End,
DurMs: nonAction.Duration().Milliseconds(),
Type: ActionTypeNone,
}.Relative(startTime))
}
}
return relActionList
}
package logs
import (
"net/http"
"sort"
"net/url"
"os"
"strconv"
"strings"
"time"
......@@ -13,6 +13,24 @@ import (
"gitlab.com/uafrica/go-utils/queues"
)
var (
MaxReqBodyLength int = 1024
MaxResBodyLength int = 1024
)
func init() {
if s := os.Getenv("API_LOGS_MAX_REQ_BODY_LENGTH"); s != "" {
if i64, err := strconv.ParseInt(s, 10, 64); err == nil && i64 >= 0 {
MaxReqBodyLength = int(i64)
}
}
if s := os.Getenv("API_LOGS_MAX_RES_BODY_LENGTH"); s != "" {
if i64, err := strconv.ParseInt(s, 10, 64); err == nil && i64 >= 0 {
MaxResBodyLength = int(i64)
}
}
}
var producer queues.Producer
func Init(p queues.Producer) {
......@@ -47,6 +65,7 @@ func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[stri
}
}
userID, _ := claim["UserID"].(int64)
username, _ := claim["Username"].(string)
accountID, _ := claim["AccountID"].(int64)
if accountID == 0 {
......@@ -60,6 +79,7 @@ func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[stri
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: "api-incoming",
Method: req.HTTPMethod,
Address: req.RequestContext.DomainName,
Path: req.Path,
......@@ -69,28 +89,31 @@ func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[stri
InitialAuthUsername: authUsername,
SourceIP: req.RequestContext.Identity.SourceIP,
UserAgent: req.RequestContext.Identity.UserAgent,
UserID: userID,
Username: username,
AccountID: accountID,
Request: ApiLogRequest{
Headers: req.Headers,
QueryParameters: req.QueryStringParameters,
BodySize: len(req.Body),
Body: req.Body,
//see below: Body: req.Body,
},
Response: ApiLogResponse{
Headers: res.Headers,
BodySize: len(res.Body),
Body: res.Body,
//see below: Body: res.Body,
},
Actions: nil,
}
//compile action list
apiLog.Actions = relativeActionList(apiLog.StartTime, apiLog.EndTime)
//sort action list on startTime, cause actions are added when they end, i.e. ordered by end time
//and all non-actions were appended at the end of the list
sort.Slice(apiLog.Actions, func(i, j int) bool { return apiLog.Actions[i].StartMs < apiLog.Actions[j].StartMs })
if apiLog.Request.BodySize > MaxReqBodyLength {
apiLog.Request.Body = req.Body[:MaxReqBodyLength] + "..."
} else {
apiLog.Request.Body = req.Body
}
if apiLog.Response.BodySize > MaxResBodyLength {
apiLog.Response.Body = res.Body[:MaxResBodyLength] + "..."
} else {
apiLog.Response.Body = res.Body
}
//also copy multi-value query parameters to the log as CSV array values
for n, as := range req.MultiValueQueryStringParameters {
......@@ -99,11 +122,11 @@ func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[stri
//todo: filter out excessive req/res body content per (method+path)
//todo: also need to do for all actions...
if apiLog.Method == http.MethodGet {
apiLog.Response.Body = "<not logged>"
}
// if apiLog.Method == http.MethodGet {
// apiLog.Response.Body = "<not logged>"
// }
logger.Debugf("Send api-log to SQS: %+v", apiLog)
// logger.Debugf("Send api-log to SQS: %+v", apiLog)
//todo: filter out sensitive values (e.g. OTP)
if _, err := producer.NewEvent("API_LOGS").
......@@ -115,11 +138,82 @@ func LogIncomingAPIRequest(startTime time.Time, requestID string, claim map[stri
return nil
} //LogIncomingAPIRequest()
//Call LogOutgoingAPIRequest() after calling an API end-point as part of a handler,
//to capture the details
//and add it to the current handler log story for reporting/metrics
func LogOutgoingAPIRequest(startTime time.Time, requestID string, claim map[string]interface{}, urlString string, method string, requestBody string, responseBody string, responseCode int) error {
if producer == nil {
return errors.Errorf("logs queue producer not set")
}
//todo: filter out some noisy (method+path)
//logger.Debugf("claim: %+v", claim)
endTime := time.Now()
userID, _ := claim["UserID"].(int64)
username, _ := claim["Username"].(string)
accountID, _ := claim["AccountID"].(int64)
params := map[string]string{}
parsedURL, err := url.Parse(urlString)
if err == nil {
for n, v := range parsedURL.Query() {
params[n] = strings.Join(v, ",")
}
}
apiLog := ApiLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
Type: "api-outgoing",
Method: method,
Path: parsedURL.Path,
Address: parsedURL.Host,
ResponseCode: responseCode,
RequestID: requestID,
UserID: userID,
Username: username,
AccountID: accountID,
Request: ApiLogRequest{
//Headers: req.Headers,
QueryParameters: params,
BodySize: len(requestBody),
//See below: Body: requestBody,
},
Response: ApiLogResponse{
//Headers: res.Headers,
BodySize: len(responseBody),
//See below: Body: responseBody,
},
}
if apiLog.Request.BodySize > MaxReqBodyLength {
apiLog.Request.Body = requestBody[:MaxReqBodyLength] + "..."
} else {
apiLog.Request.Body = requestBody
}
if apiLog.Response.BodySize > MaxResBodyLength {
apiLog.Response.Body = responseBody[:MaxResBodyLength] + "..."
} else {
apiLog.Response.Body = responseBody
}
//todo: filter out sensitive values (e.g. OTP)
if _, err := producer.NewEvent("API_LOGS").
Type("api-log").
RequestID(apiLog.RequestID).
Send(apiLog); err != nil {
return errors.Wrapf(err, "failed to send api-log")
}
return nil
} //LogOutgoingAPIRequest()
//ApiLog is the SQS event details struct encoded as JSON document, sent to SQS, to be logged for each API handler executed.
type ApiLog struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
DurMs int64 `json:"duration_ms"` //duration in milliseconds
Type string `json:"type"` //incoming-api or outgoing-api
Method string `json:"method"`
Address string `json:"address"` //server address for incoming and outgoing
Path string `json:"path"`
......@@ -128,13 +222,13 @@ type ApiLog struct {
InitialAuthUsername string `json:"initial_auth_username,omitempty"`
InitialAuthType string `json:"initial_auth_type,omitempty"`
AccountID int64 `json:"account_id,omitempty"`
UserID int64 `json:"user_id,omitempty"`
Username string `json:"username,omitempty"`
SourceIP string `json:"source_ip,omitempty"` //only logged for incoming API
UserAgent string `json:"user_agent,omitempty"` //only for incoming, indicate type of browser when UI
RelevantID string `json:"relevant_id,omitempty"`
Request ApiLogRequest `json:"request"`
Response ApiLogResponse `json:"response"`
Actions []RelativeActionLog `json:"actions,omitempty"`
}
type ApiLogRequest struct {
......
package logs
import (
"context"
"time"
"gitlab.com/uafrica/go-utils/config"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/redis"
)
type Config struct {
ActionsKeep bool `json:"actions_keep" doc:"Set true to keep list of actions in logs"`
ActionsMaxSQLLength int64 `json:"actions_max_sql_length" doc:"Set length of SQL query to keep in action list (default 0 = delete)"`
ActionsMaxSQSReqBodyLength int64 `json:"actions_max_sqs_req_body_length" doc:"Set length of SQS Request body to keep in action list (default 0 = delete)"`
ActionsMaxAPIReqBodyLength int64 `json:"actions_max_api_req_body_length" doc:"Set length of API Request body to keep in action list (default 0 = delete)"`
ActionsMaxAPIResBodyLength int64 `json:"actions_max_api_res_body_length" doc:"Set length of API Response body to keep in action list (default 0 = delete)"`
ActionsMaxSearchQueryLength int64 `json:"actions_max_search_query_length" doc:"Set length of search query to keep in action list (default 0 = delete)"`
}
const configPrefix = "LOGS"
var (
logConfig Config
dynamicLogConfig Config
dynamicExpireTime time.Time
redisCli redis.IRedis
)
func init() {
if err := config.LoadEnv(configPrefix, &logConfig); err != nil {
logger.Errorf("failed to load LOGS config: %+v", err)
}
dynamicLogConfig = logConfig
dynamicExpireTime = time.Now()
//see if can load overrides from redis
var err error
redisCli, err = redis.New(context.Background())
if err != nil {
logger.Errorf("Not able to connect to REDIS for runtime %s config: %+v", configPrefix, err)
}
}
//todo: call only on each use and check expiry time before reading from REDIS again, e.g. reload no faster that 10s
func currentLogConfig() Config {
if redisCli == nil || dynamicExpireTime.After(time.Now()) {
return dynamicLogConfig
}
//time to attempt reload
//copy static config then overload values which are defined from REDIS
dynamicLogConfig = logConfig
dynamicExpireTime = time.Now().Add(time.Second * 10)
if err := config.Load(configPrefix, &dynamicLogConfig, redisCli); err != nil {
logger.Errorf("failed to load %s config from REDIS", configPrefix)
} else {
logger.Debugf("Loaded %s config: %+v", configPrefix, dynamicLogConfig)
}
return dynamicLogConfig
} //runtimeConfigLoad
package logs
//todo... currently monitored from CloudWatch...
package logs
import (
"time"
)
type Period struct {
Start time.Time `json:"start_time"`
End time.Time `json:"end_time"`
}
func (p Period) Duration() time.Duration {
return p.End.Sub(p.Start)
}
type Periods []Period
func NewPeriods(start time.Time, end time.Time) Periods {
if end.Before(start) {
return []Period{}
}
return []Period{{Start: start, End: end}}
}
func (ps Periods) Without(p Period) Periods {
if len(ps) == 0 {
return ps //nothing left to take from
}
if p.End.Before(ps[0].Start) {
return ps //before first period
}
if p.Start.After(ps[len(ps)-1].End) {
return ps //after last period
}
//logger.Debugf("Start: %+v", ps)
nextIndex := 0
for nextIndex < len(ps) && ps[nextIndex].End.Before(p.Start) {
//logger.Debugf("skip[%d]: %s > %s", nextIndex, p.Start, ps[nextIndex].End)
nextIndex++
}
toDelete := []int{}
for nextIndex < len(ps) && ps[nextIndex].End.Before(p.End) {
if ps[nextIndex].Start.Before(p.Start) {
//trim tail
//logger.Debugf("tail[%d] %s->%s", nextIndex, ps[nextIndex].End, p.Start)
ps[nextIndex].End = p.Start
} else {
//delete this period completely and move to next
toDelete = append(toDelete, nextIndex)
//logger.Debugf("delete[%d] %s..%s", nextIndex, ps[nextIndex].Start, ps[nextIndex].End)
}
nextIndex++
}
if nextIndex < len(ps) && ps[nextIndex].End.After(p.End) {
if ps[nextIndex].Start.Before(p.Start) {
//remove part of this period
ps = append(ps, Period{Start: p.End, End: ps[nextIndex].End})
ps[nextIndex].End = p.Start
//logger.Debugf("split[%d]", nextIndex)
} else {
if ps[nextIndex].Start.Before(p.End) {
//trim head of period to start after removed peroid, then stop
//logger.Debugf("head[%d] %s->%s", nextIndex, ps[nextIndex].Start, p.End)
ps[nextIndex].Start = p.End
}
}
}
//delete selected periods completely
newPS := []Period{}
for i, p := range ps {
if len(toDelete) > 0 && i == toDelete[0] {
toDelete = toDelete[1:]
} else {
newPS = append(newPS, p)
}
}
//logger.Debugf("final: %+v", newPS)
return newPS
}
//Span is (last.end - first.start)
func (ps Periods) Span() time.Duration {
if len(ps) > 0 {
return ps[len(ps)-1].End.Sub(ps[0].Start)
}
return time.Duration(0)
}
//Duration is sum of all period durations
func (ps Periods) Duration() time.Duration {
dur := time.Duration(0)
for _, p := range ps {
dur += p.Duration()
}
return dur
}
//Gaps is (Span - Duration), i.e. time between periods
func (ps Periods) Gaps() time.Duration {
return ps.Span() - ps.Duration()
}
package logs_test
import (
"testing"
"time"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/logs"
)
func TestPeriods(t *testing.T) {
logger.SetGlobalFormat(logger.NewConsole())
logger.SetGlobalLevel(logger.LevelDebug)
t0 := time.Date(2021, 01, 01, 0, 0, 0, 0, time.Now().Location())
ps := logs.NewPeriods(t0, t0.Add(time.Hour))
t.Log(ps)
//ps: 0..60
//split[0]
ps1 := ps.Without(logs.Period{Start: t0.Add(time.Minute * 5), End: t0.Add(time.Minute * 10)})
t.Log(ps1)
//-(5..10) -> ps1: 0..5, 10..60
//split[1]
ps2 := ps1.Without(logs.Period{Start: t0.Add(time.Minute * 15), End: t0.Add(time.Minute * 20)})
t.Log(ps2)
//-(15..20) -> ps1: 0..5, 10..15, 20..60
//trim head[2]
ps3 := ps2.Without(logs.Period{Start: t0.Add(time.Minute * 18), End: t0.Add(time.Minute * 21)})
t.Log(ps3)
//-(18..21) -> ps1: 0..5, 10..15, 21..60
//trim tail[1]
ps4 := ps3.Without(logs.Period{Start: t0.Add(time.Minute * 14), End: t0.Add(time.Minute * 19)})
t.Log(ps4)
//-(14..19) -> ps1: 0..5, 10..14, 21..60
//tail, delete, head
ps5 := ps4.Without(logs.Period{Start: t0.Add(time.Minute * 4), End: t0.Add(time.Minute * 22)})
t.Log(ps5)
//-(4..22) -> ps1: 0..4, 22..60
//over start
ps6 := ps5.Without(logs.Period{Start: t0.Add(-time.Minute * 1), End: t0.Add(time.Minute * 2)})
t.Log(ps6)
//-(-1..2) -> ps1: 2..4, 22..60
//over end
ps7 := ps6.Without(logs.Period{Start: t0.Add(time.Minute * 50), End: t0.Add(time.Minute * 120)})
t.Log(ps7)
//-(50..120) -> ps1: 2..4, 22..50
//all
ps8 := ps7.Without(logs.Period{Start: t0.Add(time.Minute * 0), End: t0.Add(time.Minute * 120)})
t.Log(ps8)
//-(0..120) -> ps1: nil
}
package logs
import (
"encoding/json"
"fmt"
"os"
"time"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
)
//Call this at the end of an SQS event handler to capture the req and result as well as all actions taken during the processing
//(note: action list is only reset when this is called - so must be called after each handler, else action list has to be reset at the start)
func LogSQSRequest(startTime time.Time,
requestID string, //from API
messageType string,
req interface{},
handlerErr error,
) error {
if producer == nil {
return errors.Errorf("logs queue producer not set")
}
if !sqsLogEnabled {
return nil
}
endTime := time.Now()
log := ApiLog{
StartTime: startTime,
EndTime: endTime,
DurMs: endTime.Sub(startTime).Milliseconds(),
RequestID: requestID,
Method: "SQS",
Path: messageType,
}
if req != nil {
if reqString, ok := req.(string); ok {
log.Request.Body = reqString //do not marshal else we have double-escaped JSON
log.Request.BodySize = len(reqString)
} else {
if jsonReq, err := json.Marshal(req); err == nil {
log.Request.Body = string(jsonReq)
log.Request.BodySize = len(log.Request.Body)
}
}
}
if handlerErr == nil {
log.ResponseCode = 0
} else {
log.ResponseCode = 1
errorInfo := ErrorInfo{
Error: handlerErr.Error(),
Details: fmt.Sprintf("%+v", handlerErr),
}
jsonError, _ := json.Marshal(errorInfo)
log.Response.Body = string(jsonError)
}
//copy then reset actionList for the next handler
actionListMutex.Lock()
actionList = []ActionLog{}
actionListMutex.Unlock()
//todo: filter out sensitive values (e.g. OTP)
//note: we send SQS logs to "API_LOGS" which already exists... should be renamed to simply "LOGS"
//it use the same structure, but method="SQS" and path="messageType" and request is the event body
//so they can be plotted on the same dashboard visualisation in OpenSearch with all the same filters/metrics
if _, err := producer.NewEvent("API_LOGS").
Type("api-log").
RequestID(requestID).
Send(log); err != nil {
return errors.Wrapf(err, "failed to send api-log for SQS")
}
return nil
}
var sqsLogEnabled = false
func init() {
envSetting := os.Getenv("SQS_LOGS_ENABLED")
if envSetting == "true" {
sqsLogEnabled = true
}
//if consuming from API_LOGS, do not enable else we will consume and send to our own queue!
logger.Infof("Environment SQS_LOGS_ENABLED=\"%s\" -> sqsLogsEnabled=%v", envSetting, sqsLogEnabled)
}
type ErrorInfo struct {
Error string `json:"error"`
Details string `json:"details"`
}