Skip to content
Snippets Groups Projects
time_series.go 14.8 KiB
Newer Older
package opensearch
Jan Semmelink's avatar
Jan Semmelink committed

import (
	"bytes"
	"context"
Jan Semmelink's avatar
Jan Semmelink committed
	"encoding/json"
	"io/ioutil"
Jan Semmelink's avatar
Jan Semmelink committed
	"net/http"
	"reflect"
	"strings"
	"time"

	"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
Jan Semmelink's avatar
Jan Semmelink committed
)

const TimeFormat = "2006-01-02T15:04:05Z07:00"
const Timeout = "25s" // API Gateway times out after 30s, so return before that
// TimeSeriesHeader embed this into your log struct
Jan Semmelink's avatar
Jan Semmelink committed
type TimeSeriesHeader struct {
	StartTime  time.Time `json:"@timestamp"`
	EndTime    time.Time `json:"@end_time"`
	DurationMs int64     `json:"@duration_ms"`
}

type TimeSeries struct {
	w            *Writer
	name         string
	dataType     reflect.Type
	settings     Settings
	mappings     Mappings
	jsonSettings []byte
	jsonMappings []byte
	createdDates map[string]bool
Jan Semmelink's avatar
Jan Semmelink committed
}

// NewTimeSeries purpose:
//
//	create a time series to write e.g. api api_logs
//
Francé Wilke's avatar
Francé Wilke committed
// parameters:
Francé Wilke's avatar
Francé Wilke committed
//	name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-api_logs"
//		the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-api_logs-20210102"
Jan Semmelink's avatar
Jan Semmelink committed
//	tmpl must be your log data struct consisting of public fields as:
//		Xxx string `json:"<name>" search:"keyword|text|long|date"`	(can later add more types)
//		Xxx time.Time `json:"<name>"`								assumes type "date" for opensearch
//		Xxx int `json:"<name>"`										assumes type "long" for opensearch, specify keyword if required
func (w *Writer) NewTimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
	ts := TimeSeries{}

Jan Semmelink's avatar
Jan Semmelink committed
	if !indexNameRegex.MatchString(name) {
		return ts, errors.Errorf("invalid index_name:\"%s\"", name)
	// if already created, just return
Jan Semmelink's avatar
Jan Semmelink committed
	if existingTimeSeries, ok := w.timeSeriesByName[name]; ok {
		return existingTimeSeries, nil
Jan Semmelink's avatar
Jan Semmelink committed
	}
Jan Semmelink's avatar
Jan Semmelink committed
	structType := reflect.TypeOf(tmpl)
	if tmpl == nil || structType.Kind() != reflect.Struct {
		return ts, errors.Errorf("%T is not a struct", tmpl)
Jan Semmelink's avatar
Jan Semmelink committed
	}
Jan Semmelink's avatar
Jan Semmelink committed
	if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) {
		return ts, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl)
Jan Semmelink's avatar
Jan Semmelink committed
	}
Jan Semmelink's avatar
Jan Semmelink committed

Jan Semmelink's avatar
Jan Semmelink committed
		w:            w,
		name:         name,
		dataType:     structType,
		createdDates: map[string]bool{},
Jan Semmelink's avatar
Jan Semmelink committed
	}

	// define the OpenSearch index mapping
	ts.settings = Settings{
		Index: &SettingsIndex{
			NumberOfShards:   4,
			NumberOfReplicas: 0,
			Mapping:          &Mapping{TotalFields{Limit: 2000}},
Jan Semmelink's avatar
Jan Semmelink committed
		},
	}

	if properties, err := structMappingProperties(structType); err != nil {
		return ts, errors.Wrapf(err, "cannot map struct %s", structType)
	} else {
		ts.mappings = Mappings{
			Properties: properties,
		}
	}

	var err error
	ts.jsonSettings, err = json.Marshal(ts.settings)
	if err != nil {
		return ts, errors.Wrapf(err, "failed to marshal index settings")
	}
	ts.jsonMappings, err = json.Marshal(ts.mappings)
	if err != nil {
		return ts, errors.Wrapf(err, "failed to marshal index mappings")
	}

	w.timeSeriesByName[name] = ts
	return ts, nil
}

