package search import ( "bytes" "context" "encoding/json" "io/ioutil" "net/http" "reflect" "strings" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" "gitlab.com/uafrica/go-utils/reflection" ) type DocumentStore struct { w *Writer name string dataType reflect.Type settings Settings mappings Mappings jsonSettings []byte jsonMappings []byte created bool searchResponseBodyType reflect.Type getResponseBodyType reflect.Type } // NewDocumentStore purpose: // create a document store index to write e.g. orders then allow one to search them // parameters: // name must be the complete openSearch index name e.g. "uafrica-v3-orders" // tmpl must be your document data struct consisting of public fields as: // Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types) // Xxx time.Time `json:"<name>"` assumes type "date" for opensearch // Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, error) { ds := DocumentStore{} if !indexNameRegex.MatchString(name) { return ds, errors.Errorf("invalid index_name:\"%s\"", name) } //if already created, just return if existingDocumentStore, ok := w.documentStoreByName[name]; ok { return existingDocumentStore, nil } structType := reflect.TypeOf(tmpl) if tmpl == nil || structType.Kind() != reflect.Struct { return ds, errors.Errorf("%T is not a struct", tmpl) } ds = DocumentStore{ w: w, name: name, dataType: structType, created: false, } //define the OpenSearch index mapping ds.settings = Settings{ Index: &SettingsIndex{ NumberOfShards: 4, NumberOfReplicas: 0, }, } if properties, err := structMappingProperties(structType); err != nil { return ds, errors.Wrapf(err, "cannot map struct %s", structType) } else { ds.mappings = Mappings{ Properties: properties, } } var err error ds.jsonSettings, err = json.Marshal(ds.settings) if err != nil { return ds, errors.Wrapf(err, "failed to marshal index settings") } ds.jsonMappings, err = json.Marshal(ds.mappings) if err != nil { return ds, errors.Wrapf(err, "failed to marshal index mappings") } logs.Info("%s Index Mappings: %s", structType, string(ds.jsonMappings)) //define search response type //similar to SearchResponseBody ds.searchResponseBodyType, err = reflection.CloneType( reflect.TypeOf(SearchResponseBody{}), map[string]reflect.Type{ ".hits.hits[]._source": ds.dataType, }) if err != nil { return ds, errors.Wrapf(err, "failed to make search response type for document store") } //define get response type //similar to GetResponseBody ds.getResponseBodyType, err = reflection.CloneType( reflect.TypeOf(GetResponseBody{}), map[string]reflect.Type{ "._source": ds.dataType, }) if err != nil { return ds, errors.Wrapf(err, "failed to make get response type for document store") } w.documentStoreByName[name] = ds return ds, nil } //data must be of type specified in Writer.TimeSeries(tmpl) func (ds *DocumentStore) Write(id string, data interface{}) error { if data == nil { return errors.Errorf("data:nil") } t := reflect.TypeOf(data) if t != ds.dataType { return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType) } //get daily search index to write to, from start time indexName := ds.name // + "-" + startTime.Format("20060102") if !ds.created { res, err := ds.w.api.Create( indexName, //index name indexName, //document id strings.NewReader(string(ds.jsonSettings))) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } switch res.StatusCode { case http.StatusOK: case http.StatusCreated: case http.StatusConflict: //409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } res, err = opensearchapi.IndicesPutMappingRequest{ Index: []string{indexName}, Body: strings.NewReader(string(ds.jsonMappings)), }.Do(context.Background(), ds.w.client) if err != nil { return errors.Wrapf(err, "failed to create index(%s)", indexName) } switch res.StatusCode { case http.StatusOK: case http.StatusCreated: case http.StatusConflict: //409 = already exists default: return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String()) } ds.created = true } if res, err := ds.w.Write(indexName, id, data); err != nil { return err } else { logs.Info("IndexResponse: %+v", res) } return nil } //Search //Return: // docs will be a slice of the DocumentStore data type func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) { if ds == nil { return nil, 0, errors.Errorf("document store == nil") } if limit < 0 || limit > 1000 { err = errors.Errorf("limit=%d not 0..1000", limit) return } // example search request body for free text // { // "size": 5, // "query": { // "multi_match": { // "query": "miller", // "fields": ["title^2", "director"] // } // } // } body := SearchRequestBody{ Size: limit, Query: query, } jsonBody, _ := json.Marshal(body) search := opensearchapi.SearchRequest{ Index: []string{ds.name}, Body: bytes.NewReader(jsonBody), } searchResponse, err := search.Do(context.Background(), ds.w.client) if err != nil { err = errors.Wrapf(err, "failed to search documents") return } switch searchResponse.StatusCode { case http.StatusOK: default: resBody, _ := ioutil.ReadAll(searchResponse.Body) err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody)) return } bodyData, _ := ioutil.ReadAll(searchResponse.Body) logs.Info("Response Body: %s", string(bodyData)) resBodyPtrValue := reflect.New(ds.searchResponseBodyType) // if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { if err = json.Unmarshal(bodyData, resBodyPtrValue.Interface()); err != nil { err = errors.Wrapf(err, "cannot decode search response body") return } logs.Info("Response Parsed: %+v", resBodyPtrValue.Interface()) hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value") if err != nil { err = errors.Wrapf(err, "cannot get total nr of hits") return } if hitsTotalValue.Interface().(int) < 1 { return nil, 0, nil //no matches } foundIDs, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._id") if err != nil { err = errors.Wrapf(err, "cannot get search response documents") return } //logs.Errorf("items: (%T) %+v", foundIDs.Interface(), foundIDs.Interface()) return foundIDs.Interface().([]string), hitsTotalValue.Interface().(int), nil } func (ds *DocumentStore) Get(id string) (doc interface{}, err error) { if ds == nil { return nil, errors.Errorf("document store == nil") } get := opensearchapi.GetRequest{ Index: ds.name, DocumentType: "_doc", DocumentID: id, } getResponse, err := get.Do(context.Background(), ds.w.client) if err != nil { err = errors.Wrapf(err, "failed to get document") return } switch getResponse.StatusCode { case http.StatusOK: default: resBody, _ := ioutil.ReadAll(getResponse.Body) err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody)) return } resBodyPtrValue := reflect.New(ds.getResponseBodyType) if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil { err = errors.Wrapf(err, "cannot decode get response body") return } foundVar, err := reflection.Get(resBodyPtrValue, ".found") if err != nil { err = errors.Wrapf(err, "cannot get found value") return } if found, ok := foundVar.Interface().(bool); !ok || !found { return nil, nil //not found } //found source, err := reflection.Get(resBodyPtrValue, "._source") if err != nil { err = errors.Wrapf(err, "cannot get document from get response") return } return source.Interface(), nil } func (ds *DocumentStore) Delete(id string) (err error) { if ds == nil { return errors.Errorf("document store == nil") } del := opensearchapi.DeleteRequest{ Index: ds.name, DocumentType: "_doc", DocumentID: id, } delResponse, err := del.Do(context.Background(), ds.w.client) if err != nil { err = errors.Wrapf(err, "failed to del document") return } switch delResponse.StatusCode { case http.StatusOK: case http.StatusNotFound: case http.StatusNoContent: default: resBody, _ := ioutil.ReadAll(delResponse.Body) err = errors.Errorf("Del failed with HTTP status %v: %s", delResponse.StatusCode, string(resBody)) return } return nil }