From 0bbaf892bb64390bde6bf65d92f3971acf666c5e Mon Sep 17 00:00:00 2001 From: Jan Semmelink <jan@uafrica.com> Date: Tue, 9 Nov 2021 14:25:49 +0200 Subject: [PATCH] Implemented mapping improvements --- search/opensearch_types.go | 7 +- search/search_test.go | 3 +- search/time_series.go | 193 ++++++++++++++++++++++--------------- 3 files changed, 118 insertions(+), 85 deletions(-) diff --git a/search/opensearch_types.go b/search/opensearch_types.go index 72c048a..2c127dd 100644 --- a/search/opensearch_types.go +++ b/search/opensearch_types.go @@ -3,11 +3,6 @@ package search import "time" //Mapping configures an index in OpenSearch -type Index struct { - Settings Settings `json:"settings"` - Mappings Mappings `json:"mappings"` -} - type Settings struct { Index *SettingsIndex `json:"index,omitempty"` } @@ -22,7 +17,7 @@ type Mappings struct { } type MappingProperty struct { - Type string `json:"type"` + Type string `json:"type,omitempty"` //empty for sub-structs described with properties Enabled bool `json:"enabled,omitempty"` Fields map[string]MappingFieldProperties `json:"fields,omitempty"` Properties map[string]MappingProperty `json:"properties,omitempty"` diff --git a/search/search_test.go b/search/search_test.go index 0deef6b..2dec817 100644 --- a/search/search_test.go +++ b/search/search_test.go @@ -107,7 +107,7 @@ type testStruct struct { Test1 string `json:"test1"` Test2 string `json:"test2"` Test3 int `json:"test3"` - HTTP httpData `json:"http"` + HTTP httpData `json:"http"` //this is a sub-struct... HTTPMethod string `json:"http_method" search:"keyword"` HTTPPath string `json:"http_path" search:"keyword"` } @@ -115,6 +115,7 @@ type testStruct struct { type httpData struct { Method string `json:"method" search:"keyword"` Path string `json:"path" search:"keyword"` + Size int `json:"size" search:"long"` } func TestOlderThan(t *testing.T) { diff --git a/search/time_series.go b/search/time_series.go index 3a13720..0e6d95e 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -31,22 +31,18 @@ type TimeSeries interface { } type timeSeries struct { - w *writer - name string - dataType reflect.Type - fields []dataField - jsonIndexSpec []byte - createdDates map[string]bool + w *writer + name string + dataType reflect.Type + settings Settings + mappings Mappings + jsonSettings []byte + jsonMappings []byte + createdDates map[string]bool searchResponseBodyType reflect.Type } -type dataField struct { - name string - index []int - mapping MappingProperty -} - //purpose: // create a time series to write e.g. api logs //parameters: @@ -78,52 +74,108 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { w: w, name: name, dataType: structType, - fields: []dataField{}, createdDates: map[string]bool{}, } //define the OpenSearch index mapping - indexSpec := Index{ - Settings: Settings{ - Index: &SettingsIndex{ - NumberOfShards: 4, - NumberOfReplicas: 0, - }, - }, - Mappings: Mappings{ - Properties: map[string]MappingProperty{}, + ts.settings = Settings{ + Index: &SettingsIndex{ + NumberOfShards: 4, + NumberOfReplicas: 0, }, } + + if properties, err := structMappingProperties(structType); err != nil { + return nil, errors.Wrapf(err, "cannot map struct %s", structType) + } else { + ts.mappings = Mappings{ + Properties: properties, + } + } + + var err error + ts.jsonSettings, err = json.Marshal(ts.settings) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal index settings") + } + ts.jsonMappings, err = json.Marshal(ts.mappings) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal index mappings") + } + logger.Debugf("%s Index Mappings: %s", structType, string(ts.jsonMappings)) + + //define search response type + //similar to SearchResponseBody + ts.searchResponseBodyType, err = reflection.CloneType( + reflect.TypeOf(SearchResponseBody{}), + map[string]reflect.Type{ + ".hits.hits[]._source": ts.dataType, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to make search response type for time-series") + } + + //new package: copy type recursively, find index of special field and replace when found.... + + w.timeSeriesByName[name] = ts + return ts, nil +} + +func structMappingProperties(structType reflect.Type) (map[string]MappingProperty, error) { + properties := map[string]MappingProperty{} for i := 0; i < structType.NumField(); i++ { structField := structType.Field(i) - dataField := dataField{ - name: structField.Name, - index: structField.Index, - mapping: MappingProperty{Type: "text"}, + + fieldName := structField.Name + + //fields of embedded (anonymous) structs are added at the same level + if structField.Anonymous && structField.Type.Kind() == reflect.Struct { + subFields, err := structMappingProperties(structField.Type) + if err != nil { + return nil, errors.Wrapf(err, "failed to map embedded struct %s", fieldName) + } + for n, v := range subFields { + properties[n] = v + } + continue } + if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" { - dataField.name = jsonTags[0] + fieldName = jsonTags[0] } - if dataField.name == "" { - logger.Debugf("Skip %s unnamed field %+v", structType.Name(), structField) + if fieldName == "" { + logger.Debugf("Skip %s unnamed field %+v", structType, structField) continue } //get default type of search value from field type + fieldMapping := MappingProperty{Type: "text"} switch structField.Type.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - dataField.mapping = MappingProperty{Type: "long"} + fieldMapping = MappingProperty{Type: "long"} case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: - dataField.mapping = MappingProperty{Type: "long"} + fieldMapping = MappingProperty{Type: "long"} case reflect.Bool: - dataField.mapping = MappingProperty{Type: "boolean"} + fieldMapping = MappingProperty{Type: "boolean"} case reflect.String: - dataField.mapping = MappingProperty{Type: "text"} + fieldMapping = MappingProperty{Type: "text"} default: if structField.Type == reflect.TypeOf(time.Now()) { - dataField.mapping = MappingProperty{Type: "date"} + fieldMapping = MappingProperty{Type: "date"} } else { - dataField.mapping = MappingProperty{Type: "text"} + if structField.Type.Kind() == reflect.Struct { + subStructProperties, err := structMappingProperties(structField.Type) + if err != nil { + return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name) + } + fieldMapping = MappingProperty{ + Properties: subStructProperties, + } + } else { + // fieldMapping = MappingProperty{Type: "text"} + // unknown value type... we do not specify mapping and let it use dynamic mapping + continue + } } } @@ -132,56 +184,24 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { case "": //no change case "keyword": - dataField.mapping = MappingProperty{Type: "keyword"} + fieldMapping.Type = "keyword" case "long": - dataField.mapping = MappingProperty{Type: "long"} + fieldMapping.Type = "long" case "date": - dataField.mapping = MappingProperty{Type: "date"} + fieldMapping.Type = "data" case "boolean": - dataField.mapping = MappingProperty{Type: "boolean"} + fieldMapping.Type = "boolean" case "object": - dataField.mapping = MappingProperty{Type: "boolean", Enabled: false} + fieldMapping.Type = "object" + fieldMapping.Enabled = false default: - return nil, errors.Errorf("Unknown search:\"%s\" on timeSeries(%s) field(%s)", structField.Tag.Get("search"), name, structField.Name) + return nil, errors.Errorf("Unknown search:\"%s\" on field(%s)", structField.Tag.Get("search"), structField.Name) } //add to index spec - indexSpec.Mappings.Properties[dataField.name] = dataField.mapping - - //add to list of fields - ts.fields = append(ts.fields, dataField) - } - - //add header fields for all time series to the index spec - for n, p := range map[string]MappingProperty{ - "@timestamp": {Type: "date"}, - "@end_time": {Type: "date"}, - "@duration_ms": {Type: "long"}, - } { - indexSpec.Mappings.Properties[n] = p - } - - var err error - ts.jsonIndexSpec, err = json.Marshal(indexSpec) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal index spec") - } - - //define search response type - //similar to SearchResponseBody - ts.searchResponseBodyType, err = reflection.CloneType( - reflect.TypeOf(SearchResponseBody{}), - map[string]reflect.Type{ - ".hits.hits[]._source": ts.dataType, - }) - if err != nil { - return nil, errors.Wrapf(err, "failed to make search response type for time-series") + properties[fieldName] = fieldMapping } - - //new package: copy type recursively, find index of special field and replace when found.... - - w.timeSeriesByName[name] = ts - return ts, nil + return properties, nil } //data must be of type specified in Writer.TimeSeries(tmpl) @@ -191,20 +211,37 @@ func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) erro } t := reflect.TypeOf(data) if t != ts.dataType { - return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType.Name()) + return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType) } //get daily search index to write to, from start time indexName := ts.name + "-" + startTime.Format("20060102") if _, ok := ts.createdDates[indexName]; !ok { //create new index for this date - if not exists + res, err := ts.w.api.Create( indexName, //index name indexName, //index name also used for document id - strings.NewReader(string(ts.jsonIndexSpec))) + strings.NewReader(string(ts.jsonSettings))) + if err != nil { + return errors.Wrapf(err, "failed to create index(%s)", indexName) + } + switch res.StatusCode { + case http.StatusOK: + case http.StatusCreated: + case http.StatusConflict: //409 = already exists + default: + return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) + } + + res, err = opensearchapi.IndicesPutMappingRequest{ + Index: []string{indexName}, + Body: strings.NewReader(string(ts.jsonMappings)), + }.Do(context.Background(), ts.w.client) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } + switch res.StatusCode { case http.StatusOK: case http.StatusCreated: -- GitLab