func structMappingProperties(structType reflect.Type) (map[string]MappingProperty, error) {
	properties := map[string]MappingProperty{}
Jan Semmelink's avatar
Jan Semmelink committed
	for i := 0; i < structType.NumField(); i++ {
		structField := structType.Field(i)

		fieldName := structField.Name

		// fields of embedded (anonymous) structs are added at the same level
		if structField.Anonymous && structField.Type.Kind() == reflect.Struct {
			subFields, err := structMappingProperties(structField.Type)
			if err != nil {
				return nil, errors.Wrapf(err, "failed to map embedded struct %s", fieldName)
			}
			for n, v := range subFields {
				properties[n] = v
			}
			continue
Jan Semmelink's avatar
Jan Semmelink committed
		}
Jan Semmelink's avatar
Jan Semmelink committed
		if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" {
			fieldName = jsonTags[0]
Jan Semmelink's avatar
Jan Semmelink committed
		}
		if fieldName == "" {
			logs.Info("skip %s unnamed field %+v", structType, structField)
Jan Semmelink's avatar
Jan Semmelink committed
			continue
		}

		// get default type of opensearch value from field type
		fieldMapping := MappingProperty{Type: "text"}
Jan Semmelink's avatar
Jan Semmelink committed
		switch structField.Type.Kind() {
		case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
			fieldMapping = MappingProperty{Type: "long"}
Jan Semmelink's avatar
Jan Semmelink committed
		case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
			fieldMapping = MappingProperty{Type: "long"}
		case reflect.Float32, reflect.Float64:
			fieldMapping = MappingProperty{Type: "float"}
Jan Semmelink's avatar
Jan Semmelink committed
		case reflect.Bool:
			fieldMapping = MappingProperty{Type: "boolean"}
Jan Semmelink's avatar
Jan Semmelink committed
		case reflect.String:
			fieldMapping = MappingProperty{Type: "text"}
			// do not indicate slice, just map slice items as sub-items
			subStructProperties, err := structMappingProperties(structField.Type.Elem())
			if err != nil {
				return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name)
			}
			fieldMapping = MappingProperty{
				Properties: subStructProperties,
			}

Jan Semmelink's avatar
Jan Semmelink committed
		default:
			if structField.Type == reflect.TypeOf(time.Now()) {
				fieldMapping = MappingProperty{Type: "date"}
Jan Semmelink's avatar
Jan Semmelink committed
			} else {
				if structField.Type.Kind() == reflect.Struct {
					subStructProperties, err := structMappingProperties(structField.Type)
					if err != nil {
						return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name)
					}
					fieldMapping = MappingProperty{
						Properties: subStructProperties,
					}
				} else if structField.Type.Kind() == reflect.Map {
					// For maps, we only want to map if we specified "flattened" in the search tag
					// If we did not specify the tag, we want to just continue without mapping
					if structField.Tag.Get("search") != "flattened" {
						continue
					}
				} else {
					// fieldMapping = MappingProperty{Type: "text"}
					// unknown value type... we do not specify mapping and let it use dynamic mapping
					continue
				}
Francé Wilke's avatar
Francé Wilke committed
		// allow user to change that with a search tag on the field
		skip := false
Jan Semmelink's avatar
Jan Semmelink committed
		switch structField.Tag.Get("search") {
		case "":
Francé Wilke's avatar
Francé Wilke committed
			// no change
Jan Semmelink's avatar
Jan Semmelink committed
		case "keyword":
			fieldMapping.Type = "keyword"
Jan Semmelink's avatar
Jan Semmelink committed
		case "long":
			fieldMapping.Type = "long"
Jan Semmelink's avatar
Jan Semmelink committed
		case "date":
			fieldMapping.Type = "data"
Jan Semmelink's avatar
Jan Semmelink committed
		case "boolean":
			fieldMapping.Type = "boolean"
Jan Semmelink's avatar
Jan Semmelink committed
		case "object":
			fieldMapping.Type = "object"
			fieldMapping.Enabled = false
			// NOTE: This is only supported from OpenSearch 2.7 and onwards
			fieldMapping.Type = "flat_object"
Francé Wilke's avatar
Francé Wilke committed
		case "-":
			// do not include in mapping
			skip = true
Jan Semmelink's avatar
Jan Semmelink committed
		default:
			return nil, errors.Errorf("Unknown search:\"%s\" on field(%s)", structField.Tag.Get("search"), structField.Name)
Jan Semmelink's avatar
Jan Semmelink committed
		}

Francé Wilke's avatar
Francé Wilke committed
		// add to index spec
		if !skip {
			properties[fieldName] = fieldMapping
		}

	return properties, nil
// data must be of type specified in Writer.TimeSeries(tmpl)
func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) error {
Jan Semmelink's avatar
Jan Semmelink committed
	if data == nil {
		return errors.Errorf("data:nil")
	}
	t := reflect.TypeOf(data)
	if t != ts.dataType {
		return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType)
	// get daily opensearch index to write to, from start time
	// indexName := ts.name + "-" + startTime.Format("20060102")
Jan Semmelink's avatar
Jan Semmelink committed
	indexName := ts.name + "-" + startTime.Format("20060102")
	if _, ok := ts.createdDates[indexName]; !ok {
		// create new index for this date - if not exists
Jan Semmelink's avatar
Jan Semmelink committed
		res, err := ts.w.api.Create(
			indexName, // index name
			indexName, // index name also used for document id
			strings.NewReader(string(ts.jsonSettings)),
		)
		if err != nil {
			if res != nil {
				res.Body.Close()
			}
			return errors.Wrapf(err, "failed to create index(%s)", indexName)
		}
		switch res.StatusCode {
		case http.StatusOK:
		case http.StatusCreated:
		case http.StatusConflict: // 409 = already exists
			if res != nil {
				res.Body.Close()
			}
			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
		}

		res, err = opensearchapi.IndicesPutMappingRequest{
			Index: []string{indexName},
			Body:  strings.NewReader(string(ts.jsonMappings)),
		}.Do(context.Background(), ts.w.client)
Jan Semmelink's avatar
Jan Semmelink committed
		if err != nil {
			if res != nil {
				res.Body.Close()
			}
Jan Semmelink's avatar
Jan Semmelink committed
			return errors.Wrapf(err, "failed to create index(%s)", indexName)
		}
		switch res.StatusCode {
		case http.StatusOK:
		case http.StatusCreated:
		case http.StatusConflict: // 409 = already exists
Jan Semmelink's avatar
Jan Semmelink committed
		default:
			if res != nil {
				res.Body.Close()
			}
Jan Semmelink's avatar
Jan Semmelink committed
			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
		}
		ts.createdDates[indexName] = true
	}

	// copy to set the header values
Jan Semmelink's avatar
Jan Semmelink committed
	x := reflect.New(ts.dataType)
	x.Elem().Set(reflect.ValueOf(data))
	x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{
		StartTime:  startTime,
		EndTime:    endTime,
Jan Semmelink's avatar
Jan Semmelink committed
		DurationMs: endTime.Sub(startTime).Milliseconds(),
Jan Semmelink's avatar
Jan Semmelink committed
	}))
