package search import ( "encoding/json" "net/http" "reflect" "strings" "time" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" ) type TimeSeries interface { Write(StartTime time.Time, EndTime time.Time, data interface{}) error } type timeSeries struct { w *writer indexName string dataType reflect.Type fields []dataField } type dataField struct { name string index []int mapping MappingProperty } //create a time series to write e.g. api logs //the tmpl must be your log data struct consisting of public fields as: // Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) // Xxx time.Time `json:"<name>"` assumes type "date" for opensearch // Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error) { if !indexNameRegex.MatchString(indexName) { return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) } structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { return nil, errors.Errorf("%T is not a struct", tmpl) } ts := &timeSeries{ w: w, indexName: indexName, dataType: structType, fields: []dataField{}, } //define the OpenSearch index mapping indexSpec := Index{ Settings: Settings{ Index: &SettingsIndex{ NumberOfShards: 4, NumberOfReplicas: 0, }, }, Mappings: Mappings{ Properties: map[string]MappingProperty{}, }, } for i := 0; i < structType.NumField(); i++ { structField := structType.Field(i) dataField := dataField{ name: structField.Name, index: structField.Index, mapping: MappingProperty{Type: "text"}, } if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" { dataField.name = jsonTags[0] } if dataField.name == "" { logger.Debugf("Skip %s unnamed field %+v", structType.Name(), structField) continue } //get default type of search value from field type switch structField.Type.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: dataField.mapping = MappingProperty{Type: "long"} case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: dataField.mapping = MappingProperty{Type: "long"} case reflect.Bool: dataField.mapping = MappingProperty{Type: "boolean"} case reflect.String: dataField.mapping = MappingProperty{Type: "text"} default: if structField.Type == reflect.TypeOf(time.Now()) { dataField.mapping = MappingProperty{Type: "date"} } else { dataField.mapping = MappingProperty{Type: "text"} } } //allow user to change that with a search tag on the field switch structField.Tag.Get("search") { case "": //no change case "keyword": dataField.mapping = MappingProperty{Type: "keyword"} case "long": dataField.mapping = MappingProperty{Type: "long"} case "date": dataField.mapping = MappingProperty{Type: "date"} case "boolean": dataField.mapping = MappingProperty{Type: "boolean"} case "object": dataField.mapping = MappingProperty{Type: "boolean", Enabled: false} default: return nil, errors.Errorf("Unknown search:\"%s\" on index(%s) field(%s)", structField.Tag.Get("search"), indexName, structField.Name) } //add to index spec indexSpec.Mappings.Properties[dataField.name] = dataField.mapping //add to list of fields ts.fields = append(ts.fields, dataField) } //add header fields for all time series to the index spec for n, p := range map[string]MappingProperty{ "@timestamp": {Type: "date"}, "@end_time": {Type: "date"}, "@duration_ms": {Type: "long"}, } { indexSpec.Mappings.Properties[n] = p } //todo: find out what is significance of "@..." in the name - or just convention? Is user allowed to use it too? //create the index if it does not already exist jsonIndexSpec, err := json.Marshal(indexSpec) if err != nil { return nil, errors.Wrapf(err, "failed to marshal index spec") } logger.Debugf("JSON Index Specification: %s", string(jsonIndexSpec)) res, err := w.api.Create( indexName, //index name indexName, //index name also used for document id strings.NewReader(string(jsonIndexSpec))) if err != nil { return nil, errors.Wrapf(err, "failed to create index(%s)", indexName) } switch res.StatusCode { case http.StatusOK: return ts, nil case http.StatusCreated: return ts, nil case http.StatusConflict: //409 = already exists return ts, nil default: return nil, errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } } //data must be of type specified in Writer.TimeSeries(tmpl) func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { t := reflect.TypeOf(data) if t != ts.dataType { return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.indexName, ts.dataType.Name()) } //store all the data struct field values into a map[string]interface{} that we can marshal //which includes the header fields searchDoc := map[string]interface{}{} v := reflect.ValueOf(data) for _, field := range ts.fields { searchDoc[field.name] = v.FieldByIndex(field.index).Interface() } //set header values searchDoc["@timestamp"] = startTime searchDoc["@end_time"] = endTime searchDoc["@duration_ms"] = endTime.Sub(startTime) / time.Millisecond return ts.w.Write(ts.indexName, searchDoc) }