diff --git a/api/context.go b/api/context.go index ffe4a5ebc810718fa77f9ef79a8b67df5cfa0669..eea6c5ca30dc49bf99117ef1869d2daab1dc9eb6 100644 --- a/api/context.go +++ b/api/context.go @@ -58,13 +58,8 @@ func (ctx *apiContext) LogAPIRequestAndResponse(res events.APIGatewayProxyRespon //allocate struct for params, populate it from the URL parameters then validate and return the struct func (ctx apiContext) GetRequestParams(paramsStructType reflect.Type) (interface{}, error) { - paramValues := map[string]interface{}{} - for n, v := range ctx.request.QueryStringParameters { - paramValues[n] = v - } paramsStructValuePtr := reflect.New(paramsStructType) - - if err := ctx.extract("params", paramsStructType, paramsStructValuePtr.Elem()); err != nil { + if err := ctx.setParamsInStruct("params", paramsStructType, paramsStructValuePtr.Elem()); err != nil { return nil, errors.Wrapf(err, "failed to put query param values into struct") } if err := ctx.applyClaim("params", paramsStructValuePtr.Interface()); err != nil { @@ -78,30 +73,37 @@ func (ctx apiContext) GetRequestParams(paramsStructType reflect.Type) (interface return paramsStructValuePtr.Elem().Interface(), nil } -func (ctx apiContext) extract(name string, t reflect.Type, v reflect.Value) error { +//extract params into a struct value +func (ctx apiContext) setParamsInStruct(name string, t reflect.Type, v reflect.Value) error { for i := 0; i < t.NumField(); i++ { - f := t.Field(i) - switch f.Type.Kind() { - case reflect.Struct: - if err := ctx.extract(name+"."+f.Name, t.Field(i).Type, v.Field(i)); err != nil { - return errors.Wrapf(err, "failed to fill sub %s.%s", name, f.Name) + tf := t.Field(i) + //enter into anonymous sub-structs + if tf.Anonymous { + if tf.Type.Kind() == reflect.Struct { + if err := ctx.setParamsInStruct(name+"."+tf.Name, t.Field(i).Type, v.Field(i)); err != nil { + return errors.Wrapf(err, "failed on parameters %s.%s", name, tf.Name) + } + continue } - continue - default: + return errors.Errorf("parameters cannot parse into anonymous %s field %s", tf.Type.Kind(), tf.Type.Name()) } - n := (strings.SplitN(f.Tag.Get("json"), ",", 2))[0] + //named field: + //use name from json tag, else lowercase of field name + n := (strings.SplitN(tf.Tag.Get("json"), ",", 2))[0] if n == "" { - n = strings.ToLower(f.Name) + n = strings.ToLower(tf.Name) } if n == "" || n == "-" { - continue + continue //skip fields without name } - //get value(s) from query string + //see if this named param was specified var paramStrValues []string if paramStrValue, isDefined := ctx.request.QueryStringParameters[n]; isDefined { + //specified once in URL if len(paramStrValue) >= 2 && paramStrValue[0] == '[' && paramStrValue[len(paramStrValue)-1] == ']' { + //specified as CSV inside [...] e.g. id=[1,2,3] csvReader := csv.NewReader(strings.NewReader(paramStrValue[1 : len(paramStrValue)-1])) var err error paramStrValues, err = csvReader.Read() @@ -109,38 +111,58 @@ func (ctx apiContext) extract(name string, t reflect.Type, v reflect.Value) erro return errors.Wrapf(err, "invalid CSV: [%s]", paramStrValue) } } else { - paramStrValues = []string{paramStrValue} //single value + //specified as single value only e.g. id=1 + paramStrValues = []string{paramStrValue} } } else { + //specified multiple times e.g. id=1&id=2&id=3 paramStrValues = ctx.request.MultiValueQueryStringParameters[n] } if len(paramStrValues) == 0 { continue //param has no value specified in URL } + valueField := v.Field(i) + if valueField.Kind() == reflect.Ptr { + valueField.Set(reflect.New(valueField.Type().Elem())) + valueField = valueField.Elem() + } + //param is defined >=1 times in URL - if f.Type.Kind() == reflect.Slice { - //iterate over all specified values - for index, paramStrValue := range paramStrValues { - newValuePtr := reflect.New(f.Type.Elem()) - if err := reflection.SetValue(newValuePtr.Elem(), paramStrValue); err != nil { - return errors.Wrapf(err, "failed to set %s[%d]=%s", n, index, paramStrValues[0]) + if tf.Type.Kind() == reflect.Slice { + //this param struct field is a slice, iterate over all specified values + for i, paramStrValue := range paramStrValues { + paramValue, err := parseParamValue(paramStrValue, tf.Type.Elem()) + if err != nil { + return errors.Wrapf(err, "invalid %s[%d]", n, i) } - v.Field(i).Set(reflect.Append(v.Field(i), newValuePtr.Elem())) + valueField.Set(reflect.Append(valueField, paramValue)) } } else { if len(paramStrValues) > 1 { - return errors.Errorf("%s does not support >1 values(%v)", n, strings.Join(paramStrValues, ",")) + return errors.Errorf("parameter %s does not support multiple values [%s]", n, strings.Join(paramStrValues, ",")) } //single value specified - if err := reflection.SetValue(v.Field(i), paramStrValues[0]); err != nil { - return errors.Wrapf(err, "failed to set %s=%s", n, paramStrValues[0]) + paramValue, err := parseParamValue(paramStrValues[0], valueField.Type()) + if err != nil { + return errors.Wrapf(err, "invalid %s", n) } + valueField.Set(paramValue) } } //for each param struct field return nil } +func parseParamValue(s string, t reflect.Type) (reflect.Value, error) { + newValuePtr := reflect.New(t) + if err := json.Unmarshal([]byte("\""+s+"\""), newValuePtr.Interface()); err != nil { + if err := json.Unmarshal([]byte(s), newValuePtr.Interface()); err != nil { + return newValuePtr.Elem(), errors.Wrapf(err, "invalid \"%s\"", s) + } + } + return newValuePtr.Elem(), nil +} + func (ctx apiContext) GetRequestBody(requestStructType reflect.Type) (interface{}, error) { requestStructValuePtr := reflect.New(requestStructType) err := json.Unmarshal([]byte(ctx.request.Body), requestStructValuePtr.Interface()) diff --git a/api/params_test.go b/api/params_test.go index cb5dc0d1c0366d1282f9e69037a502cfdfeec707..b9586e99c1a875103ad79a47e574fab20d197551 100644 --- a/api/params_test.go +++ b/api/params_test.go @@ -2,8 +2,10 @@ package api_test import ( "context" + "encoding/json" "reflect" "testing" + "time" "github.com/aws/aws-lambda-go/events" "gitlab.com/uafrica/go-utils/api" @@ -34,6 +36,7 @@ func TestNested(t *testing.T) { ctx, err = api.New("request-id", nil).NewContext( context.Background(), "123", + //all URL params are specified as string values events.APIGatewayProxyRequest{ QueryStringParameters: map[string]string{ "a": "1", //must be written into P3.P2.P1.A @@ -47,11 +50,11 @@ func TestNested(t *testing.T) { }, }) if err != nil { - t.Fatal(err) + t.Fatalf("ERROR: %+v", err) } if p3d, err := ctx.GetRequestParams(reflect.TypeOf(P3{})); err != nil { - t.Fatal(err) + t.Fatalf("ERROR: %+v", err) } else { p3 := p3d.(P3) t.Logf("p3: %+v", p3) @@ -67,6 +70,77 @@ func TestNested(t *testing.T) { } } +type ParamTypes struct { + GetParams + Nr int64 `json:"nr"` + Name string `json:"name"` + NrOpt *int64 `json:"nr_opt"` + NameOpt *string `json:"name_opt"` + Time1 time.Time `json:"time1"` + Time2 *time.Time `json:"time2"` + Dur1 time.Duration `json:"dur1"` + Dur2 *time.Duration `json:"dur2"` + + //lists of values + NrList []int64 `json:"nrs"` + NameList []string `json:"names"` + NrOptList []*int64 `json:"nrs_opt"` + NameOptList []*string `json:"names_opt"` + Time1List []time.Time `json:"time1s"` + Time2List []*time.Time `json:"time2s"` + Dur1List []time.Duration `json:"dur1s"` + Dur2List []*time.Duration `json:"dur2s"` +} + +func TestTypes(t *testing.T) { + logger.SetGlobalLevel(logger.LevelDebug) + logger.SetGlobalFormat(logger.NewConsole()) + var ctx api.Context + var err error + ctx, err = api.New("request-id", nil).NewContext( + context.Background(), + "123", + //all URL params are specified as string values + events.APIGatewayProxyRequest{ + QueryStringParameters: map[string]string{ + "nr": "1", + "name": "name2", + "nr_opt": "3", + "name_opt": "name4", + "limit": "5", + "time1": "2021-11-23T00:00:00+00:00", + "time2": "2021-11-23T00:00:00+00:00", + "dur1": "4", //nanoseconds + "dur2": "4", //nanoseconds + "nrs": "[1,2,3]", + "nrs_opt": "[4,5,6]", + "names": "[A,B,C]", + "names_opt": "[D,E,F]", + "time1s": "[2021-11-23T00:00:00+00:00]", + "dur1s": "[4,5,6]", //nanoseconds + }, + MultiValueQueryStringParameters: map[string][]string{ + "dur2s": {"11", "12", "13"}, + "time2s": {"2021-11-23T00:00:00+00:00", "2021-11-23T00:00:00+00:00", "2021-11-23T00:00:00+00:00"}, + }, + }) + if err != nil { + t.Fatalf("ERROR: %+v", err) + } + + if pd, err := ctx.GetRequestParams(reflect.TypeOf(ParamTypes{})); err != nil { + t.Fatalf("ERROR: %+v", err) + } else { + p := pd.(ParamTypes) + t.Logf("p: %+v", p) + if p.Nr != 1 || p.Name != "name2" || p.NrOpt == nil || *p.NrOpt != 3 || p.NameOpt == nil || *p.NameOpt != "name4" || p.Limit != 5 { + t.Errorf("Wrong values: %+v", p) + } + jsonParams, _ := json.Marshal(p) + t.Logf("params: %s", string(jsonParams)) + } +} + type PageParams struct { Limit int64 `json:"limit"` Offset int64 `json:"offset"` @@ -94,6 +168,7 @@ func TestGet(t *testing.T) { ctx, err = api.New("request-id", nil).NewContext( context.Background(), "123", + //all URL params are specified as string values events.APIGatewayProxyRequest{ QueryStringParameters: map[string]string{ "id": "1", @@ -109,11 +184,11 @@ func TestGet(t *testing.T) { }, }) if err != nil { - t.Fatal(err) + t.Fatalf("ERROR: %+v", err) } if p3d, err := ctx.GetRequestParams(reflect.TypeOf(MyGetParams{})); err != nil { - t.Fatal(err) + t.Fatalf("ERROR: %+v", err) } else { get := p3d.(MyGetParams) t.Logf("get: %+v", get) diff --git a/reflection/get.go b/reflection/get.go new file mode 100644 index 0000000000000000000000000000000000000000..956d4c2d5f7c0e3abc792015ca84a4a5677d6144 --- /dev/null +++ b/reflection/get.go @@ -0,0 +1,78 @@ +package reflection + +import ( + "fmt" + "reflect" + "strings" + + "gitlab.com/uafrica/go-utils/errors" +) + +//Get() a jq-named element from a value +//e.g. get(reflect.ValueOf(myDoc), ".hits.hits[]._source") +//the result is an array of _source items which may be +//just one field inside the hits object in the list +// +//see usage in search.TimeSeries.Search() to get documents +//from the OpenSearch response structure that is very nested +//and parse using a runtime-created reflect Type, i.e. one +//cannot get it simply by iterating over res.Hits.[]Hits... +func Get(v reflect.Value, key string) (reflect.Value, error) { + return get("", v, key) +} + +func get(name string, v reflect.Value, key string) (reflect.Value, error) { + if key == "" { + return v, nil + } + + switch v.Kind() { + case reflect.Ptr: + return get(name, v.Elem(), key) + + case reflect.Struct: + if key[0] != '.' { + return v, errors.Errorf("get(%s): key=\"%s\" does not start with '.'", name, key) + } + fieldName := key[1:] + remainingKey := "" + index := strings.IndexAny(fieldName, ".[") + if index > 0 { + fieldName = key[1 : index+1] + remainingKey = key[index+1:] + } + t := v.Type() + fieldIndex := 0 + for fieldIndex = 0; fieldIndex < t.NumField(); fieldIndex++ { + if strings.SplitN(t.Field(fieldIndex).Tag.Get("json"), ",", 2)[0] == fieldName { + break + } + } + if fieldIndex >= t.NumField() { + return v, errors.Errorf("%s does not have field %s", name, fieldName) + } + return get(name+"."+fieldName, v.Field(fieldIndex), remainingKey) + + case reflect.Slice: + if !strings.HasPrefix(key, "[]") { + return v, errors.Errorf("canot get %s from slice, expecting \"[]\" in the key", key) + } + + //make array of results from each item in the slice + var result reflect.Value + for i := 0; i < v.Len(); i++ { + if vv, err := get(fmt.Sprintf("%s[%d]", name, i), v.Index(i), key[2:]); err != nil { + return v, errors.Wrapf(err, "failed on %s[%d]", name, i) + } else { + if !result.IsValid() { + result = reflect.MakeSlice(reflect.SliceOf(vv.Type()), 0, v.Len()) + } + result = reflect.Append(result, vv) + } + } + return result, nil + + default: + } + return v, errors.Errorf("Cannot get %s from %s", key, v.Kind()) +} diff --git a/reflection/reflection.go b/reflection/reflection.go index 14faf28a7641715e420fb3a7fa3a396f117cf0ac..b5ddd2590537f221a130a06fc424cb0ad1d8e2bf 100644 --- a/reflection/reflection.go +++ b/reflection/reflection.go @@ -52,7 +52,7 @@ func SetString(field reflect.Value, value string) { return // Field doesn't exist } if field.Kind() != reflect.String { - logger.Error("Claims: Field is not of type String: %v", field.Kind()) + logger.Errorf("Claims: Field is not of type String: %v", field.Kind()) return } field.SetString(value) diff --git a/reflection/type_clone.go b/reflection/type_clone.go new file mode 100644 index 0000000000000000000000000000000000000000..c1427129a73883ce69ae523d8f74052152cb6297 --- /dev/null +++ b/reflection/type_clone.go @@ -0,0 +1,113 @@ +package reflection + +import ( + "reflect" + "strings" + + "gitlab.com/uafrica/go-utils/errors" +) + +//CloneType() clones a type into a new type, replacing some elements of it +//Paramters: +// t is the existing type to be cloned +// replace is a list of items to replace with their new types +// use jq key notation +// if this is empty map/nil, it will clone the type without changes +//Return: +// cloned type or error +// +//Example: +// newType,err := reflection.CloneType( +// reflect.TypeOf(myStruct{}), +// map[string]reflect.Type{ +// ".field1": reflect.TypeOf(float64(0)), +// ".list[].name": reflect.TypeOf(int64(0)), +// }, +// ) +// +//This is not a function you will use everyday, but very useful when needed +//See example usage in search to read OpenSearch responses with the correct +//struct type used to parse ".hits.hits[]._source" so that we do not have to +//unmarshal the JSON, then marshal each _source from map[string]interface{} +//back to json then unmarshal again into the correct type! +//It saves a lot of overhead CPU doing it all at once using the correct type +//nested deep into the response body type. +// +//this function was written for above case... it will likely need extension +//if we want to replace all kinds of other fields, but it meets the current +//requirements. +// +//After parsing, use reflection.Get() with the same key notation to get the +//result from the nested document. +// +//Note current shortcoming: partial matching will apply if you have two +//similarly named fields, e.g. "name" and "name2", then replace instruction +//on "name" may partial match name2. To fix this, we need better function +//than strings.HasPrefix() to check for delimiter/end of name to do full +//word matches, and we need to extend the test to illustrate this. +func CloneType(t reflect.Type, replace map[string]reflect.Type) (reflect.Type, error) { + cloned, err := clone("", t, replace) + if err != nil { + return t, err + } + if len(replace) > 0 { + return t, errors.Errorf("unknown replacements: %+v", replace) + } + return cloned, nil +} + +func clone(name string, t reflect.Type, replace map[string]reflect.Type) (reflect.Type, error) { + switch t.Kind() { + case reflect.Struct: + fields := []reflect.StructField{} + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + n := strings.SplitN(f.Tag.Get("json"), ",", 2)[0] //exclude ,omitempty... + if n == "" { + n = f.Name + } + fieldName := name + "." + n + for replaceName, replaceType := range replace { + if strings.HasPrefix(replaceName, fieldName) { + if replaceName == fieldName { + f.Type = replaceType + delete(replace, replaceName) + } else { + clonedType, err := clone(fieldName, f.Type, replace) + if err != nil { + return t, errors.Wrapf(err, "failed to clone %s", fieldName) + } + f.Type = clonedType + } + } + } + if newType, ok := replace[fieldName]; ok { + f.Type = newType + } + fields = append(fields, f) + } + return reflect.StructOf(fields), nil + + case reflect.Slice: + for replaceName, replaceType := range replace { + if strings.HasPrefix(replaceName, name+"[]") { + if replaceName == name+"[]" { + //full match + delete(replace, replaceName) + return replaceType, nil + } + //partial match + elemType, err := clone(name+"[]", t.Elem(), replace) + if err != nil { + return t, errors.Wrapf(err, "failed to clone slice elem type") + } + return reflect.SliceOf(elemType), nil + } + } + //no match + return t, nil + + default: + } + return t, errors.Errorf("cannot clone %v %s", t.Kind(), t.Name()) +} diff --git a/reflection/type_clone_test.go b/reflection/type_clone_test.go new file mode 100644 index 0000000000000000000000000000000000000000..474bc098c500a7aca52106681e6cdcd072af65c0 --- /dev/null +++ b/reflection/type_clone_test.go @@ -0,0 +1,134 @@ +package reflection_test + +import ( + "encoding/json" + "reflect" + "testing" + + "gitlab.com/uafrica/go-utils/reflection" +) + +func Test1(t *testing.T) { + doc := ` + { + "took":872, + "timed_out":false, + "_shards":{ + "total":38, + "successful":38, + "skipped":0, + "failed":0 + }, + "hits":{ + "total":{ + "value":10, + "relation":"eq" + }, + "max_score":null, + "hits":[ + { + "_index": "go-utils-audit-test-20211030", + "_type": "_doc", + "_id": "Tj9l5XwBWRiAneoYazic", + "_score": 1.2039728, + "_source": { + "@timestamp": "2021-10-30T15:03:20.000000+02:00", + "@end_time": "2021-10-30T15:03:21.000000+02:00", + "@duration_ms": 1000, + "test1": "6", + "test2": "ACC_00098", + "test3": 10, + "http": { + "method": "GET", + "path": "/accounts" + }, + "http_method": "GET", + "http_path": "/accounts" + } + } + ] + } + } + ` + + //using default type, documents in _source:{} are parsed to map[string]interface{} + res := SearchResponseBody{} + if err := json.Unmarshal([]byte(doc), &res); err != nil { + t.Fatalf("cannot unmarshal response into default type: %+v", err) + } + for hitIndex, hit := range res.Hits.Hits { + t.Logf("doc[%d]: (%T)%+v", hitIndex, hit.Source, hit.Source) + } + + //create type with own document type to use for _source field: + cloned, err := reflection.CloneType( + reflect.TypeOf(SearchResponseBody{}), + map[string]reflect.Type{ + ".hits.hits[]._source": reflect.TypeOf(myDoc{}), + }) + if err != nil { + t.Fatal(err) + } + t.Logf("cloned type: %v", cloned) + + //unmarshal using new type to have correct type for each hit + resPtrValue := reflect.New(cloned) + if err := json.Unmarshal([]byte(doc), resPtrValue.Interface()); err != nil { + t.Fatalf("failed to decode into cloned type: %+v", err) + } + // clonedRes := resPtrValue.Interface() + // t.Logf("Coned res: %+v", clonedRes) + + //get the replaced values as an array of docs + v, err := reflection.Get(resPtrValue, ".hits.hits[]._source") + if err != nil { + t.Fatalf("Did not get list: %+v", err) + } + docs, ok := v.Interface().([]myDoc) + if !ok { + t.Fatalf("%T is not []myDoc", v.Interface()) + } + if len(docs) != 1 { + t.Fatalf("Got %d != 1", len(docs)) + } + for _, doc := range docs { + t.Logf("doc: (%T)%+v", doc, doc) + } +} + +type SearchResponseBody struct { + Took int `json:"took"` //milliseconds + TimedOut bool `json:"timed_out"` + Shards SearchResponseShards `json:"_shards"` + Hits SearchResponseHits `json:"hits"` +} + +type SearchResponseShards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` +} + +type SearchResponseHits struct { + Total SearchResponseHitsTotal `json:"total"` + MaxScore *float64 `json:"max_score,omitempty"` + Hits []HitDoc `json:"hits"` +} + +type SearchResponseHitsTotal struct { + Value int `json:"value"` //e.g. 0 when no docs matched + Relation string `json:"relation"` //e.g. "eq" +} + +type HitDoc struct { + Index string `json:"_index"` //name of index + Type string `json:"_type"` //_doc + ID string `json:"_id"` + Score float64 `json:"_score"` // + Source map[string]interface{} `json:"_source"` //the document of itemType +} + +type myDoc struct { + Test1 string `json:"test1"` +} diff --git a/search/opensearch_types.go b/search/opensearch_types.go index dba91cd55806f46186789b8b6d2ac15432f515e4..72c048ad80c0655e974af0af735b2cba37f332c6 100644 --- a/search/opensearch_types.go +++ b/search/opensearch_types.go @@ -1,5 +1,7 @@ package search +import "time" + //Mapping configures an index in OpenSearch type Index struct { Settings Settings `json:"settings"` @@ -24,6 +26,7 @@ type MappingProperty struct { Enabled bool `json:"enabled,omitempty"` Fields map[string]MappingFieldProperties `json:"fields,omitempty"` Properties map[string]MappingProperty `json:"properties,omitempty"` + //Index bool `json:"index,omitempty"` //set true to make text field searchable } type MappingFieldProperties struct { @@ -34,3 +37,126 @@ type MappingKeyword struct { Type string `json:"type"` //="keyword" IgnoreAbove int `json:"ignore_above"` //e.g. 256 } + +type SearchRequestBody struct { + Size int64 `json:"size,omitempty"` + Query Query `json:"query"` +} + +type Query struct { + //one of: + Match *QueryNameValue `json:"match,omitempty" doc:"<field>:<value>"` + Term *QueryNameValue `json:"term,omitempty"` + Range *QueryRange `json:"range,omitempty"` + MultiMatch *QueryMultiMatch `json:"multi_match,omitempty"` + Bool *QueryBool `json:"bool,omitempty"` +} + +type QueryMultiMatch struct { + Query string `json:"query" doc:"Text search in below fields"` + Fields []string `json:"fields,omitempty" doc:"List of fields"` +} + +//https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html +type QueryBool struct { + Must []Query `json:"must,omitempty" docs:"List of things that must appear in matching documents and will contribute to the score."` + Filter []Query `json:"filter,omitempty" doc:"List of things that must appear in matching documents. However unlike must the score of the query will be ignored. Filter clauses are executed in filter context, meaning that scoring is ignored and clauses are considered for caching."` + Should []Query `json:"should,omitempty" doc:"List of things that should appear in the matching document."` + MustNot []Query `json:"must_not,omitempty" doc:"List of things that must not appear in the matching documents. Clauses are executed in filter context meaning that scoring is ignored and clauses are considered for caching. Because scoring is ignored, a score of 0 for all documents is returned."` +} + +//<name>:<value> can be shorthanded to just a text value "...", but for sake of go type def, we always use an object meaning the same, allowing more options +//https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html#query-dsl-match-query-short-ex +type QueryNameValue map[string]QueryValue + +type QueryValue struct { + Query string `json:"query"` + Operator string `json:"operator,omitempty"` //defaults to "or", accepted values: or|and + Fuzziness string `json:"fuzziness,omitempty"` //https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#fuzziness + ZeroTermsQuery string `json:"zero_terms_query,omitempty"` +} + +func QueryValueText(text string) QueryValue { + return QueryValue{Query: text, Operator: "and"} +} + +func QueryValueTime(t time.Time) QueryValue { + return QueryValue{Query: t.String(), Operator: "and"} +} + +type QueryRange map[string]QueryExpr + +type QueryExpr map[string]string //<oper>:<value> e.g. "gte":"10" + +//example of search response body: +// { +// "took":872, +// "timed_out":false, +// "_shards":{ +// "total":38, +// "successful":38, +// "skipped":0, +// "failed":0 +// }, +// "hits":{ +// "total":{ +// "value":0, +// "relation":"eq" +// }, +// "max_score":null, +// "hits":[ +// { +// "_index": "go-utils-audit-test-20211030", +// "_type": "_doc", +// "_id": "Tj9l5XwBWRiAneoYazic", +// "_score": 1.2039728, +// "_source": { +// "@timestamp": "2021-10-30T15:03:20.679481+02:00", +// "@end_time": "2021-10-30T15:03:20.469481+02:00", +// "@duration_ms": -210, +// "test1": "6", +// "test2": "ACC_00098", +// "test3": 10, +// "http": { +// "method": "GET", +// "path": "/accounts" +// }, +// "http_method": "GET", +// "http_path": "/accounts" +// } +// }, +// ] +// } +// } +type SearchResponseBody struct { + Took int `json:"took"` //milliseconds + TimedOut bool `json:"timed_out"` + Shards SearchResponseShards `json:"_shards"` + Hits SearchResponseHits `json:"hits"` +} + +type SearchResponseShards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` +} + +type SearchResponseHits struct { + Total SearchResponseHitsTotal `json:"total"` + MaxScore *float64 `json:"max_score,omitempty"` + Hits []HitDoc `json:"hits"` +} + +type SearchResponseHitsTotal struct { + Value int `json:"value"` //e.g. 0 when no docs matched + Relation string `json:"relation"` //e.g. "eq" +} + +type HitDoc struct { + Index string `json:"_index"` //name of index + Type string `json:"_type"` //_doc + ID string `json:"_id"` + Score float64 `json:"_score"` // + Source map[string]interface{} `json:"_source"` //the document of itemType +} diff --git a/search/writer_test.go b/search/search_test.go similarity index 87% rename from search/writer_test.go rename to search/search_test.go index f85d7a350ca12d6bb0c11aaeb93b979a97d4a8ae..0deef6b0234d07bbf1c1fa2274004b3ced4a47d3 100644 --- a/search/writer_test.go +++ b/search/search_test.go @@ -67,6 +67,27 @@ func test(t *testing.T, c search.Config) { } } + query := search.Query{ + MultiMatch: &search.QueryMultiMatch{ + Query: "GET", + Fields: []string{"http_method"}, + }, + } + + docs, totalCount, err := ts.Search(query, 10) + if err != nil { + t.Errorf("failed to search: %+v", err) + } else { + if docsSlice, ok := docs.([]testStruct); ok { + t.Logf("search result total_count:%d with %d docs", totalCount, len(docsSlice)) + if len(docsSlice) > 10 { + t.Errorf("got %d docs > max 10", len(docsSlice)) + } + } else { + t.Errorf("docs %T is not []testStruct!", docs) + } + } + oldList, err := a.DelOldTimeSeries(indexName, 2) if err != nil { t.Fatalf("failed to del old: %+v", err) diff --git a/search/time_series.go b/search/time_series.go index 1c3e47fbd88ce25f76e7feaf958bd14305c0a6c7..3a13720a1706ca99d1fcb3d93176804ced4bcc84 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -1,16 +1,23 @@ package search import ( + "bytes" + "context" "encoding/json" + "io/ioutil" "net/http" "reflect" "strings" "time" + opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" + "gitlab.com/uafrica/go-utils/reflection" ) +const TimeFormat = "2006-01-02T15:04:05+07:00" + //embed this into your log struct type TimeSeriesHeader struct { StartTime time.Time `json:"@timestamp"` @@ -20,6 +27,7 @@ type TimeSeriesHeader struct { type TimeSeries interface { Write(StartTime time.Time, EndTime time.Time, data interface{}) error + Search(query Query, limit int64) (docs interface{}, totalCount int, err error) } type timeSeries struct { @@ -29,6 +37,8 @@ type timeSeries struct { fields []dataField jsonIndexSpec []byte createdDates map[string]bool + + searchResponseBodyType reflect.Type } type dataField struct { @@ -156,6 +166,20 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { if err != nil { return nil, errors.Wrapf(err, "failed to marshal index spec") } + + //define search response type + //similar to SearchResponseBody + ts.searchResponseBodyType, err = reflection.CloneType( + reflect.TypeOf(SearchResponseBody{}), + map[string]reflect.Type{ + ".hits.hits[]._source": ts.dataType, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to make search response type for time-series") + } + + //new package: copy type recursively, find index of special field and replace when found.... + w.timeSeriesByName[name] = ts return ts, nil } @@ -247,7 +271,7 @@ func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr) } else { if date.Before(timeThreshold) { - logger.Debugf("Deleting index(%s).uuid(%s).docsCount(%d) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, dailyIndexInfo.Settings.Index.DocsCount, timeThreshold) + logger.Debugf("Deleting index(%s).uuid(%s) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, timeThreshold) indicesToDelete = append(indicesToDelete, dailyIndexName) } } @@ -274,6 +298,74 @@ type IndexInfoSettings struct { } type IndexSettings struct { - UUID string `json:"uuid"` - DocsCount int64 `json:"docs.count"` + UUID string `json:"uuid"` + CreationDate string `json:"creation_date"` + NumberOfShards string `json:"number_of_shards"` + NumberOfReplicas string `json:"number_of_replicas"` + ProviderName string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103" +} + +//Search +//Return: +// docs will be a slice of the TimeSeries data type +func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalCount int, err error) { + if limit < 0 || limit > 1000 { + err = errors.Errorf("limit=%d not 0..1000", limit) + return + } + + // example search request body for free text + // { + // "size": 5, + // "query": { + // "multi_match": { + // "query": "miller", + // "fields": ["title^2", "director"] + // } + // } + // } + body := SearchRequestBody{ + Size: limit, + Query: query, + } + jsonBody, _ := json.Marshal(body) + search := opensearchapi.SearchRequest{ + Body: bytes.NewReader(jsonBody), + } + + searchResponse, err := search.Do(context.Background(), ts.w.client) + if err != nil { + err = errors.Wrapf(err, "failed to search documents") + return + } + + switch searchResponse.StatusCode { + case http.StatusOK: + default: + resBody, _ := ioutil.ReadAll(searchResponse.Body) + err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody)) + return + } + + resBodyPtrValue := reflect.New(ts.searchResponseBodyType) + if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { + err = errors.Wrapf(err, "cannot decode search response body") + return + } + + hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") + if err != nil { + err = errors.Wrapf(err, "cannot get total nr of hits") + return + } + if hitsTotalValue.Interface().(int) < 1 { + return nil, 0, nil //no matches + } + + items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source") + if err != nil { + err = errors.Wrapf(err, "cannot get search response documents") + return + } + return items.Interface(), hitsTotalValue.Interface().(int), nil } diff --git a/search/writer.go b/search/writer.go index efcd6bc09b85287d379cfbfa5b943e5a219c3178..8b492beb3d37445ef5c12010aad75e9c54471484 100644 --- a/search/writer.go +++ b/search/writer.go @@ -130,32 +130,6 @@ func (writer writer) Search() ([]interface{}, error) { return nil, errors.Errorf("NYI search result processing: %v", res) } -// // Delete the document. -// delete := opensearchapi.DeleteRequest{ -// Index: IndexName, -// DocumentID: docId, -// } - -// deleteResponse, err := delete.Do(context.Background(), client) -// if err != nil { -// fmt.Println("failed to delete document ", err) -// os.Exit(1) -// } -// fmt.Println("deleting document") -// fmt.Println(deleteResponse) - -// // Delete previously created index. -// deleteIndex := opensearchapi.IndicesDeleteRequest{ -// Index: []string{writer.config.IndexName}, -// } - -// deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) -// if err != nil { -// fmt.Println("failed to delete index ", err) -// os.Exit(1) -// } -// fmt.Println("deleting index", deleteIndexResponse) - type CreateResponse struct { Error *Error `json:"error,omitempty"` } @@ -170,15 +144,6 @@ type IndexResponse struct { Shards *Shards `json:"_shards,omitempty"` Type string `json:"_type,omitempty"` PrimaryTerm int `json:"_primary_term,omitempty"` - // _id:oZJZxXwBPnbPDFcjNpuO - // _index:go-utils-audit-test - // _primary_term:1 - // _seq_no:5 - // _shards:map[failed:0 successful:2 total:2] - // _type:_doc - // _version:1 - // result:created - } type Shards struct {