Skip to content
Snippets Groups Projects
document_store.go 7.19 KiB
Newer Older
package opensearch
	"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
type DocumentStore struct {
	w            *Writer
	name         string
	dataType     reflect.Type
	settings     Settings
	mappings     Mappings
	jsonSettings []byte
	jsonMappings []byte
	created      bool
// NewDocumentStore purpose:
//	create a document store index to write e.g. orders then allow one to opensearch them
//	name must be the complete openSearch index name e.g. "uafrica-v3-orders"
//	tmpl must be your document data struct consisting of public fields as:
//		Xxx string `json:"<name>" search:"keyword|text|long|date"`	(can later add more types)
//		Xxx time.Time `json:"<name>"`								assumes type "date" for opensearch
//		Xxx int `json:"<name>"`										assumes type "long" for opensearch, specify keyword if required
func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, error) {
	ds := DocumentStore{}
		return ds, errors.Errorf("invalid index_name:\"%s\"", name)
	// if already created, just return
	if existingDocumentStore, ok := w.documentStoreByName[name]; ok {
		return existingDocumentStore, nil
	}

	structType := reflect.TypeOf(tmpl)
	if tmpl == nil || structType.Kind() != reflect.Struct {
		return ds, errors.Errorf("%T is not a struct", tmpl)
	// define the OpenSearch index mapping
	ds.settings = Settings{
		Index: &SettingsIndex{
			NumberOfShards:   4,
			NumberOfReplicas: 0,
		},
	}

	if properties, err := structMappingProperties(structType); err != nil {
		return ds, errors.Wrapf(err, "cannot map struct %s", structType)
	} else {
		ds.mappings = Mappings{
			Properties: properties,
		}
	}

	var err error
	ds.jsonSettings, err = json.Marshal(ds.settings)
	if err != nil {
		return ds, errors.Wrapf(err, "failed to marshal index settings")
	}
	ds.jsonMappings, err = json.Marshal(ds.mappings)
	if err != nil {
		return ds, errors.Wrapf(err, "failed to marshal index mappings")
// data must be of type specified in Writer.TimeSeries(tmpl)
func (ds *DocumentStore) Write(id string, data interface{}) error {
	if data == nil {
		return errors.Errorf("data:nil")
	}
	t := reflect.TypeOf(data)
	if t != ds.dataType {
		return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType)
	}

	// get daily opensearch index to write to, from start time
	indexName := ds.name // + "-" + startTime.Format("20060102")
	if !ds.created {
		res, err := ds.w.api.Create(
			indexName, // index name
			indexName, // document id
			strings.NewReader(string(ds.jsonSettings)))
		if err != nil {
			if res != nil {
				res.Body.Close()
			}
			return errors.Wrapf(err, "failed to create index(%s)", indexName)
		}
		switch res.StatusCode {
		case http.StatusOK:
		case http.StatusCreated:
		case http.StatusConflict: // 409 = already exists
			if res != nil {
				res.Body.Close()
			}
			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
		}

		res, err = opensearchapi.IndicesPutMappingRequest{
			Index: []string{indexName},
			Body:  strings.NewReader(string(ds.jsonMappings)),
		}.Do(context.Background(), ds.w.client)
		if err != nil {
			if res != nil {
				res.Body.Close()
			}
			return errors.Wrapf(err, "failed to create index(%s)", indexName)
		}
		switch res.StatusCode {
		case http.StatusOK:
		case http.StatusCreated:
		case http.StatusConflict: // 409 = already exists
			if res != nil {
				res.Body.Close()
			}
			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
		}
		ds.created = true
	}
Francé Wilke's avatar
Francé Wilke committed
	if _, err := ds.w.Write(indexName, id, data); err != nil {
//	docs will be a slice of the DocumentStore data type
func (ds *DocumentStore) Search(query Query, limit int64) (res *SearchResponseHits, err error) {
		return nil, errors.Errorf("document store == nil")
	}
	if limit < 0 || limit > 1000 {
		err = errors.Errorf("limit=%d not 0..1000", limit)
		return
	}

	// example opensearch request body for free text
	// 	{
	// 		"size": 5,
	// 		"query": {
	// 			"multi_match": {
	// 				"query": "miller",
	// 				"fields": ["title^2", "director"]
	// 			}
	//    	}
	//  }
	body := SearchRequestBody{
		Size:    limit,
		Query:   query,
		Timeout: Timeout,
	jsonBody, _ := json.Marshal(body)
	search := opensearchapi.SearchRequest{
		Index: []string{ds.name},
		Body:  bytes.NewReader(jsonBody),
	}

	searchResponse, err := search.Do(context.Background(), ds.w.client)
	if err != nil {
		err = errors.Wrapf(err, "failed to opensearch documents")
		return
	}

	switch searchResponse.StatusCode {
	case http.StatusOK:
	default:
		resBody, _ := ioutil.ReadAll(searchResponse.Body)
		err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
		return
	}

	bodyData, _ := ioutil.ReadAll(searchResponse.Body)

	var response SearchResponseBody
	err = json.Unmarshal(bodyData, &response)
		logs.Info("opensearch response body: %s", string(bodyData))
		err = errors.Wrapf(err, "cannot decode opensearch response body")
	return &response.Hits, nil
func (ds *DocumentStore) Get(id string) (res *GetResponseBody, err error) {
	if ds == nil {
		return nil, errors.Errorf("document store == nil")
	}
	get := opensearchapi.GetRequest{
		Index:      ds.name,
		DocumentID: id,
	}
	getResponse, err := get.Do(context.Background(), ds.w.client)
	if err != nil {
		err = errors.Wrapf(err, "failed to get document")
		return
	}

	switch getResponse.StatusCode {
	case http.StatusOK:
	default:
		resBody, _ := ioutil.ReadAll(getResponse.Body)
		err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
		return
	}

	bodyData, _ := ioutil.ReadAll(getResponse.Body)
	var response GetResponseBody
	err = json.Unmarshal(bodyData, &response)
		logs.Info("GET response body: %s", string(bodyData))
		err = errors.Wrapf(err, "cannot unmarshal GET response body")
	return &response, nil
func (ds *DocumentStore) Delete(id string) (err error) {
	if ds == nil {
		return errors.Errorf("document store == nil")
	}

	var delResponse *opensearchapi.Response

	defer func() {
		if delResponse != nil && delResponse.Body != nil {
			delResponse.Body.Close()
		}
	}()

	del := opensearchapi.DeleteRequest{
		Index:      ds.name,
		DocumentID: id,
	delResponse, err = del.Do(context.Background(), ds.w.client)
		return errors.Wrapf(err, "failed to del document")
	}

	switch delResponse.StatusCode {
	case http.StatusOK:
	case http.StatusNotFound:
	case http.StatusNoContent:
	default:
		resBody, _ := ioutil.ReadAll(delResponse.Body)
		return errors.Errorf("Del failed with HTTP status %v: %s", delResponse.StatusCode, string(resBody))