diff --git a/reflection/get.go b/reflection/get.go index 99349f1dcd809f98948b4b787f7ce870884d4e7f..956d4c2d5f7c0e3abc792015ca84a4a5677d6144 100644 --- a/reflection/get.go +++ b/reflection/get.go @@ -8,6 +8,15 @@ import ( "gitlab.com/uafrica/go-utils/errors" ) +//Get() a jq-named element from a value +//e.g. get(reflect.ValueOf(myDoc), ".hits.hits[]._source") +//the result is an array of _source items which may be +//just one field inside the hits object in the list +// +//see usage in search.TimeSeries.Search() to get documents +//from the OpenSearch response structure that is very nested +//and parse using a runtime-created reflect Type, i.e. one +//cannot get it simply by iterating over res.Hits.[]Hits... func Get(v reflect.Value, key string) (reflect.Value, error) { return get("", v, key) } diff --git a/reflection/type_clone.go b/reflection/type_clone.go index 9cc2d1cbfcb81d787910db54ecb569cd9af0f856..9c3fad742e70126df972fd2fac55ed6b89ba15af 100644 --- a/reflection/type_clone.go +++ b/reflection/type_clone.go @@ -7,6 +7,44 @@ import ( "gitlab.com/uafrica/go-utils/errors" ) +//CloneType() clones a type into a new type, replacing some elements of it +//Paramters: +// t is the existing type to be cloned +// replace is a list of items to replace with their new types +// use jq key notation +// if this is empty map/nil, it will clone the type without changes +//Return: +// cloned type or error +// +//Example: +// newType,err := reflection.CloneType( +// reflect.TypeOf(myStruct{}), +// map[string]reflect.Type{ +// ".field1": reflect.TypeOf(float64(0)), +// ".list[].name": reflect.TypeOf(int64(0)), +// }, +// ) +// +//This is not a function you will use everyday, but very useful when needed +//See example usage in search to read OpenSearch responses with the correct +//struct type used to parse ".hits.hits[]._source" so that we do not have to +//unmarshal the JSON, then marshal each _source from map[string]interface{} +//back to json then unmarshal again into the correct type! +//It saves a lot of overhead CPU doing it all at once using the correct type +//nested deep into the response body type. +// +//this function was written for above case... it will likely need extension +//if we want to replace all kinds of other fields, but it meets the current +//requirements. +// +//After parsing, use reflection.Get() with the same key notation to get the +//result from the nested document. +// +//Note current shortcoming: partial matching will apply if you have two +//similarly named fields, e.g. "name" and "name2", then replace instruction +//on "name" may partial match name2. To fix this, we need better function +//than strings.HasPrefix() to check for delimiter/end of name to do full +//word matches, and we need to extend the test to illustrate this. func CloneType(t reflect.Type, replace map[string]reflect.Type) (reflect.Type, error) { return clone("", t, replace) } @@ -26,6 +64,7 @@ func clone(name string, t reflect.Type, replace map[string]reflect.Type) (reflec if strings.HasPrefix(replaceName, fieldName) { if replaceName == fieldName { f.Type = replaceType + delete(replace, replaceName) } else { clonedType, err := clone(fieldName, f.Type, replace) if err != nil { @@ -47,20 +86,19 @@ func clone(name string, t reflect.Type, replace map[string]reflect.Type) (reflec if strings.HasPrefix(replaceName, name+"[]") { if replaceName == name+"[]" { //full match + delete(replace, replaceName) return replaceType, nil - } else { - elemType, err := clone(name+"[]", t.Elem(), replace) - if err != nil { - return t, errors.Wrapf(err, "failed to clone slice elem type") - } - return reflect.SliceOf(elemType), nil } - } else { - return t, nil + //partial match + elemType, err := clone(name+"[]", t.Elem(), replace) + if err != nil { + return t, errors.Wrapf(err, "failed to clone slice elem type") + } + return reflect.SliceOf(elemType), nil } } - - return t, errors.Errorf("NYI") + //no match + return t, nil default: } diff --git a/search/time_series.go b/search/time_series.go index 1c3e47fbd88ce25f76e7feaf958bd14305c0a6c7..fc004ff1a365c36eab9df9f0a3584e7711646e56 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -1,14 +1,18 @@ package search import ( + "bytes" + "context" "encoding/json" "net/http" "reflect" "strings" "time" + opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/reflection" ) //embed this into your log struct @@ -20,6 +24,7 @@ type TimeSeriesHeader struct { type TimeSeries interface { Write(StartTime time.Time, EndTime time.Time, data interface{}) error + Search(limit int) (docs []interface{}, totalCount int, err error) } type timeSeries struct { @@ -29,6 +34,8 @@ type timeSeries struct { fields []dataField jsonIndexSpec []byte createdDates map[string]bool + + searchResponseBodyType reflect.Type } type dataField struct { @@ -156,6 +163,20 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { if err != nil { return nil, errors.Wrapf(err, "failed to marshal index spec") } + + //define search response type + //similar to SearchResponseBody + ts.searchResponseBodyType, err = reflection.CloneType( + reflect.TypeOf([]reflect.StructField{}), + map[string]reflect.Type{ + ".hits.hits[]._slice": ts.dataType, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to make search response type for time-series") + } + + //new package: copy type recursively, find index of special field and replace when found.... + w.timeSeriesByName[name] = ts return ts, nil } @@ -277,3 +298,156 @@ type IndexSettings struct { UUID string `json:"uuid"` DocsCount int64 `json:"docs.count"` } + +func (ts *timeSeries) Search(limit int) (docs []interface{}, totalCount int, err error) { + if limit > 1000 { + err = errors.Errorf("limit=%d > 1000", limit) + return + } + + // example search request body for free text + // { + // "size": 5, + // "query": { + // "multi_match": { + // "query": "miller", + // "fields": ["title^2", "director"] + // } + // } + // } + body := SearchRequestBody{ + Size: limit, + Query: &SearchQuery{ + MultiMatch: &QueryMultiMatch{ + Query: "GET", //keyword to find + Fields: []string{"http_method"}, //, "title^2", "directory"}, + }, + }, + } + jsonBody, _ := json.Marshal(body) + search := opensearchapi.SearchRequest{ + Body: bytes.NewReader(jsonBody), + } + + searchResponse, err := search.Do(context.Background(), ts.w.client) + if err != nil { + err = errors.Wrapf(err, "failed to search documents") + return + } + + switch searchResponse.StatusCode { + case http.StatusOK: + default: + err = errors.Errorf("Search failed with HTTP status %v", searchResponse.StatusCode) + return + } + + resBodyPtrValue := reflect.New(ts.searchResponseBodyType) + if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { + err = errors.Wrapf(err, "cannot decode search response body") + return + } + + hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") + if err != nil { + err = errors.Wrapf(err, "cannot get total nr of hits") + return + } + if hitsTotalValue.Interface().(int) < 1 { + return nil, 0, nil //no matches + } + + items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source") + if err != nil { + err = errors.Wrapf(err, "cannot get search response documents") + return + } + return items.Interface().([]interface{}), hitsTotalValue.Interface().(int), nil +} + +type SearchRequestBody struct { + Size int `json:"size,omitempty"` + Query *SearchQuery `json:"query,omitempty"` +} + +type SearchQuery struct { + MultiMatch *QueryMultiMatch `json:"multi_match,omitempty"` +} + +type QueryMultiMatch struct { + Query string `json:"query"` + Fields []string `json:"fields"` +} + +//example of search response body: +// { +// "took":872, +// "timed_out":false, +// "_shards":{ +// "total":38, +// "successful":38, +// "skipped":0, +// "failed":0 +// }, +// "hits":{ +// "total":{ +// "value":0, +// "relation":"eq" +// }, +// "max_score":null, +// "hits":[ +// { +// "_index": "go-utils-audit-test-20211030", +// "_type": "_doc", +// "_id": "Tj9l5XwBWRiAneoYazic", +// "_score": 1.2039728, +// "_source": { +// "@timestamp": "2021-10-30T15:03:20.679481+02:00", +// "@end_time": "2021-10-30T15:03:20.469481+02:00", +// "@duration_ms": -210, +// "test1": "6", +// "test2": "ACC_00098", +// "test3": 10, +// "http": { +// "method": "GET", +// "path": "/accounts" +// }, +// "http_method": "GET", +// "http_path": "/accounts" +// } +// }, +// ] +// } +// } +type SearchResponseBody struct { + Took int `json:"took"` //milliseconds + TimedOut bool `json:"timed_out"` + Shards SearchResponseShards `json:"_shards"` + Hits SearchResponseHits `json:"hits"` +} + +type SearchResponseShards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` +} + +type SearchResponseHits struct { + Total SearchResponseHitsTotal `json:"total"` + MaxScore *float64 `json:"max_score,omitempty"` + Hits []HitDoc `json:"hits"` +} + +type SearchResponseHitsTotal struct { + Value int `json:"value"` //e.g. 0 when no docs matched + Relation string `json:"relation"` //e.g. "eq" +} + +type HitDoc struct { + Index string `json:"_index"` //name of index + Type string `json:"_type"` //_doc + ID string `json:"_id"` + Score float64 `json:"_score"` // + Source map[string]interface{} `json:"_source"` //the document of itemType +} diff --git a/search/writer.go b/search/writer.go index efcd6bc09b85287d379cfbfa5b943e5a219c3178..8b492beb3d37445ef5c12010aad75e9c54471484 100644 --- a/search/writer.go +++ b/search/writer.go @@ -130,32 +130,6 @@ func (writer writer) Search() ([]interface{}, error) { return nil, errors.Errorf("NYI search result processing: %v", res) } -// // Delete the document. -// delete := opensearchapi.DeleteRequest{ -// Index: IndexName, -// DocumentID: docId, -// } - -// deleteResponse, err := delete.Do(context.Background(), client) -// if err != nil { -// fmt.Println("failed to delete document ", err) -// os.Exit(1) -// } -// fmt.Println("deleting document") -// fmt.Println(deleteResponse) - -// // Delete previously created index. -// deleteIndex := opensearchapi.IndicesDeleteRequest{ -// Index: []string{writer.config.IndexName}, -// } - -// deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) -// if err != nil { -// fmt.Println("failed to delete index ", err) -// os.Exit(1) -// } -// fmt.Println("deleting index", deleteIndexResponse) - type CreateResponse struct { Error *Error `json:"error,omitempty"` } @@ -170,15 +144,6 @@ type IndexResponse struct { Shards *Shards `json:"_shards,omitempty"` Type string `json:"_type,omitempty"` PrimaryTerm int `json:"_primary_term,omitempty"` - // _id:oZJZxXwBPnbPDFcjNpuO - // _index:go-utils-audit-test - // _primary_term:1 - // _seq_no:5 - // _shards:map[failed:0 successful:2 total:2] - // _type:_doc - // _version:1 - // result:created - } type Shards struct { diff --git a/search/writer_test.go b/search/writer_test.go index f85d7a350ca12d6bb0c11aaeb93b979a97d4a8ae..333a254b6a8a5807db4e2e0670bab6ae39f7b9f3 100644 --- a/search/writer_test.go +++ b/search/writer_test.go @@ -67,6 +67,13 @@ func test(t *testing.T, c search.Config) { } } + docs, totalCount, err := ts.Search(10) + if err != nil { + t.Errorf("failed to search: %+v", err) + } else { + t.Logf("search: %d: %+v", totalCount, docs) + } + oldList, err := a.DelOldTimeSeries(indexName, 2) if err != nil { t.Fatalf("failed to del old: %+v", err)