Skip to content
Snippets Groups Projects
Select Git revision
  • 3a2e23b80410e65d8a6a27282fcbca1bd74d5f13
  • main default protected
  • trading_hours
  • refactor_trading_hours
  • audit_cleaning_cater_for_non_struct_fields
  • remove-info-logs
  • sl-refactor
  • 18-use-scan-for-param-values
  • 17-order-search-results
  • 4-simplify-framework-2
  • 1-http-error
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
  • v1.278.0
31 results

time_series.go

Blame
  • time_series.go 13.51 KiB
    package search
    
    import (
    	"bytes"
    	"context"
    	"encoding/json"
    	"io/ioutil"
    	"net/http"
    	"reflect"
    	"strings"
    	"time"
    
    	opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/logger"
    	"gitlab.com/uafrica/go-utils/reflection"
    )
    
    const TimeFormat = "2006-01-02T15:04:05+07:00"
    
    //embed this into your log struct
    type TimeSeriesHeader struct {
    	StartTime  time.Time `json:"@timestamp"`
    	EndTime    time.Time `json:"@end_time"`
    	DurationMs int64     `json:"@duration_ms"`
    }
    
    type TimeSeries interface {
    	Write(StartTime time.Time, EndTime time.Time, data interface{}) error
    	Search(query Query, limit int64) (docs interface{}, totalCount int, err error)
    }
    
    type timeSeries struct {
    	w            *writer
    	name         string
    	dataType     reflect.Type
    	settings     Settings
    	mappings     Mappings
    	jsonSettings []byte
    	jsonMappings []byte
    	createdDates map[string]bool
    
    	searchResponseBodyType reflect.Type
    }
    
    //purpose:
    //	create a time series to write e.g. api logs
    //parameters:
    //	name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-logs"
    //		the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-logs-20210102"
    //	tmpl must be your log data struct consisting of public fields as:
    //		Xxx string `json:"<name>" search:"keyword|text|long|date"`	(can later add more types)
    //		Xxx time.Time `json:"<name>"`								assumes type "date" for opensearch
    //		Xxx int `json:"<name>"`										assumes type "long" for opensearch, specify keyword if required
    func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
    	if !indexNameRegex.MatchString(name) {
    		return nil, errors.Errorf("invalid index_name:\"%s\"", name)
    	}
    
    	//if already created, just return
    	if existingTimeSeries, ok := w.timeSeriesByName[name]; ok {
    		return existingTimeSeries, nil
    	}
    
    	structType := reflect.TypeOf(tmpl)
    	if tmpl == nil || structType.Kind() != reflect.Struct {
    		return nil, errors.Errorf("%T is not a struct", tmpl)
    	}
    	if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) {
    		return nil, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl)
    	}
    
    	ts := &timeSeries{
    		w:            w,
    		name:         name,
    		dataType:     structType,
    		createdDates: map[string]bool{},
    	}
    
    	//define the OpenSearch index mapping
    	ts.settings = Settings{
    		Index: &SettingsIndex{
    			NumberOfShards:   4,
    			NumberOfReplicas: 0,
    		},
    	}
    
    	if properties, err := structMappingProperties(structType); err != nil {
    		return nil, errors.Wrapf(err, "cannot map struct %s", structType)
    	} else {
    		ts.mappings = Mappings{
    			Properties: properties,
    		}
    	}
    
    	var err error
    	ts.jsonSettings, err = json.Marshal(ts.settings)
    	if err != nil {
    		return nil, errors.Wrapf(err, "failed to marshal index settings")
    	}
    	ts.jsonMappings, err = json.Marshal(ts.mappings)
    	if err != nil {
    		return nil, errors.Wrapf(err, "failed to marshal index mappings")
    	}
    	logger.Infof("%s Index Mappings: %s", structType, string(ts.jsonMappings))
    
    	//define search response type
    	//similar to SearchResponseBody
    	ts.searchResponseBodyType, err = reflection.CloneType(
    		reflect.TypeOf(SearchResponseBody{}),
    		map[string]reflect.Type{
    			".hits.hits[]._source": ts.dataType,
    		})
    	if err != nil {
    		return nil, errors.Wrapf(err, "failed to make search response type for time-series")
    	}
    	w.timeSeriesByName[name] = ts
    	return ts, nil
    }
    
    func structMappingProperties(structType reflect.Type) (map[string]MappingProperty, error) {
    	properties := map[string]MappingProperty{}
    	for i := 0; i < structType.NumField(); i++ {
    		structField := structType.Field(i)
    
    		fieldName := structField.Name
    
    		//fields of embedded (anonymous) structs are added at the same level
    		if structField.Anonymous && structField.Type.Kind() == reflect.Struct {
    			subFields, err := structMappingProperties(structField.Type)
    			if err != nil {
    				return nil, errors.Wrapf(err, "failed to map embedded struct %s", fieldName)
    			}
    			for n, v := range subFields {
    				properties[n] = v
    			}
    			continue
    		}
    
    		if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" {
    			fieldName = jsonTags[0]
    		}
    		if fieldName == "" {
    			logger.Debugf("Skip %s unnamed field %+v", structType, structField)
    			continue
    		}
    
    		//get default type of search value from field type
    		fieldMapping := MappingProperty{Type: "text"}
    		switch structField.Type.Kind() {
    		case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
    			fieldMapping = MappingProperty{Type: "long"}
    		case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
    			fieldMapping = MappingProperty{Type: "long"}
    		case reflect.Float32, reflect.Float64:
    			fieldMapping = MappingProperty{Type: "float"}
    		case reflect.Bool:
    			fieldMapping = MappingProperty{Type: "boolean"}
    		case reflect.String:
    			fieldMapping = MappingProperty{Type: "text"}
    
    		case reflect.Slice:
    			//do not indicate slice, just map slice items as sub-items
    			subStructProperties, err := structMappingProperties(structField.Type.Elem())
    			if err != nil {
    				return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name)
    			}
    			fieldMapping = MappingProperty{
    				Properties: subStructProperties,
    			}
    
    		default:
    			if structField.Type == reflect.TypeOf(time.Now()) {
    				fieldMapping = MappingProperty{Type: "date"}
    			} else {
    				if structField.Type.Kind() == reflect.Struct {
    					subStructProperties, err := structMappingProperties(structField.Type)
    					if err != nil {
    						return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name)
    					}
    					fieldMapping = MappingProperty{
    						Properties: subStructProperties,
    					}
    				} else {
    					// fieldMapping = MappingProperty{Type: "text"}
    					// unknown value type... we do not specify mapping and let it use dynamic mapping
    					continue
    				}
    			}
    		}
    
    		//allow user to change that with a search tag on the field
    		switch structField.Tag.Get("search") {
    		case "":
    			//no change
    		case "keyword":
    			fieldMapping.Type = "keyword"
    		case "long":
    			fieldMapping.Type = "long"
    		case "date":
    			fieldMapping.Type = "data"
    		case "boolean":
    			fieldMapping.Type = "boolean"
    		case "object":
    			fieldMapping.Type = "object"
    			fieldMapping.Enabled = false
    		default:
    			return nil, errors.Errorf("Unknown search:\"%s\" on field(%s)", structField.Tag.Get("search"), structField.Name)
    		}
    
    		//add to index spec
    		properties[fieldName] = fieldMapping
    	}
    	return properties, nil
    }
    
    //data must be of type specified in Writer.TimeSeries(tmpl)
    func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error {
    	if data == nil {
    		return errors.Errorf("data:nil")
    	}
    	t := reflect.TypeOf(data)
    	if t != ts.dataType {
    		return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType)
    	}
    
    	//get daily search index to write to, from start time
    	indexName := ts.name + "-" + startTime.Format("20060102")
    	if _, ok := ts.createdDates[indexName]; !ok {
    		//create new index for this date - if not exists
    
    		res, err := ts.w.api.Create(
    			indexName, //index name
    			indexName, //index name also used for document id
    			strings.NewReader(string(ts.jsonSettings)))
    		if err != nil {
    			return errors.Wrapf(err, "failed to create index(%s)", indexName)
    		}
    		switch res.StatusCode {
    		case http.StatusOK:
    		case http.StatusCreated:
    		case http.StatusConflict: //409 = already exists
    		default:
    			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
    		}
    
    		res, err = opensearchapi.IndicesPutMappingRequest{
    			Index: []string{indexName},
    			Body:  strings.NewReader(string(ts.jsonMappings)),
    		}.Do(context.Background(), ts.w.client)
    		if err != nil {
    			return errors.Wrapf(err, "failed to create index(%s)", indexName)
    		}
    		switch res.StatusCode {
    		case http.StatusOK:
    		case http.StatusCreated:
    		case http.StatusConflict: //409 = already exists
    		default:
    			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
    		}
    		ts.createdDates[indexName] = true
    	}
    
    	//copy to set the header values
    	x := reflect.New(ts.dataType)
    	x.Elem().Set(reflect.ValueOf(data))
    	x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{
    		StartTime:  startTime,
    		EndTime:    endTime,
    		DurationMs: endTime.Sub(startTime).Milliseconds(),
    	}))
    	if res, err := ts.w.Write(indexName, "", x.Elem().Interface()); err != nil {
    		return err
    	} else {
    		logger.Debugf("IndexResponse: %+v", res)
    	}
    	return nil
    
    }
    
    //parameters:
    //		indexName is index prefix before dash-date, e.g. "api-logs" then will look for "api-logs-<date>"
    //returns
    //		list of indices to delete with err==nil if deleted successfully
    func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string, error) {
    	if !indexNameRegex.MatchString(indexName) {
    		return nil, errors.Errorf("invalid index_name:\"%s\"", indexName)
    	}
    	if olderThanDays < 0 {
    		return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays)
    	}
    	if olderThanDays == 0 {
    		return nil, nil
    	}
    
    	//make list of indices matching specified name e.g. "uafrica-v3-api-logs-*"
    	res, err := w.api.Indices.Get([]string{indexName + "-*"}, w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"}))
    	if err != nil {
    		return nil, errors.Wrapf(err, "failed to list existing %s-* indices", indexName)
    	}
    	switch res.StatusCode {
    	case http.StatusOK:
    	default:
    		return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
    	}
    
    	indices := map[string]indexInfo{}
    	if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
    		return nil, errors.Wrapf(err, "failed to read list of indices")
    	}
    
    	//calculate time N days ago
    	//working in local time, assuming the server runs in same location as API user...
    	//so that index rotates at midnight local time rather than midnight UTC
    	t0 := time.Now()
    	timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time
    	timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays))      //= N days before that
    
    	indicesToDelete := []string{}
    	for dailyIndexName, dailyIndexInfo := range indices {
    		dateStr := dailyIndexName[len(indexName)+1:]
    		if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil {
    			logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr)
    		} else {
    			if date.Before(timeThreshold) {
    				logger.Debugf("Deleting index(%s).uuid(%s) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, timeThreshold)
    				indicesToDelete = append(indicesToDelete, dailyIndexName)
    			}
    		}
    	}
    	if len(indicesToDelete) > 0 {
    		_, err := w.api.Indices.Delete(indicesToDelete)
    		if err != nil {
    			return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete)
    		}
    		logger.Debugf("Deleted %d daily indices(%s) older than %d days", len(indicesToDelete), indicesToDelete, olderThanDays)
    	}
    	return indicesToDelete, nil
    }
    
    //output from GET /_cat/indices for each index in the list
    type indexInfo struct {
    	Aliases  map[string]interface{} `json:"aliases"`
    	Mappings Mappings               `json:"mappings"`
    	Settings IndexInfoSettings      `json:"settings"`
    }
    
    type IndexInfoSettings struct {
    	Index IndexSettings `json:"index"`
    }
    
    type IndexSettings struct {
    	UUID             string `json:"uuid"`
    	CreationDate     string `json:"creation_date"`
    	NumberOfShards   string `json:"number_of_shards"`
    	NumberOfReplicas string `json:"number_of_replicas"`
    	ProviderName     string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103"
    }
    
    //Search
    //Return:
    //	docs will be a slice of the TimeSeries data type
    func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalCount int, err error) {
    	if ts == nil {
    		return nil, 0, errors.Errorf("time series == nil")
    	}
    	if limit < 0 || limit > 1000 {
    		err = errors.Errorf("limit=%d not 0..1000", limit)
    		return
    	}
    
    	// example search request body for free text
    	// 	{
    	// 		"size": 5,
    	// 		"query": {
    	// 			"multi_match": {
    	// 				"query": "miller",
    	// 				"fields": ["title^2", "director"]
    	// 			}
    	//    	}
    	//  }
    	body := SearchRequestBody{
    		Size:  limit,
    		Query: query,
    	}
    	jsonBody, _ := json.Marshal(body)
    	search := opensearchapi.SearchRequest{
    		Index: []string{ts.name + "-*"},
    		Body:  bytes.NewReader(jsonBody),
    	}
    
    	searchResponse, err := search.Do(context.Background(), ts.w.client)
    	if err != nil {
    		err = errors.Wrapf(err, "failed to search documents")
    		return
    	}
    
    	switch searchResponse.StatusCode {
    	case http.StatusOK:
    	default:
    		resBody, _ := ioutil.ReadAll(searchResponse.Body)
    		err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
    		return
    	}
    
    	resBodyPtrValue := reflect.New(ts.searchResponseBodyType)
    	if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
    		err = errors.Wrapf(err, "cannot decode search response body")
    		return
    	}
    
    	hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value")
    	if err != nil {
    		err = errors.Wrapf(err, "cannot get total nr of hits")
    		return
    	}
    	if hitsTotalValue.Interface().(int) < 1 {
    		return nil, 0, nil //no matches
    	}
    
    	items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source")
    	if err != nil {
    		err = errors.Wrapf(err, "cannot get search response documents")
    		return
    	}
    	return items.Interface(), hitsTotalValue.Interface().(int), nil
    }