diff --git a/search/document_store.go b/search/document_store.go index 07278e1b0eaee9c528c2502a3d926c0cad7a87e4..7690f33c03a2dd9582ccdf8c5aeac7ad5b658759 100644 --- a/search/document_store.go +++ b/search/document_store.go @@ -15,15 +15,8 @@ import ( "gitlab.com/uafrica/go-utils/reflection" ) -type DocumentStore interface { - Write(id string, data interface{}) error - Search(query Query, limit int64) (ids []string, totalCount int, err error) - Get(id string) (doc interface{}, err error) - Delete(id string) error -} - -type documentStore struct { - w *writer +type DocumentStore struct { + w *Writer name string dataType reflect.Type settings Settings @@ -35,17 +28,18 @@ type documentStore struct { getResponseBodyType reflect.Type } -//purpose: +// NewDocumentStore purpose: // create a document store index to write e.g. orders then allow one to search them -//parameters: +// parameters: // name must be the complete openSearch index name e.g. "uafrica-v3-orders" // tmpl must be your document data struct consisting of public fields as: // Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) // Xxx time.Time `json:"<name>"` assumes type "date" for opensearch // Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required -func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, error) { +func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, error) { + ds := DocumentStore{} if !indexNameRegex.MatchString(name) { - return nil, errors.Errorf("invalid index_name:\"%s\"", name) + return ds, errors.Errorf("invalid index_name:\"%s\"", name) } //if already created, just return @@ -55,10 +49,10 @@ func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, er structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { - return nil, errors.Errorf("%T is not a struct", tmpl) + return ds, errors.Errorf("%T is not a struct", tmpl) } - ds := &documentStore{ + ds = DocumentStore{ w: w, name: name, dataType: structType, @@ -74,7 +68,7 @@ func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, er } if properties, err := structMappingProperties(structType); err != nil { - return nil, errors.Wrapf(err, "cannot map struct %s", structType) + return ds, errors.Wrapf(err, "cannot map struct %s", structType) } else { ds.mappings = Mappings{ Properties: properties, @@ -84,11 +78,11 @@ func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, er var err error ds.jsonSettings, err = json.Marshal(ds.settings) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal index settings") + return ds, errors.Wrapf(err, "failed to marshal index settings") } ds.jsonMappings, err = json.Marshal(ds.mappings) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal index mappings") + return ds, errors.Wrapf(err, "failed to marshal index mappings") } logs.Info("%s Index Mappings: %s", structType, string(ds.jsonMappings)) @@ -100,7 +94,7 @@ func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, er ".hits.hits[]._source": ds.dataType, }) if err != nil { - return nil, errors.Wrapf(err, "failed to make search response type for document store") + return ds, errors.Wrapf(err, "failed to make search response type for document store") } //define get response type @@ -111,14 +105,14 @@ func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, er "._source": ds.dataType, }) if err != nil { - return nil, errors.Wrapf(err, "failed to make get response type for document store") + return ds, errors.Wrapf(err, "failed to make get response type for document store") } w.documentStoreByName[name] = ds return ds, nil } //data must be of type specified in Writer.TimeSeries(tmpl) -func (ds *documentStore) Write(id string, data interface{}) error { +func (ds *DocumentStore) Write(id string, data interface{}) error { if data == nil { return errors.Errorf("data:nil") } @@ -172,7 +166,7 @@ func (ds *documentStore) Write(id string, data interface{}) error { //Search //Return: // docs will be a slice of the DocumentStore data type -func (ds *documentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) { +func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) { if ds == nil { return nil, 0, errors.Errorf("document store == nil") } @@ -246,7 +240,7 @@ func (ds *documentStore) Search(query Query, limit int64) (ids []string, totalCo return foundIDs.Interface().([]string), hitsTotalValue.Interface().(int), nil } -func (ds *documentStore) Get(id string) (doc interface{}, err error) { +func (ds *DocumentStore) Get(id string) (doc interface{}, err error) { if ds == nil { return nil, errors.Errorf("document store == nil") } @@ -293,7 +287,7 @@ func (ds *documentStore) Get(id string) (doc interface{}, err error) { return source.Interface(), nil } -func (ds *documentStore) Delete(id string) (err error) { +func (ds *DocumentStore) Delete(id string) (err error) { if ds == nil { return errors.Errorf("document store == nil") } diff --git a/search/document_store_test.go b/search/document_store_test.go index 9e7726086f685e5cc3e9c31a1bb8904db9cc9c2c..fb7cd16436c2254f6123e9342cbfc65eb030b402 100644 --- a/search/document_store_test.go +++ b/search/document_store_test.go @@ -34,7 +34,7 @@ func testDocuments(t *testing.T, c search.Config) { } indexName := "go-utils-search-docs-test" - ds, err := a.DocumentStore(indexName, SearchOrder{}) + ds, err := a.NewDocumentStore(indexName, SearchOrder{}) if err != nil { t.Fatalf("failed to create document store: %+v", err) } diff --git a/search/search_test.go b/search/search_test.go index db25f0659c1ed6f400b7a2db0b1f0dba8129d305..9a3ee68440703dec1885ef15225eb0124fadd75f 100644 --- a/search/search_test.go +++ b/search/search_test.go @@ -33,7 +33,7 @@ func test(t *testing.T, c search.Config) { } indexName := "go-utils-audit-test" - ts, err := a.TimeSeries(indexName, testStruct{}) + ts, err := a.NewTimeSeries(indexName, testStruct{}) if err != nil { t.Fatalf("failed to create time series: %+v", err) } @@ -87,7 +87,7 @@ func test(t *testing.T, c search.Config) { // } //} - oldList, err := a.DelOldTimeSeries(indexName, 2) + oldList, err := ts.DelOldTimeSeries(2) if err != nil { t.Fatalf("failed to del old: %+v", err) } diff --git a/search/time_series.go b/search/time_series.go index e3542d00ff8dca62e5b6c8e6333c7ce19a68c179..7e46f3d6e61a8d2926cddb3bfe45d50aeaa53456 100644 --- a/search/time_series.go +++ b/search/time_series.go @@ -18,38 +18,15 @@ import ( const TimeFormat = "2006-01-02T15:04:05Z07:00" -//embed this into your log struct +// TimeSeriesHeader embed this into your log struct type TimeSeriesHeader struct { StartTime time.Time `json:"@timestamp"` EndTime time.Time `json:"@end_time"` DurationMs int64 `json:"@duration_ms"` } -type TimeSeries interface { - Write(StartTime time.Time, EndTime time.Time, data interface{}) error - - // Search returns docs indexed on OpenSearch document ID which cat be used in Get(id) - // The docs value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl) - // So you can safely type assert e.g. - // type myType struct {...} - // ts := search.TimeSeries(..., myType{}) - // docs,totalCount,err := ts.Search(...) - // if err == nil { - // for id,docValue := range docs { - // doc := docValue.(myType) - // ... - // } - // } - Search(query Query, searchQuery []map[string]string, limit int64, offset int64) (docs map[string]interface{}, totalCount int, err error) - - // Get takes the id returned in Search() - // The id is uuid assigned by OpenSearch when documents are added with Write(). - // The document value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl) - Get(id string) (interface{}, error) -} - -type timeSeries struct { - w *writer +type TimeSeries struct { + w *Writer name string dataType reflect.Type settings Settings @@ -62,7 +39,7 @@ type timeSeries struct { getResponseBodyType reflect.Type } -// TimeSeries purpose: +// NewTimeSeries purpose: // create a time series to write e.g. api api_logs // parameters: // name must be the openSearch index name prefix without the date, e.g. "uafrica-v3-api-api_logs" @@ -71,9 +48,11 @@ type timeSeries struct { // Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) // Xxx time.Time `json:"<name>"` assumes type "date" for opensearch // Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required -func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { +func (w *Writer) NewTimeSeries(name string, tmpl interface{}) (TimeSeries, error) { + ts := TimeSeries{} + if !indexNameRegex.MatchString(name) { - return nil, errors.Errorf("invalid index_name:\"%s\"", name) + return ts, errors.Errorf("invalid index_name:\"%s\"", name) } //if already created, just return @@ -83,13 +62,13 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { - return nil, errors.Errorf("%T is not a struct", tmpl) + return ts, errors.Errorf("%T is not a struct", tmpl) } if structType.NumField() < 1 || !structType.Field(0).Anonymous || structType.Field(0).Type != reflect.TypeOf(TimeSeriesHeader{}) { - return nil, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl) + return ts, errors.Errorf("%T does not start with anonymous TimeSeriesHeader", tmpl) } - ts := &timeSeries{ + ts = TimeSeries{ w: w, name: name, dataType: structType, @@ -105,7 +84,7 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { } if properties, err := structMappingProperties(structType); err != nil { - return nil, errors.Wrapf(err, "cannot map struct %s", structType) + return ts, errors.Wrapf(err, "cannot map struct %s", structType) } else { ts.mappings = Mappings{ Properties: properties, @@ -115,11 +94,11 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { var err error ts.jsonSettings, err = json.Marshal(ts.settings) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal index settings") + return ts, errors.Wrapf(err, "failed to marshal index settings") } ts.jsonMappings, err = json.Marshal(ts.mappings) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal index mappings") + return ts, errors.Wrapf(err, "failed to marshal index mappings") } logs.Info("%s Index Mappings: %s", structType, string(ts.jsonMappings)) @@ -131,7 +110,7 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { ".hits.hits[]._source": ts.dataType, }) if err != nil { - return nil, errors.Wrapf(err, "failed to make search response type for time-series") + return ts, errors.Wrapf(err, "failed to make search response type for time-series") } // define get response type @@ -142,7 +121,7 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) { "._source": ts.dataType, }) if err != nil { - return nil, errors.Wrapf(err, "failed to make get response type for time-series") + return ts, errors.Wrapf(err, "failed to make get response type for time-series") } w.timeSeriesByName[name] = ts @@ -253,7 +232,7 @@ func structMappingProperties(structType reflect.Type) (map[string]MappingPropert } //data must be of type specified in Writer.TimeSeries(tmpl) -func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) error { +func (ts *TimeSeries) Write(startTime, endTime time.Time, data interface{}) error { if data == nil { return errors.Errorf("data:nil") } @@ -316,14 +295,11 @@ func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) erro } -//parameters: +// DelOldTimeSeries parameters: // indexName is index prefix before dash-date, e.g. "api-api_logs" then will look for "api-api_logs-<date>" -//returns +// returns // list of indices to delete with err==nil if deleted successfully -func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string, error) { - if !indexNameRegex.MatchString(indexName) { - return nil, errors.Errorf("invalid index_name:\"%s\"", indexName) - } +func (ts *TimeSeries) DelOldTimeSeries(olderThanDays int) ([]string, error) { if olderThanDays < 0 { return nil, errors.Errorf("invalid olderThanDays=%d < 0", olderThanDays) } @@ -332,14 +308,14 @@ func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string } //make list of indices matching specified name e.g. "uafrica-v3-api-api_logs-*" - res, err := w.api.Indices.Get([]string{indexName + "-*"}, w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"})) + res, err := ts.w.api.Indices.Get([]string{ts.name + "-*"}, ts.w.api.Indices.Get.WithHeader(map[string]string{"Accept": "application/json"})) if err != nil { - return nil, errors.Wrapf(err, "failed to list existing %s-* indices", indexName) + return nil, errors.Wrapf(err, "failed to list existing %s-* indices", ts.name) } switch res.StatusCode { case http.StatusOK: default: - return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) + return nil, errors.Errorf("failed to list existing %s-* indices: %v %s %s", ts.name, res.StatusCode, res.Status(), res.String()) } indices := map[string]indexInfo{} @@ -356,7 +332,7 @@ func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string indicesToDelete := []string{} for dailyIndexName, dailyIndexInfo := range indices { - dateStr := dailyIndexName[len(indexName)+1:] + dateStr := dailyIndexName[len(ts.name)+1:] if date, err := time.ParseInLocation("20060102", dateStr, t0.Location()); err != nil { logs.Info("Ignore index(%s) with invalid date(%s)", dailyIndexName, dateStr) } else { @@ -367,7 +343,7 @@ func (w *writer) DelOldTimeSeries(indexName string, olderThanDays int) ([]string } } if len(indicesToDelete) > 0 { - _, err := w.api.Indices.Delete(indicesToDelete) + _, err := ts.w.api.Indices.Delete(indicesToDelete) if err != nil { return indicesToDelete, errors.Wrapf(err, "failed to delete indices(%s)", indicesToDelete) } @@ -395,10 +371,20 @@ type IndexSettings struct { ProviderName string `json:"provided_name"` //e.g. "go-utils-audit-test-20211103" } -//Search -//Return: +// Search returns docs indexed on OpenSearch document ID which cat be used in Get(id) +// The docs value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl) +// So you can safely type assert e.g. +// type myType struct {...} +// ts := search.TimeSeries(..., myType{}) +// docs,totalCount,err := ts.Search(...) +// if err == nil { +// for id,docValue := range docs { +// doc := docValue.(myType) +// ... +// } +// } // docs will be a slice of the TimeSeries data type -func (ts *timeSeries) Search(query Query, sort []map[string]string, limit int64, offset int64) (docs map[string]interface{}, totalCount int, err error) { +func (ts *TimeSeries) Search(query Query, sort []map[string]string, limit int64, offset int64) (docs map[string]interface{}, totalCount int, err error) { if ts == nil { return nil, 0, errors.Errorf("time series == nil") } @@ -483,8 +469,11 @@ func (ts *timeSeries) Search(query Query, sort []map[string]string, limit int64, return docs, hitsTotalValue.Interface().(int), nil } -func (ds *timeSeries) Get(id string) (doc interface{}, err error) { - if ds == nil { +// Get takes the id returned in Search() +// The id is uuid assigned by OpenSearch when documents are added with Write(). +// The document value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl) +func (ts *TimeSeries) Get(id string) (doc interface{}, err error) { + if ts == nil { return nil, errors.Errorf("document store == nil") } parts := strings.SplitN(id, "/", 2) @@ -493,7 +482,7 @@ func (ds *timeSeries) Get(id string) (doc interface{}, err error) { DocumentType: "_doc", DocumentID: parts[1], } - getResponse, err := get.Do(context.Background(), ds.w.client) + getResponse, err := get.Do(context.Background(), ts.w.client) if err != nil { err = errors.Wrapf(err, "failed to get document") return @@ -507,7 +496,7 @@ func (ds *timeSeries) Get(id string) (doc interface{}, err error) { return } - resBodyPtrValue := reflect.New(ds.getResponseBodyType) + resBodyPtrValue := reflect.New(ts.getResponseBodyType) if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { err = errors.Wrapf(err, "cannot decode get response body") return diff --git a/search/writer.go b/search/writer.go index ba47ed66b21707c2682a2c3512944adbbe2a80b4..7ddd5a2521a7472b631535ac59379848552ff970 100644 --- a/search/writer.go +++ b/search/writer.go @@ -12,17 +12,21 @@ import ( "gitlab.com/uafrica/go-utils/logs" ) -type Writer interface { - TimeSeries(name string, tmpl interface{}) (TimeSeries, error) //tmpl must embed TimeSeriesHeader as first unanymous field - DelOldTimeSeries(name string, olderThanDays int) ([]string, error) - DocumentStore(name string, tmpl interface{}) (DocumentStore, error) +type Writer struct { + config Config + client *opensearch.Client + api *opensearchapi.API + timeSeriesByName map[string]TimeSeries + documentStoreByName map[string]DocumentStore } func New(config Config) (Writer, error) { + w := Writer{} + if err := config.Validate(); err != nil { - return nil, errors.Wrapf(err, "invalid config") + return w, errors.Wrapf(err, "invalid config") } - w := &writer{ + w = Writer{ config: config, timeSeriesByName: map[string]TimeSeries{}, documentStoreByName: map[string]DocumentStore{}, @@ -40,7 +44,7 @@ func New(config Config) (Writer, error) { var err error w.client, err = opensearch.NewClient(searchConfig) if err != nil { - return nil, errors.Wrapf(err, "cannot initialize opensearch connection") + return w, errors.Wrapf(err, "cannot initialize opensearch connection") } // Print OpenSearch version information on console. logs.Info("Search client created with config: %+v", searchConfig) @@ -49,15 +53,7 @@ func New(config Config) (Writer, error) { return w, nil } -type writer struct { - config Config - client *opensearch.Client - api *opensearchapi.API - timeSeriesByName map[string]TimeSeries - documentStoreByName map[string]DocumentStore -} - -func (writer writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) { +func (writer Writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) { if writer.client == nil { return nil, errors.Errorf("writer closed") }