Newer
Older
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
const TimeFormat = "2006-01-02T15:04:05Z07:00"
const Timeout = "25s" // API Gateway times out after 30s, so return before that
// TimeSeriesHeader embed this into your log struct
type TimeSeriesHeader struct {
StartTime time.Time `json:"@timestamp"`
EndTime time.Time `json:"@end_time"`
DurationMs int64 `json:"@duration_ms"`
}
type TimeSeries struct {
w *Writer
name string
dataType reflect.Type
settings Settings
mappings Mappings
jsonSettings []byte
jsonMappings []byte
createdDates map[string]bool
// NewTimeSeries purpose:
//
// create a time series to write e.g. api api_logs
//
// name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-api_logs"
// the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-api_logs-20210102"
// tmpl must be your log data struct consisting of public fields as:
// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types)
// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch
// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required
func (w *Writer) NewTimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
ts := TimeSeries{}
return ts, errors.Errorf("invalid index_name:\"%s\"", name)
if existingTimeSeries, ok := w.timeSeriesByName[name]; ok {
return existingTimeSeries, nil
structType := reflect.TypeOf(tmpl)
if tmpl == nil || structType.Kind() != reflect.Struct {
return ts, errors.Errorf("%T is not a struct", tmpl)
if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) {
return ts, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl)
w: w,
name: name,
dataType: structType,
createdDates: map[string]bool{},
ts.settings = Settings{
Index: &SettingsIndex{
NumberOfShards: 4,
NumberOfReplicas: 0,
Mapping: &Mapping{TotalFields{Limit: 2000}},
if properties, err := structMappingProperties(structType); err != nil {
return ts, errors.Wrapf(err, "cannot map struct %s", structType)
} else {
ts.mappings = Mappings{
Properties: properties,
}
}
var err error
ts.jsonSettings, err = json.Marshal(ts.settings)
if err != nil {
return ts, errors.Wrapf(err, "failed to marshal index settings")
}
ts.jsonMappings, err = json.Marshal(ts.mappings)
if err != nil {
return ts, errors.Wrapf(err, "failed to marshal index mappings")
}
w.timeSeriesByName[name] = ts
return ts, nil
}
func structMappingProperties(structType reflect.Type) (map[string]MappingProperty, error) {
properties := map[string]MappingProperty{}
for i := 0; i < structType.NumField(); i++ {
structField := structType.Field(i)
// fields of embedded (anonymous) structs are added at the same level
if structField.Anonymous && structField.Type.Kind() == reflect.Struct {
subFields, err := structMappingProperties(structField.Type)
if err != nil {
return nil, errors.Wrapf(err, "failed to map embedded struct %s", fieldName)
}
for n, v := range subFields {
properties[n] = v
}
continue
if jsonTags := strings.SplitN(structField.Tag.Get("json"), ",", 2); len(jsonTags) > 0 && jsonTags[0] != "" {
logs.Info("skip %s unnamed field %+v", structType, structField)
// get default type of opensearch value from field type
fieldMapping := MappingProperty{Type: "text"}
switch structField.Type.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
fieldMapping = MappingProperty{Type: "long"}
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
fieldMapping = MappingProperty{Type: "long"}
Jan Semmelink
committed
case reflect.Float32, reflect.Float64:
fieldMapping = MappingProperty{Type: "float"}
fieldMapping = MappingProperty{Type: "boolean"}
fieldMapping = MappingProperty{Type: "text"}
Jan Semmelink
committed
case reflect.Slice:
// do not indicate slice, just map slice items as sub-items
Jan Semmelink
committed
subStructProperties, err := structMappingProperties(structField.Type.Elem())
if err != nil {
return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name)
}
fieldMapping = MappingProperty{
Properties: subStructProperties,
}
default:
if structField.Type == reflect.TypeOf(time.Now()) {
fieldMapping = MappingProperty{Type: "date"}
if structField.Type.Kind() == reflect.Struct {
subStructProperties, err := structMappingProperties(structField.Type)
if err != nil {
return nil, errors.Wrapf(err, "failed to map %s.%s", structType, structField.Name)
}
fieldMapping = MappingProperty{
Properties: subStructProperties,
}
} else if structField.Type.Kind() == reflect.Map {
// For maps, we only want to map if we specified "flattened" in the search tag
// If we did not specify the tag, we want to just continue without mapping
if structField.Tag.Get("search") != "flattened" {
continue
}
} else {
// fieldMapping = MappingProperty{Type: "text"}
// unknown value type... we do not specify mapping and let it use dynamic mapping
continue
}
// allow user to change that with a search tag on the field
skip := false
fieldMapping.Type = "object"
fieldMapping.Enabled = false
// NOTE: This is only supported from OpenSearch 2.7 and onwards
fieldMapping.Type = "flat_object"
case "-":
// do not include in mapping
skip = true
return nil, errors.Errorf("Unknown search:\"%s\" on field(%s)", structField.Tag.Get("search"), structField.Name)
// add to index spec
if !skip {
properties[fieldName] = fieldMapping
}
// data must be of type specified in Writer.TimeSeries(tmpl)
func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) error {
if data == nil {
return errors.Errorf("data:nil")
}
t := reflect.TypeOf(data)
if t != ts.dataType {
return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType)
// get daily opensearch index to write to, from start time
// indexName := ts.name + "-" + startTime.Format("20060102")
indexName := ts.name + "-" + startTime.Format("20060102")
if _, ok := ts.createdDates[indexName]; !ok {
// create new index for this date - if not exists
indexName, // index name
indexName, // index name also used for document id
strings.NewReader(string(ts.jsonSettings)),
)
if res != nil {
res.Body.Close()
}
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
case http.StatusCreated:
case http.StatusConflict: // 409 = already exists
if res != nil {
res.Body.Close()
}
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
res, err = opensearchapi.IndicesPutMappingRequest{
Index: []string{indexName},
Body: strings.NewReader(string(ts.jsonMappings)),
}.Do(context.Background(), ts.w.client)
if res != nil {
res.Body.Close()
}
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
case http.StatusCreated:
case http.StatusConflict: // 409 = already exists
if res != nil {
res.Body.Close()
}
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
ts.createdDates[indexName] = true
}
// copy to set the header values
x := reflect.New(ts.dataType)
x.Elem().Set(reflect.ValueOf(data))
x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{
StartTime: startTime,
EndTime: endTime,
if _, err := ts.w.Write(indexName, "", x.Elem().Interface()); err != nil {
Jan Semmelink
committed
return err
}
return nil
// DelOldTimeSeries parameters:
//
// indexName is index prefix before dash-date, e.g. "api-api_logs" then will look for "api-api_logs-<date>"
//
//
// list of indices to delete with err==nil if deleted successfully
func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) {
if olderThanDays < 0 {
return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays)
}
if olderThanDays == 0 {
return nil, nil
}
// make list of indices matching specified name e.g. "uafrica-v3-api-api_logs-*"
res, err := ts.w.api.Indices.Get([]string{ts.name + "-*"}, ts.w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"}))
return nil, errors.Wrapf(err, "failed to list existing %s-* indices", ts.name)
}
switch res.StatusCode {
case http.StatusOK:
default:
return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", ts.name, res.StatusCode, res.Status(), res.String())
indices := map[string]indexInfo{}
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
return nil, errors.Wrapf(err, "failed to read list of indices")
// calculate time N days ago
// working in local time, assuming the server runs in same location as API user...
// so that index rotates at midnight local time rather than midnight UTC
timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) // = midnight yesterday in local time
timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) // = N days before that
dateStr := dailyIndexName[len(ts.name)+1:]
if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil {
logs.Info("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr)
} else {
if date.Before(timeThreshold) {
indicesToDelete = append(indicesToDelete, dailyIndexName)
}
}
}
if len(indicesToDelete) > 0 {
_, err := ts.w.api.Indices.Delete(indicesToDelete)
if err != nil {
return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete)
}
// output from GET /_cat/indices for each index in the list
type indexInfo struct {
Aliases map[string]interface{} `json:"aliases"`
Mappings Mappings `json:"mappings"`
Settings IndexInfoSettings `json:"settings"`
}
type IndexInfoSettings struct {
Index IndexSettings `json:"index"`
}
UUID string `json:"uuid"`
CreationDate string `json:"creation_date"`
NumberOfShards string `json:"number_of_shards"`
NumberOfReplicas string `json:"number_of_replicas"`
ProviderName string `json:"provided_name"` // e.g. "go-utils-audit-test-20211103"
// Search returns docs indexed on OpenSearch document ID which cat be used in Get(id)
// The docs value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl)
// So you can safely type assert e.g.
// type myType struct {...}
// ts := opensearch.TimeSeries(..., myType{})
// docs,totalCount,err := ts.Search(...)
// if err == nil {
// for id,docValue := range docs {
// doc := docValue.(myType)
// ...
// }
// }
// docs will be a slice of the TimeSeries data type
func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, offset int64) (openSearchResult *SearchResponseHits, err error) {
Jan Semmelink
committed
if ts == nil {
return nil, errors.Errorf("time series == nil")
Jan Semmelink
committed
}
if limit < 0 || limit > 1000 {
err = errors.Errorf("limit=%d not 0..1000", limit)
// example opensearch request body for free text
// {
// "size": 5,
// "query": {
// "multi_match": {
// "query": "miller",
// "fields": ["title^2", "director"]
// }
// }
// }
body := SearchRequestBody{
Sort: sort,
Size: limit,
From: offset,
Query: query,
jsonBody, _ := json.Marshal(body)
search := opensearchapi.SearchRequest{
Index: []string{ts.name + "-*"},
Body: bytes.NewReader(jsonBody),
trueVal := true
search.AllowPartialSearchResults = &trueVal
searchResponse, err := search.Do(context.Background(), ts.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to search documents")
return
}
switch searchResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(searchResponse.Body)
err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
bodyData, _ := ioutil.ReadAll(searchResponse.Body)
var response SearchResponseBody
err = json.Unmarshal(bodyData, &response)
if err != nil {
logs.Info("opensearch response body: %s", string(bodyData))
err = errors.Wrapf(err, "cannot decode opensearch response body")
return &response.Hits, nil
// Get takes the id returned in Search()
// The id is uuid assigned by OpenSearch when documents are added with Write().
// The document value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl)
func (ts *TimeSeries) Get(id string) (res *GetResponseBody, err error) {
return nil, errors.Errorf("time series == nil")
}
parts := strings.SplitN(id, "/", 2)
get := opensearchapi.GetRequest{
Index: parts[0],
DocumentID: parts[1],
getResponse, err := get.Do(context.Background(), ts.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to get document")
return
}
switch getResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(getResponse.Body)
err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
return
}
bodyData, _ := ioutil.ReadAll(getResponse.Body)
var response GetResponseBody
err = json.Unmarshal(bodyData, &response)
logs.Info("GET response body: %s", string(bodyData))
err = errors.Wrapf(err, "cannot unmarshal GET response body")