package search import ( "bytes" "context" "encoding/json" "net/http" "reflect" "strings" "time" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logger" "gitlab.com/uafrica/go-utils/reflection" ) //embed this into your log struct type TimeSeriesHeader struct { StartTime time.Time `json:"@timestamp"` EndTime time.Time `json:"@end_time"` DurationMs int64 `json:"@duration_ms"` } type TimeSeries interface { Write(StartTime time.Time, EndTime time.Time, data interface{}) error Search(limit int) (docs interface{}, totalCount int, err error) } type timeSeries struct { w *writer name string dataType reflect.Type fields []dataField jsonIndexSpec []byte createdDates map[string]bool searchResponseBodyType reflect.Type } type dataField struct { name string index []int mapping MappingProperty } //purpose: // create a time series to write e.g. api logs //parameters: // name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-logs" // the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-logs-20210102" // tmpl must be your log data struct consisting of public fields as: // Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) // Xxx time.Time `json:"<name>"` assumes type "date" for opensearch // Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { if !indexNameRegex.MatchString(name) { return nil, errors.Errorf("invalid index_name:\"%s\"", name) } //if already created, just return if existingTimeSeries, ok := w.timeSeriesByName[name]; ok { return existingTimeSeries, nil } structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { return nil, errors.Errorf("%T is not a struct", tmpl) } if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) { return nil, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl) } ts := &timeSeries{ w: w, name: name, dataType: structType, fields: []dataField{}, createdDates: map[string]bool{}, } //define the OpenSearch index mapping indexSpec := Index{ Settings: Settings{ Index: &SettingsIndex{ NumberOfShards: 4, NumberOfReplicas: 0, }, }, Mappings: Mappings{ Properties: map[string]MappingProperty{}, }, } for i := 0; i < structType.NumField(); i++ { structField := structType.Field(i) dataField := dataField{ name: structField.Name, index: structField.Index, mapping: MappingProperty{Type: "text"}, } if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" { dataField.name = jsonTags[0] } if dataField.name == "" { logger.Debugf("Skip %s unnamed field %+v", structType.Name(), structField) continue } //get default type of search value from field type switch structField.Type.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: dataField.mapping = MappingProperty{Type: "long"} case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: dataField.mapping = MappingProperty{Type: "long"} case reflect.Bool: dataField.mapping = MappingProperty{Type: "boolean"} case reflect.String: dataField.mapping = MappingProperty{Type: "text"} default: if structField.Type == reflect.TypeOf(time.Now()) { dataField.mapping = MappingProperty{Type: "date"} } else { dataField.mapping = MappingProperty{Type: "text"} } } //allow user to change that with a search tag on the field switch structField.Tag.Get("search") { case "": //no change case "keyword": dataField.mapping = MappingProperty{Type: "keyword"} case "long": dataField.mapping = MappingProperty{Type: "long"} case "date": dataField.mapping = MappingProperty{Type: "date"} case "boolean": dataField.mapping = MappingProperty{Type: "boolean"} case "object": dataField.mapping = MappingProperty{Type: "boolean", Enabled: false} default: return nil, errors.Errorf("Unknown search:\"%s\" on timeSeries(%s) field(%s)", structField.Tag.Get("search"), name, structField.Name) } //add to index spec indexSpec.Mappings.Properties[dataField.name] = dataField.mapping //add to list of fields ts.fields = append(ts.fields, dataField) } //add header fields for all time series to the index spec for n, p := range map[string]MappingProperty{ "@timestamp": {Type: "date"}, "@end_time": {Type: "date"}, "@duration_ms": {Type: "long"}, } { indexSpec.Mappings.Properties[n] = p } var err error ts.jsonIndexSpec, err = json.Marshal(indexSpec) if err != nil { return nil, errors.Wrapf(err, "failed to marshal index spec") } //define search response type //similar to SearchResponseBody ts.searchResponseBodyType, err = reflection.CloneType( reflect.TypeOf(SearchResponseBody{}), map[string]reflect.Type{ ".hits.hits[]._source": ts.dataType, }) if err != nil { return nil, errors.Wrapf(err, "failed to make search response type for time-series") } //new package: copy type recursively, find index of special field and replace when found.... w.timeSeriesByName[name] = ts return ts, nil } //data must be of type specified in Writer.TimeSeries(tmpl) func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { if data == nil { return errors.Errorf("data:nil") } t := reflect.TypeOf(data) if t != ts.dataType { return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType.Name()) } //get daily search index to write to, from start time indexName := ts.name + "-" + startTime.Format("20060102") if _, ok := ts.createdDates[indexName]; !ok { //create new index for this date - if not exists res, err := ts.w.api.Create( indexName, //index name indexName, //index name also used for document id strings.NewReader(string(ts.jsonIndexSpec))) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } switch res.StatusCode { case http.StatusOK: case http.StatusCreated: case http.StatusConflict: //409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } ts.createdDates[indexName] = true } //copy to set the header values x := reflect.New(ts.dataType) x.Elem().Set(reflect.ValueOf(data)) x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{ StartTime: startTime, EndTime: endTime, DurationMs: int64(endTime.Sub(startTime) / time.Millisecond), })) return ts.w.Write(indexName, x.Elem().Interface()) } //parameters: // indexName is index prefix before dash-date, e.g. "api-logs" then will look for "api-logs-<date>" //returns // list of indices to delete with err==nil if deleted successfully func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string, error) { if !indexNameRegex.MatchString(indexName) { return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) } if olderThanDays < 0 { return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays) } if olderThanDays == 0 { return nil, nil } //make list of indices matching specified name e.g. "uafrica-v3-api-logs-*" res, err := w.api.Indices.Get([]string{indexName + "-*"}, w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"})) if err != nil { return nil, errors.Wrapf(err, "failed to list existing %s-* indices", indexName) } switch res.StatusCode { case http.StatusOK: default: return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } indices := map[string]indexInfo{} if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { return nil, errors.Wrapf(err, "failed to read list of indices") } //calculate time N days ago //working in local time, assuming the server runs in same location as API user... //so that index rotates at midnight local time rather than midnight UTC t0 := time.Now() timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) //= N days before that indicesToDelete := []string{} for dailyIndexName, dailyIndexInfo := range indices { dateStr := dailyIndexName[len(indexName)+1:] if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil { logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr) } else { if date.Before(timeThreshold) { logger.Debugf("Deleting index(%s).uuid(%s) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, timeThreshold) indicesToDelete = append(indicesToDelete, dailyIndexName) } } } if len(indicesToDelete) > 0 { _, err := w.api.Indices.Delete(indicesToDelete) if err != nil { return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete) } logger.Debugf("Deleted %d daily indices(%s) older than %d days", len(indicesToDelete), indicesToDelete, olderThanDays) } return indicesToDelete, nil } //output from GET /_cat/indices for each index in the list type indexInfo struct { Aliases map[string]interface{} `json:"aliases"` Mappings Mappings `json:"mappings"` Settings IndexInfoSettings `json:"settings"` } type IndexInfoSettings struct { Index IndexSettings `json:"index"` } type IndexSettings struct { UUID string `json:"uuid"` CreationDate string `json:"creation_date"` NumberOfShards string `json:"number_of_shards"` NumberOfReplicas string `json:"number_of_replicas"` ProviderName string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103" } //Search //Return: // docs will be a slice of the TimeSeries data type func (ts *timeSeries) Search(limit int) (docs interface{}, totalCount int, err error) { if limit > 1000 { err = errors.Errorf("limit=%d > 1000", limit) return } // example search request body for free text // { // "size": 5, // "query": { // "multi_match": { // "query": "miller", // "fields": ["title^2", "director"] // } // } // } body := SearchRequestBody{ Size: limit, Query: &SearchQuery{ MultiMatch: &QueryMultiMatch{ Query: "GET", //keyword to find Fields: []string{"http_method"}, //, "title^2", "directory"}, }, }, } jsonBody, _ := json.Marshal(body) search := opensearchapi.SearchRequest{ Body: bytes.NewReader(jsonBody), } searchResponse, err := search.Do(context.Background(), ts.w.client) if err != nil { err = errors.Wrapf(err, "failed to search documents") return } switch searchResponse.StatusCode { case http.StatusOK: default: err = errors.Errorf("Search failed with HTTP status %v", searchResponse.StatusCode) return } resBodyPtrValue := reflect.New(ts.searchResponseBodyType) if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { err = errors.Wrapf(err, "cannot decode search response body") return } hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") if err != nil { err = errors.Wrapf(err, "cannot get total nr of hits") return } if hitsTotalValue.Interface().(int) < 1 { return nil, 0, nil //no matches } items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source") if err != nil { err = errors.Wrapf(err, "cannot get search response documents") return } return items.Interface(), hitsTotalValue.Interface().(int), nil } type SearchRequestBody struct { Size int `json:"size,omitempty"` Query *SearchQuery `json:"query,omitempty"` } type SearchQuery struct { MultiMatch *QueryMultiMatch `json:"multi_match,omitempty"` } type QueryMultiMatch struct { Query string `json:"query"` Fields []string `json:"fields"` } //example of search response body: // { // "took":872, // "timed_out":false, // "_shards":{ // "total":38, // "successful":38, // "skipped":0, // "failed":0 // }, // "hits":{ // "total":{ // "value":0, // "relation":"eq" // }, // "max_score":null, // "hits":[ // { // "_index": "go-utils-audit-test-20211030", // "_type": "_doc", // "_id": "Tj9l5XwBWRiAneoYazic", // "_score": 1.2039728, // "_source": { // "@timestamp": "2021-10-30T15:03:20.679481+02:00", // "@end_time": "2021-10-30T15:03:20.469481+02:00", // "@duration_ms": -210, // "test1": "6", // "test2": "ACC_00098", // "test3": 10, // "http": { // "method": "GET", // "path": "/accounts" // }, // "http_method": "GET", // "http_path": "/accounts" // } // }, // ] // } // } type SearchResponseBody struct { Took int `json:"took"` //milliseconds TimedOut bool `json:"timed_out"` Shards SearchResponseShards `json:"_shards"` Hits SearchResponseHits `json:"hits"` } type SearchResponseShards struct { Total int `json:"total"` Successful int `json:"successful"` Skipped int `json:"skipped"` Failed int `json:"failed"` } type SearchResponseHits struct { Total SearchResponseHitsTotal `json:"total"` MaxScore *float64 `json:"max_score,omitempty"` Hits []HitDoc `json:"hits"` } type SearchResponseHitsTotal struct { Value int `json:"value"` //e.g. 0 when no docs matched Relation string `json:"relation"` //e.g. "eq" } type HitDoc struct { Index string `json:"_index"` //name of index Type string `json:"_type"` //_doc ID string `json:"_id"` Score float64 `json:"_score"` // Source map[string]interface{} `json:"_source"` //the document of itemType }