package search import ( "bytes" "context" "encoding/json" "io/ioutil" "net/http" "reflect" "strings" "time" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" "gitlab.com/uafrica/go-utils/reflection" ) const TimeFormat = "2006-01-02T15:04:05+07:00" //embed this into your log struct type TimeSeriesHeader struct { StartTime time.Time `json:"@timestamp"` EndTime time.Time `json:"@end_time"` DurationMs int64 `json:"@duration_ms"` } type TimeSeries interface { Write(StartTime time.Time, EndTime time.Time, data interface{}) error Search(query Query, limit int64) (docs interface{}, totalCount int, err error) } type timeSeries struct { w *writer name string dataType reflect.Type settings Settings mappings Mappings jsonSettings []byte jsonMappings []byte createdDates map[string]bool searchResponseBodyType reflect.Type } //purpose: // create a time series to write e.g. api logs //parameters: // name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-logs" // the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-logs-20210102" // tmpl must be your log data struct consisting of public fields as: // Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) // Xxx time.Time `json:"<name>"` assumes type "date" for opensearch // Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { if !indexNameRegex.MatchString(name) { return nil, errors.Errorf("invalid index_name:\"%s\"", name) } //if already created, just return if existingTimeSeries, ok := w.timeSeriesByName[name]; ok { return existingTimeSeries, nil } structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { return nil, errors.Errorf("%T is not a struct", tmpl) } if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) { return nil, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl) } ts := &timeSeries{ w: w, name: name, dataType: structType, createdDates: map[string]bool{}, } //define the OpenSearch index mapping ts.settings = Settings{ Index: &SettingsIndex{ NumberOfShards: 4, NumberOfReplicas: 0, }, } if properties, err := structMappingProperties(structType); err != nil { return nil, errors.Wrapf(err, "cannot map struct %s", structType) } else { ts.mappings = Mappings{ Properties: properties, } } var err error ts.jsonSettings, err = json.Marshal(ts.settings) if err != nil { return nil, errors.Wrapf(err, "failed to marshal index settings") } ts.jsonMappings, err = json.Marshal(ts.mappings) if err != nil { return nil, errors.Wrapf(err, "failed to marshal index mappings") } logger.Infof("%s Index Mappings: %s", structType, string(ts.jsonMappings)) //define search response type //similar to SearchResponseBody ts.searchResponseBodyType, err = reflection.CloneType( reflect.TypeOf(SearchResponseBody{}), map[string]reflect.Type{ ".hits.hits[]._source": ts.dataType, }) if err != nil { return nil, errors.Wrapf(err, "failed to make search response type for time-series") } w.timeSeriesByName[name] = ts return ts, nil } func structMappingProperties(structType reflect.Type) (map[string]MappingProperty, error) { properties := map[string]MappingProperty{} for i := 0; i < structType.NumField(); i++ { structField := structType.Field(i) fieldName := structField.Name //fields of embedded (anonymous) structs are added at the same level if structField.Anonymous && structField.Type.Kind() == reflect.Struct { subFields, err := structMappingProperties(structField.Type) if err != nil { return nil, errors.Wrapf(err, "failed to map embedded struct %s", fieldName) } for n, v := range subFields { properties[n] = v } continue } if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" { fieldName = jsonTags[0] } if fieldName == "" { logger.Debugf("Skip %s unnamed field %+v", structType, structField) continue } //get default type of search value from field type fieldMapping := MappingProperty{Type: "text"} switch structField.Type.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: fieldMapping = MappingProperty{Type: "long"} case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: fieldMapping = MappingProperty{Type: "long"} case reflect.Float32, reflect.Float64: fieldMapping = MappingProperty{Type: "float"} case reflect.Bool: fieldMapping = MappingProperty{Type: "boolean"} case reflect.String: fieldMapping = MappingProperty{Type: "text"} case reflect.Slice: //do not indicate slice, just map slice items as sub-items subStructProperties, err := structMappingProperties(structField.Type.Elem()) if err != nil { return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name) } fieldMapping = MappingProperty{ Properties: subStructProperties, } default: if structField.Type == reflect.TypeOf(time.Now()) { fieldMapping = MappingProperty{Type: "date"} } else { if structField.Type.Kind() == reflect.Struct { subStructProperties, err := structMappingProperties(structField.Type) if err != nil { return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name) } fieldMapping = MappingProperty{ Properties: subStructProperties, } } else { // fieldMapping = MappingProperty{Type: "text"} // unknown value type... we do not specify mapping and let it use dynamic mapping continue } } } //allow user to change that with a search tag on the field switch structField.Tag.Get("search") { case "": //no change case "keyword": fieldMapping.Type = "keyword" case "long": fieldMapping.Type = "long" case "date": fieldMapping.Type = "data" case "boolean": fieldMapping.Type = "boolean" case "object": fieldMapping.Type = "object" fieldMapping.Enabled = false default: return nil, errors.Errorf("Unknown search:\"%s\" on field(%s)", structField.Tag.Get("search"), structField.Name) } //add to index spec properties[fieldName] = fieldMapping } return properties, nil } //data must be of type specified in Writer.TimeSeries(tmpl) func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { if data == nil { return errors.Errorf("data:nil") } t := reflect.TypeOf(data) if t != ts.dataType { return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType) } //get daily search index to write to, from start time indexName := ts.name + "-" + startTime.Format("20060102") if _, ok := ts.createdDates[indexName]; !ok { //create new index for this date - if not exists res, err := ts.w.api.Create( indexName, //index name indexName, //index name also used for document id strings.NewReader(string(ts.jsonSettings))) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } switch res.StatusCode { case http.StatusOK: case http.StatusCreated: case http.StatusConflict: //409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } res, err = opensearchapi.IndicesPutMappingRequest{ Index: []string{indexName}, Body: strings.NewReader(string(ts.jsonMappings)), }.Do(context.Background(), ts.w.client) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } switch res.StatusCode { case http.StatusOK: case http.StatusCreated: case http.StatusConflict: //409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } ts.createdDates[indexName] = true } //copy to set the header values x := reflect.New(ts.dataType) x.Elem().Set(reflect.ValueOf(data)) x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{ StartTime: startTime, EndTime: endTime, DurationMs: endTime.Sub(startTime).Milliseconds(), })) if res, err := ts.w.Write(indexName, "", x.Elem().Interface()); err != nil { return err } else { logger.Debugf("IndexResponse: %+v", res) } return nil } //parameters: // indexName is index prefix before dash-date, e.g. "api-logs" then will look for "api-logs-<date>" //returns // list of indices to delete with err==nil if deleted successfully func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string, error) { if !indexNameRegex.MatchString(indexName) { return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) } if olderThanDays < 0 { return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays) } if olderThanDays == 0 { return nil, nil } //make list of indices matching specified name e.g. "uafrica-v3-api-logs-*" res, err := w.api.Indices.Get([]string{indexName + "-*"}, w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"})) if err != nil { return nil, errors.Wrapf(err, "failed to list existing %s-* indices", indexName) } switch res.StatusCode { case http.StatusOK: default: return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } indices := map[string]indexInfo{} if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { return nil, errors.Wrapf(err, "failed to read list of indices") } //calculate time N days ago //working in local time, assuming the server runs in same location as API user... //so that index rotates at midnight local time rather than midnight UTC t0 := time.Now() timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) //= N days before that indicesToDelete := []string{} for dailyIndexName, dailyIndexInfo := range indices { dateStr := dailyIndexName[len(indexName)+1:] if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil { logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr) } else { if date.Before(timeThreshold) { logger.Debugf("Deleting index(%s).uuid(%s) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, timeThreshold) indicesToDelete = append(indicesToDelete, dailyIndexName) } } } if len(indicesToDelete) > 0 { _, err := w.api.Indices.Delete(indicesToDelete) if err != nil { return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete) } logger.Debugf("Deleted %d daily indices(%s) older than %d days", len(indicesToDelete), indicesToDelete, olderThanDays) } return indicesToDelete, nil } //output from GET /_cat/indices for each index in the list type indexInfo struct { Aliases map[string]interface{} `json:"aliases"` Mappings Mappings `json:"mappings"` Settings IndexInfoSettings `json:"settings"` } type IndexInfoSettings struct { Index IndexSettings `json:"index"` } type IndexSettings struct { UUID string `json:"uuid"` CreationDate string `json:"creation_date"` NumberOfShards string `json:"number_of_shards"` NumberOfReplicas string `json:"number_of_replicas"` ProviderName string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103" } //Search //Return: // docs will be a slice of the TimeSeries data type func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalCount int, err error) { if ts == nil { return nil, 0, errors.Errorf("time series == nil") } if limit < 0 || limit > 1000 { err = errors.Errorf("limit=%d not 0..1000", limit) return } // example search request body for free text // { // "size": 5, // "query": { // "multi_match": { // "query": "miller", // "fields": ["title^2", "director"] // } // } // } body := SearchRequestBody{ Size: limit, Query: query, } jsonBody, _ := json.Marshal(body) search := opensearchapi.SearchRequest{ Index: []string{ts.name + "-*"}, Body: bytes.NewReader(jsonBody), } searchResponse, err := search.Do(context.Background(), ts.w.client) if err != nil { err = errors.Wrapf(err, "failed to search documents") return } switch searchResponse.StatusCode { case http.StatusOK: default: resBody, _ := ioutil.ReadAll(searchResponse.Body) err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody)) return } resBodyPtrValue := reflect.New(ts.searchResponseBodyType) if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { err = errors.Wrapf(err, "cannot decode search response body") return } hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") if err != nil { err = errors.Wrapf(err, "cannot get total nr of hits") return } if hitsTotalValue.Interface().(int) < 1 { return nil, 0, nil //no matches } items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source") if err != nil { err = errors.Wrapf(err, "cannot get search response documents") return } return items.Interface(), hitsTotalValue.Interface().(int), nil }