Francé Wilke's avatar
Francé Wilke committed
	if _, err := ts.w.Write(indexName, "", x.Elem().Interface()); err != nil {
// DelOldTimeSeries parameters:
//
//	indexName is index prefix before dash-date, e.g. "api-api_logs" then will look for "api-api_logs-<date>"
//
//
//	list of indices to delete with err==nil if deleted successfully
func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) {
Jan Semmelink's avatar
Jan Semmelink committed
	if olderThanDays < 0 {
		return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays)
	}
	if olderThanDays == 0 {
		return nil, nil
	}

	// make list of indices matching specified name e.g. "uafrica-v3-api-api_logs-*"
	res, err := ts.w.api.Indices.Get([]string{ts.name + "-*"}, ts.w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"}))
Jan Semmelink's avatar
Jan Semmelink committed
	if err != nil {
		return nil, errors.Wrapf(err, "failed to list existing %s-* indices", ts.name)
Jan Semmelink's avatar
Jan Semmelink committed
	}
	switch res.StatusCode {
	case http.StatusOK:
	default:
		return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", ts.name, res.StatusCode, res.Status(), res.String())
Jan Semmelink's avatar
Jan Semmelink committed
	}

Jan Semmelink's avatar
Jan Semmelink committed
	indices := map[string]indexInfo{}
	if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
		return nil, errors.Wrapf(err, "failed to read list of indices")
Jan Semmelink's avatar
Jan Semmelink committed
	}

	// calculate time N days ago
	// working in local time, assuming the server runs in same location as API user...
	// so that index rotates at midnight local time rather than midnight UTC
Jan Semmelink's avatar
Jan Semmelink committed
	t0 := time.Now()
	timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) // = midnight yesterday in local time
	timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays))      // = N days before that
Jan Semmelink's avatar
Jan Semmelink committed

Jan Semmelink's avatar
Jan Semmelink committed
	indicesToDelete := []string{}
Francé Wilke's avatar
Francé Wilke committed
	for dailyIndexName, _ := range indices {
		dateStr := dailyIndexName[len(ts.name)+1:]
Jan Semmelink's avatar
Jan Semmelink committed
		if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil {
Francé Wilke's avatar
Francé Wilke committed
			logs.Info("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr)
Jan Semmelink's avatar
Jan Semmelink committed
		} else {
			if date.Before(timeThreshold) {
				indicesToDelete = append(indicesToDelete, dailyIndexName)
			}
		}
	}
	if len(indicesToDelete) > 0 {
		_, err := ts.w.api.Indices.Delete(indicesToDelete)
Jan Semmelink's avatar
Jan Semmelink committed
		if err != nil {
			return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete)
		}
Jan Semmelink's avatar
Jan Semmelink committed
	}
