diff --git a/api/README.md b/api/README.md index a464c77100fefa86aa1fbef21ba3cdbeee40b810..1435a4923f7bcfe3e7200523bc42ea4eacc88135 100644 --- a/api/README.md +++ b/api/README.md @@ -204,9 +204,7 @@ API documentation will eventually be generated from doc tags in your structs. Th # API Logger -API uses the go-utils/logger at the moment. Eventually it will be possible to use other loggers, and customise api logs. Audits can already be customised using ```api.WithAuditor()```. - -By default, go-utils/logger writes JSON records. At the start of the example api main function the logger is configured to write console format and log at DEBUG level... more to come on this front too including triggers for full debug in production on selected handlers or specific events, and logging different levels for selected code packages to avoid log clutter without having to remove debug from code. +API Logs are written from global variables using logs.LogIncomingAPIRequest() # Router Path Parameters diff --git a/api/api.go b/api/api.go index 219cfa743d43c0f82ef1e4d728408e728eacfbaa..5cd5e16a794f0331ba5550cb33945509d9e64edf 100644 --- a/api/api.go +++ b/api/api.go @@ -6,10 +6,10 @@ import ( "runtime/debug" "github.com/aws/aws-lambda-go/lambda" - "gitlab.com/uafrica/go-utils/audit" + "gitlab.com/uafrica/go-utils/consumer/mem_consumer" "gitlab.com/uafrica/go-utils/errors" - queues_mem "gitlab.com/uafrica/go-utils/queues/mem" - queues_sqs "gitlab.com/uafrica/go-utils/queues/sqs" + "gitlab.com/uafrica/go-utils/queues" + "gitlab.com/uafrica/go-utils/queues/sqs_producer" "gitlab.com/uafrica/go-utils/service" "gitlab.com/uafrica/go-utils/string_utils" ) @@ -52,19 +52,13 @@ type Api struct { } //wrap Service.WithStarter to return api, else cannot be chained -func (api Api) WithStarter(name string, starter service.IStarter) Api { +func (api Api) WithStarter(name string, starter service.Starter) Api { api.Service = api.Service.WithStarter(name, starter) return api } //wrap else cannot be chained -func (api Api) WithAuditor(auditor audit.Auditor) Api { - api.Service = api.Service.WithAuditor(auditor) - return api -} - -//wrap else cannot be chained -func (api Api) WithProducer(producer service.Producer) Api { +func (api Api) WithProducer(producer queues.Producer) Api { api.Service = api.Service.WithProducer(producer) return api } @@ -126,11 +120,11 @@ func (api Api) Run() { if (os.Getenv("LOG_LEVEL") == "debug") && api.localQueueEventHandlers != nil { //use in-memory channels for async events api.Debugf("Using in-memory channels for async events ...") - api = api.WithProducer(queues_mem.NewProducer(queues_mem.NewConsumer(api.Service, api.localQueueEventHandlers))) + api = api.WithProducer(mem_consumer.NewProducer(mem_consumer.New(api.Service, api.localQueueEventHandlers))) } else { //use SQS for async events api.Debugf("Using SQS queue producer for async events ...") - api = api.WithProducer(queues_sqs.NewProducer(api.requestIDHeaderKey)) + api = api.WithProducer(sqs_producer.New(api.requestIDHeaderKey)) } //run as an AWS Lambda function diff --git a/api/lambda.go b/api/lambda.go index 3d2479e19a8ff1b07802421cbae78317b94fdf1d..a36bfbf1bccaf0e905b863615b77390ceda83be7 100644 --- a/api/lambda.go +++ b/api/lambda.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-lambda-go/lambdacontext" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/logs" ) func (api Api) NewContext(baseCtx context.Context, requestID string, request events.APIGatewayProxyRequest) (Context, error) { @@ -57,7 +58,6 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat if err != nil { return res, err } - //report handler crashes if api.crashReporter != nil { @@ -99,14 +99,8 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat if api.requestIDHeaderKey != "" { res.Headers[api.requestIDHeaderKey] = Ctx.RequestID() } - if err := api.Service.WriteValues(Ctx.StartTime(), time.Now(), Ctx.RequestID(), map[string]interface{}{ - "direction": "incoming", - "type": "api", - "request_id": Ctx.RequestID(), - "request": Ctx.Request(), - "response": res}, - ); err != nil { - Ctx.Errorf("failed to audit: %+v", err) + if err := logs.LogIncomingAPIRequest(Ctx.StartTime(), apiGatewayProxyReq, res); err != nil { + Ctx.Errorf("failed to log: %+v", err) } }() diff --git a/audit/audit.go b/audit/audit.go deleted file mode 100644 index 2fdf2bfc864d40cf2e791f941d5c49fe775b98dd..0000000000000000000000000000000000000000 --- a/audit/audit.go +++ /dev/null @@ -1,8 +0,0 @@ -package audit - -import "time" - -type Auditor interface { - WriteValues(startTime, endTime time.Time, requestID string, values map[string]interface{}) error - WriteEvent(requestID string, event Event) error -} diff --git a/audit/event.go b/audit/change.go similarity index 79% rename from audit/event.go rename to audit/change.go index dc61743472d209b2f5d99e39027e908a7198d0e7..e1d9649f761fa17677e7388a4e515ca45575d609 100644 --- a/audit/event.go +++ b/audit/change.go @@ -9,26 +9,60 @@ import ( "time" "github.com/r3labs/diff/v2" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/reflection" ) -type Event struct { +var producer queues.Producer + +func Init(p queues.Producer) { + producer = p +} + +func SaveDataChange( + requestID string, + source string, + eventType string, + orgValue interface{}, + newValue interface{}, +) error { + if producer == nil { + return errors.Errorf("audit queue producer not set") + } + + changeRecord, err := NewChangeRecord(source, eventType, orgValue, newValue) + if err != nil { + return errors.Wrapf(err, "fail to determine changes") + } + if _, err := producer.NewEvent("AUDIT"). + Type("audit"). + RequestID(requestID). + Send(changeRecord); err != nil { + return errors.Wrapf(err, "failed to send data change record") + } + return nil +} + +type ChangeRecord struct { ID int64 `json:"id"` ObjectID string `json:"object_id"` Type string `json:"type"` Source string `json:"source"` Timestamp time.Time `json:"timestamp"` - Change map[string]interface{} `json:"change"` + Changes map[string]interface{} `json:"changes"` } +//purpose: +// Creates a record describing a change of data //parameters: // source could be "" then defaults to "SYSTEM" or specify the user name that made the change // orgValue and newValue could be nil -// they are compared and changes are logged -func NewEvent(source string, eventType string, orgValue, newValue interface{}) (Event, error) { +// they are compared and changes are recorded +func NewChangeRecord(source string, eventType string, orgValue, newValue interface{}) (ChangeRecord, error) { changelog, err := diff.Diff(orgValue, newValue) if err != nil { - return Event{}, err + return ChangeRecord{}, err } changes := map[string]interface{}{} @@ -129,12 +163,12 @@ func NewEvent(source string, eventType string, orgValue, newValue interface{}) ( objectIDString = getStringValue(newValue, "Key") } - event := Event{ + event := ChangeRecord{ ObjectID: objectIDString, Source: source, Type: eventType, Timestamp: time.Now(), - Change: changes, + Changes: changes, } if event.Source == "" { event.Source = "SYSTEM" diff --git a/audit/file_audit.go b/audit/file_audit.go deleted file mode 100644 index 7d340e1bd065ac93ddbe1485781df3cda14d4d57..0000000000000000000000000000000000000000 --- a/audit/file_audit.go +++ /dev/null @@ -1,65 +0,0 @@ -package audit - -import ( - "encoding/json" - "os" - "time" - - "gitlab.com/uafrica/go-utils/errors" -) - -//creates auditor that writes to file, which could be os.Stderr or os.Stdout for debugging -func File(f *os.File) Auditor { - if f == nil { - panic(errors.Errorf("cannot create file auditor with f=nil")) - } - return &fileAudit{ - f: f, - } -} - -type fileAudit struct { - f *os.File -} - -func (fa fileAudit) WriteValues(startTime, endTime time.Time, requestID string, values map[string]interface{}) error { - if fa.f == nil { - return errors.Errorf("auditor is closed") - } - obj := map[string]interface{}{ - "start_time": startTime, - "end_time": endTime, - "duration": endTime.Sub(startTime), - "request_id": requestID, - "values": values, - } - jsonObj, err := json.Marshal(obj) - if err != nil { - return errors.Wrapf(err, "failed to JSON encode audit values") - } - if _, err := fa.f.Write(jsonObj); err != nil { - return errors.Wrapf(err, "failed to write audit values to file") - } - return nil -} - -func (fa fileAudit) WriteEvent(requestID string, event Event) error { - if fa.f == nil { - return errors.Errorf("auditor is closed") - } - obj := map[string]interface{}{ - "start_time": event.Timestamp, - "end_time": event.Timestamp, - "duration": 0, - "request_id": requestID, - "values": event, - } - jsonObj, err := json.Marshal(obj) - if err != nil { - return errors.Wrapf(err, "failed to JSON encode audit event") - } - if _, err := fa.f.Write(jsonObj); err != nil { - return errors.Wrapf(err, "failed to write audit event to file") - } - return nil -} diff --git a/audit/no_audit.go b/audit/no_audit.go deleted file mode 100644 index 930f4134f7c4f2a820bea8ca64e6982bd90d4656..0000000000000000000000000000000000000000 --- a/audit/no_audit.go +++ /dev/null @@ -1,20 +0,0 @@ -package audit - -import ( - "time" -) - -//creates auditor that writes nothiong -func None() Auditor { - return noAudit{} -} - -type noAudit struct{} - -func (noAudit) WriteValues(startTime, endTime time.Time, requestID string, values map[string]interface{}) error { - return nil -} - -func (noAudit) WriteEvent(requestID string, event Event) error { - return nil -} diff --git a/consumer/README.md b/consumer/README.md new file mode 100644 index 0000000000000000000000000000000000000000..2d8745e39563b1157a71b7b26f79dc5929bf4e23 --- /dev/null +++ b/consumer/README.md @@ -0,0 +1,4 @@ +# Consumer + +Consumes a queue of events in the same way that API processes HTTP requests. +Consumer is a type of service, just like API and CRON are also types of services. diff --git a/consumer/check.go b/consumer/check.go new file mode 100644 index 0000000000000000000000000000000000000000..3b77c4f07994ceb0a78d7ab36d1d80c46e1bba52 --- /dev/null +++ b/consumer/check.go @@ -0,0 +1,7 @@ +package consumer + +import "gitlab.com/uafrica/go-utils/service" + +type Checker interface { + Check(service.Context) (interface{}, error) +} diff --git a/queues/consumer.go b/consumer/consumer.go similarity index 70% rename from queues/consumer.go rename to consumer/consumer.go index ec4c50db8dc3f9b01f514ed232dac34285337e86..6278ae91c0d81c9bffcff33d5b5656191bd32640 100644 --- a/queues/consumer.go +++ b/consumer/consumer.go @@ -1,10 +1,10 @@ -package queues +package consumer import "gitlab.com/uafrica/go-utils/service" //IConsumer is the interface implemented by both mem and sqs consumer type Consumer interface { - WithStarter(name string, starter service.IStarter) Consumer + WithStarter(name string, starter service.Starter) Consumer Run() ProcessFile(filename string) error } diff --git a/queues/context.go b/consumer/context.go similarity index 84% rename from queues/context.go rename to consumer/context.go index c3321fecabf9238b4dd28da4616ede30bbb3dc99..ef182c8b0bfb033ed072928ec2d202cf4efecbaf 100644 --- a/queues/context.go +++ b/consumer/context.go @@ -1,4 +1,4 @@ -package queues +package consumer import ( "context" @@ -6,13 +6,14 @@ import ( "reflect" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) //Context within a consumer to process an event type Context interface { service.Context - Event() service.Event //the event start started this context in the consumer + Event() queues.Event //the event start started this context in the consumer GetRecord(recordType reflect.Type) (interface{}, error) //extract struct value from event data } @@ -20,10 +21,10 @@ var contextInterfaceType = reflect.TypeOf((*Context)(nil)).Elem() type queuesContext struct { service.Context - event service.Event + event queues.Event } -func NewContext(service service.Service, event service.Event) (Context, error) { +func NewContext(service service.Service, event queues.Event) (Context, error) { baseCtx := context.Background() serviceContext, err := service.NewContext(baseCtx, event.RequestIDValue, map[string]interface{}{ "message_type": event.TypeName, @@ -39,7 +40,7 @@ func NewContext(service service.Service, event service.Event) (Context, error) { return ctx, nil } -func (ctx queuesContext) Event() service.Event { +func (ctx queuesContext) Event() queues.Event { return ctx.event } diff --git a/queues/handler.go b/consumer/handler.go similarity index 98% rename from queues/handler.go rename to consumer/handler.go index 36ba9ec40d196177c7ce54f6e638471ba3935f0a..bc09bd749c64125e21c985d98ff088e1c89be7eb 100644 --- a/queues/handler.go +++ b/consumer/handler.go @@ -1,4 +1,4 @@ -package queues +package consumer import ( "reflect" diff --git a/queues/mem/README.md b/consumer/mem_consumer/README.md similarity index 100% rename from queues/mem/README.md rename to consumer/mem_consumer/README.md diff --git a/queues/mem/consumer.go b/consumer/mem_consumer/consumer.go similarity index 76% rename from queues/mem/consumer.go rename to consumer/mem_consumer/consumer.go index 317d1dc33622fc5e4f3377c3cd139252c69b8f8c..99274c99b52c114ca4e9615545eec0e6d2019f19 100644 --- a/queues/mem/consumer.go +++ b/consumer/mem_consumer/consumer.go @@ -1,4 +1,4 @@ -package mem +package mem_consumer import ( "encoding/json" @@ -10,55 +10,49 @@ import ( "time" "github.com/google/uuid" - "gitlab.com/uafrica/go-utils/audit" + "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/service" ) -func NewConsumer(s service.Service, routes map[string]interface{}) queues.Consumer { +func New(s service.Service, routes map[string]interface{}) consumer.Consumer { if s == nil { panic("NewConsumer(service==nil)") } - router, err := queues.NewRouter(routes) + router, err := consumer.NewRouter(routes) if err != nil { panic(fmt.Sprintf("cannot create router: %+v", err)) } - c := &consumer{ + c := &memConsumer{ Service: s, router: router, queues: map[string]*queue{}, } //create a producer that will produce into this consumer - c.producer = &producer{ + c.producer = &memProducer{ consumer: c, } c.Service = c.Service.WithProducer(c.producer) return c } -type consumer struct { +type memConsumer struct { sync.Mutex service.Service - router queues.Router - producer *producer + router consumer.Router + producer queues.Producer queues map[string]*queue } //wrap Service.WithStarter to return cron, else cannot be chained -func (consumer *consumer) WithStarter(name string, starter service.IStarter) queues.Consumer { +func (consumer *memConsumer) WithStarter(name string, starter service.Starter) consumer.Consumer { consumer.Service = consumer.Service.WithStarter(name, starter) return consumer } -//wrap else cannot be chained -func (consumer *consumer) WithAuditor(auditor audit.Auditor) queues.Consumer { - consumer.Service = consumer.Service.WithAuditor(auditor) - return consumer -} - -func (consumer *consumer) Queue(name string) (*queue, error) { +func (consumer *memConsumer) Queue(name string) (*queue, error) { consumer.Lock() defer consumer.Unlock() q, ok := consumer.queues[name] @@ -66,7 +60,7 @@ func (consumer *consumer) Queue(name string) (*queue, error) { q = &queue{ consumer: consumer, name: name, - ch: make(chan service.Event), + ch: make(chan queues.Event), } go q.run() consumer.queues[name] = q @@ -76,20 +70,20 @@ func (consumer *consumer) Queue(name string) (*queue, error) { //do not call this - when using local producer, the consumer is automatically running //for each queue you send to, and processing from q.run() -func (consumer *consumer) Run() { +func (consumer *memConsumer) Run() { panic(errors.Errorf("DO NOT RUN LOCAL CONSUMER")) } -func (consumer *consumer) ProcessFile(filename string) error { +func (consumer *memConsumer) ProcessFile(filename string) error { f, err := os.Open(filename) if err != nil { return errors.Wrapf(err, "failed to open queue event file %s", filename) } defer f.Close() - var event service.Event + var event queues.Event if err := json.NewDecoder(f).Decode(&event); err != nil { - return errors.Wrapf(err, "failed to read service.Event from file %s", filename) + return errors.Wrapf(err, "failed to read queues.Event from file %s", filename) } q := queue{ @@ -107,16 +101,16 @@ func (consumer *consumer) ProcessFile(filename string) error { } type queue struct { - consumer *consumer + consumer *memConsumer name string - ch chan service.Event + ch chan queues.Event } func (q *queue) run() { // logger.Debugf("Q(%s) Start", q.name) for event := range q.ch { //process in background because some event processing sends to itself then wait for some responses on new events on the same queue!!! - go func(event service.Event) { + go func(event queues.Event) { // logger.Debugf("Q(%s) process start: %+v", q.name, event) err := q.process(event) if err != nil { @@ -129,7 +123,7 @@ func (q *queue) run() { // logger.Debugf("Q(%s) STOPPED", q.name) } -func (q *queue) process(event service.Event) error { +func (q *queue) process(event queues.Event) error { //todo: create context with logger rand.Seed(time.Now().Unix()) @@ -137,7 +131,7 @@ func (q *queue) process(event service.Event) error { // if q.crashReporter != nil { // defer q.crashReporter.Catch(ctx) // } - ctx, err := queues.NewContext(q.consumer.Service, event) + ctx, err := consumer.NewContext(q.consumer.Service, event) if err != nil { return err } @@ -147,7 +141,7 @@ func (q *queue) process(event service.Event) error { if err != nil { return errors.Wrapf(err, "unhandled event type(%v)", event.TypeName) } - handler, ok := sqsHandler.(queues.Handler) + handler, ok := sqsHandler.(consumer.Handler) if !ok { return errors.Errorf("messageType(%v) unsupported signature: %T", event.TypeName, sqsHandler) } @@ -185,7 +179,7 @@ func (q *queue) process(event service.Event) error { return nil } //queue.process() -func (q *queue) Send(event service.Event) (msgID string, err error) { +func (q *queue) Send(event queues.Event) (msgID string, err error) { event.MessageID = uuid.New().String() q.ch <- event return event.MessageID, nil diff --git a/queues/mem/producer.go b/consumer/mem_consumer/producer.go similarity index 54% rename from queues/mem/producer.go rename to consumer/mem_consumer/producer.go index b32c4c44057d4e135c1e1c741ac94d8da2266237..f1b51f070368c46af97a88daa0d748d47ebe06e8 100644 --- a/queues/mem/producer.go +++ b/consumer/mem_consumer/producer.go @@ -1,29 +1,29 @@ -package mem +package mem_consumer import ( + "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" "gitlab.com/uafrica/go-utils/queues" - "gitlab.com/uafrica/go-utils/service" ) //can only produce locally if also consuming local -func NewProducer(memConsumer queues.Consumer) service.Producer { - if memConsumer == nil { - panic(errors.Errorf("cannot product locally without consumer")) +func NewProducer(consumer consumer.Consumer) queues.Producer { + if consumer == nil { + panic(errors.Errorf("cannot produce local events without mem consumer")) } - mc, ok := memConsumer.(*consumer) + mc, ok := consumer.(*memConsumer) if !ok { - panic(errors.Errorf("NewProducer(%T) is not a mem consumer", memConsumer)) + panic(errors.Errorf("NewProducer(consumer=%T) is not a mem consumer", consumer)) } return mc.producer } -type producer struct { - consumer *consumer +type memProducer struct { + consumer *memConsumer } -func (producer *producer) Send(event service.Event) (string, error) { +func (producer *memProducer) Send(event queues.Event) (string, error) { logger.Debugf("MEM producer.queue(%s) Sending event %+v", event.QueueName, event) q, err := producer.consumer.Queue(event.QueueName) if err != nil { @@ -38,3 +38,7 @@ func (producer *producer) Send(event service.Event) (string, error) { logger.Debugf("MEM producer.queue(%s) SENT event %+v", event.QueueName, event) return msgID, nil } + +func (producer *memProducer) NewEvent(queueName string) queues.Event { + return queues.NewEvent(producer, queueName) +} diff --git a/queues/router.go b/consumer/router.go similarity index 99% rename from queues/router.go rename to consumer/router.go index f2357365b0e2b7a16d10bfd8ab741c91f3b22ba7..744cf1a5d5ddb8b3f861861b8799b758dfb06e29 100644 --- a/queues/router.go +++ b/consumer/router.go @@ -1,4 +1,4 @@ -package queues +package consumer import ( "fmt" diff --git a/queues/sqs/consumer.go b/consumer/sqs_consumer/consumer.go similarity index 67% rename from queues/sqs/consumer.go rename to consumer/sqs_consumer/consumer.go index 1c31e5f6412fb8f29b15f016a645e9c12874326c..bbf2e5ba61122ac030ea812b113d08ad21f842b1 100644 --- a/queues/sqs/consumer.go +++ b/consumer/sqs_consumer/consumer.go @@ -1,4 +1,4 @@ -package sqs +package sqs_consumer import ( "context" @@ -15,18 +15,20 @@ import ( "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" "github.com/google/uuid" - "gitlab.com/uafrica/go-utils/audit" + "gitlab.com/uafrica/go-utils/consumer" "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/queues" + "gitlab.com/uafrica/go-utils/queues/sqs_producer" "gitlab.com/uafrica/go-utils/service" ) -func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queues.Consumer { +func New(requestIDHeaderKey string, routes map[string]interface{}) consumer.Consumer { env := os.Getenv("ENVIRONMENT") //todo: support config loading for local dev and env for lambda in prod if env == "" { env = "dev" } - router, err := queues.NewRouter(routes) + router, err := consumer.NewRouter(routes) if err != nil { panic(fmt.Sprintf("cannot create router: %+v", err)) } @@ -41,43 +43,38 @@ func NewConsumer(requestIDHeaderKey string, routes map[string]interface{}) queue } s := service.New(). - WithProducer(NewProducer(requestIDHeaderKey)) - return consumer{ + WithProducer(sqs_producer.New(requestIDHeaderKey)) + + return sqsConsumer{ Service: s, env: env, router: router, requestIDHeaderKey: requestIDHeaderKey, ConstantMessageType: sqsMessageType, - checks: map[string]queues.ICheck{}, + checks: map[string]consumer.Checker{}, } } -type consumer struct { +type sqsConsumer struct { service.Service env string - router queues.Router + router consumer.Router requestIDHeaderKey string ConstantMessageType string //from os.Getenv("SQS_MESSAGE_TYPE") - checks map[string]queues.ICheck + checks map[string]consumer.Checker } //wrap Service.WithStarter to return cron, else cannot be chained -func (consumer consumer) WithStarter(name string, starter service.IStarter) queues.Consumer { - consumer.Service = consumer.Service.WithStarter(name, starter) - return consumer -} - -//wrap else cannot be chained -func (consumer consumer) WithAuditor(auditor audit.Auditor) queues.Consumer { - consumer.Service = consumer.Service.WithAuditor(auditor) - return consumer +func (c sqsConsumer) WithStarter(name string, starter service.Starter) consumer.Consumer { + c.Service = c.Service.WithStarter(name, starter) + return c } -func (consumer consumer) Run() { - lambda.Start(consumer.Handler) +func (c sqsConsumer) Run() { + lambda.Start(c.Handler) } -func (consumer consumer) ProcessFile(filename string) error { +func (c sqsConsumer) ProcessFile(filename string) error { f, err := os.Open(filename) if err != nil { return errors.Wrapf(err, "failed to open queue event file %s", filename) @@ -89,7 +86,7 @@ func (consumer consumer) ProcessFile(filename string) error { return errors.Wrapf(err, "failed to read sqs event from file %s", filename) } - if consumer.Handler( + if c.Handler( lambdacontext.NewContext( context.Background(), &lambdacontext.LambdaContext{ @@ -106,7 +103,7 @@ func (consumer consumer) ProcessFile(filename string) error { return nil } -func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error { +func (c sqsConsumer) Handler(baseCtx context.Context, lambdaEvent events.SQSEvent) error { //todo: create context with logger rand.Seed(time.Now().Unix()) @@ -115,17 +112,18 @@ func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQS // defer sqs.crashReporter.Catch(ctx) // } - if consumer.ConstantMessageType != "" { + if c.ConstantMessageType != "" { //legacy mode for fixed message type as used in shiplogic //where the whole instance is started for a specific SQS_MESSAGE_TYPE defined in environment - handler, err := consumer.router.Route(consumer.ConstantMessageType) + handler, err := c.router.Route(c.ConstantMessageType) if err != nil { - return errors.Wrapf(err, "messageType=%s not handled", consumer.ConstantMessageType) //checked on startup - should never get here!!! + return errors.Wrapf(err, "messageType=%s not handled", c.ConstantMessageType) //checked on startup - should never get here!!! } if msgHandler, ok := handler.(func(events.SQSEvent) error); !ok { - return errors.Wrapf(err, "SQS_MESSAGE_TYPE=%s: handler signature %T not supported", consumer.ConstantMessageType, handler) + return errors.Wrapf(err, "SQS_MESSAGE_TYPE=%s: handler signature %T not supported", c.ConstantMessageType, handler) } else { + //call the handler return msgHandler(lambdaEvent) } } else { @@ -133,20 +131,36 @@ func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQS //process all message records in this event: for messageIndex, message := range lambdaEvent.Records { //get request-id for this message record + startTime := time.Now() requestID := "" - if requestIDAttr, ok := message.MessageAttributes[consumer.requestIDHeaderKey]; ok { + if requestIDAttr, ok := message.MessageAttributes[c.requestIDHeaderKey]; ok { requestID = *requestIDAttr.StringValue } messageType := "" + var requestToLog interface{} + var handlerErr error + requestToLog = message.Body //will be logged as string if failed before parsing body into struct + defer func() { + if err := logs.LogSQSRequest( + startTime, + requestID, + messageType, + requestToLog, + handlerErr, + ); err != nil { + c.Errorf("failed to log: %+v", err) + } + }() + if messageTypeAttr, ok := message.MessageAttributes["type"]; !ok || messageTypeAttr.StringValue == nil { - consumer.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required + c.Errorf("ignoring message without messageType") //todo: could support generic handler for these... not yet required continue } else { messageType = *messageTypeAttr.StringValue } - event := service.Event{ + event := queues.Event{ //producer: nil, MessageID: message.MessageId, QueueName: "N/A", //not sure how to get queue name from lambda Event... would be good to log it, may be in os.Getenv(???)? @@ -156,7 +170,7 @@ func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQS BodyJSON: message.Body, } - ctx, err := queues.NewContext(consumer.Service, event) + ctx, err := consumer.NewContext(c.Service, event) if err != nil { return err } @@ -167,12 +181,12 @@ func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQS }).Infof("Queue(%s) Start SQS Handler Event: %v", ctx.Event().QueueName, ctx.Event()) //routing on messageType - sqsHandler, err := consumer.router.Route(messageType) + sqsHandler, err := c.router.Route(messageType) if err != nil { ctx.Errorf("Unhandled sqs messageType(%v): %v", messageType, err) continue } - handler, ok := sqsHandler.(queues.Handler) + handler, ok := sqsHandler.(consumer.Handler) if !ok { ctx.Errorf("messageType(%v) unsupported signature: %T", messageType, sqsHandler) continue @@ -189,13 +203,15 @@ func (consumer consumer) Handler(baseCtx context.Context, lambdaEvent events.SQS ctx.Errorf("invalid message: %+v", err) continue } + requestToLog = recordStruct //replace string log with structured log ctx.Debugf("message (%T) %+v", recordStruct, recordStruct) args = append(args, reflect.ValueOf(recordStruct)) results := handler.FuncValue.Call(args) if len(results) > 0 && !results[0].IsNil() { - ctx.Errorf("handler failed: %+v", results[0].Interface().(error)) + handlerErr = results[0].Interface().(error) + ctx.Errorf("handler failed: %+v", handlerErr) } } } diff --git a/cron/cron.go b/cron/cron.go index 5a9dff266b6ef2411f276cefa5599b1199b839ed..114e106cfde429a8bed1e8cc5d741bb4d94fcd2e 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -8,7 +8,6 @@ import ( "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/lambdacontext" "github.com/google/uuid" - "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" "gitlab.com/uafrica/go-utils/service" @@ -44,17 +43,11 @@ type Cron struct { } //wrap Service.WithStarter to return cron, else cannot be chained -func (cron Cron) WithStarter(name string, starter service.IStarter) Cron { +func (cron Cron) WithStarter(name string, starter service.Starter) Cron { cron.Service = cron.Service.WithStarter(name, starter) return cron } -//wrap else cannot be chained -func (cron Cron) WithAuditor(auditor audit.Auditor) Cron { - cron.Service = cron.Service.WithAuditor(auditor) - return cron -} - //add a check to startup of each context //they will be called in the sequence they were added //if check return error, processing stops and err is returned diff --git a/cron/handler.go b/cron/handler.go index 956e14cb3d20992cfcacac4b34dc1f58a1e7b108..814a41ead1696e110d73ee03b7aad7f158175da5 100644 --- a/cron/handler.go +++ b/cron/handler.go @@ -22,7 +22,7 @@ func NewHandler(fnc interface{}) (Handler, error) { return h, errors.Errorf("returns %d results instead of (error)", fncType.NumOut()) } - //arg[0] must implement interface queues.Context + //arg[0] must implement interface consumer.Context if fncType.In(0) != contextInterfaceType && !fncType.In(0).Implements(contextInterfaceType) { return h, errors.Errorf("first arg %v does not implement %v", fncType.In(0), contextInterfaceType) diff --git a/examples/core/api/main.go b/examples/core/api/main.go index b6907329733375ebc52cf6bbf257accd783ea2d7..5fe8aa1c8002719e75a26797e18943067342fc11 100644 --- a/examples/core/api/main.go +++ b/examples/core/api/main.go @@ -6,7 +6,6 @@ import ( "os" "gitlab.com/uafrica/go-utils/api" - "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/examples/core/app" "gitlab.com/uafrica/go-utils/examples/core/db" @@ -22,7 +21,6 @@ func main() { WithCheck("maintenance", maint{}). WithCheck("rate", rateLimiter{}). WithCORS(cors{}). - WithAuditor(audit.File(os.Stdout)). WithEvents(app.QueueRoutes()). //only used when LOG_LEVEL="debug" Run() } diff --git a/examples/core/app/users/users.go b/examples/core/app/users/users.go index 93e094a2aaf803f1c3809d6bb00b17d747458fc6..5599be9e7793a29a67e3aa2fbbef63e15a4e4968 100644 --- a/examples/core/app/users/users.go +++ b/examples/core/app/users/users.go @@ -11,7 +11,7 @@ import ( "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/examples/core/email" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/service" + "gitlab.com/uafrica/go-utils/queues" ) type User struct { @@ -113,7 +113,7 @@ func Add(ctx api.Context, params noParams, newUser POSTUser) (User, error) { Subject: "Welcome User", Body: "Your account has been created", } - /*eventID*/ _, err := service.NewEvent(ctx, "notify").RequestID(ctx.RequestID()).Type("email").Delay(time.Second * 5).Params(map[string]string{}).Send(email) + /*eventID*/ _, err := queues.NewEvent(ctx, "notify").RequestID(ctx.RequestID()).Type("email").Delay(time.Second * 5).Params(map[string]string{}).Send(email) if err != nil { ctx.Errorf("failed to notify: %+v", err) } diff --git a/examples/core/cron/main.go b/examples/core/cron/main.go index f4f400ffcb1c19be1e908276c0ec8d373502942b..66a494b7907815e92449cb379723cda9703ed363 100644 --- a/examples/core/cron/main.go +++ b/examples/core/cron/main.go @@ -25,7 +25,6 @@ func main() { cron.New(app.CronRoutes()). WithStarter("db", db.Connector("core")). - //WithAuditor(audit{}). Run(invokeArnPtr) } diff --git a/examples/core/db/database.go b/examples/core/db/database.go index 8021c42f85802f829e1640840ef425c057abcfe7..f694c23fcf4cd0f6ce7af2867bedf22484fa39f8 100644 --- a/examples/core/db/database.go +++ b/examples/core/db/database.go @@ -7,14 +7,14 @@ import ( "gitlab.com/uafrica/go-utils/service" ) -func Connector(dbName string) service.IStarter { +func Connector(dbName string) service.Starter { return &connector{ name: dbName, conn: 0, } } -//connector implements service.IStarter +//connector implements service.Starter type connector struct { name string conn int diff --git a/examples/core/email/notify.go b/examples/core/email/notify.go index 206f2be9ee540314b853eeb68843298a2233467b..07a1149cec6d7854b88dfaf4cc71a3a7f96aad4b 100644 --- a/examples/core/email/notify.go +++ b/examples/core/email/notify.go @@ -1,8 +1,6 @@ package email -import ( - "gitlab.com/uafrica/go-utils/queues" -) +import "gitlab.com/uafrica/go-utils/service" type Message struct { From string @@ -13,7 +11,7 @@ type Message struct { Body string } -func Notify(ctx queues.Context, msg Message) error { +func Notify(ctx service.Context, msg Message) error { ctx.Debugf("Pretending to send email: %+v", msg) return nil } diff --git a/examples/core/sqs/main.go b/examples/core/sqs/main.go index 7a019a7cf89b1d58996f51440fdd44cfa659c49f..276fb5123898b4de7d95293b308ea8c7083e4649 100644 --- a/examples/core/sqs/main.go +++ b/examples/core/sqs/main.go @@ -4,10 +4,10 @@ import ( "flag" "gitlab.com/uafrica/go-utils/config" + "gitlab.com/uafrica/go-utils/consumer/sqs_consumer" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/examples/core/db" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/queues/sqs" ) func main() { @@ -16,7 +16,7 @@ func main() { sqsRoutes := map[string]interface{}{} - consumer := sqs.NewConsumer("uafrica-request-id", sqsRoutes). + consumer := sqs_consumer.New("uafrica-request-id", sqsRoutes). WithStarter("db", db.Connector("core")) if reqFile != nil && *reqFile != "" { diff --git a/logs/README.md b/logs/README.md new file mode 100644 index 0000000000000000000000000000000000000000..75ec01279321db81b613de502bc76e92867d3974 --- /dev/null +++ b/logs/README.md @@ -0,0 +1,43 @@ +# Logs + +This package provides functions to log API, SQS and CRON events, capturing for example an API method and path along with request and response details, or SQS event details along with the time it spent in the queue before being processed. + +These logs are sent to a SQS queue. The handling of the queue event and capturing of the log, is done as SQS handlers, not as part of this package. + +Do not confuse this with logger, which write free-format log entries to stdout. + +# API-LOGS + +At the end of each API handler, an api-log is captured to describe the incoming API request and response. Also part of this log is a list of actions takens during the handler, including: +* API calls made +* SQS Events sent +* Database Statements executed +* Time slept + +Each of those has a start/end time and duration, and they are summed and it should add up to most of the API-Log total duration. +If there is a big difference between the summed time and the total duration, then we are doing something that takes time that we are not monitoring which should be investigated. +The total time spent sleeping, waiting for db calls, waiting for outgoing API calls, is logged in the API log. +This can be logged and broken down per path and method to see where the API is spending most time, and that could be investigated and optimised to improvie performance. + +# SQS-LOGS + +SQS logs are written at the end of a SQS event handler, similar to API logs. +Since SQS is used to write API logs, those handlers should not be logged, as it will create a circular infinite queue ramping up huge bills. +To be safe, SQS logs are therefore DISABLED by default. +It should only be enabled for things like provider rate requests or any SQS handler that is part of the functionality of the system doing async work, not handlers that are part of the infrastructure. + +SQS log will also write to the API_LOGS queue and the same index in OpenSearch (can review and change this in future) +It logs with method "SQS" and path is the message type. +That means we can log durations and through put in the same way and on the same graph as API when needed +It also captures the actions taken as part of the handler, in the same way it is captured for API. + +So when one finds some action takes too long in API, and move it to an SQS handler, the change will be visibile on the dashboard and indicate the improvement or not if your change did not have the desired effect. + +That it is idea. + +We can easily disable SQS logs and we can easily move it to another index in OpenSearch if necessary. Will have to try it for a while an see if it is useful in the current form or not. + +# CRON-LOGS + +In the same way we log API/SQS, it will be useful to monitor crons with a bit of output, e.g. nr of items deleted by a janitor etc. +One can get that currently from cloud watch if the logs are not disabled, and CloudWatch should ideally not be source of metrics, but that is currently the case, so not changing it yet. \ No newline at end of file diff --git a/logs/api-logs.go b/logs/api-logs.go new file mode 100644 index 0000000000000000000000000000000000000000..a2e6d47afaa1d82725ccd74bce9a6530697d7ac5 --- /dev/null +++ b/logs/api-logs.go @@ -0,0 +1,230 @@ +package logs + +import ( + "net/http" + "strings" + "sync" + "time" + + "github.com/aws/aws-lambda-go/events" + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/queues" + "gitlab.com/uafrica/go-utils/service" +) + +//Call this at the end of an API request handler to capture the req/res as well as all actions taken during the processing +//(note: action list is only reset when this is called - so must be called after each handler, else action list has to be reset at the start) +func LogIncomingAPIRequest(startTime time.Time, req events.APIGatewayProxyRequest, res events.APIGatewayProxyResponse) error { + if service.Ctx == nil { + return errors.Errorf("Cannot log without service context") + } + + //todo: filter out some noisy (method+path) + + endTime := time.Now() + dur := endTime.Sub(startTime) + + var authType string + var authUsername string + if req.RequestContext.Identity.CognitoAuthenticationType != "" { + authType = "cognito" + split := strings.Split(req.RequestContext.Identity.CognitoAuthenticationProvider, ":") + if len(split) > 0 { + authUsername = split[len(split)-1] //= part after last ':' + } + } else { + authType = "iam" + split := strings.Split(req.RequestContext.Identity.UserArn, ":user/") + if len(split) > 0 { + authUsername = split[len(split)-1] //= part after ':user/' + } + } + + claim := service.Ctx.Claim() + username, _ := claim["user_id"].(string) + accountID, _ := claim["account_id"].(int64) + apiLog := ApiLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: int(dur / time.Millisecond), + Method: req.HTTPMethod, + Address: req.RequestContext.DomainName, + Path: req.Path, + ResponseCode: res.StatusCode, + RequestID: service.Ctx.RequestID(), + InitialAuthType: authType, + InitialAuthUsername: authUsername, + SourceIP: req.RequestContext.Identity.SourceIP, + UserAgent: req.RequestContext.Identity.UserAgent, + Username: username, + AccountID: accountID, + Request: ApiLogRequest{ + Headers: req.Headers, + QueryParameters: req.QueryStringParameters, + BodySize: len(req.Body), + Body: req.Body, + }, + Response: ApiLogResponse{ + Headers: res.Headers, + BodySize: len(res.Body), + Body: res.Body, + }, + //RelevantID: service.Ctx.,??? + } + + //copy then reset actionList for the next handler + actionListMutex.Lock() + apiLog.Actions = actionList + actionList = []ActionLog{} + actionListMutex.Unlock() + + //also copy multi-value query parameters to the log as CSV array values + for n, as := range req.MultiValueQueryStringParameters { + apiLog.Request.QueryParameters[n] = "[" + strings.Join(as, ",") + "]" + } + + //todo: filter out excessive req/res body content per (method+path) + //todo: also need to do for all actions... + if apiLog.Method == http.MethodGet { + apiLog.Response.Body = "<not logged>" + } + + //todo: filter out sensitive values (e.g. OTP) + if _, err := queues.NewEvent(service.Ctx, "API_LOGS"). + Type("incoming-api-log"). + RequestID(apiLog.RequestID). + Send(apiLog); err != nil { + return errors.Wrapf(err, "failed to send incoming-api-log") + } + return nil +} //LogIncomingAPIRequest() + +//ApiLog is the SQS event details struct encoded as JSON document, sent to SQS, to be logged for each API handler executed. +type ApiLog struct { + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurMs int `json:"duration_ms"` //duration in milliseconds + Method string `json:"method"` + Address string `json:"address"` //server address for incoming and outgoing + Path string `json:"path"` + ResponseCode int `json:"response_code"` + RequestID string `json:"request_id"` + InitialAuthUsername string `json:"initial_auth_username,omitempty"` + InitialAuthType string `json:"initial_auth_type,omitempty"` + AccountID int64 `json:"account_id,omitempty"` + Username string `json:"username,omitempty"` + SourceIP string `json:"source_ip,omitempty"` //only logged for incoming API + UserAgent string `json:"user_agent,omitempty"` //only for incoming, indicate type of browser when UI + RelevantID string `json:"relevant_id,omitempty"` + Request ApiLogRequest `json:"request"` + Response ApiLogResponse `json:"response"` + Actions []ActionLog `json:"actions,omitempty"` +} + +type ApiLogRequest struct { + Headers map[string]string `json:"headers,omitempty"` + QueryParameters map[string]string `json:"query_parameters,omitempty"` + BodySize int `json:"body_size"` //set even when body is truncated/omitted + Body string `json:"body,omitempty"` //json body as a string +} + +type ApiLogResponse struct { + Headers map[string]string `json:"headers,omitempty"` + BodySize int `json:"body_size"` //set even when body is truncated/omitted + Body string `json:"body,omitempty"` //json content as a string +} + +func LogOutgoingAPIRequest(url string, method string, requestBody string, responseBody string, responseCode int, startTime time.Time) error { + endTime := time.Now() + dur := endTime.Sub(startTime) + + apiCallLog := ApiCallLog{ + URL: url, + Method: method, + ResponseCode: responseCode, + Request: ApiCallRequestLog{ + BodySize: len(requestBody), + Body: requestBody, + }, + Response: ApiCallResponseLog{ + BodySize: len(responseBody), + Body: responseBody, + }, + } + + actionListMutex.Lock() + actionList = append(actionList, ActionLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: int(dur / time.Millisecond), + Type: ActionTypeApiCall, + ApiCall: &apiCallLog, + }) + actionListMutex.Unlock() + + return nil +} //LogOutgoingAPIRequest() + +var ( + actionListMutex sync.Mutex + actionList = []ActionLog{} +) + +type ActionLog struct { + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + DurMs int `json:"duration_ms"` //duration in milliseconds + Type ActionType `json:"type" doc:"Type is api-call|sqs-sent|sql-query|sleep"` + ApiCall *ApiCallLog `json:"api_call,omitempty"` + SQSSent *SQSSentLog `json:"sqs_sent,omitempty"` + SQLQuery *SQLQueryLog `json:"sql_query,omitempty"` + Sleep *SleepLog `json:"sleep,omitempty"` +} + +type ActionType string + +var ActionTypeList = []ActionType{ + ActionTypeApiCall, + ActionTypeSqsSent, + ActionTypeSqlQuery, + ActionTypeSleep, +} + +const ( + ActionTypeApiCall ActionType = "api-call" + ActionTypeSqsSent ActionType = "sqs-sent" + ActionTypeSqlQuery ActionType = "sql-query" + ActionTypeSleep ActionType = "sleep" +) + +//APICallLog captures details of an outgoing API call made from a handler +type ApiCallLog struct { + URL string `json:"url"` + Method string `json:"method"` + ResponseCode int `json:"response_code"` + Request ApiCallRequestLog `json:"request"` + Response ApiCallResponseLog `json:"response"` +} + +type ApiCallRequestLog struct { + BodySize int `json:"body_size"` + Body string `json:"body"` +} + +type ApiCallResponseLog struct { + BodySize int `json:"body_size"` + Body string `json:"body"` +} + +//SQSSentLog captures details of an SQS event sent from a handler +type SQSSentLog struct { +} + +//SQLQueryLog captures details of an SQL query executed from a handler +type SQLQueryLog struct { +} + +//SleepLog captures details of time spent sleeping from a handler +type SleepLog struct { + //nothing to record apart from the action start/end/dur... +} diff --git a/logs/cron.logs.go b/logs/cron.logs.go new file mode 100644 index 0000000000000000000000000000000000000000..1f58fd5265d3e22a565749d75fac4d43fce58c90 --- /dev/null +++ b/logs/cron.logs.go @@ -0,0 +1,3 @@ +package logs + +//todo... currently monitored from CloudWatch... diff --git a/logs/sqs-logs.go b/logs/sqs-logs.go new file mode 100644 index 0000000000000000000000000000000000000000..49f446b6d0d0b98ca45e66f07f19538bb617848e --- /dev/null +++ b/logs/sqs-logs.go @@ -0,0 +1,96 @@ +package logs + +import ( + "encoding/json" + "fmt" + "os" + "time" + + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" + "gitlab.com/uafrica/go-utils/service" +) + +//Call this at the end of an SQS event handler to capture the req and result as well as all actions taken during the processing +//(note: action list is only reset when this is called - so must be called after each handler, else action list has to be reset at the start) +func LogSQSRequest(startTime time.Time, + requestID string, //from API + messageType string, + req interface{}, + handlerErr error, +) error { + if service.Ctx == nil { + return errors.Errorf("Cannot log without service context") + } + + if !sqsLogEnabled { + return nil + } + + endTime := time.Now() + dur := endTime.Sub(startTime) + log := ApiLog{ + StartTime: startTime, + EndTime: endTime, + DurMs: int(dur / time.Millisecond), + RequestID: requestID, + Method: "SQS", + Path: messageType, + } + + if req != nil { + if jsonReq, err := json.Marshal(req); err == nil { + log.Request.Body = string(jsonReq) + log.Request.BodySize = len(log.Request.Body) + } + } + + if handlerErr == nil { + log.ResponseCode = 0 + } else { + log.ResponseCode = 1 + errorInfo := ErrorInfo{ + Error: handlerErr.Error(), + Details: fmt.Sprintf("%+v", handlerErr), + } + jsonError, _ := json.Marshal(errorInfo) + log.Response.Body = string(jsonError) + } + + //copy then reset actionList for the next handler + actionListMutex.Lock() + log.Actions = actionList + actionList = []ActionLog{} + actionListMutex.Unlock() + + //todo: filter out sensitive values (e.g. OTP) + + //note: we send SQS logs to "API_LOGS" which already exists... should be renamed to simply "LOGS" + //it use the same structure, but method="SQS" and path="messageType" and request is the event body + //so they can be plotted on the same dashboard visualisation in OpenSearch with all the same filters/metrics + if _, err := queues.NewEvent(service.Ctx, "API_LOGS"). + Type("api-log"). + RequestID(requestID). + Send(log); err != nil { + return errors.Wrapf(err, "failed to send api-log for SQS") + } + return nil +} + +var sqsLogEnabled = false + +func init() { + envSetting := os.Getenv("SQS_LOGS_ENABLED") + if envSetting == "true" { + sqsLogEnabled = true + } + //if consuming from API_LOGS, do not enable else we will consume and send to our own queue! + + logger.Infof("Environment SQS_LOGS_ENABLED=\"%s\" -> sqsLogsEnabled=%v", envSetting, sqsLogEnabled) +} + +type ErrorInfo struct { + Error string `json:"error"` + Details string `json:"details"` +} diff --git a/queues/audit.go b/queues/audit.go deleted file mode 100644 index 16d50098946c8aecc76aa2b26bffb9011e13fdda..0000000000000000000000000000000000000000 --- a/queues/audit.go +++ /dev/null @@ -1,55 +0,0 @@ -package queues - -import ( - "time" - - "gitlab.com/uafrica/go-utils/audit" - "gitlab.com/uafrica/go-utils/errors" - "gitlab.com/uafrica/go-utils/service" -) - -//create auditor that push to a queue using the specified producer -func Auditor(queueName string, messageType string, producer service.ProducerLogger) audit.Auditor { - if producer == nil { - panic(errors.Errorf("cannot create auditor with producer=nil")) - } - if queueName == "" { - queueName = "AUDIT" - } - if messageType == "" { - messageType = "audit" - } - return auditor{ - producer: producer, - queueName: queueName, - messageType: messageType, - } -} - -type auditor struct { - producer service.ProducerLogger - queueName string - messageType string -} - -func (a auditor) WriteEvent(requestID string, event audit.Event) error { - _, err := service.NewEvent(a.producer, a.queueName). - RequestID(requestID). - Type(a.messageType). - Send(event) - if err != nil { - return errors.Wrapf(err, "failed to write audit event") - } - return nil -} - -func (a auditor) WriteValues(startTime, endTime time.Time, requestID string, values map[string]interface{}) error { - _, err := service.NewEvent(a.producer, a.queueName). - RequestID(requestID). - Type(a.messageType). - Send(values) - if err != nil { - return errors.Wrapf(err, "failed to write audit values") - } - return nil -} diff --git a/queues/check.go b/queues/check.go deleted file mode 100644 index f0148eef81b12737ba3b85d1082f0ecf99711956..0000000000000000000000000000000000000000 --- a/queues/check.go +++ /dev/null @@ -1,5 +0,0 @@ -package queues - -type ICheck interface { - Check(Context) (interface{}, error) -} diff --git a/service/event.go b/queues/event.go similarity index 91% rename from service/event.go rename to queues/event.go index 08f6c98c6d78bcbf9471c68ac79b36e76a3a5d59..376947d423b3b6b1eaf2131ad864004333e35a43 100644 --- a/service/event.go +++ b/queues/event.go @@ -1,4 +1,4 @@ -package service +package queues import ( "encoding/json" @@ -9,12 +9,7 @@ import ( "gitlab.com/uafrica/go-utils/logger" ) -type ProducerLogger interface { - Producer - logger.Logger -} - -func NewEvent(producer ProducerLogger, queueName string) Event { +func NewEvent(producer Producer, queueName string) Event { if producer == nil { panic(errors.Errorf("NewEvent(producer=nil)")) } @@ -31,7 +26,7 @@ func NewEvent(producer ProducerLogger, queueName string) Event { } type Event struct { - producer ProducerLogger + producer Producer MessageID string //assigned by implementation (AWS/mem/..) QueueName string //queue determine sequencing, items in same queue are delivered one-after-the-other, other queues may deliver concurrent to this queue TypeName string //type determines which handler processes the event @@ -81,6 +76,8 @@ func (event Event) Params(params map[string]string) Event { return event } +var log = logger.New() + func (event Event) Send(value interface{}) (string, error) { if event.producer == nil { return "", errors.Errorf("send with producer==nil") @@ -95,7 +92,7 @@ func (event Event) Send(value interface{}) (string, error) { if msgID, err := event.producer.Send(event); err != nil { return "", errors.Wrapf(err, "failed to send event") } else { - event.producer.WithFields(map[string]interface{}{ + log.WithFields(map[string]interface{}{ "queue": event.QueueName, "type": event.TypeName, "due": event.DueTime, diff --git a/service/producer.go b/queues/producer.go similarity index 55% rename from service/producer.go rename to queues/producer.go index 050c56f8134cd77432b3383e827d163933f60aa8..00df1db12444c7c33d67c9ca08861bbba1f022b8 100644 --- a/service/producer.go +++ b/queues/producer.go @@ -1,7 +1,7 @@ -package service +package queues //Producer sends an event for async processing type Producer interface { + NewEvent(queueName string) Event Send(event Event) (msgID string, err error) - //todo: method to request an event after some delay with incrementing attempt nr } diff --git a/queues/sqs/README.md b/queues/sqs_producer/README.md similarity index 100% rename from queues/sqs/README.md rename to queues/sqs_producer/README.md diff --git a/queues/sqs/producer.go b/queues/sqs_producer/producer.go similarity index 90% rename from queues/sqs/producer.go rename to queues/sqs_producer/producer.go index ce10639f7eef631b83195b07d2a0a582e25a3995..55a0fd7e20bb12d5bacccce505793518636d6bbc 100644 --- a/queues/sqs/producer.go +++ b/queues/sqs_producer/producer.go @@ -1,4 +1,4 @@ -package sqs +package sqs_producer import ( "os" @@ -11,10 +11,10 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" - "gitlab.com/uafrica/go-utils/service" + "gitlab.com/uafrica/go-utils/queues" ) -func NewProducer(requestIDHeaderKey string) service.Producer { +func New(requestIDHeaderKey string) queues.Producer { region := os.Getenv("AWS_REGION") if region == "" { panic(errors.Errorf("environment AWS_REGION is not defined")) @@ -39,8 +39,12 @@ type producer struct { queues map[string]*QueueProducer } +func (producer *producer) NewEvent(queueName string) queues.Event { + return queues.NewEvent(producer, queueName) +} + // Note: Calling code needs SQS IAM permissions -func (producer *producer) Send(event service.Event) (string, error) { +func (producer *producer) Send(event queues.Event) (string, error) { logger.Debugf("SQS producer.Send(%+v)", event) messenger, ok := producer.queues[event.QueueName] if !ok { @@ -89,7 +93,7 @@ type QueueProducer struct { queueURL string } -func (m *QueueProducer) Send(event service.Event) (string, error) { +func (m *QueueProducer) Send(event queues.Event) (string, error) { logger.Debugf("SQS producer.queue(%s) Sending event %+v", m.queueURL, event) //add params as message attributes diff --git a/service/README.md b/service/README.md index ce8c48cd7ee32ccf67df1508fc992c7af2efa312..60b637eafeaf45df8e242fe74c937442ab93dc4d 100644 --- a/service/README.md +++ b/service/README.md @@ -34,7 +34,7 @@ Example: Where package db then defines: - func Connector(dbName string) service.IStarter { + func Connector(dbName string) service.Starter { return &connector{ dbName: dbName, dbConn: nil, @@ -157,16 +157,11 @@ So: ```claim,ok := ctx.Get("claims").(MyClaimStruct)``` * All fields in params or body structs matching claim names will be overwritten by the time a handler is called. -# Audits - -Audit records are written with: -* ctx.AuditChange(), or -* ctx.AuditWrite() +# Data Change Audits +Data Change audit records are written with ctx.AuditChange() The AuditChange() method logs the changes between an original and new value. -The AuditWrite() logs all the data given to it. -A handler may write 0..N audit record, there is no check. In general, audits are written to capture changes, and when a handler changes multiple database records, they could all be audited. # Sending Async Events diff --git a/service/context.go b/service/context.go index ce4acc20d11e657db6d6a7209e271f326f7da06a..533af78892b47915af5901b4fc2cf9e1c81233d8 100644 --- a/service/context.go +++ b/service/context.go @@ -9,6 +9,7 @@ import ( "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/string_utils" ) @@ -18,8 +19,7 @@ var Ctx Context type Context interface { context.Context logger.Logger - Producer - audit.Auditor + queues.Producer RequestID() string MillisecondsSinceStart() int64 @@ -51,7 +51,7 @@ type Context interface { ValueOrDefault(name string, defaultValue interface{}) interface{} Data() map[string]interface{} - //write an audit event + //write a data change audit event AuditChange(eventType string, orgValue, newValue interface{}) } @@ -78,7 +78,6 @@ func (s service) NewContext(base context.Context, requestID string, values map[s Context: base, Logger: l, Producer: s.Producer, - Auditor: s.Auditor, startTime: time.Now(), requestID: requestID, data: map[string]interface{}{}, @@ -99,15 +98,13 @@ func (s service) NewContext(base context.Context, requestID string, values map[s Ctx.Debugf("Start(%s)=(%T)%+v", starterName, starterData, starterData) } - return Ctx, nil } type serviceContext struct { context.Context logger.Logger - Producer - audit.Auditor + queues.Producer startTime time.Time requestID string claim map[string]interface{} @@ -220,17 +217,14 @@ func (ctx *serviceContext) ValueOrDefault(name string, defaultValue interface{}) func (ctx *serviceContext) AuditChange(eventType string, orgValue, newValue interface{}) { username, _ := ctx.Claim()["username"].(string) - event, err := audit.NewEvent( + if err := audit.SaveDataChange( + ctx.requestID, username, //use username as source (will default to "SYSTEM" if undefined) eventType, orgValue, newValue, - ) - if err != nil { - ctx.Errorf("failed to define audit event: %+v", err) + ); err != nil { + ctx.Errorf("failed to save data change: %+v", err) return } - if err := ctx.Auditor.WriteEvent(ctx.requestID, event); err != nil { - ctx.Errorf("failed to audit change: %+v", err) - } } diff --git a/service/service.go b/service/service.go index 0a80933a8f94edc92eaf69d8817a390c1b474f2c..3eda7b06bf3ba17c23c70bf2edff773b29c52148 100644 --- a/service/service.go +++ b/service/service.go @@ -4,19 +4,17 @@ import ( "context" "os" - "gitlab.com/uafrica/go-utils/audit" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/queues" "gitlab.com/uafrica/go-utils/string_utils" ) type Service interface { logger.Logger - Producer - audit.Auditor - WithStarter(name string, starter IStarter) Service - WithProducer(producer Producer) Service - WithAuditor(auditor audit.Auditor) Service + queues.Producer + WithStarter(name string, starter Starter) Service + WithProducer(producer queues.Producer) Service NewContext(base context.Context, requestID string, values map[string]interface{}) (Context, error) } @@ -28,18 +26,16 @@ func New() Service { return service{ Producer: nil, Logger: logger.New().WithFields(map[string]interface{}{"env": env}), - Auditor: audit.None(), env: env, - starters: map[string]IStarter{}, + starters: map[string]Starter{}, } } type service struct { - logger.Logger //for logging outside of context - Producer //for sending async events - audit.Auditor - env string - starters map[string]IStarter + logger.Logger //for logging outside of context + queues.Producer //for sending async events + env string + starters map[string]Starter } func (s service) Env() string { @@ -59,7 +55,7 @@ func (s service) Env() string { //you can implement one starter that does everything and return a struct or //implement one for your db, one for rate limit, one for ... //the name must be snake-case, e.g. "this_is_my_starter_name" -func (s service) WithStarter(name string, starter IStarter) Service { +func (s service) WithStarter(name string, starter Starter) Service { if !string_utils.IsSnakeCase(name) { panic(errors.Errorf("invalid starter name=\"%s\", expecting snake_case names only", name)) } @@ -73,16 +69,9 @@ func (s service) WithStarter(name string, starter IStarter) Service { return s } -func (s service) WithProducer(producer Producer) Service { +func (s service) WithProducer(producer queues.Producer) Service { if producer != nil { s.Producer = producer } return s } - -func (s service) WithAuditor(auditor audit.Auditor) Service { - if auditor != nil { - s.Auditor = auditor - } - return s -} diff --git a/service/start.go b/service/start.go index c30c11733164a66a642b6d89f1ce1f264525e465..143a4e15f533bfec0f903b6e2b3344116a5df752 100644 --- a/service/start.go +++ b/service/start.go @@ -1,6 +1,6 @@ package service -type IStarter interface { +type Starter interface { //called at the start of api/cron/queues processing, before checks, e.g. to ensure we have db connection //i.e. setup things that does not depend on the request/event details //if you need the request details, you need to implement a check for each of the api, cron and/or queue as needed, not a Start() method.