Select Git revision
document_store.go
-
Francé Wilke authoredFrancé Wilke authored
document_store.go 8.86 KiB
package search
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"net/http"
"reflect"
"strings"
opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logs"
"gitlab.com/uafrica/go-utils/reflection"
)
type DocumentStore struct {
w *Writer
name string
dataType reflect.Type
settings Settings
mappings Mappings
jsonSettings []byte
jsonMappings []byte
created bool
searchResponseBodyType reflect.Type
getResponseBodyType reflect.Type
}
// NewDocumentStore purpose:
// create a document store index to write e.g. orders then allow one to search them
// parameters:
// name must be the complete openSearch index name e.g. "uafrica-v3-orders"
// tmpl must be your document data struct consisting of public fields as:
// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types)
// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch
// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required
func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, error) {
ds := DocumentStore{}
if !indexNameRegex.MatchString(name) {
return ds, errors.Errorf("invalid index_name:\"%s\"", name)
}
//if already created, just return
if existingDocumentStore, ok := w.documentStoreByName[name]; ok {
return existingDocumentStore, nil
}
structType := reflect.TypeOf(tmpl)
if tmpl == nil || structType.Kind() != reflect.Struct {
return ds, errors.Errorf("%T is not a struct", tmpl)
}
ds = DocumentStore{
w: w,
name: name,
dataType: structType,
created: false,
}
//define the OpenSearch index mapping
ds.settings = Settings{
Index: &SettingsIndex{
NumberOfShards: 4,
NumberOfReplicas: 0,
},
}
if properties, err := structMappingProperties(structType); err != nil {
return ds, errors.Wrapf(err, "cannot map struct %s", structType)
} else {
ds.mappings = Mappings{
Properties: properties,
}
}
var err error
ds.jsonSettings, err = json.Marshal(ds.settings)
if err != nil {
return ds, errors.Wrapf(err, "failed to marshal index settings")
}
ds.jsonMappings, err = json.Marshal(ds.mappings)
if err != nil {
return ds, errors.Wrapf(err, "failed to marshal index mappings")
}
logs.Info("%s Index Mappings: %s", structType, string(ds.jsonMappings))
//define search response type
//similar to SearchResponseBody
ds.searchResponseBodyType, err = reflection.CloneType(
reflect.TypeOf(SearchResponseBody{}),
map[string]reflect.Type{
".hits.hits[]._source": ds.dataType,
})
if err != nil {
return ds, errors.Wrapf(err, "failed to make search response type for document store")
}
//define get response type
//similar to GetResponseBody
ds.getResponseBodyType, err = reflection.CloneType(
reflect.TypeOf(GetResponseBody{}),
map[string]reflect.Type{
"._source": ds.dataType,
})
if err != nil {
return ds, errors.Wrapf(err, "failed to make get response type for document store")
}
w.documentStoreByName[name] = ds
return ds, nil
}
//data must be of type specified in Writer.TimeSeries(tmpl)
func (ds *DocumentStore) Write(id string, data interface{}) error {
if data == nil {
return errors.Errorf("data:nil")
}
t := reflect.TypeOf(data)
if t != ds.dataType {
return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType)
}
//get daily search index to write to, from start time
indexName := ds.name // + "-" + startTime.Format("20060102")
if !ds.created {
res, err := ds.w.api.Create(
indexName, //index name
indexName, //document id
strings.NewReader(string(ds.jsonSettings)))
if err != nil {
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
case http.StatusCreated:
case http.StatusConflict: //409 = already exists
default:
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
res, err = opensearchapi.IndicesPutMappingRequest{
Index: []string{indexName},
Body: strings.NewReader(string(ds.jsonMappings)),
}.Do(context.Background(), ds.w.client)
if err != nil {
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
case http.StatusCreated:
case http.StatusConflict: //409 = already exists
default:
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
ds.created = true
}
if res, err := ds.w.Write(indexName, id, data); err != nil {
return err
} else {
logs.Info("IndexResponse: %+v", res)
}
return nil
}
//Search
//Return:
// docs will be a slice of the DocumentStore data type
func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) {
if ds == nil {
return nil, 0, errors.Errorf("document store == nil")
}
if limit < 0 || limit > 1000 {
err = errors.Errorf("limit=%d not 0..1000", limit)
return
}
// example search request body for free text
// {
// "size": 5,
// "query": {
// "multi_match": {
// "query": "miller",
// "fields": ["title^2", "director"]
// }
// }
// }
body := SearchRequestBody{
Size: limit,
Query: query,
}
jsonBody, _ := json.Marshal(body)
search := opensearchapi.SearchRequest{
Index: []string{ds.name},
Body: bytes.NewReader(jsonBody),
}
searchResponse, err := search.Do(context.Background(), ds.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to search documents")
return
}
switch searchResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(searchResponse.Body)
err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
return
}
bodyData, _ := ioutil.ReadAll(searchResponse.Body)
logs.Info("Response Body: %s", string(bodyData))
resBodyPtrValue := reflect.New(ds.searchResponseBodyType)
// if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
if err = json.Unmarshal(bodyData, resBodyPtrValue.Interface()); err != nil {
err = errors.Wrapf(err, "cannot decode search response body")
return
}
logs.Info("Response Parsed: %+v", resBodyPtrValue.Interface())
hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value")
if err != nil {
err = errors.Wrapf(err, "cannot get total nr of hits")
return
}
if hitsTotalValue.Interface().(int) < 1 {
return nil, 0, nil //no matches
}
foundIDs, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._id")
if err != nil {
err = errors.Wrapf(err, "cannot get search response documents")
return
}
//logs.Errorf("items: (%T) %+v", foundIDs.Interface(), foundIDs.Interface())
return foundIDs.Interface().([]string), hitsTotalValue.Interface().(int), nil
}
func (ds *DocumentStore) Get(id string) (doc interface{}, err error) {
if ds == nil {
return nil, errors.Errorf("document store == nil")
}
get := opensearchapi.GetRequest{
Index: ds.name,
DocumentType: "_doc",
DocumentID: id,
}
getResponse, err := get.Do(context.Background(), ds.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to get document")
return
}
switch getResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(getResponse.Body)
err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
return
}
resBodyPtrValue := reflect.New(ds.getResponseBodyType)
if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
err = errors.Wrapf(err, "cannot decode get response body")
return
}
foundVar, err := reflection.Get(resBodyPtrValue, ".found")
if err != nil {
err = errors.Wrapf(err, "cannot get found value")
return
}
if found, ok := foundVar.Interface().(bool); !ok || !found {
return nil, nil //not found
}
//found
source, err := reflection.Get(resBodyPtrValue, "._source")
if err != nil {
err = errors.Wrapf(err, "cannot get document from get response")
return
}
return source.Interface(), nil
}
func (ds *DocumentStore) Delete(id string) (err error) {
if ds == nil {
return errors.Errorf("document store == nil")
}
del := opensearchapi.DeleteRequest{
Index: ds.name,
DocumentType: "_doc",
DocumentID: id,
}
delResponse, err := del.Do(context.Background(), ds.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to del document")
return
}
switch delResponse.StatusCode {
case http.StatusOK:
case http.StatusNotFound:
case http.StatusNoContent:
default:
resBody, _ := ioutil.ReadAll(delResponse.Body)
err = errors.Errorf("Del failed with HTTP status %v: %s", delResponse.StatusCode, string(resBody))
return
}
return nil
}