Jan Semmelink's avatar
Jan Semmelink committed
	return indicesToDelete, nil
}

// output from GET /_cat/indices for each index in the list
Jan Semmelink's avatar
Jan Semmelink committed
type indexInfo struct {
	Aliases  map[string]interface{} `json:"aliases"`
	Mappings Mappings               `json:"mappings"`
	Settings IndexInfoSettings      `json:"settings"`
}

type IndexInfoSettings struct {
	Index IndexSettings `json:"index"`
}
Jan Semmelink's avatar
Jan Semmelink committed

Jan Semmelink's avatar
Jan Semmelink committed
type IndexSettings struct {
	UUID             string `json:"uuid"`
	CreationDate     string `json:"creation_date"`
	NumberOfShards   string `json:"number_of_shards"`
	NumberOfReplicas string `json:"number_of_replicas"`
	ProviderName     string `json:"provided_name"` // e.g. "go-utils-audit-test-20211103"
Jan Semmelink's avatar
Jan Semmelink committed
}
// Search returns docs indexed on OpenSearch document ID which cat be used in Get(id)
// The docs value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl)
// So you can safely type assert e.g.
//		type myType struct {...}
//		ts := opensearch.TimeSeries(..., myType{})
//		docs,totalCount,err := ts.Search(...)
//		if err == nil {
//			for id,docValue := range docs {
//				doc := docValue.(myType)
//				...
//			}
//		}
//	docs will be a slice of the TimeSeries data type
func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, offset int64) (openSearchResult *SearchResponseHits, err error) {
		return nil, errors.Errorf("time series == nil")
	if limit < 0 || limit > 1000 {
		err = errors.Errorf("limit=%d not 0..1000", limit)
	// example opensearch request body for free text
	// 	{
	// 		"size": 5,
	// 		"query": {
	// 			"multi_match": {
	// 				"query": "miller",
	// 				"fields": ["title^2", "director"]
	// 			}
	//    	}
	//  }
	body := SearchRequestBody{
		Sort:    sort,
		Size:    limit,
		From:    offset,
		Query:   query,
		Timeout: Timeout, // this doesn't work
	jsonBody, _ := json.Marshal(body)
	search := opensearchapi.SearchRequest{
		Index: []string{ts.name + "-*"},
		Body:  bytes.NewReader(jsonBody),
	trueVal := true
	search.AllowPartialSearchResults = &trueVal

	searchResponse, err := search.Do(context.Background(), ts.w.client)
	if err != nil {
		err = errors.Wrapf(err, "failed to search documents")
		return
	}

	switch searchResponse.StatusCode {
	case http.StatusOK:
	default:
		resBody, _ := ioutil.ReadAll(searchResponse.Body)
		err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
	bodyData, _ := ioutil.ReadAll(searchResponse.Body)

	var response SearchResponseBody
	err = json.Unmarshal(bodyData, &response)
	if err != nil {
		logs.Info("opensearch response body: %s", string(bodyData))
		err = errors.Wrapf(err, "cannot decode opensearch response body")
	return &response.Hits, nil
// Get takes the id returned in Search()
// The id is uuid assigned by OpenSearch when documents are added with Write().
// The document value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl)
func (ts *TimeSeries) Get(id string) (res *GetResponseBody, err error) {
		return nil, errors.Errorf("time series == nil")
Francé Wilke's avatar
Francé Wilke committed
	}
	parts := strings.SplitN(id, "/", 2)
	get := opensearchapi.GetRequest{
		Index:      parts[0],
		DocumentID: parts[1],
Francé Wilke's avatar
Francé Wilke committed
	}
	getResponse, err := get.Do(context.Background(), ts.w.client)
Francé Wilke's avatar
Francé Wilke committed
	if err != nil {
		err = errors.Wrapf(err, "failed to get document")
		return
	}

	switch getResponse.StatusCode {
	case http.StatusOK:
	default:
		resBody, _ := ioutil.ReadAll(getResponse.Body)
		err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
		return
	}

	bodyData, _ := ioutil.ReadAll(getResponse.Body)
	var response GetResponseBody
	err = json.Unmarshal(bodyData, &response)
Francé Wilke's avatar
Francé Wilke committed
	if err != nil {
		logs.Info("GET response body: %s", string(bodyData))
		err = errors.Wrapf(err, "cannot unmarshal GET response body")
	return &response, nil