diff --git a/search/document_store.go b/search/document_store.go index 7690f33c03a2dd9582ccdf8c5aeac7ad5b658759..bf4cefe41df7658755e9d961de9e5b8fcd56e5a7 100644 --- a/search/document_store.go +++ b/search/document_store.go @@ -12,20 +12,17 @@ import ( opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" - "gitlab.com/uafrica/go-utils/reflection" ) type DocumentStore struct { - w *Writer - name string - dataType reflect.Type - settings Settings - mappings Mappings - jsonSettings []byte - jsonMappings []byte - created bool - searchResponseBodyType reflect.Type - getResponseBodyType reflect.Type + w *Writer + name string + dataType reflect.Type + settings Settings + mappings Mappings + jsonSettings []byte + jsonMappings []byte + created bool } // NewDocumentStore purpose: @@ -42,7 +39,7 @@ func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, return ds, errors.Errorf("invalid index_name:\"%s\"", name) } - //if already created, just return + // if already created, just return if existingDocumentStore, ok := w.documentStoreByName[name]; ok { return existingDocumentStore, nil } @@ -59,7 +56,7 @@ func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, created: false, } - //define the OpenSearch index mapping + // define the OpenSearch index mapping ds.settings = Settings{ Index: &SettingsIndex{ NumberOfShards: 4, @@ -86,32 +83,11 @@ func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, } logs.Info("%s Index Mappings: %s", structType, string(ds.jsonMappings)) - //define search response type - //similar to SearchResponseBody - ds.searchResponseBodyType, err = reflection.CloneType( - reflect.TypeOf(SearchResponseBody{}), - map[string]reflect.Type{ - ".hits.hits[]._source": ds.dataType, - }) - if err != nil { - return ds, errors.Wrapf(err, "failed to make search response type for document store") - } - - //define get response type - //similar to GetResponseBody - ds.getResponseBodyType, err = reflection.CloneType( - reflect.TypeOf(GetResponseBody{}), - map[string]reflect.Type{ - "._source": ds.dataType, - }) - if err != nil { - return ds, errors.Wrapf(err, "failed to make get response type for document store") - } w.documentStoreByName[name] = ds return ds, nil } -//data must be of type specified in Writer.TimeSeries(tmpl) +// data must be of type specified in Writer.TimeSeries(tmpl) func (ds *DocumentStore) Write(id string, data interface{}) error { if data == nil { return errors.Errorf("data:nil") @@ -121,7 +97,7 @@ func (ds *DocumentStore) Write(id string, data interface{}) error { return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType) } - //get daily search index to write to, from start time + // get daily search index to write to, from start time indexName := ds.name // + "-" + startTime.Format("20060102") if !ds.created { res, err := ds.w.api.Create( @@ -163,12 +139,12 @@ func (ds *DocumentStore) Write(id string, data interface{}) error { return nil } -//Search -//Return: +// Search +// Return: // docs will be a slice of the DocumentStore data type -func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) { +func (ds *DocumentStore) Search(query Query, limit int64) (res *SearchResponseHits, err error) { if ds == nil { - return nil, 0, errors.Errorf("document store == nil") + return nil, errors.Errorf("document store == nil") } if limit < 0 || limit > 1000 { err = errors.Errorf("limit=%d not 0..1000", limit) @@ -186,8 +162,9 @@ func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCo // } // } body := SearchRequestBody{ - Size: limit, - Query: query, + Size: limit, + Query: query, + Timeout: "29s", } jsonBody, _ := json.Marshal(body) @@ -213,34 +190,18 @@ func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCo bodyData, _ := ioutil.ReadAll(searchResponse.Body) logs.Info("Response Body: %s", string(bodyData)) - resBodyPtrValue := reflect.New(ds.searchResponseBodyType) - // if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { - if err = json.Unmarshal(bodyData, resBodyPtrValue.Interface()); err != nil { - err = errors.Wrapf(err, "cannot decode search response body") - return - } - - logs.Info("Response Parsed: %+v", resBodyPtrValue.Interface()) - - hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") + var response SearchResponseBody + err = json.Unmarshal(bodyData, &response) if err != nil { - err = errors.Wrapf(err, "cannot get total nr of hits") + logs.Info("search response body: %s", string(bodyData)) + err = errors.Wrapf(err, "cannot decode search response body") return } - if hitsTotalValue.Interface().(int) < 1 { - return nil, 0, nil //no matches - } - foundIDs, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._id") - if err != nil { - err = errors.Wrapf(err, "cannot get search response documents") - return - } - //logs.Errorf("items: (%T) %+v", foundIDs.Interface(), foundIDs.Interface()) - return foundIDs.Interface().([]string), hitsTotalValue.Interface().(int), nil + return &response.Hits, nil } -func (ds *DocumentStore) Get(id string) (doc interface{}, err error) { +func (ds *DocumentStore) Get(id string) (res *GetResponseBody, err error) { if ds == nil { return nil, errors.Errorf("document store == nil") } @@ -263,28 +224,18 @@ func (ds *DocumentStore) Get(id string) (doc interface{}, err error) { return } - resBodyPtrValue := reflect.New(ds.getResponseBodyType) - if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { - err = errors.Wrapf(err, "cannot decode get response body") - return - } + bodyData, _ := ioutil.ReadAll(getResponse.Body) + logs.Info("Get Response Body: %s", string(bodyData)) - foundVar, err := reflection.Get(resBodyPtrValue, ".found") + var response GetResponseBody + err = json.Unmarshal(bodyData, &response) if err != nil { - err = errors.Wrapf(err, "cannot get found value") + logs.Info("GET response body: %s", string(bodyData)) + err = errors.Wrapf(err, "cannot unmarshal GET response body") return } - if found, ok := foundVar.Interface().(bool); !ok || !found { - return nil, nil //not found - } - //found - source, err := reflection.Get(resBodyPtrValue, "._source") - if err != nil { - err = errors.Wrapf(err, "cannot get document from get response") - return - } - return source.Interface(), nil + return &response, nil } func (ds *DocumentStore) Delete(id string) (err error) { diff --git a/search/document_store_test.go b/search/document_store_test.go deleted file mode 100644 index fb7cd16436c2254f6123e9342cbfc65eb030b402..0000000000000000000000000000000000000000 --- a/search/document_store_test.go +++ /dev/null @@ -1,222 +0,0 @@ -package search_test - -import ( - "fmt" - "math/rand" - "strings" - "testing" - "time" - - "github.com/google/uuid" - "gitlab.com/uafrica/go-utils/search" -) - -func TestLocalDocuments(t *testing.T) { - testDocuments(t, search.Config{ - Addresses: []string{"https://localhost:9200"}, - }) -} - -func TestDevDocuments(t *testing.T) { - testDocuments(t, search.Config{ - Addresses: []string{"https://search-uafrica-v3-api-api_logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/"}, //from AWS Console OpenSearch Service > Domains > uafrica-v3-api-api_logs > General Information: Domain Endpoints - Username: "uafrica", - Password: "Aiz}a4ee", - }) -} - -func testDocuments(t *testing.T, c search.Config) { - //logs.SetGlobalFormat(logs.NewConsole()) - //logs.SetGlobalLevel(logs.LevelDebug) - a, err := search.New(c) - if err != nil { - t.Fatalf("failed to create writer: %+v", err) - } - - indexName := "go-utils-search-docs-test" - ds, err := a.NewDocumentStore(indexName, SearchOrder{}) - if err != nil { - t.Fatalf("failed to create document store: %+v", err) - } - - //write N documents - buyers := []string{"Joe", "Anne", "Griet", "Kobus", "Caleb", "Roger", "Susan", "Maria", "Sandra"} - sellers := []string{"Hannelie", "Angelica", "Louis", "Bertus", "Bongi", "Vusi", "Andrew", "Joseph"} - nextItem := 0 - itemInfos := []testItemInfo{ - {Name: "Samsung 17\" LCD Monitor", Cost: 0}, - {Name: "Acer 15\" LED Monitor", Cost: 0}, - {Name: "Apple M1 16\" MAC", Cost: 0}, - {Name: "Red Dress size M", Cost: 0}, - {Name: "Grey Shorts size 115cm", Cost: 0}, - {Name: "Black Student Prince School Shoes Boys Size 8", Cost: 0}, - {Name: "Black Student Prince School Shoes Boys Size 9", Cost: 0}, - {Name: "Black Student Prince School Shoes Boys Size 10", Cost: 0}, - {Name: "Black Student Prince School Shoes Boys Size 11", Cost: 0}, - {Name: "Black Student Prince School Shoes Girst Size 6", Cost: 0}, - {Name: "Black Student Prince School Shoes Girst Size 7", Cost: 0}, - {Name: "Black Student Prince School Shoes Girst Size 8", Cost: 0}, - {Name: "Faber Castell HB Pencil", Cost: 0}, - {Name: "Faber Castell 2H Pencil", Cost: 0}, - {Name: "Faber Castell 4H Pencil", Cost: 0}, - {Name: "12 Colour Crayons", Cost: 0}, - {Name: "Steadler Rubber", Cost: 0}, - } - N := 100 - for i := 0; i < N; i++ { - //make a random document - id := uuid.New().String() - buyer := buyers[rand.Intn(len(buyers))] - seller := sellers[rand.Intn(len(sellers))] - doc := SearchOrder{ - ID: int64(i + 1), - AccountID: int64(i + 2), - AccountOrderNumber: int64(i * 2), - ChannelID: int64(i), - ChannelOrderNumber: fmt.Sprintf("CHO-%d", i), - ChannelOrderReference: fmt.Sprintf("REF-%05d", i), - CustomerName: buyer, - CustomerEmail: strings.ToLower(buyer + "@home.net"), - CustomerPhone: "123456789", - DeliveryAddress: "My Street", - Currency: "ZAR", - Items: []SearchOrderItem{}, - TotalPrice: 0, - TotalWeightKg: 0, - TotalQty: 0, - TotalFulfilledQty: 0, - Status: OrderStatusNew, - PaymentStatus: OrderPaymentStatusUnpaid, - TimeCreated: time.Now(), - TimeModified: nil, - Tags: "1(blue),2(red)", - BuyerSelectedShippingCost: 1.23, - BuyerSelectedShippingProvider: seller, - } - for i := 0; i < rand.Intn(5)+1; i++ { - itemInfo := itemInfos[nextItem] - nextItem++ - if nextItem >= len(itemInfos) { - nextItem = 0 - } - item := SearchOrderItem{ - SKU: fmt.Sprintf("SKU-%s-%03d", itemInfo.Name[:3], i), - Description: itemInfo.Name, - UnitPrice: itemInfo.Cost, - Qty: rand.Intn(3) + 1, - } - doc.Items = append(doc.Items, item) - doc.TotalPrice += item.UnitPrice * float64(item.Qty) - doc.TotalQty += item.Qty - } - - //add to the search - if err := ds.Write(id, doc); err != nil { - t.Fatalf("failed to add doc: %+v", err) - } - } - - //search some of the documents - query := search.Query{ - MultiMatch: &search.QueryMultiMatch{ - Query: buyers[0], - Fields: []string{"buyer"}, - }, - } - ids, totalCount, err := ds.Search(query, 10) - if err != nil { - t.Errorf("failed to search: %+v", err) - } else { - t.Logf("search result total_count:%d with %d ids", totalCount, len(ids)) - if len(ids) > 10 { - t.Errorf("got %d docs > max 10", len(ids)) - } - for _, id := range ids { - if getDoc, err := ds.Get(id); err != nil { - t.Fatalf("failed to get: %+v", err) - } else { - t.Logf(" GOT: %s: %+v", id, getDoc) - } - } - } - t.Logf("Done") -} - -// type testDocument struct { -// //UUID string `json:"uuid"` -// Buyer string `json:"buyer" search:"keyword"` -// Seller string `json:"seller" search:"keyword"` -// Items []docItem `json:"items"` -// TotalCost float64 `json:"total_cost"` -// TotalItems int `json:"total_items"` -// } - -// type docItem struct { -// Description string `json:"description" search:"keyword"` -// UnitCost float64 `json:"unit_cost"` -// Qty int `json:"qty"` -// } - -type testItemInfo struct { - Name string - Cost float64 -} - -type SearchOrder struct { - ID int64 `json:"id"` - AccountID int64 `json:"account_id" search:"keyword"` - AccountOrderNumber int64 `json:"account_order_number"` - ChannelID int64 `json:"channel,omitempty" search:"keyword"` - ChannelOrderNumber string `json:"channel_order_number,omitempty"` - ChannelOrderReference string `json:"channel_order_reference,omitempty"` - CustomerName string `json:"customer_name,omitempty"` - CustomerEmail string `json:"customer_email,omitempty"` - CustomerPhone string `json:"customer_phone,omitempty"` - DeliveryAddress string `json:"delivery_address,omitempty"` - Currency string `json:"currency" search:"keyword"` - Items []SearchOrderItem `json:"items,omitempty"` - TotalPrice float64 `json:"total_price"` - TotalWeightKg float64 `json:"total_weight_kg"` - TotalQty int `json:"total_qty"` - TotalFulfilledQty int `json:"total_fulfilled_qty"` - Status OrderStatus `json:"status" search:"keyword"` - PaymentStatus OrderPaymentStatus `json:"payment_status" search:"keyword"` - TimeCreated time.Time `json:"time_created"` - TimeModified *time.Time `json:"time_modified,omitempty"` - Tags string `json:"tags" search:"keyword"` //CSV or tags sorted, so [A,B] and [B,A] both -> "A,B" keyword - BuyerSelectedShippingCost float64 `json:"buyer_selected_shipping_cost,omitempty"` - BuyerSelectedShippingProvider string `json:"buyer_selected_shipping_provider,omitempty"` -} - -type SearchOrderItem struct { - SKU string `json:"sku" search:"keyword"` - Description string `json:"description" search:"keyword"` - Vendor string `json:"vendor" search:"keyword"` - UnitPrice float64 `json:"unit_price"` - UnitWeightKg float64 `json:"unit_weight_kg"` - Qty int `json:"qty"` - FulfilledQty int `json:"fulfilled_qty"` - TotalPrice float64 `json:"total_price"` - TotalWeightKg float64 `json:"total_weight_kg"` -} - -type OrderStatus string - -const ( - OrderStatusNew OrderStatus = "new" - OrderStatusCompleted OrderStatus = "completed" - OrderStatusCancelled OrderStatus = "cancelled" -) - -type OrderPaymentStatus string - -const ( - OrderPaymentStatusUnpaid OrderPaymentStatus = "unpaid" - OrderPaymentStatusPending OrderPaymentStatus = "pending" - OrderPaymentStatusPartiallyPaid OrderPaymentStatus = "partially-paid" - OrderPaymentStatusPaid OrderPaymentStatus = "paid" - OrderPaymentStatusPartiallyRefunded OrderPaymentStatus = "partially-refunded" - OrderPaymentStatusRefunded OrderPaymentStatus = "refunded" - OrderPaymentStatusVoided OrderPaymentStatus = "voided" - OrderPaymentStatusAuthorised OrderPaymentStatus = "authorised" -) diff --git a/search/opensearch_types.go b/search/opensearch_types.go index 6ddbfac65f5a36339ed1b79fd9cf8f224db9ed18..75ccbc81d53523e8de79d1aa64d11079a6711874 100644 --- a/search/opensearch_types.go +++ b/search/opensearch_types.go @@ -17,11 +17,10 @@ type Mappings struct { } type MappingProperty struct { - Type string `json:"type,omitempty"` //empty for sub-structs described with properties + Type string `json:"type,omitempty"` // empty for sub-structs described with properties Enabled bool `json:"enabled,omitempty"` Fields map[string]MappingFieldProperties `json:"fields,omitempty"` Properties map[string]MappingProperty `json:"properties,omitempty"` - //Index bool `json:"index,omitempty"` //set true to make text field searchable } type MappingFieldProperties struct { @@ -29,15 +28,16 @@ type MappingFieldProperties struct { } type MappingKeyword struct { - Type string `json:"type"` //="keyword" - IgnoreAbove int `json:"ignore_above"` //e.g. 256 + Type string `json:"type"` // ="keyword" + IgnoreAbove int `json:"ignore_above"` // e.g. 256 } type SearchRequestBody struct { - Sort []map[string]string `json:"sort,omitempty"` // order and order_by - Size int64 `json:"size,omitempty"` // limit - From int64 `json:"from,omitempty"` // offset - Query Query `json:"query"` + Sort []map[string]string `json:"sort,omitempty"` // order and order_by + Size int64 `json:"size,omitempty"` // limit + From int64 `json:"from,omitempty"` // offset + Query Query `json:"query"` + Timeout string `json:"timeout"` // timeout for search } type Query struct { @@ -48,7 +48,7 @@ type Query struct { MultiMatch *QueryMultiMatch `json:"multi_match,omitempty"` Bool *QueryBool `json:"bool,omitempty"` QueryString *QueryString `json:"query_string,omitempty"` - Wildcard *QueryNameValue `json:"wildcard,omitempty"` //https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html + Wildcard *QueryNameValue `json:"wildcard,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html } type QueryTerm map[string]string //<oper>:<value> @@ -77,10 +77,10 @@ type QueryNameValue map[string]QueryValue type QueryValue struct { Query string `json:"query,omitempty"` - Operator string `json:"operator,omitempty"` //defaults to "or", accepted values: or|and - Fuzziness string `json:"fuzziness,omitempty"` //https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#fuzziness + Operator string `json:"operator,omitempty"` // defaults to "or", accepted values: or|and + Fuzziness string `json:"fuzziness,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#fuzziness ZeroTermsQuery string `json:"zero_terms_query,omitempty"` - Value string `json:"value,omitempty"` //https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html + Value string `json:"value,omitempty"` // https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-wildcard-query.html } func QueryValueTextValue(text string) QueryValue { @@ -97,9 +97,9 @@ func QueryValueTime(t time.Time) QueryValue { type QueryRange map[string]QueryExpr -type QueryExpr map[string]string //<oper>:<value> e.g. "gte":"10" +type QueryExpr map[string]string // <oper>:<value> e.g. "gte":"10" -//example of search response body: +// example of search response body: // { // "took":872, // "timed_out":false, @@ -140,7 +140,7 @@ type QueryExpr map[string]string //<oper>:<value> e.g. "gte":"10" // } // } type SearchResponseBody struct { - Took int `json:"took"` //milliseconds + Took int `json:"took"` // milliseconds TimedOut bool `json:"timed_out"` Shards SearchResponseShards `json:"_shards"` Hits SearchResponseHits `json:"hits"` @@ -160,16 +160,16 @@ type SearchResponseHits struct { } type SearchResponseHitsTotal struct { - Value int `json:"value"` //e.g. 0 when no docs matched - Relation string `json:"relation"` //e.g. "eq" + Value int `json:"value"` // e.g. 0 when no docs matched + Relation string `json:"relation"` // e.g. "eq" } type HitDoc struct { - Index string `json:"_index"` //name of index - Type string `json:"_type"` //_doc + Index string `json:"_index"` // name of index + Type string `json:"_type"` // _doc ID string `json:"_id"` Score float64 `json:"_score"` // - Source map[string]interface{} `json:"_source"` //the document of itemType + Source map[string]interface{} `json:"_source"` // the document of itemType } // Get Response Body Example: @@ -184,12 +184,12 @@ type HitDoc struct { // "_source": { ... } // } type GetResponseBody struct { - Index string `json:"_index"` //name of index - Type string `json:"_type"` //_doc + Index string `json:"_index"` // name of index + Type string `json:"_type"` // _doc ID string `json:"_id"` Version int `json:"_version"` SeqNo int `json:"_seq_no"` PrimaryTerm int `json:"_primary_term"` Found bool `json:"found"` - Source map[string]interface{} `json:"_source"` //the document of itemType + Source map[string]interface{} `json:"_source"` // the document of itemType } diff --git a/search/search_test.go b/search/search_test.go deleted file mode 100644 index 9a3ee68440703dec1885ef15225eb0124fadd75f..0000000000000000000000000000000000000000 --- a/search/search_test.go +++ /dev/null @@ -1,139 +0,0 @@ -package search_test - -import ( - "fmt" - "math/rand" - "sort" - "testing" - "time" - - "gitlab.com/uafrica/go-utils/search" -) - -func TestLocalWriter(t *testing.T) { - test(t, search.Config{ - Addresses: []string{"https://localhost:9200"}, - }) -} - -func TestDevWriter(t *testing.T) { - test(t, search.Config{ - Addresses: []string{"https://search-uafrica-v3-api-api_logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/"}, //from AWS Console OpenSearch Service > Domains > uafrica-v3-api-api_logs > General Information: Domain Endpoints - Username: "uafrica", - Password: "Aiz}a4ee", - }) -} - -func test(t *testing.T, c search.Config) { - //logs.SetGlobalFormat(logs.NewConsole()) - //logs.SetGlobalLevel(logs.LevelDebug) - a, err := search.New(c) - if err != nil { - t.Fatalf("failed to create writer: %+v", err) - } - - indexName := "go-utils-audit-test" - ts, err := a.NewTimeSeries(indexName, testStruct{}) - if err != nil { - t.Fatalf("failed to create time series: %+v", err) - } - - //write N records - methods := []string{"GET", "POST", "GET", "PATCH", "GET", "GET", "DELETE", "GET", "GET"} //more gets than others - paths := []string{"/users", "/orders", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates"} - N := 100 - testTime := time.Now().Add(-time.Hour * time.Duration(N)) - for i := 0; i < N; i++ { - testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Hour))) - method := methods[i%len(methods)] - path := paths[i%len(paths)] - if err := ts.Write( - testTime, - testTime.Add(-time.Duration(float64(time.Second)*(float64(rand.Intn(100))/100.0+0.1))), - testStruct{ - TimeSeriesHeader: search.TimeSeriesHeader{}, - Test1: fmt.Sprintf("%d", i+1), //1,2,3,... - Test2: fmt.Sprintf("ACC_%05d", 93+i%7), //ACC_00093..ACC00100 - Test3: i%3 + 8, //8,9, or 10 - HTTP: httpData{ - Method: method, - Path: path, - }, - HTTPMethod: method, - HTTPPath: path, - }); err != nil { - t.Fatalf("failed to add doc: %+v", err) - } - } - - //query := search.Query{ - // MultiMatch: &search.QueryMultiMatch{ - // Query: "GET", - // Fields: []string{"http_method"}, - // }, - //} - - //docs, totalCount, err := ts.Search(query, 10) - //if err != nil { - // t.Errorf("failed to search: %+v", err) - //} else { - // if docsSlice, ok := docs.([]testStruct); ok { - // t.Logf("search result total_count:%d with %d docs", totalCount, len(docsSlice)) - // if len(docsSlice) > 10 { - // t.Errorf("got %d docs > max 10", len(docsSlice)) - // } - // } else { - // t.Errorf("docs %T is not []testStruct!", docs) - // } - //} - - oldList, err := ts.DelOldTimeSeries(2) - if err != nil { - t.Fatalf("failed to del old: %+v", err) - } - sort.Slice(oldList, func(i, j int) bool { return oldList[i] < oldList[j] }) - - t.Logf("Deleted %d old series", len(oldList)) - //indexes deleted depends on current time, so not verifying - // if len(oldList) != 2 || oldList[0] != "go-utils-audit-test-20211029" || oldList[1] != "go-utils-audit-test-20211030" { - // t.Fatalf("Did not delete expected indices") - // } - t.Logf("Done") -} - -type testStruct struct { - search.TimeSeriesHeader - Test1 string `json:"test1"` - Test2 string `json:"test2"` - Test3 int `json:"test3"` - HTTP httpData `json:"http"` //this is a sub-struct... - HTTPMethod string `json:"http_method" search:"keyword"` - HTTPPath string `json:"http_path" search:"keyword"` -} - -type httpData struct { - Method string `json:"method" search:"keyword"` - Path string `json:"path" search:"keyword"` - Size int `json:"size" search:"long"` -} - -func TestOlderThan(t *testing.T) { - local := time.Now().Location() - - //time now for test (using a fixed value in SAST) - t0, _ := time.ParseInLocation("2006-01-02 15:04:05", "2021-10-20 14:15:16", local) - t.Logf("t0=%s", t0) - - //threshold is 2 days older, applying at midnight in location SAST - olderThanDays := 2 - t.Logf("n=%d", olderThanDays) - - t1 := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, local) - t.Logf("t1=%s", t1) - t1 = t1.Add(-time.Hour * 24 * time.Duration(olderThanDays)) - t.Logf("Threshold = %s", t1) -} - -func TestTime(t *testing.T) { - t.Logf("Time: %s", time.Now().Format("2006-01-02T15:04:05Z07:00")) -} diff --git a/search/time_series.go b/search/time_series.go index 7e46f3d6e61a8d2926cddb3bfe45d50aeaa53456..ef2ee84d2b76e632a7c2a9ea889dfa95610bb9c9 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -13,10 +13,10 @@ import ( opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" - "gitlab.com/uafrica/go-utils/reflection" ) const TimeFormat = "2006-01-02T15:04:05Z07:00" +const Timeout = "29s" // API Gateway times out after 30s, so return just before that // TimeSeriesHeader embed this into your log struct type TimeSeriesHeader struct { @@ -34,9 +34,6 @@ type TimeSeries struct { jsonSettings []byte jsonMappings []byte createdDates map[string]bool - - searchResponseBodyType reflect.Type - getResponseBodyType reflect.Type } // NewTimeSeries purpose: @@ -102,28 +99,6 @@ func (w *Writer) NewTimeSeries(name string, tmpl interface{}) (TimeSeries, error } logs.Info("%s Index Mappings: %s", structType, string(ts.jsonMappings)) - //define search response type - //similar to SearchResponseBody - ts.searchResponseBodyType, err = reflection.CloneType( - reflect.TypeOf(SearchResponseBody{}), - map[string]reflect.Type{ - ".hits.hits[]._source": ts.dataType, - }) - if err != nil { - return ts, errors.Wrapf(err, "failed to make search response type for time-series") - } - - // define get response type - // similar to GetResponseBody - ts.getResponseBodyType, err = reflection.CloneType( - reflect.TypeOf(GetResponseBody{}), - map[string]reflect.Type{ - "._source": ts.dataType, - }) - if err != nil { - return ts, errors.Wrapf(err, "failed to make get response type for time-series") - } - w.timeSeriesByName[name] = ts return ts, nil } @@ -135,7 +110,7 @@ func structMappingProperties(structType reflect.Type) (map[string]MappingPropert fieldName := structField.Name - //fields of embedded (anonymous) structs are added at the same level + // fields of embedded (anonymous) structs are added at the same level if structField.Anonymous && structField.Type.Kind() == reflect.Struct { subFields, err := structMappingProperties(structField.Type) if err != nil { @@ -151,11 +126,11 @@ func structMappingProperties(structType reflect.Type) (map[string]MappingPropert fieldName = jsonTags[0] } if fieldName == "" { - logs.Info("Skip %s unnamed field %+v", structType, structField) + logs.Info("skip %s unnamed field %+v", structType, structField) continue } - //get default type of search value from field type + // get default type of search value from field type fieldMapping := MappingProperty{Type: "text"} switch structField.Type.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: @@ -231,7 +206,7 @@ func structMappingProperties(structType reflect.Type) (map[string]MappingPropert return properties, nil } -//data must be of type specified in Writer.TimeSeries(tmpl) +// data must be of type specified in Writer.TimeSeries(tmpl) func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) error { if data == nil { return errors.Errorf("data:nil") @@ -247,8 +222,8 @@ func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) erro //create new index for this date - if not exists res, err := ts.w.api.Create( - indexName, //index name - indexName, //index name also used for document id + indexName, // index name + indexName, // index name also used for document id strings.NewReader(string(ts.jsonSettings))) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) @@ -256,7 +231,7 @@ func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) erro switch res.StatusCode { case http.StatusOK: case http.StatusCreated: - case http.StatusConflict: //409 = already exists + case http.StatusConflict: // 409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } @@ -271,14 +246,14 @@ func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) erro switch res.StatusCode { case http.StatusOK: case http.StatusCreated: - case http.StatusConflict: //409 = already exists + case http.StatusConflict: // 409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } ts.createdDates[indexName] = true } - //copy to set the header values + // copy to set the header values x := reflect.New(ts.dataType) x.Elem().Set(reflect.ValueOf(data)) x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{ @@ -323,9 +298,9 @@ func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) { return nil, errors.Wrapf(err, "failed to read list of indices") } - //calculate time N days ago - //working in local time, assuming the server runs in same location as API user... - //so that index rotates at midnight local time rather than midnight UTC + // calculate time N days ago + // working in local time, assuming the server runs in same location as API user... + // so that index rotates at midnight local time rather than midnight UTC t0 := time.Now() timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) //= N days before that @@ -352,7 +327,7 @@ func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) { return indicesToDelete, nil } -//output from GET /_cat/indices for each index in the list +// output from GET /_cat/indices for each index in the list type indexInfo struct { Aliases map[string]interface{} `json:"aliases"` Mappings Mappings `json:"mappings"` @@ -384,9 +359,9 @@ type IndexSettings struct { // } // } // docs will be a slice of the TimeSeries data type -func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, offset int64) (docs map[string]interface{}, totalCount int, err error) { +func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, offset int64) (openSearchResult *SearchResponseHits, err error) { if ts == nil { - return nil, 0, errors.Errorf("time series == nil") + return nil, errors.Errorf("time series == nil") } if limit < 0 || limit > 1000 { err = errors.Errorf("limit=%d not 0..1000", limit) @@ -404,10 +379,11 @@ func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, // } // } body := SearchRequestBody{ - Sort: sort, - Size: limit, - From: offset, - Query: query, + Sort: sort, + Size: limit, + From: offset, + Query: query, + Timeout: Timeout, } jsonBody, _ := json.Marshal(body) @@ -434,47 +410,23 @@ func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, bodyData, _ := ioutil.ReadAll(searchResponse.Body) logs.Info("Response Body: %s", string(bodyData)) - resBodyPtrValue := reflect.New(ts.searchResponseBodyType) - if err = json.Unmarshal(bodyData, resBodyPtrValue.Interface()); err != nil { + var response SearchResponseBody + err = json.Unmarshal(bodyData, &response) + if err != nil { logs.Info("search response body: %s", string(bodyData)) err = errors.Wrapf(err, "cannot decode search response body") return } - hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") - if err != nil { - err = errors.Wrapf(err, "cannot get total nr of hits") - return - } - if hitsTotalValue.Interface().(int) < 1 { - return nil, 0, nil //no matches - } - - hits, err := reflection.Get(resBodyPtrValue, ".hits.hits[]") - if err != nil { - err = errors.Wrapf(err, "cannot get search response documents") - return - } - - docs = map[string]interface{}{} - if !hits.IsValid() { - return - } - for i := 0; i < hits.Len(); i++ { - hit := hits.Index(i) - index := hit.Field(0).Interface().(string) // HitDoc.Index - id := hit.Field(2).Interface().(string) // HitDoc.ID - docs[index+"/"+id] = hit.Field(4).Interface() // HitDoc.Source - } - return docs, hitsTotalValue.Interface().(int), nil + return &response.Hits, nil } // Get takes the id returned in Search() // The id is uuid assigned by OpenSearch when documents are added with Write(). // The document value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl) -func (ts *TimeSeries) Get(id string) (doc interface{}, err error) { +func (ts *TimeSeries) Get(id string) (res *GetResponseBody, err error) { if ts == nil { - return nil, errors.Errorf("document store == nil") + return nil, errors.Errorf("time series == nil") } parts := strings.SplitN(id, "/", 2) get := opensearchapi.GetRequest{ @@ -496,26 +448,16 @@ func (ts *TimeSeries) Get(id string) (doc interface{}, err error) { return } - resBodyPtrValue := reflect.New(ts.getResponseBodyType) - if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { - err = errors.Wrapf(err, "cannot decode get response body") - return - } + bodyData, _ := ioutil.ReadAll(getResponse.Body) + logs.Info("Get Response Body: %s", string(bodyData)) - foundVar, err := reflection.Get(resBodyPtrValue, ".found") + var response GetResponseBody + err = json.Unmarshal(bodyData, &response) if err != nil { - err = errors.Wrapf(err, "cannot get found value") + logs.Info("GET response body: %s", string(bodyData)) + err = errors.Wrapf(err, "cannot unmarshal GET response body") return } - if found, ok := foundVar.Interface().(bool); !ok || !found { - return nil, nil //not found - } - //found - source, err := reflection.Get(resBodyPtrValue, "._source") - if err != nil { - err = errors.Wrapf(err, "cannot get document from get response") - return - } - return source.Interface(), nil + return &response, nil }