Skip to content
Snippets Groups Projects
Commit 6a0f9ef4 authored by Jan Semmelink's avatar Jan Semmelink
Browse files

Update search package

parent 95a53472
Branches
Tags
1 merge request!5Add search client for OpenSearch to log and query API events
This commit is part of merge request !5. Comments created here will be created in the context of that merge request.
......@@ -31,3 +31,7 @@ func (c *Config) Validate() error {
const indexNamePattern = `[a-z]([a-z0-9-]*[a-z0-9])*`
var indexNameRegex = regexp.MustCompile("^" + indexNamePattern + "$")
func ValidIndexName(s string) bool {
return indexNameRegex.MatchString(s)
}
......@@ -11,15 +11,24 @@ import (
"gitlab.com/uafrica/go-utils/logger"
)
//embed this into your log struct
type TimeSeriesHeader struct {
StartTime time.Time `json:"@timestamp"`
EndTime time.Time `json:"@end_time"`
DurationMs int64 `json:"@duration_ms"`
}
type TimeSeries interface {
Write(StartTime time.Time, EndTime time.Time, data interface{}) error
}
type timeSeries struct {
w *writer
indexName string
name string
dataType reflect.Type
fields []dataField
jsonIndexSpec []byte
createdDates map[string]bool
}
type dataField struct {
......@@ -28,25 +37,39 @@ type dataField struct {
mapping MappingProperty
}
//purpose:
// create a time series to write e.g. api logs
//the tmpl must be your log data struct consisting of public fields as:
//parameters:
// name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-logs"
// the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-logs-20210102"
// tmpl must be your log data struct consisting of public fields as:
// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types)
// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch
// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required
func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error) {
if !indexNameRegex.MatchString(indexName) {
return nil, errors.Errorf("invalid index_name:\"%s\"", indexName)
func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
if !indexNameRegex.MatchString(name) {
return nil, errors.Errorf("invalid index_name:\"%s\"", name)
}
//if already created, just return
if existingTimeSeries, ok := w.timeSeriesByName[name]; ok {
return existingTimeSeries, nil
}
structType := reflect.TypeOf(tmpl)
if tmpl == nil || structType.Kind() != reflect.Struct {
return nil, errors.Errorf("%T is not a struct", tmpl)
}
if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) {
return nil, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl)
}
ts := &timeSeries{
w: w,
indexName: indexName,
name: name,
dataType: structType,
fields: []dataField{},
createdDates: map[string]bool{},
}
//define the OpenSearch index mapping
......@@ -109,7 +132,7 @@ func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, err
case "object":
dataField.mapping = MappingProperty{Type: "boolean", Enabled: false}
default:
return nil, errors.Errorf("Unknown search:\"%s\" on index(%s) field(%s)", structField.Tag.Get("search"), indexName, structField.Name)
return nil, errors.Errorf("Unknown search:\"%s\" on timeSeries(%s) field(%s)", structField.Tag.Get("search"), name, structField.Name)
}
//add to index spec
......@@ -128,52 +151,129 @@ func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, err
indexSpec.Mappings.Properties[n] = p
}
//todo: find out what is significance of "@..." in the name - or just convention? Is user allowed to use it too?
//create the index if it does not already exist
jsonIndexSpec, err := json.Marshal(indexSpec)
var err error
ts.jsonIndexSpec, err = json.Marshal(indexSpec)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal index spec")
}
logger.Debugf("JSON Index Specification: %s", string(jsonIndexSpec))
res, err := w.api.Create(
w.timeSeriesByName[name] = ts
return ts, nil
}
//data must be of type specified in Writer.TimeSeries(tmpl)
func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error {
if data == nil {
return errors.Errorf("data:nil")
}
t := reflect.TypeOf(data)
if t != ts.dataType {
return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType.Name())
}
//get daily search index to write to, from start time
indexName := ts.name + "-" + startTime.Format("20060102")
if _, ok := ts.createdDates[indexName]; !ok {
//create new index for this date - if not exists
res, err := ts.w.api.Create(
indexName, //index name
indexName, //index name also used for document id
strings.NewReader(string(jsonIndexSpec)))
strings.NewReader(string(ts.jsonIndexSpec)))
if err != nil {
return nil, errors.Wrapf(err, "failed to create index(%s)", indexName)
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
return ts, nil
case http.StatusCreated:
return ts, nil
case http.StatusConflict: //409 = already exists
return ts, nil
default:
return nil, errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
ts.createdDates[indexName] = true
}
//data must be of type specified in Writer.TimeSeries(tmpl)
func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error {
t := reflect.TypeOf(data)
if t != ts.dataType {
return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.indexName, ts.dataType.Name())
//copy to set the header values
x := reflect.New(ts.dataType)
x.Elem().Set(reflect.ValueOf(data))
x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{
StartTime: startTime,
EndTime: endTime,
DurationMs: int64(endTime.Sub(startTime) / time.Millisecond),
}))
return ts.w.Write(indexName, x.Elem().Interface())
}
//store all the data struct field values into a map[string]interface{} that we can marshal
//which includes the header fields
searchDoc := map[string]interface{}{}
//parameters:
// indexName is index prefix before dash-date, e.g. "api-logs" then will look for "api-logs-<date>"
//returns
// list of indices to delete with err==nil if deleted successfully
func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string, error) {
if !indexNameRegex.MatchString(indexName) {
return nil, errors.Errorf("invalid index_name:\"%s\"", indexName)
}
if olderThanDays < 0 {
return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays)
}
if olderThanDays == 0 {
return nil, nil
}
//make list of indices matching specified name e.g. "uafrica-v3-api-logs-*"
res, err := w.api.Indices.Get([]string{indexName + "-*"}, w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"}))
if err != nil {
return nil, errors.Wrapf(err, "failed to list existing %s-* indices", indexName)
}
switch res.StatusCode {
case http.StatusOK:
default:
return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
indices := map[string]indexInfo{}
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
return nil, errors.Wrapf(err, "failed to read list of indices")
}
//calculate time N days ago
//working in local time, assuming the server runs in same location as API user...
//so that index rotates at midnight local time rather than midnight UTC
t0 := time.Now()
timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time
timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) //= N days before that
indicesToDelete := []string{}
for dailyIndexName, dailyIndexInfo := range indices {
dateStr := dailyIndexName[len(indexName)+1:]
if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil {
logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr)
} else {
if date.Before(timeThreshold) {
logger.Debugf("Deleting index(%s).uuid(%s).docsCount(%d) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, dailyIndexInfo.Settings.Index.DocsCount, timeThreshold)
indicesToDelete = append(indicesToDelete, dailyIndexName)
}
}
}
if len(indicesToDelete) > 0 {
_, err := w.api.Indices.Delete(indicesToDelete)
if err != nil {
return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete)
}
logger.Debugf("Deleted %d daily indices(%s) older than %d days", len(indicesToDelete), indicesToDelete, olderThanDays)
}
return indicesToDelete, nil
}
//output from GET /_cat/indices for each index in the list
type indexInfo struct {
Aliases map[string]interface{} `json:"aliases"`
Mappings Mappings `json:"mappings"`
Settings IndexInfoSettings `json:"settings"`
}
v := reflect.ValueOf(data)
for _, field := range ts.fields {
searchDoc[field.name] = v.FieldByIndex(field.index).Interface()
type IndexInfoSettings struct {
Index IndexSettings `json:"index"`
}
//set header values
searchDoc["@timestamp"] = startTime
searchDoc["@end_time"] = endTime
searchDoc["@duration_ms"] = endTime.Sub(startTime) / time.Millisecond
return ts.w.Write(ts.indexName, searchDoc)
type IndexSettings struct {
UUID string `json:"uuid"`
DocsCount int64 `json:"docs.count"`
}
......@@ -13,7 +13,8 @@ import (
)
type Writer interface {
TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error)
TimeSeries(name string, tmpl interface{}) (TimeSeries, error) //tmpl must embed TimeSeriesHeader as first unanymous field
DelOldTimeSeries(name string, olderThanDays int) ([]string, error)
}
func New(config Config) (Writer, error) {
......@@ -22,6 +23,7 @@ func New(config Config) (Writer, error) {
}
w := &writer{
config: config,
timeSeriesByName: map[string]TimeSeries{},
}
// Initialize the client with SSL/TLS enabled.
......@@ -49,9 +51,10 @@ type writer struct {
config Config
client *opensearch.Client
api *opensearchapi.API
timeSeriesByName map[string]TimeSeries
}
func (writer writer) Write(indexName string, doc map[string]interface{}) error {
func (writer writer) Write(indexName string, doc interface{}) error {
if writer.client == nil {
return errors.Errorf("writer closed")
}
......
......@@ -3,6 +3,7 @@ package search_test
import (
"fmt"
"math/rand"
"sort"
"testing"
"time"
......@@ -11,8 +12,11 @@ import (
)
func TestLocalWriter(t *testing.T) {
test(t, search.Config{})
test(t, search.Config{
Addresses: []string{"https://localhost:9200"},
})
}
func TestDevWriter(t *testing.T) {
test(t, search.Config{
Addresses: []string{"https://search-uafrica-v3-api-logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/"}, //from AWS Console OpenSearch Service > Domains > uafrica-v3-api-logs > General Information: Domain Endpoints
......@@ -29,7 +33,8 @@ func test(t *testing.T, c search.Config) {
t.Fatalf("failed to create writer: %+v", err)
}
ts, err := a.TimeSeries("go-utils-audit-test", testStruct{})
indexName := "go-utils-audit-test"
ts, err := a.TimeSeries(indexName, testStruct{})
if err != nil {
t.Fatalf("failed to create time series: %+v", err)
}
......@@ -38,15 +43,16 @@ func test(t *testing.T, c search.Config) {
methods := []string{"GET", "POST", "GET", "PATCH", "GET", "GET", "DELETE", "GET", "GET"} //more gets than others
paths := []string{"/users", "/orders", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates"}
N := 100
testTime := time.Now().Add(-time.Second * time.Duration(N))
testTime := time.Now().Add(-time.Hour * time.Duration(N))
for i := 0; i < N; i++ {
testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Second)))
testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Hour)))
method := methods[i%len(methods)]
path := paths[i%len(paths)]
if err := ts.Write(
testTime,
testTime.Add(-time.Duration(float64(time.Second)*(float64(rand.Intn(100))/100.0+0.1))),
testStruct{
TimeSeriesHeader: search.TimeSeriesHeader{},
Test1: fmt.Sprintf("%d", i+1), //1,2,3,...
Test2: fmt.Sprintf("ACC_%05d", 93+i%7), //ACC_00093..ACC00100
Test3: i%3 + 8, //8,9, or 10
......@@ -60,10 +66,23 @@ func test(t *testing.T, c search.Config) {
t.Fatalf("failed to add doc: %+v", err)
}
}
oldList, err := a.DelOldTimeSeries(indexName, 2)
if err != nil {
t.Fatalf("failed to del old: %+v", err)
}
sort.Slice(oldList, func(i, j int) bool { return oldList[i] < oldList[j] })
t.Logf("Deleted %d old series", len(oldList))
//indexes deleted depends on current time, so not verifying
// if len(oldList) != 2 || oldList[0] != "go-utils-audit-test-20211029" || oldList[1] != "go-utils-audit-test-20211030" {
// t.Fatalf("Did not delete expected indices")
// }
t.Logf("Done")
}
type testStruct struct {
search.TimeSeriesHeader
Test1 string `json:"test1"`
Test2 string `json:"test2"`
Test3 int `json:"test3"`
......@@ -76,3 +95,20 @@ type httpData struct {
Method string `json:"method" search:"keyword"`
Path string `json:"path" search:"keyword"`
}
func TestOlderThan(t *testing.T) {
local := time.Now().Location()
//time now for test (using a fixed value in SAST)
t0, _ := time.ParseInLocation("2006-01-02 15:04:05", "2021-10-20 14:15:16", local)
t.Logf("t0=%s", t0)
//threshold is 2 days older, applying at midnight in location SAST
olderThanDays := 2
t.Logf("n=%d", olderThanDays)
t1 := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, local)
t.Logf("t1=%s", t1)
t1 = t1.Add(-time.Hour * 24 * time.Duration(olderThanDays))
t.Logf("Threshold = %s", t1)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment