Skip to content
Snippets Groups Projects
Commit d83ef134 authored by Jan Semmelink's avatar Jan Semmelink
Browse files

Merge branch '12-search-retrieval' into 'main'

Search package improvements to retrieve documents with text searches from OpenSearch

See merge request uafrica/go-utils!6
parents 54e1dee7 cfe7a39f
Branches
Tags v1.3.6
1 merge request!6Search package improvements to retrieve documents with text searches from OpenSearch
......@@ -58,13 +58,8 @@ func (ctx *apiContext) LogAPIRequestAndResponse(res events.APIGatewayProxyRespon
//allocate struct for params, populate it from the URL parameters then validate and return the struct
func (ctx apiContext) GetRequestParams(paramsStructType reflect.Type) (interface{}, error) {
paramValues := map[string]interface{}{}
for n, v := range ctx.request.QueryStringParameters {
paramValues[n] = v
}
paramsStructValuePtr := reflect.New(paramsStructType)
if err := ctx.extract("params", paramsStructType, paramsStructValuePtr.Elem()); err != nil {
if err := ctx.setParamsInStruct("params", paramsStructType, paramsStructValuePtr.Elem()); err != nil {
return nil, errors.Wrapf(err, "failed to put query param values into struct")
}
if err := ctx.applyClaim("params", paramsStructValuePtr.Interface()); err != nil {
......@@ -78,30 +73,37 @@ func (ctx apiContext) GetRequestParams(paramsStructType reflect.Type) (interface
return paramsStructValuePtr.Elem().Interface(), nil
}
func (ctx apiContext) extract(name string, t reflect.Type, v reflect.Value) error {
//extract params into a struct value
func (ctx apiContext) setParamsInStruct(name string, t reflect.Type, v reflect.Value) error {
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
switch f.Type.Kind() {
case reflect.Struct:
if err := ctx.extract(name+"."+f.Name, t.Field(i).Type, v.Field(i)); err != nil {
return errors.Wrapf(err, "failed to fill sub %s.%s", name, f.Name)
tf := t.Field(i)
//enter into anonymous sub-structs
if tf.Anonymous {
if tf.Type.Kind() == reflect.Struct {
if err := ctx.setParamsInStruct(name+"."+tf.Name, t.Field(i).Type, v.Field(i)); err != nil {
return errors.Wrapf(err, "failed on parameters %s.%s", name, tf.Name)
}
continue
default:
}
return errors.Errorf("parameters cannot parse into anonymous %s field %s", tf.Type.Kind(), tf.Type.Name())
}
n := (strings.SplitN(f.Tag.Get("json"), ",", 2))[0]
//named field:
//use name from json tag, else lowercase of field name
n := (strings.SplitN(tf.Tag.Get("json"), ",", 2))[0]
if n == "" {
n = strings.ToLower(f.Name)
n = strings.ToLower(tf.Name)
}
if n == "" || n == "-" {
continue
continue //skip fields without name
}
//get value(s) from query string
//see if this named param was specified
var paramStrValues []string
if paramStrValue, isDefined := ctx.request.QueryStringParameters[n]; isDefined {
//specified once in URL
if len(paramStrValue) >= 2 && paramStrValue[0] == '[' && paramStrValue[len(paramStrValue)-1] == ']' {
//specified as CSV inside [...] e.g. id=[1,2,3]
csvReader := csv.NewReader(strings.NewReader(paramStrValue[1 : len(paramStrValue)-1]))
var err error
paramStrValues, err = csvReader.Read()
......@@ -109,38 +111,58 @@ func (ctx apiContext) extract(name string, t reflect.Type, v reflect.Value) erro
return errors.Wrapf(err, "invalid CSV: [%s]", paramStrValue)
}
} else {
paramStrValues = []string{paramStrValue} //single value
//specified as single value only e.g. id=1
paramStrValues = []string{paramStrValue}
}
} else {
//specified multiple times e.g. id=1&id=2&id=3
paramStrValues = ctx.request.MultiValueQueryStringParameters[n]
}
if len(paramStrValues) == 0 {
continue //param has no value specified in URL
}
valueField := v.Field(i)
if valueField.Kind() == reflect.Ptr {
valueField.Set(reflect.New(valueField.Type().Elem()))
valueField = valueField.Elem()
}
//param is defined >=1 times in URL
if f.Type.Kind() == reflect.Slice {
//iterate over all specified values
for index, paramStrValue := range paramStrValues {
newValuePtr := reflect.New(f.Type.Elem())
if err := reflection.SetValue(newValuePtr.Elem(), paramStrValue); err != nil {
return errors.Wrapf(err, "failed to set %s[%d]=%s", n, index, paramStrValues[0])
if tf.Type.Kind() == reflect.Slice {
//this param struct field is a slice, iterate over all specified values
for i, paramStrValue := range paramStrValues {
paramValue, err := parseParamValue(paramStrValue, tf.Type.Elem())
if err != nil {
return errors.Wrapf(err, "invalid %s[%d]", n, i)
}
v.Field(i).Set(reflect.Append(v.Field(i), newValuePtr.Elem()))
valueField.Set(reflect.Append(valueField, paramValue))
}
} else {
if len(paramStrValues) > 1 {
return errors.Errorf("%s does not support >1 values(%v)", n, strings.Join(paramStrValues, ","))
return errors.Errorf("parameter %s does not support multiple values [%s]", n, strings.Join(paramStrValues, ","))
}
//single value specified
if err := reflection.SetValue(v.Field(i), paramStrValues[0]); err != nil {
return errors.Wrapf(err, "failed to set %s=%s", n, paramStrValues[0])
paramValue, err := parseParamValue(paramStrValues[0], valueField.Type())
if err != nil {
return errors.Wrapf(err, "invalid %s", n)
}
valueField.Set(paramValue)
}
} //for each param struct field
return nil
}
func parseParamValue(s string, t reflect.Type) (reflect.Value, error) {
newValuePtr := reflect.New(t)
if err := json.Unmarshal([]byte("\""+s+"\""), newValuePtr.Interface()); err != nil {
if err := json.Unmarshal([]byte(s), newValuePtr.Interface()); err != nil {
return newValuePtr.Elem(), errors.Wrapf(err, "invalid \"%s\"", s)
}
}
return newValuePtr.Elem(), nil
}
func (ctx apiContext) GetRequestBody(requestStructType reflect.Type) (interface{}, error) {
requestStructValuePtr := reflect.New(requestStructType)
err := json.Unmarshal([]byte(ctx.request.Body), requestStructValuePtr.Interface())
......
......@@ -2,8 +2,10 @@ package api_test
import (
"context"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/aws/aws-lambda-go/events"
"gitlab.com/uafrica/go-utils/api"
......@@ -34,6 +36,7 @@ func TestNested(t *testing.T) {
ctx, err = api.New("request-id", nil).NewContext(
context.Background(),
"123",
//all URL params are specified as string values
events.APIGatewayProxyRequest{
QueryStringParameters: map[string]string{
"a": "1", //must be written into P3.P2.P1.A
......@@ -47,11 +50,11 @@ func TestNested(t *testing.T) {
},
})
if err != nil {
t.Fatal(err)
t.Fatalf("ERROR: %+v", err)
}
if p3d, err := ctx.GetRequestParams(reflect.TypeOf(P3{})); err != nil {
t.Fatal(err)
t.Fatalf("ERROR: %+v", err)
} else {
p3 := p3d.(P3)
t.Logf("p3: %+v", p3)
......@@ -67,6 +70,77 @@ func TestNested(t *testing.T) {
}
}
type ParamTypes struct {
GetParams
Nr int64 `json:"nr"`
Name string `json:"name"`
NrOpt *int64 `json:"nr_opt"`
NameOpt *string `json:"name_opt"`
Time1 time.Time `json:"time1"`
Time2 *time.Time `json:"time2"`
Dur1 time.Duration `json:"dur1"`
Dur2 *time.Duration `json:"dur2"`
//lists of values
NrList []int64 `json:"nrs"`
NameList []string `json:"names"`
NrOptList []*int64 `json:"nrs_opt"`
NameOptList []*string `json:"names_opt"`
Time1List []time.Time `json:"time1s"`
Time2List []*time.Time `json:"time2s"`
Dur1List []time.Duration `json:"dur1s"`
Dur2List []*time.Duration `json:"dur2s"`
}
func TestTypes(t *testing.T) {
logger.SetGlobalLevel(logger.LevelDebug)
logger.SetGlobalFormat(logger.NewConsole())
var ctx api.Context
var err error
ctx, err = api.New("request-id", nil).NewContext(
context.Background(),
"123",
//all URL params are specified as string values
events.APIGatewayProxyRequest{
QueryStringParameters: map[string]string{
"nr": "1",
"name": "name2",
"nr_opt": "3",
"name_opt": "name4",
"limit": "5",
"time1": "2021-11-23T00:00:00+00:00",
"time2": "2021-11-23T00:00:00+00:00",
"dur1": "4", //nanoseconds
"dur2": "4", //nanoseconds
"nrs": "[1,2,3]",
"nrs_opt": "[4,5,6]",
"names": "[A,B,C]",
"names_opt": "[D,E,F]",
"time1s": "[2021-11-23T00:00:00+00:00]",
"dur1s": "[4,5,6]", //nanoseconds
},
MultiValueQueryStringParameters: map[string][]string{
"dur2s": {"11", "12", "13"},
"time2s": {"2021-11-23T00:00:00+00:00", "2021-11-23T00:00:00+00:00", "2021-11-23T00:00:00+00:00"},
},
})
if err != nil {
t.Fatalf("ERROR: %+v", err)
}
if pd, err := ctx.GetRequestParams(reflect.TypeOf(ParamTypes{})); err != nil {
t.Fatalf("ERROR: %+v", err)
} else {
p := pd.(ParamTypes)
t.Logf("p: %+v", p)
if p.Nr != 1 || p.Name != "name2" || p.NrOpt == nil || *p.NrOpt != 3 || p.NameOpt == nil || *p.NameOpt != "name4" || p.Limit != 5 {
t.Errorf("Wrong values: %+v", p)
}
jsonParams, _ := json.Marshal(p)
t.Logf("params: %s", string(jsonParams))
}
}
type PageParams struct {
Limit int64 `json:"limit"`
Offset int64 `json:"offset"`
......@@ -94,6 +168,7 @@ func TestGet(t *testing.T) {
ctx, err = api.New("request-id", nil).NewContext(
context.Background(),
"123",
//all URL params are specified as string values
events.APIGatewayProxyRequest{
QueryStringParameters: map[string]string{
"id": "1",
......@@ -109,11 +184,11 @@ func TestGet(t *testing.T) {
},
})
if err != nil {
t.Fatal(err)
t.Fatalf("ERROR: %+v", err)
}
if p3d, err := ctx.GetRequestParams(reflect.TypeOf(MyGetParams{})); err != nil {
t.Fatal(err)
t.Fatalf("ERROR: %+v", err)
} else {
get := p3d.(MyGetParams)
t.Logf("get: %+v", get)
......
package reflection
import (
"fmt"
"reflect"
"strings"
"gitlab.com/uafrica/go-utils/errors"
)
//Get() a jq-named element from a value
//e.g. get(reflect.ValueOf(myDoc), ".hits.hits[]._source")
//the result is an array of _source items which may be
//just one field inside the hits object in the list
//
//see usage in search.TimeSeries.Search() to get documents
//from the OpenSearch response structure that is very nested
//and parse using a runtime-created reflect Type, i.e. one
//cannot get it simply by iterating over res.Hits.[]Hits...
func Get(v reflect.Value, key string) (reflect.Value, error) {
return get("", v, key)
}
func get(name string, v reflect.Value, key string) (reflect.Value, error) {
if key == "" {
return v, nil
}
switch v.Kind() {
case reflect.Ptr:
return get(name, v.Elem(), key)
case reflect.Struct:
if key[0] != '.' {
return v, errors.Errorf("get(%s): key=\"%s\" does not start with '.'", name, key)
}
fieldName := key[1:]
remainingKey := ""
index := strings.IndexAny(fieldName, ".[")
if index > 0 {
fieldName = key[1 : index+1]
remainingKey = key[index+1:]
}
t := v.Type()
fieldIndex := 0
for fieldIndex = 0; fieldIndex < t.NumField(); fieldIndex++ {
if strings.SplitN(t.Field(fieldIndex).Tag.Get("json"), ",", 2)[0] == fieldName {
break
}
}
if fieldIndex >= t.NumField() {
return v, errors.Errorf("%s does not have field %s", name, fieldName)
}
return get(name+"."+fieldName, v.Field(fieldIndex), remainingKey)
case reflect.Slice:
if !strings.HasPrefix(key, "[]") {
return v, errors.Errorf("canot get %s from slice, expecting \"[]\" in the key", key)
}
//make array of results from each item in the slice
var result reflect.Value
for i := 0; i < v.Len(); i++ {
if vv, err := get(fmt.Sprintf("%s[%d]", name, i), v.Index(i), key[2:]); err != nil {
return v, errors.Wrapf(err, "failed on %s[%d]", name, i)
} else {
if !result.IsValid() {
result = reflect.MakeSlice(reflect.SliceOf(vv.Type()), 0, v.Len())
}
result = reflect.Append(result, vv)
}
}
return result, nil
default:
}
return v, errors.Errorf("Cannot get %s from %s", key, v.Kind())
}
......@@ -52,7 +52,7 @@ func SetString(field reflect.Value, value string) {
return // Field doesn't exist
}
if field.Kind() != reflect.String {
logger.Error("Claims: Field is not of type String: %v", field.Kind())
logger.Errorf("Claims: Field is not of type String: %v", field.Kind())
return
}
field.SetString(value)
......
package reflection
import (
"reflect"
"strings"
"gitlab.com/uafrica/go-utils/errors"
)
//CloneType() clones a type into a new type, replacing some elements of it
//Paramters:
// t is the existing type to be cloned
// replace is a list of items to replace with their new types
// use jq key notation
// if this is empty map/nil, it will clone the type without changes
//Return:
// cloned type or error
//
//Example:
// newType,err := reflection.CloneType(
// reflect.TypeOf(myStruct{}),
// map[string]reflect.Type{
// ".field1": reflect.TypeOf(float64(0)),
// ".list[].name": reflect.TypeOf(int64(0)),
// },
// )
//
//This is not a function you will use everyday, but very useful when needed
//See example usage in search to read OpenSearch responses with the correct
//struct type used to parse ".hits.hits[]._source" so that we do not have to
//unmarshal the JSON, then marshal each _source from map[string]interface{}
//back to json then unmarshal again into the correct type!
//It saves a lot of overhead CPU doing it all at once using the correct type
//nested deep into the response body type.
//
//this function was written for above case... it will likely need extension
//if we want to replace all kinds of other fields, but it meets the current
//requirements.
//
//After parsing, use reflection.Get() with the same key notation to get the
//result from the nested document.
//
//Note current shortcoming: partial matching will apply if you have two
//similarly named fields, e.g. "name" and "name2", then replace instruction
//on "name" may partial match name2. To fix this, we need better function
//than strings.HasPrefix() to check for delimiter/end of name to do full
//word matches, and we need to extend the test to illustrate this.
func CloneType(t reflect.Type, replace map[string]reflect.Type) (reflect.Type, error) {
cloned, err := clone("", t, replace)
if err != nil {
return t, err
}
if len(replace) > 0 {
return t, errors.Errorf("unknown replacements: %+v", replace)
}
return cloned, nil
}
func clone(name string, t reflect.Type, replace map[string]reflect.Type) (reflect.Type, error) {
switch t.Kind() {
case reflect.Struct:
fields := []reflect.StructField{}
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
n := strings.SplitN(f.Tag.Get("json"), ",", 2)[0] //exclude ,omitempty...
if n == "" {
n = f.Name
}
fieldName := name + "." + n
for replaceName, replaceType := range replace {
if strings.HasPrefix(replaceName, fieldName) {
if replaceName == fieldName {
f.Type = replaceType
delete(replace, replaceName)
} else {
clonedType, err := clone(fieldName, f.Type, replace)
if err != nil {
return t, errors.Wrapf(err, "failed to clone %s", fieldName)
}
f.Type = clonedType
}
}
}
if newType, ok := replace[fieldName]; ok {
f.Type = newType
}
fields = append(fields, f)
}
return reflect.StructOf(fields), nil
case reflect.Slice:
for replaceName, replaceType := range replace {
if strings.HasPrefix(replaceName, name+"[]") {
if replaceName == name+"[]" {
//full match
delete(replace, replaceName)
return replaceType, nil
}
//partial match
elemType, err := clone(name+"[]", t.Elem(), replace)
if err != nil {
return t, errors.Wrapf(err, "failed to clone slice elem type")
}
return reflect.SliceOf(elemType), nil
}
}
//no match
return t, nil
default:
}
return t, errors.Errorf("cannot clone %v %s", t.Kind(), t.Name())
}
package reflection_test
import (
"encoding/json"
"reflect"
"testing"
"gitlab.com/uafrica/go-utils/reflection"
)
func Test1(t *testing.T) {
doc := `
{
"took":872,
"timed_out":false,
"_shards":{
"total":38,
"successful":38,
"skipped":0,
"failed":0
},
"hits":{
"total":{
"value":10,
"relation":"eq"
},
"max_score":null,
"hits":[
{
"_index": "go-utils-audit-test-20211030",
"_type": "_doc",
"_id": "Tj9l5XwBWRiAneoYazic",
"_score": 1.2039728,
"_source": {
"@timestamp": "2021-10-30T15:03:20.000000+02:00",
"@end_time": "2021-10-30T15:03:21.000000+02:00",
"@duration_ms": 1000,
"test1": "6",
"test2": "ACC_00098",
"test3": 10,
"http": {
"method": "GET",
"path": "/accounts"
},
"http_method": "GET",
"http_path": "/accounts"
}
}
]
}
}
`
//using default type, documents in _source:{} are parsed to map[string]interface{}
res := SearchResponseBody{}
if err := json.Unmarshal([]byte(doc), &res); err != nil {
t.Fatalf("cannot unmarshal response into default type: %+v", err)
}
for hitIndex, hit := range res.Hits.Hits {
t.Logf("doc[%d]: (%T)%+v", hitIndex, hit.Source, hit.Source)
}
//create type with own document type to use for _source field:
cloned, err := reflection.CloneType(
reflect.TypeOf(SearchResponseBody{}),
map[string]reflect.Type{
".hits.hits[]._source": reflect.TypeOf(myDoc{}),
})
if err != nil {
t.Fatal(err)
}
t.Logf("cloned type: %v", cloned)
//unmarshal using new type to have correct type for each hit
resPtrValue := reflect.New(cloned)
if err := json.Unmarshal([]byte(doc), resPtrValue.Interface()); err != nil {
t.Fatalf("failed to decode into cloned type: %+v", err)
}
// clonedRes := resPtrValue.Interface()
// t.Logf("Coned res: %+v", clonedRes)
//get the replaced values as an array of docs
v, err := reflection.Get(resPtrValue, ".hits.hits[]._source")
if err != nil {
t.Fatalf("Did not get list: %+v", err)
}
docs, ok := v.Interface().([]myDoc)
if !ok {
t.Fatalf("%T is not []myDoc", v.Interface())
}
if len(docs) != 1 {
t.Fatalf("Got %d != 1", len(docs))
}
for _, doc := range docs {
t.Logf("doc: (%T)%+v", doc, doc)
}
}
type SearchResponseBody struct {
Took int `json:"took"` //milliseconds
TimedOut bool `json:"timed_out"`
Shards SearchResponseShards `json:"_shards"`
Hits SearchResponseHits `json:"hits"`
}
type SearchResponseShards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
}
type SearchResponseHits struct {
Total SearchResponseHitsTotal `json:"total"`
MaxScore *float64 `json:"max_score,omitempty"`
Hits []HitDoc `json:"hits"`
}
type SearchResponseHitsTotal struct {
Value int `json:"value"` //e.g. 0 when no docs matched
Relation string `json:"relation"` //e.g. "eq"
}
type HitDoc struct {
Index string `json:"_index"` //name of index
Type string `json:"_type"` //_doc
ID string `json:"_id"`
Score float64 `json:"_score"` //
Source map[string]interface{} `json:"_source"` //the document of itemType
}
type myDoc struct {
Test1 string `json:"test1"`
}
package search
import "time"
//Mapping configures an index in OpenSearch
type Index struct {
Settings Settings `json:"settings"`
......@@ -24,6 +26,7 @@ type MappingProperty struct {
Enabled bool `json:"enabled,omitempty"`
Fields map[string]MappingFieldProperties `json:"fields,omitempty"`
Properties map[string]MappingProperty `json:"properties,omitempty"`
//Index bool `json:"index,omitempty"` //set true to make text field searchable
}
type MappingFieldProperties struct {
......@@ -34,3 +37,126 @@ type MappingKeyword struct {
Type string `json:"type"` //="keyword"
IgnoreAbove int `json:"ignore_above"` //e.g. 256
}
type SearchRequestBody struct {
Size int64 `json:"size,omitempty"`
Query Query `json:"query"`
}
type Query struct {
//one of:
Match *QueryNameValue `json:"match,omitempty" doc:"<field>:<value>"`
Term *QueryNameValue `json:"term,omitempty"`
Range *QueryRange `json:"range,omitempty"`
MultiMatch *QueryMultiMatch `json:"multi_match,omitempty"`
Bool *QueryBool `json:"bool,omitempty"`
}
type QueryMultiMatch struct {
Query string `json:"query" doc:"Text search in below fields"`
Fields []string `json:"fields,omitempty" doc:"List of fields"`
}
//https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html
type QueryBool struct {
Must []Query `json:"must,omitempty" docs:"List of things that must appear in matching documents and will contribute to the score."`
Filter []Query `json:"filter,omitempty" doc:"List of things that must appear in matching documents. However unlike must the score of the query will be ignored. Filter clauses are executed in filter context, meaning that scoring is ignored and clauses are considered for caching."`
Should []Query `json:"should,omitempty" doc:"List of things that should appear in the matching document."`
MustNot []Query `json:"must_not,omitempty" doc:"List of things that must not appear in the matching documents. Clauses are executed in filter context meaning that scoring is ignored and clauses are considered for caching. Because scoring is ignored, a score of 0 for all documents is returned."`
}
//<name>:<value> can be shorthanded to just a text value "...", but for sake of go type def, we always use an object meaning the same, allowing more options
//https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html#query-dsl-match-query-short-ex
type QueryNameValue map[string]QueryValue
type QueryValue struct {
Query string `json:"query"`
Operator string `json:"operator,omitempty"` //defaults to "or", accepted values: or|and
Fuzziness string `json:"fuzziness,omitempty"` //https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#fuzziness
ZeroTermsQuery string `json:"zero_terms_query,omitempty"`
}
func QueryValueText(text string) QueryValue {
return QueryValue{Query: text, Operator: "and"}
}
func QueryValueTime(t time.Time) QueryValue {
return QueryValue{Query: t.String(), Operator: "and"}
}
type QueryRange map[string]QueryExpr
type QueryExpr map[string]string //<oper>:<value> e.g. "gte":"10"
//example of search response body:
// {
// "took":872,
// "timed_out":false,
// "_shards":{
// "total":38,
// "successful":38,
// "skipped":0,
// "failed":0
// },
// "hits":{
// "total":{
// "value":0,
// "relation":"eq"
// },
// "max_score":null,
// "hits":[
// {
// "_index": "go-utils-audit-test-20211030",
// "_type": "_doc",
// "_id": "Tj9l5XwBWRiAneoYazic",
// "_score": 1.2039728,
// "_source": {
// "@timestamp": "2021-10-30T15:03:20.679481+02:00",
// "@end_time": "2021-10-30T15:03:20.469481+02:00",
// "@duration_ms": -210,
// "test1": "6",
// "test2": "ACC_00098",
// "test3": 10,
// "http": {
// "method": "GET",
// "path": "/accounts"
// },
// "http_method": "GET",
// "http_path": "/accounts"
// }
// },
// ]
// }
// }
type SearchResponseBody struct {
Took int `json:"took"` //milliseconds
TimedOut bool `json:"timed_out"`
Shards SearchResponseShards `json:"_shards"`
Hits SearchResponseHits `json:"hits"`
}
type SearchResponseShards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
}
type SearchResponseHits struct {
Total SearchResponseHitsTotal `json:"total"`
MaxScore *float64 `json:"max_score,omitempty"`
Hits []HitDoc `json:"hits"`
}
type SearchResponseHitsTotal struct {
Value int `json:"value"` //e.g. 0 when no docs matched
Relation string `json:"relation"` //e.g. "eq"
}
type HitDoc struct {
Index string `json:"_index"` //name of index
Type string `json:"_type"` //_doc
ID string `json:"_id"`
Score float64 `json:"_score"` //
Source map[string]interface{} `json:"_source"` //the document of itemType
}
......@@ -67,6 +67,27 @@ func test(t *testing.T, c search.Config) {
}
}
query := search.Query{
MultiMatch: &search.QueryMultiMatch{
Query: "GET",
Fields: []string{"http_method"},
},
}
docs, totalCount, err := ts.Search(query, 10)
if err != nil {
t.Errorf("failed to search: %+v", err)
} else {
if docsSlice, ok := docs.([]testStruct); ok {
t.Logf("search result total_count:%d with %d docs", totalCount, len(docsSlice))
if len(docsSlice) > 10 {
t.Errorf("got %d docs > max 10", len(docsSlice))
}
} else {
t.Errorf("docs %T is not []testStruct!", docs)
}
}
oldList, err := a.DelOldTimeSeries(indexName, 2)
if err != nil {
t.Fatalf("failed to del old: %+v", err)
......
package search
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"net/http"
"reflect"
"strings"
"time"
opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logger"
"gitlab.com/uafrica/go-utils/reflection"
)
const TimeFormat = "2006-01-02T15:04:05+07:00"
//embed this into your log struct
type TimeSeriesHeader struct {
StartTime time.Time `json:"@timestamp"`
......@@ -20,6 +27,7 @@ type TimeSeriesHeader struct {
type TimeSeries interface {
Write(StartTime time.Time, EndTime time.Time, data interface{}) error
Search(query Query, limit int64) (docs interface{}, totalCount int, err error)
}
type timeSeries struct {
......@@ -29,6 +37,8 @@ type timeSeries struct {
fields []dataField
jsonIndexSpec []byte
createdDates map[string]bool
searchResponseBodyType reflect.Type
}
type dataField struct {
......@@ -156,6 +166,20 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal index spec")
}
//define search response type
//similar to SearchResponseBody
ts.searchResponseBodyType, err = reflection.CloneType(
reflect.TypeOf(SearchResponseBody{}),
map[string]reflect.Type{
".hits.hits[]._source": ts.dataType,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to make search response type for time-series")
}
//new package: copy type recursively, find index of special field and replace when found....
w.timeSeriesByName[name] = ts
return ts, nil
}
......@@ -247,7 +271,7 @@ func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string
logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr)
} else {
if date.Before(timeThreshold) {
logger.Debugf("Deleting index(%s).uuid(%s).docsCount(%d) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, dailyIndexInfo.Settings.Index.DocsCount, timeThreshold)
logger.Debugf("Deleting index(%s).uuid(%s) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, timeThreshold)
indicesToDelete = append(indicesToDelete, dailyIndexName)
}
}
......@@ -275,5 +299,73 @@ type IndexInfoSettings struct {
type IndexSettings struct {
UUID string `json:"uuid"`
DocsCount int64 `json:"docs.count"`
CreationDate string `json:"creation_date"`
NumberOfShards string `json:"number_of_shards"`
NumberOfReplicas string `json:"number_of_replicas"`
ProviderName string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103"
}
//Search
//Return:
// docs will be a slice of the TimeSeries data type
func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalCount int, err error) {
if limit < 0 || limit > 1000 {
err = errors.Errorf("limit=%d not 0..1000", limit)
return
}
// example search request body for free text
// {
// "size": 5,
// "query": {
// "multi_match": {
// "query": "miller",
// "fields": ["title^2", "director"]
// }
// }
// }
body := SearchRequestBody{
Size: limit,
Query: query,
}
jsonBody, _ := json.Marshal(body)
search := opensearchapi.SearchRequest{
Body: bytes.NewReader(jsonBody),
}
searchResponse, err := search.Do(context.Background(), ts.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to search documents")
return
}
switch searchResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(searchResponse.Body)
err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
return
}
resBodyPtrValue := reflect.New(ts.searchResponseBodyType)
if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
err = errors.Wrapf(err, "cannot decode search response body")
return
}
hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value")
if err != nil {
err = errors.Wrapf(err, "cannot get total nr of hits")
return
}
if hitsTotalValue.Interface().(int) < 1 {
return nil, 0, nil //no matches
}
items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source")
if err != nil {
err = errors.Wrapf(err, "cannot get search response documents")
return
}
return items.Interface(), hitsTotalValue.Interface().(int), nil
}
......@@ -130,32 +130,6 @@ func (writer writer) Search() ([]interface{}, error) {
return nil, errors.Errorf("NYI search result processing: %v", res)
}
// // Delete the document.
// delete := opensearchapi.DeleteRequest{
// Index: IndexName,
// DocumentID: docId,
// }
// deleteResponse, err := delete.Do(context.Background(), client)
// if err != nil {
// fmt.Println("failed to delete document ", err)
// os.Exit(1)
// }
// fmt.Println("deleting document")
// fmt.Println(deleteResponse)
// // Delete previously created index.
// deleteIndex := opensearchapi.IndicesDeleteRequest{
// Index: []string{writer.config.IndexName},
// }
// deleteIndexResponse, err := deleteIndex.Do(context.Background(), client)
// if err != nil {
// fmt.Println("failed to delete index ", err)
// os.Exit(1)
// }
// fmt.Println("deleting index", deleteIndexResponse)
type CreateResponse struct {
Error *Error `json:"error,omitempty"`
}
......@@ -170,15 +144,6 @@ type IndexResponse struct {
Shards *Shards `json:"_shards,omitempty"`
Type string `json:"_type,omitempty"`
PrimaryTerm int `json:"_primary_term,omitempty"`
// _id:oZJZxXwBPnbPDFcjNpuO
// _index:go-utils-audit-test
// _primary_term:1
// _seq_no:5
// _shards:map[failed:0 successful:2 total:2]
// _type:_doc
// _version:1
// result:created
}
type Shards struct {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment