Newer
Older
Jan Semmelink
committed
import (
"bytes"
"context"
"encoding/json"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
Jan Semmelink
committed
"io/ioutil"
"net/http"
"reflect"
"strings"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
Jan Semmelink
committed
)
type DocumentStore struct {
w *Writer
name string
dataType reflect.Type
settings Settings
mappings Mappings
jsonSettings []byte
jsonMappings []byte
created bool
Jan Semmelink
committed
}
// NewDocumentStore purpose:
// create a document store index to write e.g. orders then allow one to opensearch them
Jan Semmelink
committed
// name must be the complete openSearch index name e.g. "uafrica-v3-orders"
// tmpl must be your document data struct consisting of public fields as:
// Xxx string `json:"<name>" search:"keyword|text|long|date"` (can later add more types)
// Xxx time.Time `json:"<name>"` assumes type "date" for opensearch
// Xxx int `json:"<name>"` assumes type "long" for opensearch, specify keyword if required
func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, error) {
ds := DocumentStore{}
Jan Semmelink
committed
if !indexNameRegex.MatchString(name) {
return ds, errors.Errorf("invalid index_name:\"%s\"", name)
Jan Semmelink
committed
}
// if already created, just return
Jan Semmelink
committed
if existingDocumentStore, ok := w.documentStoreByName[name]; ok {
return existingDocumentStore, nil
}
structType := reflect.TypeOf(tmpl)
if tmpl == nil || structType.Kind() != reflect.Struct {
return ds, errors.Errorf("%T is not a struct", tmpl)
Jan Semmelink
committed
}
ds = DocumentStore{
Jan Semmelink
committed
w: w,
name: name,
dataType: structType,
created: false,
}
// define the OpenSearch index mapping
Jan Semmelink
committed
ds.settings = Settings{
Index: &SettingsIndex{
NumberOfShards: 4,
NumberOfReplicas: 0,
},
}
if properties, err := structMappingProperties(structType); err != nil {
return ds, errors.Wrapf(err, "cannot map struct %s", structType)
Jan Semmelink
committed
} else {
ds.mappings = Mappings{
Properties: properties,
}
}
var err error
ds.jsonSettings, err = json.Marshal(ds.settings)
if err != nil {
return ds, errors.Wrapf(err, "failed to marshal index settings")
Jan Semmelink
committed
}
ds.jsonMappings, err = json.Marshal(ds.mappings)
if err != nil {
return ds, errors.Wrapf(err, "failed to marshal index mappings")
Jan Semmelink
committed
}
logs.Info("%s Index Mappings: %s", structType, string(ds.jsonMappings))
Jan Semmelink
committed
w.documentStoreByName[name] = ds
return ds, nil
}
// data must be of type specified in Writer.TimeSeries(tmpl)
func (ds *DocumentStore) Write(id string, data interface{}) error {
Jan Semmelink
committed
if data == nil {
return errors.Errorf("data:nil")
}
t := reflect.TypeOf(data)
if t != ds.dataType {
return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType)
}
// get daily opensearch index to write to, from start time
Jan Semmelink
committed
indexName := ds.name // + "-" + startTime.Format("20060102")
if !ds.created {
res, err := ds.w.api.Create(
indexName, // index name
indexName, // document id
Jan Semmelink
committed
strings.NewReader(string(ds.jsonSettings)))
if res != nil {
res.Body.Close()
}
Jan Semmelink
committed
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
case http.StatusCreated:
case http.StatusConflict: // 409 = already exists
Jan Semmelink
committed
default:
if res != nil {
res.Body.Close()
}
Jan Semmelink
committed
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
res, err = opensearchapi.IndicesPutMappingRequest{
Index: []string{indexName},
Body: strings.NewReader(string(ds.jsonMappings)),
}.Do(context.Background(), ds.w.client)
if err != nil {
if res != nil {
res.Body.Close()
}
Jan Semmelink
committed
return errors.Wrapf(err, "failed to create index(%s)", indexName)
}
switch res.StatusCode {
case http.StatusOK:
case http.StatusCreated:
case http.StatusConflict: // 409 = already exists
Jan Semmelink
committed
default:
if res != nil {
res.Body.Close()
}
Jan Semmelink
committed
return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
}
ds.created = true
}
if indexResponse, err := ds.w.Write(indexName, id, data); err != nil {
Jan Semmelink
committed
return err
} else {
logs.Info("IndexResponse: %+v", indexResponse)
Jan Semmelink
committed
}
return nil
}
// Search
// Return:
Jan Semmelink
committed
// docs will be a slice of the DocumentStore data type
func (ds *DocumentStore) Search(query Query, limit int64) (res *SearchResponseHits, err error) {
Jan Semmelink
committed
if ds == nil {
return nil, errors.Errorf("document store == nil")
Jan Semmelink
committed
}
if limit < 0 || limit > 1000 {
err = errors.Errorf("limit=%d not 0..1000", limit)
return
}
// example opensearch request body for free text
Jan Semmelink
committed
// {
// "size": 5,
// "query": {
// "multi_match": {
// "query": "miller",
// "fields": ["title^2", "director"]
// }
// }
// }
body := SearchRequestBody{
Size: limit,
Query: query,
Jan Semmelink
committed
}
Jan Semmelink
committed
jsonBody, _ := json.Marshal(body)
search := opensearchapi.SearchRequest{
Index: []string{ds.name},
Body: bytes.NewReader(jsonBody),
Jan Semmelink
committed
}
searchResponse, err := search.Do(context.Background(), ds.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to opensearch documents")
Jan Semmelink
committed
return
}
switch searchResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(searchResponse.Body)
err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
return
}
bodyData, _ := ioutil.ReadAll(searchResponse.Body)
var response SearchResponseBody
err = json.Unmarshal(bodyData, &response)
Jan Semmelink
committed
if err != nil {
logs.Info("opensearch response body: %s", string(bodyData))
err = errors.Wrapf(err, "cannot decode opensearch response body")
Jan Semmelink
committed
return
}
return &response.Hits, nil
Jan Semmelink
committed
}
func (ds *DocumentStore) Get(id string) (res *GetResponseBody, err error) {
Jan Semmelink
committed
if ds == nil {
return nil, errors.Errorf("document store == nil")
}
get := opensearchapi.GetRequest{
Index: ds.name,
DocumentID: id,
Jan Semmelink
committed
}
getResponse, err := get.Do(context.Background(), ds.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to get document")
return
}
switch getResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(getResponse.Body)
err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
return
}
bodyData, _ := ioutil.ReadAll(getResponse.Body)
logs.Info("Get Response Body: %s", string(bodyData))
Jan Semmelink
committed
var response GetResponseBody
err = json.Unmarshal(bodyData, &response)
Jan Semmelink
committed
if err != nil {
logs.Info("GET response body: %s", string(bodyData))
err = errors.Wrapf(err, "cannot unmarshal GET response body")
Jan Semmelink
committed
return
}
Jan Semmelink
committed
}
func (ds *DocumentStore) Delete(id string) (err error) {
if ds == nil {
return errors.Errorf("document store == nil")
}
var delResponse *opensearchapi.Response
defer func() {
if delResponse != nil && delResponse.Body != nil {
delResponse.Body.Close()
}
}()
del := opensearchapi.DeleteRequest{
Index: ds.name,
DocumentID: id,
delResponse, err = del.Do(context.Background(), ds.w.client)
return errors.Wrapf(err, "failed to del document")
}
switch delResponse.StatusCode {
case http.StatusOK:
case http.StatusNotFound:
case http.StatusNoContent:
default:
resBody, _ := ioutil.ReadAll(delResponse.Body)
return errors.Errorf("Del failed with HTTP status %v: %s", delResponse.StatusCode, string(resBody))