diff --git a/config/struct.go b/config/struct.go index 90d0b30aba2865e17488f742ef618569c01e02c7..f52c44b08c5a2e3d962248d6b7b031256cc0929a 100644 --- a/config/struct.go +++ b/config/struct.go @@ -24,7 +24,15 @@ func Load(prefix string, configPtr interface{}) error { return errors.Errorf("%T is not &struct", configPtr) } v := reflect.ValueOf(configPtr) - return load(prefix, t.Elem(), v.Elem()) + if err := load(prefix, t.Elem(), v.Elem()); err != nil { + return errors.Wrapf(err, "failed to load config with prefix %s", prefix) + } + if validator, ok := configPtr.(Validator); ok { + if err := validator.Validate(); err != nil { + return errors.Wrapf(err, "invalid config with prefix %s", prefix) + } + } + return nil } type nameValue struct { diff --git a/search/config.go b/search/config.go index 0adce93969418955ec242a33714c69d035dfad83..1ae4c1974d40987d06945aaabeb9089e998cba90 100644 --- a/search/config.go +++ b/search/config.go @@ -31,3 +31,7 @@ func (c *Config) Validate() error { const indexNamePattern = `[a-z]([a-z0-9-]*[a-z0-9])*` var indexNameRegex = regexp.MustCompile("^" + indexNamePattern + "$") + +func ValidIndexName(s string) bool { + return indexNameRegex.MatchString(s) +} diff --git a/search/time_series.go b/search/time_series.go index 28e01790e4e8c252f41f940b00f8d41084858bc1..1c3e47fbd88ce25f76e7feaf958bd14305c0a6c7 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -11,15 +11,24 @@ import ( "gitlab.com/uafrica/go-utils/logger" ) +//embed this into your log struct +type TimeSeriesHeader struct { + StartTime time.Time `json:"@timestamp"` + EndTime time.Time `json:"@end_time"` + DurationMs int64 `json:"@duration_ms"` +} + type TimeSeries interface { Write(StartTime time.Time, EndTime time.Time, data interface{}) error } type timeSeries struct { - w *writer - indexName string - dataType reflect.Type - fields []dataField + w *writer + name string + dataType reflect.Type + fields []dataField + jsonIndexSpec []byte + createdDates map[string]bool } type dataField struct { @@ -28,25 +37,39 @@ type dataField struct { mapping MappingProperty } -//create a time series to write e.g. api logs -//the tmpl must be your log data struct consisting of public fields as: -// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) -// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch -// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required -func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error) { - if !indexNameRegex.MatchString(indexName) { - return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) +//purpose: +// create a time series to write e.g. api logs +//parameters: +// name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-logs" +// the actual indices in openSearch will be called "<indexName>-<ccyymmdd>" e.g. "uafrica-v3-api-logs-20210102" +// tmpl must be your log data struct consisting of public fields as: +// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) +// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch +// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required +func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { + if !indexNameRegex.MatchString(name) { + return nil, errors.Errorf("invalid index_name:\"%s\"", name) + } + + //if already created, just return + if existingTimeSeries, ok := w.timeSeriesByName[name]; ok { + return existingTimeSeries, nil } + structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { return nil, errors.Errorf("%T is not a struct", tmpl) } + if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) { + return nil, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl) + } ts := &timeSeries{ - w: w, - indexName: indexName, - dataType: structType, - fields: []dataField{}, + w: w, + name: name, + dataType: structType, + fields: []dataField{}, + createdDates: map[string]bool{}, } //define the OpenSearch index mapping @@ -109,7 +132,7 @@ func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, err case "object": dataField.mapping = MappingProperty{Type: "boolean", Enabled: false} default: - return nil, errors.Errorf("Unknown search:\"%s\" on index(%s) field(%s)", structField.Tag.Get("search"), indexName, structField.Name) + return nil, errors.Errorf("Unknown search:\"%s\" on timeSeries(%s) field(%s)", structField.Tag.Get("search"), name, structField.Name) } //add to index spec @@ -128,52 +151,129 @@ func (w *writer) TimeSeries(indexName string, tmpl interface{}) (TimeSeries, err indexSpec.Mappings.Properties[n] = p } - //todo: find out what is significance of "@..." in the name - or just convention? Is user allowed to use it too? - - //create the index if it does not already exist - jsonIndexSpec, err := json.Marshal(indexSpec) + var err error + ts.jsonIndexSpec, err = json.Marshal(indexSpec) if err != nil { return nil, errors.Wrapf(err, "failed to marshal index spec") } - logger.Debugf("JSON Index Specification: %s", string(jsonIndexSpec)) - res, err := w.api.Create( - indexName, //index name - indexName, //index name also used for document id - strings.NewReader(string(jsonIndexSpec))) + w.timeSeriesByName[name] = ts + return ts, nil +} + +//data must be of type specified in Writer.TimeSeries(tmpl) +func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { + if data == nil { + return errors.Errorf("data:nil") + } + t := reflect.TypeOf(data) + if t != ts.dataType { + return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.name, ts.dataType.Name()) + } + + //get daily search index to write to, from start time + indexName := ts.name + "-" + startTime.Format("20060102") + if _, ok := ts.createdDates[indexName]; !ok { + //create new index for this date - if not exists + res, err := ts.w.api.Create( + indexName, //index name + indexName, //index name also used for document id + strings.NewReader(string(ts.jsonIndexSpec))) + if err != nil { + return errors.Wrapf(err, "failed to create index(%s)", indexName) + } + switch res.StatusCode { + case http.StatusOK: + case http.StatusCreated: + case http.StatusConflict: //409 = already exists + default: + return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) + } + ts.createdDates[indexName] = true + } + + //copy to set the header values + x := reflect.New(ts.dataType) + x.Elem().Set(reflect.ValueOf(data)) + x.Elem().Field(0).Set(reflect.ValueOf(TimeSeriesHeader{ + StartTime: startTime, + EndTime: endTime, + DurationMs: int64(endTime.Sub(startTime) / time.Millisecond), + })) + return ts.w.Write(indexName, x.Elem().Interface()) +} + +//parameters: +// indexName is index prefix before dash-date, e.g. "api-logs" then will look for "api-logs-<date>" +//returns +// list of indices to delete with err==nil if deleted successfully +func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string, error) { + if !indexNameRegex.MatchString(indexName) { + return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) + } + if olderThanDays < 0 { + return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays) + } + if olderThanDays == 0 { + return nil, nil + } + + //make list of indices matching specified name e.g. "uafrica-v3-api-logs-*" + res, err := w.api.Indices.Get([]string{indexName + "-*"}, w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"})) if err != nil { - return nil, errors.Wrapf(err, "failed to create index(%s)", indexName) + return nil, errors.Wrapf(err, "failed to list existing %s-* indices", indexName) } switch res.StatusCode { case http.StatusOK: - return ts, nil - case http.StatusCreated: - return ts, nil - case http.StatusConflict: //409 = already exists - return ts, nil default: - return nil, errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) + return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } -} -//data must be of type specified in Writer.TimeSeries(tmpl) -func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { - t := reflect.TypeOf(data) - if t != ts.dataType { - return errors.Errorf("cannot write %T into TimeSeries(%s), expecting %s", data, ts.indexName, ts.dataType.Name()) + indices := map[string]indexInfo{} + if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { + return nil, errors.Wrapf(err, "failed to read list of indices") } - //store all the data struct field values into a map[string]interface{} that we can marshal - //which includes the header fields - searchDoc := map[string]interface{}{} + //calculate time N days ago + //working in local time, assuming the server runs in same location as API user... + //so that index rotates at midnight local time rather than midnight UTC + t0 := time.Now() + timeThreshold := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, t0.Location()) //= midnight yesterday in local time + timeThreshold = timeThreshold.Add(-time.Hour * 24 * time.Duration(olderThanDays)) //= N days before that - v := reflect.ValueOf(data) - for _, field := range ts.fields { - searchDoc[field.name] = v.FieldByIndex(field.index).Interface() + indicesToDelete := []string{} + for dailyIndexName, dailyIndexInfo := range indices { + dateStr := dailyIndexName[len(indexName)+1:] + if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil { + logger.Debugf("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr) + } else { + if date.Before(timeThreshold) { + logger.Debugf("Deleting index(%s).uuid(%s).docsCount(%d) older than %s days...", dailyIndexName, dailyIndexInfo.Settings.Index.UUID, dailyIndexInfo.Settings.Index.DocsCount, timeThreshold) + indicesToDelete = append(indicesToDelete, dailyIndexName) + } + } + } + if len(indicesToDelete) > 0 { + _, err := w.api.Indices.Delete(indicesToDelete) + if err != nil { + return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete) + } + logger.Debugf("Deleted %d daily indices(%s) older than %d days", len(indicesToDelete), indicesToDelete, olderThanDays) } + return indicesToDelete, nil +} + +//output from GET /_cat/indices for each index in the list +type indexInfo struct { + Aliases map[string]interface{} `json:"aliases"` + Mappings Mappings `json:"mappings"` + Settings IndexInfoSettings `json:"settings"` +} + +type IndexInfoSettings struct { + Index IndexSettings `json:"index"` +} - //set header values - searchDoc["@timestamp"] = startTime - searchDoc["@end_time"] = endTime - searchDoc["@duration_ms"] = endTime.Sub(startTime) / time.Millisecond - return ts.w.Write(ts.indexName, searchDoc) +type IndexSettings struct { + UUID string `json:"uuid"` + DocsCount int64 `json:"docs.count"` } diff --git a/search/writer.go b/search/writer.go index a36e05f153d36c880cec5060b801b4a90738a354..efcd6bc09b85287d379cfbfa5b943e5a219c3178 100644 --- a/search/writer.go +++ b/search/writer.go @@ -13,7 +13,8 @@ import ( ) type Writer interface { - TimeSeries(indexName string, tmpl interface{}) (TimeSeries, error) + TimeSeries(name string, tmpl interface{}) (TimeSeries, error) //tmpl must embed TimeSeriesHeader as first unanymous field + DelOldTimeSeries(name string, olderThanDays int) ([]string, error) } func New(config Config) (Writer, error) { @@ -21,7 +22,8 @@ func New(config Config) (Writer, error) { return nil, errors.Wrapf(err, "invalid config") } w := &writer{ - config: config, + config: config, + timeSeriesByName: map[string]TimeSeries{}, } // Initialize the client with SSL/TLS enabled. @@ -46,12 +48,13 @@ func New(config Config) (Writer, error) { //implements audit.Auditor type writer struct { - config Config - client *opensearch.Client - api *opensearchapi.API + config Config + client *opensearch.Client + api *opensearchapi.API + timeSeriesByName map[string]TimeSeries } -func (writer writer) Write(indexName string, doc map[string]interface{}) error { +func (writer writer) Write(indexName string, doc interface{}) error { if writer.client == nil { return errors.Errorf("writer closed") } diff --git a/search/writer_test.go b/search/writer_test.go index f09757a28ccbd5e56b188c33d8fe4bf5976ddef5..f85d7a350ca12d6bb0c11aaeb93b979a97d4a8ae 100644 --- a/search/writer_test.go +++ b/search/writer_test.go @@ -3,6 +3,7 @@ package search_test import ( "fmt" "math/rand" + "sort" "testing" "time" @@ -11,8 +12,11 @@ import ( ) func TestLocalWriter(t *testing.T) { - test(t, search.Config{}) + test(t, search.Config{ + Addresses: []string{"https://localhost:9200"}, + }) } + func TestDevWriter(t *testing.T) { test(t, search.Config{ Addresses: []string{"https://search-uafrica-v3-api-logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/"}, //from AWS Console OpenSearch Service > Domains > uafrica-v3-api-logs > General Information: Domain Endpoints @@ -29,7 +33,8 @@ func test(t *testing.T, c search.Config) { t.Fatalf("failed to create writer: %+v", err) } - ts, err := a.TimeSeries("go-utils-audit-test", testStruct{}) + indexName := "go-utils-audit-test" + ts, err := a.TimeSeries(indexName, testStruct{}) if err != nil { t.Fatalf("failed to create time series: %+v", err) } @@ -38,18 +43,19 @@ func test(t *testing.T, c search.Config) { methods := []string{"GET", "POST", "GET", "PATCH", "GET", "GET", "DELETE", "GET", "GET"} //more gets than others paths := []string{"/users", "/orders", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates"} N := 100 - testTime := time.Now().Add(-time.Second * time.Duration(N)) + testTime := time.Now().Add(-time.Hour * time.Duration(N)) for i := 0; i < N; i++ { - testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Second))) + testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Hour))) method := methods[i%len(methods)] path := paths[i%len(paths)] if err := ts.Write( testTime, testTime.Add(-time.Duration(float64(time.Second)*(float64(rand.Intn(100))/100.0+0.1))), testStruct{ - Test1: fmt.Sprintf("%d", i+1), //1,2,3,... - Test2: fmt.Sprintf("ACC_%05d", 93+i%7), //ACC_00093..ACC00100 - Test3: i%3 + 8, //8,9, or 10 + TimeSeriesHeader: search.TimeSeriesHeader{}, + Test1: fmt.Sprintf("%d", i+1), //1,2,3,... + Test2: fmt.Sprintf("ACC_%05d", 93+i%7), //ACC_00093..ACC00100 + Test3: i%3 + 8, //8,9, or 10 HTTP: httpData{ Method: method, Path: path, @@ -60,10 +66,23 @@ func test(t *testing.T, c search.Config) { t.Fatalf("failed to add doc: %+v", err) } } + + oldList, err := a.DelOldTimeSeries(indexName, 2) + if err != nil { + t.Fatalf("failed to del old: %+v", err) + } + sort.Slice(oldList, func(i, j int) bool { return oldList[i] < oldList[j] }) + + t.Logf("Deleted %d old series", len(oldList)) + //indexes deleted depends on current time, so not verifying + // if len(oldList) != 2 || oldList[0] != "go-utils-audit-test-20211029" || oldList[1] != "go-utils-audit-test-20211030" { + // t.Fatalf("Did not delete expected indices") + // } t.Logf("Done") } type testStruct struct { + search.TimeSeriesHeader Test1 string `json:"test1"` Test2 string `json:"test2"` Test3 int `json:"test3"` @@ -76,3 +95,20 @@ type httpData struct { Method string `json:"method" search:"keyword"` Path string `json:"path" search:"keyword"` } + +func TestOlderThan(t *testing.T) { + local := time.Now().Location() + + //time now for test (using a fixed value in SAST) + t0, _ := time.ParseInLocation("2006-01-02 15:04:05", "2021-10-20 14:15:16", local) + t.Logf("t0=%s", t0) + + //threshold is 2 days older, applying at midnight in location SAST + olderThanDays := 2 + t.Logf("n=%d", olderThanDays) + + t1 := time.Date(t0.Year(), t0.Month(), t0.Day(), 0, 0, 0, 0, local) + t.Logf("t1=%s", t1) + t1 = t1.Add(-time.Hour * 24 * time.Duration(olderThanDays)) + t.Logf("Threshold = %s", t1) +}