From 26739ba2c975e17b2d534309d9245044151f762d Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Tue, 2 Nov 2021 10:15:47 +0200 Subject: [PATCH] Update search --- api/audit.go | 49 +++++++++ api/lambda.go | 2 + audit/file_audit.go | 8 +- config/config.go | 35 ++++++ config/struct.go | 110 +++++++++++++++++++ config/struct_test.go | 70 ++++++++++++ examples/core/api/main.go | 6 -- go.mod | 1 + go.sum | 4 + search/README.md | 18 ++++ search/config.go | 33 ++++++ search/dev/docker-compose.yml | 80 ++++++++++++++ search/opensearch_types.go | 36 +++++++ search/time_series.go | 179 +++++++++++++++++++++++++++++++ search/writer.go | 195 ++++++++++++++++++++++++++++++++++ search/writer_test.go | 78 ++++++++++++++ 16 files changed, 897 insertions(+), 7 deletions(-) create mode 100644 api/audit.go create mode 100644 config/config.go create mode 100644 config/struct.go create mode 100644 config/struct_test.go create mode 100644 search/README.md create mode 100644 search/config.go create mode 100644 search/dev/docker-compose.yml create mode 100644 search/opensearch_types.go create mode 100644 search/time_series.go create mode 100644 search/writer.go create mode 100644 search/writer_test.go diff --git a/api/audit.go b/api/audit.go new file mode 100644 index 0000000..602ad08 --- /dev/null +++ b/api/audit.go @@ -0,0 +1,49 @@ +package api + +// func x() { +// x := map[string]opensearch.MappingProperty{ +// "account_id": {Type: "keyword"}, +// "http_method": {Type: "keyword"}, +// "path": {Type: "keyword"}, +// "id": {Type: "long"}, +// "request_id": {Type: "text"}, //, Fields: MappingFieldProperties{Keyword: MappingKeyword{Type:"keyword", IgnoreAbove: 256}}}, +// "initial_auth_type": {Type: "text"}, +// "initial_auth_username": {Type: "text"}, +// "ip": {Type: "text"}, +// "relevant_id": {Type: "text"}, +// "user_agent": {Type: "text"}, +// "user_id": {Type: "long"}, +// "request": { +// Properties: map[string]MappingProperty{ +// "body": {Type: "object", Enabled: false}, +// "query": { +// Properties: map[string]MappingProperty{ +// "account_id": {Type: "keyword"}, +// "id": {Type: "text"}, +// "ids": {Type: "text"}, +// "invoice_id": {Type: "text"}, +// "shipment_id": {Type: "text"}, +// "user_id": {Type: "text"}, +// "limit": {Type: "text"}, +// "offset": {Type: "text"}, +// "search": {Type: "text"}, +// "order": {Type: "text"}, +// "order_by": {Type: "text"}, +// "include_relations": {Type: "keyword"}, +// }, +// }, +// }, +// }, +// "response_code": {Type: "long"}, +// "response_size": {Type: "long"}, +// "response": { +// Properties: map[string]MappingProperty{ +// "body": {Type: "object", Enabled: false}, +// }, +// }, +// }, +// }, +// } + +// } +// } diff --git a/api/lambda.go b/api/lambda.go index d18f131..cacfdd0 100644 --- a/api/lambda.go +++ b/api/lambda.go @@ -99,6 +99,8 @@ func (api Api) Handler(baseCtx context.Context, apiGatewayProxyReq events.APIGat res.Headers[api.requestIDHeaderKey] = ctx.RequestID() } if err := api.Service.WriteValues(ctx.StartTime(), time.Now(), ctx.RequestID(), map[string]interface{}{ + "direction": "incoming", + "type": "api", "request_id": ctx.RequestID(), "request": ctx.Request(), "response": res}, diff --git a/audit/file_audit.go b/audit/file_audit.go index 176851d..7d340e1 100644 --- a/audit/file_audit.go +++ b/audit/file_audit.go @@ -13,7 +13,7 @@ func File(f *os.File) Auditor { if f == nil { panic(errors.Errorf("cannot create file auditor with f=nil")) } - return fileAudit{ + return &fileAudit{ f: f, } } @@ -23,6 +23,9 @@ type fileAudit struct { } func (fa fileAudit) WriteValues(startTime, endTime time.Time, requestID string, values map[string]interface{}) error { + if fa.f == nil { + return errors.Errorf("auditor is closed") + } obj := map[string]interface{}{ "start_time": startTime, "end_time": endTime, @@ -41,6 +44,9 @@ func (fa fileAudit) WriteValues(startTime, endTime time.Time, requestID string, } func (fa fileAudit) WriteEvent(requestID string, event Event) error { + if fa.f == nil { + return errors.Errorf("auditor is closed") + } obj := map[string]interface{}{ "start_time": event.Timestamp, "end_time": event.Timestamp, diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..a4bed6b --- /dev/null +++ b/config/config.go @@ -0,0 +1,35 @@ +package config + +import ( + "encoding/json" + "os" + + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" +) + +func MustGetOrDefault(valuePtr interface{}, name string) { + //update value if defined + envValueStr := os.Getenv(name) + if envValueStr != "" { + if err := json.Unmarshal([]byte(envValueStr), valuePtr); err != nil { + if err := json.Unmarshal([]byte("\""+envValueStr+"\""), valuePtr); err != nil { + panic(errors.Errorf("cannot parse environment %s=%s into %T", name, envValueStr, valuePtr)) + } + } + logger.Debugf("CONFIG: Set %s=%s", name, envValueStr) + } + + if validator, ok := valuePtr.(Validator); ok { + if err := validator.Validate(); err != nil { + panic(errors.Errorf("Invalid %s=(%T)%+v", name, valuePtr, valuePtr)) + } + logger.Debugf("CONFIG: Validated %s", name) + } + + logger.Debugf("CONFIG: %s=(%T)%+v", name, valuePtr, valuePtr) +} + +type Validator interface { + Validate() error +} diff --git a/config/struct.go b/config/struct.go new file mode 100644 index 0000000..90d0b30 --- /dev/null +++ b/config/struct.go @@ -0,0 +1,110 @@ +package config + +import ( + "encoding/json" + "os" + "reflect" + "regexp" + "sort" + "strconv" + "strings" + + "gitlab.com/uafrica/go-utils/errors" +) + +func Load(prefix string, configPtr interface{}) error { + if !prefixRegex.MatchString(prefix) { + return errors.Errorf("invalid config prefix \"%s\"", prefix) + } + if configPtr == nil { + return errors.Errorf("Load(nil)") + } + t := reflect.TypeOf(configPtr) + if t.Kind() != reflect.Ptr || t.Elem().Kind() != reflect.Struct { + return errors.Errorf("%T is not &struct", configPtr) + } + v := reflect.ValueOf(configPtr) + return load(prefix, t.Elem(), v.Elem()) +} + +type nameValue struct { + name string + value string +} + +func load(prefix string, t reflect.Type, ptrValue reflect.Value) error { + switch t.Kind() { + case reflect.Struct: + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + if err := load(prefix+"_"+strings.ToUpper(f.Name), f.Type, ptrValue.Field(i)); err != nil { + return errors.Wrapf(err, "cannot load field") + } + } + + case reflect.Slice: + //expect JSON list of values or just one value + s := os.Getenv(prefix) + if s != "" { + if err := json.Unmarshal([]byte(s), ptrValue.Addr().Interface()); err != nil { + return errors.Wrapf(err, "cannot read env %s=%s into %s", prefix, s, t.Name()) + } + } else { + //see if _1, _2, ... is used then construct a list with those values + //(only applies to list of strings) + values := map[string]string{} + for _, x := range os.Environ() { + parts := strings.SplitN(x, "=", 2) + if len(parts) == 2 && strings.HasPrefix(parts[0], prefix+"_") { + values[parts[0]] = parts[1] + } + } + if len(values) > 0 { + //add in sorted order + list := []nameValue{} + for n, v := range values { + list = append(list, nameValue{name: n, value: v}) + } + sort.Slice(list, func(i, j int) bool { + return list[i].name < list[j].name + }) + s := "" + for _, nv := range list { + if t.Elem().Kind() == reflect.String { + s += ",\"" + nv.value + "\"" //quoted + } else { + s += "," + nv.value //unquoted + } + } + s = "[" + s[1:] + "]" + if err := json.Unmarshal([]byte(s), ptrValue.Addr().Interface()); err != nil { + return errors.Wrapf(err, "cannot read env %s=%s into %s", prefix, s, t.Name()) + } + } + } + + case reflect.String: + s := os.Getenv(prefix) + if s != "" { + ptrValue.Set(reflect.ValueOf(s)) + } + + case reflect.Int64: + s := os.Getenv(prefix) + if s != "" { + i64, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return errors.Errorf("%s=%s not integer value", prefix, s) + } + ptrValue.Set(reflect.ValueOf(i64)) + } + + default: + return errors.Errorf("cannot load config %s_... into %s kind %s", prefix, t.Name(), t.Kind()) + } + return nil +} + +const prefixPattern = `[A-Z]([A-Z0-9_]*[A-Z0-9])*` + +var prefixRegex = regexp.MustCompile("^" + prefixPattern + "$") diff --git a/config/struct_test.go b/config/struct_test.go new file mode 100644 index 0000000..2f153ef --- /dev/null +++ b/config/struct_test.go @@ -0,0 +1,70 @@ +package config_test + +import ( + "os" + "testing" + + "gitlab.com/uafrica/go-utils/config" + "gitlab.com/uafrica/go-utils/logger" +) + +func TestLoad(t *testing.T) { + logger.SetGlobalFormat(logger.NewConsole()) + logger.SetGlobalLevel(logger.LevelDebug) + + os.Setenv("TEST_A", "123") + os.Setenv("TEST_B", "abc") + os.Setenv("TEST_C", "789") + + //list of value must be valid JSON, i.e. if list of int or list of string, it must be unquoted or quoted as expected by JSON: + os.Setenv("TEST_L", "[1,2,3]") + os.Setenv("TEST_M", "[\"7\", \"8\"]") + + //list of string entries can also be defined with _1, _2, ... postfixes + //the key value must be unique but has no significance apart from ordering, + //so if you comment out _3, then _1, _2 and _4 will result in 3 entries in the list + //as long as they are unique keys + os.Setenv("TEST_N_1", "111") + os.Setenv("TEST_N_2", "222") + //os.Setenv("TEST_N_3", "333") + os.Setenv("TEST_N_4", "444") + + os.Setenv("TEST_P_1", "111") + os.Setenv("TEST_P_2", "222") + //os.Setenv("TEST_N_3", "333") + os.Setenv("TEST_P_4", "444") + + c := Config{} + if err := config.Load("TEST", &c); err != nil { + t.Fatalf("Cannot load config: %+v", err) + } + + if c.A != "123" || c.B != "abc" || c.C != 789 { + t.Fatalf("Loaded wrong values: %+v", c) + } + if len(c.L) != 3 || c.L[0] != 1 || c.L[1] != 2 || c.L[2] != 3 { + t.Fatalf("Loaded wrong values: %+v", c) + } + if len(c.M) != 2 || c.M[0] != "7" || c.M[1] != "8" { + t.Fatalf("Loaded wrong values for M: %+v", c.M) + } + t.Logf("M=%+v", c.M) + if len(c.N) != 3 || c.N[0] != "111" || c.N[1] != "222" || c.N[2] != "444" { + t.Fatalf("Loaded wrong values for N: %+v", c.N) + } + t.Logf("N=%+v", c.N) + if len(c.P) != 3 || c.P[0] != 111 || c.P[1] != 222 || c.P[2] != 444 { + t.Fatalf("Loaded wrong values for P: %+v", c.N) + } + t.Logf("P=%+v", c.P) +} + +type Config struct { + A string `json:"a"` + B string `json:"b"` + C int64 `json:"c"` + L []int64 `json:"l"` + M []string `json:"m"` + N []string `json:"n"` + P []int64 `json:"p"` +} diff --git a/examples/core/api/main.go b/examples/core/api/main.go index 34402bc..b690732 100644 --- a/examples/core/api/main.go +++ b/examples/core/api/main.go @@ -1,7 +1,6 @@ package main import ( - "flag" "math/rand" "net/http" "os" @@ -17,10 +16,6 @@ import ( func main() { logger.SetGlobalLevel(logger.LevelDebug) logger.SetGlobalFormat(logger.NewConsole()) - - localPort := flag.Int("port", 0, "Run with local HTTP server in this port (default: Run as lambda)") - flag.Parse() - api.New("uafrica-request-id", app.ApiRoutes()). WithStarter("db", db.Connector("core")). WithCheck("claims", claimsChecker{}). @@ -29,7 +24,6 @@ func main() { WithCORS(cors{}). WithAuditor(audit.File(os.Stdout)). WithEvents(app.QueueRoutes()). //only used when LOG_LEVEL="debug" - WithLocalPort(localPort). //if nil will still run as lambda Run() } diff --git a/go.mod b/go.mod index 4aae0d6..fe7d1c1 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( require ( github.com/golang/protobuf v1.5.2 // indirect + github.com/opensearch-project/opensearch-go v1.0.0 github.com/r3labs/diff/v2 v2.14.0 github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect diff --git a/go.sum b/go.sum index 7deed11..f98637f 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -89,6 +90,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= +github.com/opensearch-project/opensearch-go v1.0.0 h1:8Gh7B7Un5BxuxWAgmzleEF7lpOtC71pCgPp7lKr3ca8= +github.com/opensearch-project/opensearch-go v1.0.0/go.mod h1:FrUl/52DBegRYvK7ISF278AXmjDV647lyTnsLGBR7J4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -187,6 +190,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/search/README.md b/search/README.md new file mode 100644 index 0000000..b956a80 --- /dev/null +++ b/search/README.md @@ -0,0 +1,18 @@ +# Search +This package is a wrapper around the AWS OpenSearch package to provide a consistent way to: +* log time series data (e.g. api-logs) +* store and search for documents + +## Time Series Logs +We use a time series for example to store API logs, writing a log record at the end of each API handler, with a start timestamp and duration as well as key values like HTTP method, path, request-id, URL parameters, response code etc. + +The indexed logs are then mainly used from the OpenSearch dashboard to visualise the system usage and performance and we can setup alerts on conditions to detect failures and poor performance. One can also read the logs to asist trouble shooting. + +The indexed logs can also be queried from our API to provide insights to account users, but the account users do not directly interact with OpenSearch. + +## Document Store and Search +We use a document store and search to provide quick text searches from the API. In this case a copy of a document (e.g. an order) is stored in OpenSearch when the order is created or updated. Updates overwrite the existing document, so there will only be one copy of each document. + +When a user is looking for an order, the API provides an end-point to search orders e.g. for "lcd screen", then the API does an OpenSearch query in the orders index, get N results and then read those orders from the orders table in the database (not OpenSearch) and return those results to the user. + +We therefore use OpenSearch only for searching and returning a list of document ids, then read the documents from the database. A document is typically an "order" but also anything else that we need to do free text searches on. \ No newline at end of file diff --git a/search/config.go b/search/config.go new file mode 100644 index 0000000..0adce93 --- /dev/null +++ b/search/config.go @@ -0,0 +1,33 @@ +package search + +import ( + "regexp" + + "gitlab.com/uafrica/go-utils/errors" +) + +type Config struct { + Addresses []string `json:"addresses" doc:"List of server addresses. Requires at least one, e.g. \"https://localhost:9200\" for local testing"` + Username string `json:"username" doc:"User name for HTTP basic auth. Defaults to admin for local testing."` + Password string `json:"password" doc:"User password for HTTP basic auth. Defaults to admin for local testing."` + //TLSCertificateFilename string `json:"tls_certificate_filename" doc:"Filename to load TLS certificate. Defaults to insecure connection when not specified."` + //IndexName string `json:"index_name" doc:"OpenSearch index name should be lowercase with dashes"` +} + +func (c *Config) Validate() error { + if len(c.Addresses) == 0 { + return errors.Errorf("missing addresses") + //c.Addresses = []string{"https://localhost:9200"} + } + if c.Username == "" { + c.Username = "admin" + } + if c.Password == "" { + c.Password = "admin" + } + return nil +} + +const indexNamePattern = `[a-z]([a-z0-9-]*[a-z0-9])*` + +var indexNameRegex = regexp.MustCompile("^" + indexNamePattern + "$") diff --git a/search/dev/docker-compose.yml b/search/dev/docker-compose.yml new file mode 100644 index 0000000..ba1b3fd --- /dev/null +++ b/search/dev/docker-compose.yml @@ -0,0 +1,80 @@ +version: '3' +services: + opensearch-node1: + image: opensearchproject/opensearch:latest + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch-node1 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_master_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - opensearch-data1:/usr/share/opensearch/data + ports: + - 9200:9200 + - 9600:9600 # required for Performance Analyzer + networks: + - opensearch-net + opensearch-node2: + image: opensearchproject/opensearch:latest + container_name: opensearch-node2 + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch-node2 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_master_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - opensearch-data2:/usr/share/opensearch/data + networks: + - opensearch-net + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:latest + container_name: opensearch-dashboards + ports: + - 5601:5601 + expose: + - "5601" + environment: + OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' + networks: + - opensearch-net + + kibana: + container_name: opensearch-kibana + image: docker.elastic.co/kibana/kibana:7.11.0 + depends_on: + - opensearch-node1 + - opensearch-node2 + ports: + - 5602:5601 + expose: + - "5602" + environment: + ELASTICSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' + networks: + - opensearch-net + +volumes: + opensearch-data1: + opensearch-data2: + +networks: + opensearch-net: diff --git a/search/opensearch_types.go b/search/opensearch_types.go new file mode 100644 index 0000000..dba91cd --- /dev/null +++ b/search/opensearch_types.go @@ -0,0 +1,36 @@ +package search + +//Mapping configures an index in OpenSearch +type Index struct { + Settings Settings `json:"settings"` + Mappings Mappings `json:"mappings"` +} + +type Settings struct { + Index *SettingsIndex `json:"index,omitempty"` +} + +type SettingsIndex struct { + NumberOfShards int `json:"number_of_shards,omitempty"` + NumberOfReplicas int `json:"number_of_replicas,omitempty"` +} + +type Mappings struct { + Properties map[string]MappingProperty `json:"properties,omitempty"` +} + +type MappingProperty struct { + Type string `json:"type"` + Enabled bool `json:"enabled,omitempty"` + Fields map[string]MappingFieldProperties `json:"fields,omitempty"` + Properties map[string]MappingProperty `json:"properties,omitempty"` +} + +type MappingFieldProperties struct { + Keyword *MappingKeyword `json:"keyword"` +} + +type MappingKeyword struct { + Type string `json:"type"` //="keyword" + IgnoreAbove int `json:"ignore_above"` //e.g. 256 +} diff --git a/search/time_series.go b/search/time_series.go new file mode 100644 index 0000000..28e0179 --- /dev/null +++ b/search/time_series.go @@ -0,0 +1,179 @@ +package search + +import ( + "encoding/json" + "net/http" + "reflect" + "strings" + "time" + + "gitlab.com/uafrica/go-utils/errors" + "gitlab.com/uafrica/go-utils/logger" +) + +type TimeSeries interface { + Write(StartTime time.Time, EndTime time.Time, data interface{}) error +} + +type timeSeries struct { + w *writer + indexName string + dataType reflect.Type + fields []dataField +} + +type dataField struct { + name string + index []int + mapping MappingProperty +} + +//create a time series to write e.g. api logs +//the tmpl must be your log data struct consisting of public fields as: +// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) +// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch +// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required +func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error) { + if !indexNameRegex.MatchString(indexName) { + return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) + } + structType := reflect.TypeOf(tmpl) + if tmpl == nil || structType.Kind() != reflect.Struct { + return nil, errors.Errorf("%T is not a struct", tmpl) + } + + ts := &timeSeries{ + w: w, + indexName: indexName, + dataType: structType, + fields: []dataField{}, + } + + //define the OpenSearch index mapping + indexSpec := Index{ + Settings: Settings{ + Index: &SettingsIndex{ + NumberOfShards: 4, + NumberOfReplicas: 0, + }, + }, + Mappings: Mappings{ + Properties: map[string]MappingProperty{}, + }, + } + for i := 0; i < structType.NumField(); i++ { + structField := structType.Field(i) + dataField := dataField{ + name: structField.Name, + index: structField.Index, + mapping: MappingProperty{Type: "text"}, + } + if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" { + dataField.name = jsonTags[0] + } + if dataField.name == "" { + logger.Debugf("Skip %s unnamed field %+v", structType.Name(), structField) + continue + } + + //get default type of search value from field type + switch structField.Type.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + dataField.mapping = MappingProperty{Type: "long"} + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + dataField.mapping = MappingProperty{Type: "long"} + case reflect.Bool: + dataField.mapping = MappingProperty{Type: "boolean"} + case reflect.String: + dataField.mapping = MappingProperty{Type: "text"} + default: + if structField.Type == reflect.TypeOf(time.Now()) { + dataField.mapping = MappingProperty{Type: "date"} + } else { + dataField.mapping = MappingProperty{Type: "text"} + } + } + + //allow user to change that with a search tag on the field + switch structField.Tag.Get("search") { + case "": + //no change + case "keyword": + dataField.mapping = MappingProperty{Type: "keyword"} + case "long": + dataField.mapping = MappingProperty{Type: "long"} + case "date": + dataField.mapping = MappingProperty{Type: "date"} + case "boolean": + dataField.mapping = MappingProperty{Type: "boolean"} + case "object": + dataField.mapping = MappingProperty{Type: "boolean", Enabled: false} + default: + return nil, errors.Errorf("Unknown search:\"%s\" on index(%s) field(%s)", structField.Tag.Get("search"), indexName, structField.Name) + } + + //add to index spec + indexSpec.Mappings.Properties[dataField.name] = dataField.mapping + + //add to list of fields + ts.fields = append(ts.fields, dataField) + } + + //add header fields for all time series to the index spec + for n, p := range map[string]MappingProperty{ + "@timestamp": {Type: "date"}, + "@end_time": {Type: "date"}, + "@duration_ms": {Type: "long"}, + } { + indexSpec.Mappings.Properties[n] = p + } + + //todo: find out what is significance of "@..." in the name - or just convention? Is user allowed to use it too? + + //create the index if it does not already exist + jsonIndexSpec, err := json.Marshal(indexSpec) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal index spec") + } + logger.Debugf("JSON Index Specification: %s", string(jsonIndexSpec)) + res, err := w.api.Create( + indexName, //index name + indexName, //index name also used for document id + strings.NewReader(string(jsonIndexSpec))) + if err != nil { + return nil, errors.Wrapf(err, "failed to create index(%s)", indexName) + } + switch res.StatusCode { + case http.StatusOK: + return ts, nil + case http.StatusCreated: + return ts, nil + case http.StatusConflict: //409 = already exists + return ts, nil + default: + return nil, errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) + } +} + +//data must be of type specified in Writer.TimeSeries(tmpl) +func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { + t := reflect.TypeOf(data) + if t != ts.dataType { + return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.indexName, ts.dataType.Name()) + } + + //store all the data struct field values into a map[string]interface{} that we can marshal + //which includes the header fields + searchDoc := map[string]interface{}{} + + v := reflect.ValueOf(data) + for _, field := range ts.fields { + searchDoc[field.name] = v.FieldByIndex(field.index).Interface() + } + + //set header values + searchDoc["@timestamp"] = startTime + searchDoc["@end_time"] = endTime + searchDoc["@duration_ms"] = endTime.Sub(startTime) / time.Millisecond + return ts.w.Write(ts.indexName, searchDoc) +} diff --git a/search/writer.go b/search/writer.go new file mode 100644 index 0000000..a36e05f --- /dev/null +++ b/search/writer.go @@ -0,0 +1,195 @@ +package search + +import ( + "context" + "crypto/tls" + "encoding/json" + "net/http" + "strings" + + opensearch "github.com/opensearch-project/opensearch-go" + opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" + "gitlab.com/uafrica/go-utils/errors" +) + +type Writer interface { + TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error) +} + +func New(config Config) (Writer, error) { + if err := config.Validate(); err != nil { + return nil, errors.Wrapf(err, "invalid config") + } + w := &writer{ + config: config, + } + + // Initialize the client with SSL/TLS enabled. + var err error + w.client, err = opensearch.NewClient(opensearch.Config{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Addresses: config.Addresses, + Username: config.Username, + Password: config.Password, + }) + if err != nil { + return nil, errors.Wrapf(err, "cannot initialize opensearch connection") + } + // Print OpenSearch version information on console. + //fmt.Println(client.Info()) + + w.api = opensearchapi.New(w.client) + return w, nil +} + +//implements audit.Auditor +type writer struct { + config Config + client *opensearch.Client + api *opensearchapi.API +} + +func (writer writer) Write(indexName string, doc map[string]interface{}) error { + if writer.client == nil { + return errors.Errorf("writer closed") + } + jsonDoc, err := json.Marshal(doc) + if err != nil { + return errors.Wrapf(err, "failed to JSON encode document") + } + indexResponse, err := writer.api.Index( + indexName, + strings.NewReader(string(jsonDoc)), + ) + if err != nil { + return errors.Wrapf(err, "failed to index document") + } + var res IndexResponse + if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil { + return errors.Wrapf(err, "failed to decode JSON response") + } + //success example: + //res = map[ + // _id:oZJZxXwBPnbPDFcjNpuO + // _index:go-utils-audit-test + // _primary_term:1 + // _seq_no:5 + // _shards:map[failed:0 successful:2 total:2] + // _type:_doc + // _version:1 + // result:created + // ] + //error example: + //res = map[ + // error:map[ + // reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value + // root_cause:[ + // map[reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value type:mapper_parsing_exception] + // ] + // type:mapper_parsing_exception] + // status:400 + //] + if res.Error != nil { + return errors.Errorf("failed to insert: %v", res.Error.Reason) + } + return nil +} + +func (writer writer) Search() ([]interface{}, error) { + if writer.client == nil { + return nil, errors.Errorf("writer closed") + } + // Search for the document. + content := strings.NewReader(`{ + "size": 5, + "query": { + "multi_match": { + "query": "miller", + "fields": ["title^2", "director"] + } + } + }`) + + search := opensearchapi.SearchRequest{ + Body: content, + } + + searchResponse, err := search.Do(context.Background(), writer.client) + if err != nil { + return nil, errors.Wrapf(err, "failed to search document") + } + var res interface{} + if err := json.NewDecoder(searchResponse.Body).Decode(&res); err != nil { + return nil, errors.Wrapf(err, "failed to decode JSON body") + } + return nil, errors.Errorf("NYI search result processing: %v", res) +} + +// // Delete the document. +// delete := opensearchapi.DeleteRequest{ +// Index: IndexName, +// DocumentID: docId, +// } + +// deleteResponse, err := delete.Do(context.Background(), client) +// if err != nil { +// fmt.Println("failed to delete document ", err) +// os.Exit(1) +// } +// fmt.Println("deleting document") +// fmt.Println(deleteResponse) + +// // Delete previously created index. +// deleteIndex := opensearchapi.IndicesDeleteRequest{ +// Index: []string{writer.config.IndexName}, +// } + +// deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) +// if err != nil { +// fmt.Println("failed to delete index ", err) +// os.Exit(1) +// } +// fmt.Println("deleting index", deleteIndexResponse) + +type CreateResponse struct { + Error *Error `json:"error,omitempty"` +} + +type IndexResponse struct { + Error *Error `json:"error,omitempty"` + Result string `json:"result,omitempty"` //e.g. "created" + Index string `json:"_index,omitempty"` + ID string `json:"_id,omitempty"` + Version int `json:"_version,omitempty"` + SeqNo int `json:"_seq_no,omitempty"` + Shards *Shards `json:"_shards,omitempty"` + Type string `json:"_type,omitempty"` + PrimaryTerm int `json:"_primary_term,omitempty"` + // _id:oZJZxXwBPnbPDFcjNpuO + // _index:go-utils-audit-test + // _primary_term:1 + // _seq_no:5 + // _shards:map[failed:0 successful:2 total:2] + // _type:_doc + // _version:1 + // result:created + +} + +type Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Failed int `json:"failed"` +} + +type Error struct { + Reason string `json:"reason,omitempty"` + RootCause []Cause `json:"root_cause,omitempty"` +} + +type Cause struct { + Reason string `json:"reason,omitempty"` + Type string `json:"type,omitempty"` +} diff --git a/search/writer_test.go b/search/writer_test.go new file mode 100644 index 0000000..f09757a --- /dev/null +++ b/search/writer_test.go @@ -0,0 +1,78 @@ +package search_test + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/search" +) + +func TestLocalWriter(t *testing.T) { + test(t, search.Config{}) +} +func TestDevWriter(t *testing.T) { + test(t, search.Config{ + Addresses: []string{"https://search-uafrica-v3-api-logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/"}, //from AWS Console OpenSearch Service > Domains > uafrica-v3-api-logs > General Information: Domain Endpoints + Username: "uafrica", + Password: "Aiz}a4ee", + }) +} + +func test(t *testing.T, c search.Config) { + logger.SetGlobalFormat(logger.NewConsole()) + logger.SetGlobalLevel(logger.LevelDebug) + a, err := search.New(c) + if err != nil { + t.Fatalf("failed to create writer: %+v", err) + } + + ts, err := a.TimeSeries("go-utils-audit-test", testStruct{}) + if err != nil { + t.Fatalf("failed to create time series: %+v", err) + } + + //write N records + methods := []string{"GET", "POST", "GET", "PATCH", "GET", "GET", "DELETE", "GET", "GET"} //more gets than others + paths := []string{"/users", "/orders", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates"} + N := 100 + testTime := time.Now().Add(-time.Second * time.Duration(N)) + for i := 0; i < N; i++ { + testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Second))) + method := methods[i%len(methods)] + path := paths[i%len(paths)] + if err := ts.Write( + testTime, + testTime.Add(-time.Duration(float64(time.Second)*(float64(rand.Intn(100))/100.0+0.1))), + testStruct{ + Test1: fmt.Sprintf("%d", i+1), //1,2,3,... + Test2: fmt.Sprintf("ACC_%05d", 93+i%7), //ACC_00093..ACC00100 + Test3: i%3 + 8, //8,9, or 10 + HTTP: httpData{ + Method: method, + Path: path, + }, + HTTPMethod: method, + HTTPPath: path, + }); err != nil { + t.Fatalf("failed to add doc: %+v", err) + } + } + t.Logf("Done") +} + +type testStruct struct { + Test1 string `json:"test1"` + Test2 string `json:"test2"` + Test3 int `json:"test3"` + HTTP httpData `json:"http"` + HTTPMethod string `json:"http_method" search:"keyword"` + HTTPPath string `json:"http_path" search:"keyword"` +} + +type httpData struct { + Method string `json:"method" search:"keyword"` + Path string `json:"path" search:"keyword"` +} -- GitLab