diff --git a/go.mod b/go.mod index f05f4d61ae708a5d3b8de7ef2c05e7164eb52484..55391096b8225c1b3def8ea8c8d90b012d098ac0 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,15 @@ go 1.17 require ( github.com/MindscapeHQ/raygun4go v1.1.1 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/aws/aws-lambda-go v1.26.0 - github.com/aws/aws-sdk-go v1.42.12 + github.com/aws/aws-sdk-go v1.43.2 github.com/aws/aws-secretsmanager-caching-go v1.1.0 github.com/go-pg/pg/v10 v10.10.6 github.com/go-redis/redis/v8 v8.11.4 github.com/go-redis/redis_rate/v9 v9.1.2 github.com/google/uuid v1.3.0 - github.com/opensearch-project/opensearch-go v1.0.0 + github.com/opensearch-project/opensearch-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/r3labs/diff/v2 v2.14.2 github.com/sirupsen/logrus v1.8.1 @@ -20,7 +21,6 @@ require ( ) require ( - github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-errors/errors v1.4.1 // indirect @@ -37,7 +37,7 @@ require ( github.com/vmihailenco/tagparser v0.1.2 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect - golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect + golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7 // indirect google.golang.org/appengine v1.6.6 // indirect google.golang.org/protobuf v1.26.0 // indirect diff --git a/go.sum b/go.sum index 7e557f8aa2c3567e91df5e27f0d9f7ec01879008..ce3c9fd3caa620d4e596485d4b1a7a6072b57db4 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,9 @@ github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoU github.com/aws/aws-lambda-go v1.26.0 h1:6ujqBpYF7tdZcBvPIccs98SpeGfrt/UOVEiexfNIdHA= github.com/aws/aws-lambda-go v1.26.0/go.mod h1:jJmlefzPfGnckuHdXX7/80O3BvUUi12XOkbv4w9SGLU= github.com/aws/aws-sdk-go v1.19.23/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.42.12 h1:zVrAgi3/HuMPygZknc+f2KAHcn+Zuq767857hnHBMPA= -github.com/aws/aws-sdk-go v1.42.12/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/aws/aws-sdk-go v1.42.27/go.mod h1:OGr6lGMAKGlG9CVrYnWYDKIyb829c6EVBRjxqjmPepc= +github.com/aws/aws-sdk-go v1.43.2 h1:T6LuKCNu8CYXXDn3xJoldh8FbdvuVH7C9aSuLNrlht0= +github.com/aws/aws-sdk-go v1.43.2/go.mod h1:OGr6lGMAKGlG9CVrYnWYDKIyb829c6EVBRjxqjmPepc= github.com/aws/aws-secretsmanager-caching-go v1.1.0 h1:vcV94XGJ9KouXKYBTMqgrBw96Tae8JKLmoUZ5SbaXNo= github.com/aws/aws-secretsmanager-caching-go v1.1.0/go.mod h1:wahQpJP1dZKMqjGFAjGCqilHkTlN0zReGWocPLbXmxg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -97,8 +98,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/opensearch-project/opensearch-go v1.0.0 h1:8Gh7B7Un5BxuxWAgmzleEF7lpOtC71pCgPp7lKr3ca8= -github.com/opensearch-project/opensearch-go v1.0.0/go.mod h1:FrUl/52DBegRYvK7ISF278AXmjDV647lyTnsLGBR7J4= +github.com/opensearch-project/opensearch-go v1.1.0 h1:eG5sh3843bbU1itPRjA9QXbxcg8LaZ+DjEzQH9aLN3M= +github.com/opensearch-project/opensearch-go v1.1.0/go.mod h1:+6/XHCuTH+fwsMJikZEWsucZ4eZMma3zNSeLrTtVGbo= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -165,8 +166,8 @@ golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211216030914-fe4d6282115f h1:hEYJvxw1lSnWIl8X9ofsYMklzaDs90JI2az5YMd4fPM= +golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/search/document_store.go b/search/document_store.go index bf4cefe41df7658755e9d961de9e5b8fcd56e5a7..9a6fb1c3518e6be0c9e8a3c1b6e5eff47b831548 100644 --- a/search/document_store.go +++ b/search/document_store.go @@ -101,8 +101,8 @@ func (ds *DocumentStore) Write(id string, data interface{}) error { indexName := ds.name // + "-" + startTime.Format("20060102") if !ds.created { res, err := ds.w.api.Create( - indexName, //index name - indexName, //document id + indexName, // index name + indexName, // document id strings.NewReader(string(ds.jsonSettings))) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) @@ -110,7 +110,7 @@ func (ds *DocumentStore) Write(id string, data interface{}) error { switch res.StatusCode { case http.StatusOK: case http.StatusCreated: - case http.StatusConflict: //409 = already exists + case http.StatusConflict: // 409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } @@ -125,7 +125,7 @@ func (ds *DocumentStore) Write(id string, data interface{}) error { switch res.StatusCode { case http.StatusOK: case http.StatusCreated: - case http.StatusConflict: //409 = already exists + case http.StatusConflict: // 409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } @@ -164,7 +164,7 @@ func (ds *DocumentStore) Search(query Query, limit int64) (res *SearchResponseHi body := SearchRequestBody{ Size: limit, Query: query, - Timeout: "29s", + Timeout: Timeout, } jsonBody, _ := json.Marshal(body) diff --git a/search/opensearch_types.go b/search/opensearch_types.go index 75ccbc81d53523e8de79d1aa64d11079a6711874..7a05e67303e5eea81c09a21ed06de9f0a4a481e9 100644 --- a/search/opensearch_types.go +++ b/search/opensearch_types.go @@ -1,7 +1,6 @@ package search -import "time" - +// Settings // Mapping configures an index in OpenSearch type Settings struct { Index *SettingsIndex `json:"index,omitempty"` @@ -37,69 +36,59 @@ type SearchRequestBody struct { Size int64 `json:"size,omitempty"` // limit From int64 `json:"from,omitempty"` // offset Query Query `json:"query"` - Timeout string `json:"timeout"` // timeout for search + Timeout string `json:"timeout,omitempty"` // timeout for search } +// Query NOTE: We are only using bool filter queries, to be able to always do the correct sorting, not based on the relevancy score +// https://opensearch.org/docs/latest/opensearch/query-dsl/bool/ type Query struct { - //one of: - Match *QueryNameValue `json:"match,omitempty" doc:"<field>:<value>"` + Bool *QueryBool `json:"bool,omitempty"` +} + +type QueryBool struct { + Filter []FilterQuery `json:"filter,omitempty"` // List of things that must appear in matching documents. However, unlike must the score of the query will be ignored. Filter clauses are executed in filter context, meaning that scoring is ignored and clauses are considered for caching + MustNot []FilterQuery `json:"must_not,omitempty"` // List of things that must not appear in the matching documents. Clauses are executed in filter context meaning that scoring is ignored and clauses are considered for caching. Because scoring is ignored, a score of 0 for all documents is returned +} + +type FilterQuery struct { + // one of: + Match *QueryMatch `json:"match,omitempty"` Term *QueryTerm `json:"term,omitempty"` Range *QueryRange `json:"range,omitempty"` MultiMatch *QueryMultiMatch `json:"multi_match,omitempty"` Bool *QueryBool `json:"bool,omitempty"` QueryString *QueryString `json:"query_string,omitempty"` - Wildcard *QueryNameValue `json:"wildcard,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html + Wildcard *QueryWildcard `json:"wildcard,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html } -type QueryTerm map[string]string //<oper>:<value> +type QueryMatch map[string]string -type QueryMultiMatch struct { - Query string `json:"query" doc:"Full value match search in selected fields"` - Fields []string `json:"fields,omitempty" doc:"List of fields"` -} +type QueryTerm map[string]string -type QueryString struct { - Query string `json:"query" doc:"Text search with partial matches, using asterisk for optional or question mark for required wildcards before and/or after text"` - Fields []string `json:"fields,omitempty" doc:"List of fields"` -} - -// https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html -type QueryBool struct { - Must []Query `json:"must,omitempty" docs:"List of things that must appear in matching documents and will contribute to the score."` - Filter []Query `json:"filter,omitempty" doc:"List of things that must appear in matching documents. However unlike must the score of the query will be ignored. Filter clauses are executed in filter context, meaning that scoring is ignored and clauses are considered for caching."` - Should []Query `json:"should,omitempty" doc:"List of things that should appear in the matching document."` - MustNot []Query `json:"must_not,omitempty" doc:"List of things that must not appear in the matching documents. Clauses are executed in filter context meaning that scoring is ignored and clauses are considered for caching. Because scoring is ignored, a score of 0 for all documents is returned."` -} - -// <name>:<value> can be shorthanded to just a text value "...", but for sake of go type def, we always use an object meaning the same, allowing more options -// https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html#query-dsl-match-query-short-ex -type QueryNameValue map[string]QueryValue +type QueryWildcard map[string]string -type QueryValue struct { - Query string `json:"query,omitempty"` - Operator string `json:"operator,omitempty"` // defaults to "or", accepted values: or|and - Fuzziness string `json:"fuzziness,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#fuzziness - ZeroTermsQuery string `json:"zero_terms_query,omitempty"` - Value string `json:"value,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html +type QueryMultiMatch struct { + Query string `json:"query"` // Full value match search in selected fields + Fields []string `json:"fields,omitempty" doc:"List of fields"` + Type QueryMultiMatchType `json:"type,omitempty"` } -func QueryValueTextValue(text string) QueryValue { - return QueryValue{Value: text} -} +type QueryMultiMatchType string -func QueryValueText(text string) QueryValue { - return QueryValue{Query: text, Operator: "and"} -} +const ( + QueryMultiMatchTypePhrase QueryMultiMatchType = "phrase" +) -func QueryValueTime(t time.Time) QueryValue { - return QueryValue{Query: t.String(), Operator: "and"} +type QueryString struct { + Query string `json:"query"` // Text search with partial matches, using asterisk for optional or question mark for required wildcards before and/or after text + Fields []string `json:"fields,omitempty" doc:"List of fields"` } type QueryRange map[string]QueryExpr -type QueryExpr map[string]string // <oper>:<value> e.g. "gte":"10" +type QueryExpr map[string]string -// example of search response body: +// SearchResponseBody example: // { // "took":872, // "timed_out":false, @@ -172,7 +161,7 @@ type HitDoc struct { Source map[string]interface{} `json:"_source"` // the document of itemType } -// Get Response Body Example: +// GetResponseBody Example: // { // "_index": "go-utils-search-docs-test", // "_type": "_doc", diff --git a/search/time_series.go b/search/time_series.go index ef2ee84d2b76e632a7c2a9ea889dfa95610bb9c9..b6cb0d3422bdccfb92e2a9e22bec55a364a87f7a 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -10,13 +10,13 @@ import ( "strings" "time" - opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" + "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" ) const TimeFormat = "2006-01-02T15:04:05Z07:00" -const Timeout = "29s" // API Gateway times out after 30s, so return just before that +const Timeout = "25s" // API Gateway times out after 30s, so return before that // TimeSeriesHeader embed this into your log struct type TimeSeriesHeader struct { @@ -52,7 +52,7 @@ func (w *Writer) NewTimeSeries(name string, tmpl interface{}) (TimeSeries, error return ts, errors.Errorf("invalid index_name:\"%s\"", name) } - //if already created, just return + // if already created, just return if existingTimeSeries, ok := w.timeSeriesByName[name]; ok { return existingTimeSeries, nil } @@ -72,7 +72,7 @@ func (w *Writer) NewTimeSeries(name string, tmpl interface{}) (TimeSeries, error createdDates: map[string]bool{}, } - //define the OpenSearch index mapping + // define the OpenSearch index mapping ts.settings = Settings{ Index: &SettingsIndex{ NumberOfShards: 4, @@ -145,7 +145,7 @@ func structMappingProperties(structType reflect.Type) (map[string]MappingPropert fieldMapping = MappingProperty{Type: "text"} case reflect.Slice: - //do not indicate slice, just map slice items as sub-items + // do not indicate slice, just map slice items as sub-items subStructProperties, err := structMappingProperties(structField.Type.Elem()) if err != nil { return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name) @@ -216,15 +216,15 @@ func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) erro return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType) } - //get daily search index to write to, from start time + // get daily search index to write to, from start time indexName := ts.name + "-" + startTime.Format("20060102") if _, ok := ts.createdDates[indexName]; !ok { - //create new index for this date - if not exists - + // create new index for this date - if not exists res, err := ts.w.api.Create( indexName, // index name indexName, // index name also used for document id - strings.NewReader(string(ts.jsonSettings))) + strings.NewReader(string(ts.jsonSettings)), + ) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } @@ -282,7 +282,7 @@ func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) { return nil, nil } - //make list of indices matching specified name e.g. "uafrica-v3-api-api_logs-*" + // make list of indices matching specified name e.g. "uafrica-v3-api-api_logs-*" res, err := ts.w.api.Indices.Get([]string{ts.name + "-*"}, ts.w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"})) if err != nil { return nil, errors.Wrapf(err, "failed to list existing %s-* indices", ts.name) @@ -302,8 +302,8 @@ func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) { // working in local time, assuming the server runs in same location as API user... // so that index rotates at midnight local time rather than midnight UTC t0 := time.Now() - timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time - timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) //= N days before that + timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) // = midnight yesterday in local time + timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) // = N days before that indicesToDelete := []string{} for dailyIndexName, dailyIndexInfo := range indices { @@ -343,7 +343,7 @@ type IndexSettings struct { CreationDate string `json:"creation_date"` NumberOfShards string `json:"number_of_shards"` NumberOfReplicas string `json:"number_of_replicas"` - ProviderName string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103" + ProviderName string `json:"provided_name"` // e.g. "go-utils-audit-test-20211103" } // Search returns docs indexed on OpenSearch document ID which cat be used in Get(id) @@ -383,16 +383,20 @@ func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, Size: limit, From: offset, Query: query, - Timeout: Timeout, + Timeout: Timeout, // this doesn't work } jsonBody, _ := json.Marshal(body) logs.Info("Search: %s", string(jsonBody)) + search := opensearchapi.SearchRequest{ Index: []string{ts.name + "-*"}, Body: bytes.NewReader(jsonBody), } + trueVal := true + search.AllowPartialSearchResults = &trueVal + searchResponse, err := search.Do(context.Background(), ts.w.client) if err != nil { err = errors.Wrapf(err, "failed to search documents") diff --git a/search/writer.go b/search/writer.go index 7ddd5a2521a7472b631535ac59379848552ff970..72f9d24ce05e6182ea7c3b02a588ee843974fdab 100644 --- a/search/writer.go +++ b/search/writer.go @@ -3,13 +3,13 @@ package search import ( "crypto/tls" "encoding/json" - "net/http" - "strings" - opensearch "github.com/opensearch-project/opensearch-go" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" + "net/http" + "strings" + "time" ) type Writer struct { @@ -34,7 +34,8 @@ func New(config Config) (Writer, error) { searchConfig := opensearch.Config{ Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + ResponseHeaderTimeout: time.Second * 28, // timeout the request just before api gateway times out }, Addresses: config.Addresses, Username: config.Username, @@ -50,6 +51,7 @@ func New(config Config) (Writer, error) { logs.Info("Search client created with config: %+v", searchConfig) w.api = opensearchapi.New(w.client) + return w, nil } @@ -81,8 +83,8 @@ func (writer Writer) Write(indexName string, id string, doc interface{}) (*Index if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil { return nil, errors.Wrapf(err, "failed to decode JSON response") } - //success example: - //res = map[ + // success example: + // res = map[ // _id:oZJZxXwBPnbPDFcjNpuO // _index:go-utils-audit-test // _primary_term:1 @@ -92,8 +94,8 @@ func (writer Writer) Write(indexName string, id string, doc interface{}) (*Index // _version:1 // result:created // ] - //error example: - //res = map[ + // error example: + // res = map[ // error:map[ // reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value // root_cause:[ @@ -101,7 +103,7 @@ func (writer Writer) Write(indexName string, id string, doc interface{}) (*Index // ] // type:mapper_parsing_exception] // status:400 - //] + // ] if res.Error != nil { return nil, errors.Errorf("failed to insert: %v", res.Error.Reason) } @@ -114,7 +116,7 @@ type CreateResponse struct { type IndexResponse struct { Error *Error `json:"error,omitempty"` - Result string `json:"result,omitempty"` //e.g. "created" + Result string `json:"result,omitempty"` // e.g. "created" Index string `json:"_index,omitempty"` ID string `json:"_id,omitempty"` Version int `json:"_version,omitempty"`