Skip to content
Snippets Groups Projects
Select Git revision
  • a30cb2dcee71faecd6188ab78baca929ced4eb34
  • main default protected
  • trading_hours
  • refactor_trading_hours
  • audit_cleaning_cater_for_non_struct_fields
  • remove-info-logs
  • sl-refactor
  • 18-use-scan-for-param-values
  • 17-order-search-results
  • 4-simplify-framework-2
  • 1-http-error
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
  • v1.278.0
31 results

document_store.go

Blame
  • document_store.go 8.86 KiB
    package search
    
    import (
    	"bytes"
    	"context"
    	"encoding/json"
    	"io/ioutil"
    	"net/http"
    	"reflect"
    	"strings"
    
    	opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/logs"
    	"gitlab.com/uafrica/go-utils/reflection"
    )
    
    type DocumentStore struct {
    	w                      *Writer
    	name                   string
    	dataType               reflect.Type
    	settings               Settings
    	mappings               Mappings
    	jsonSettings           []byte
    	jsonMappings           []byte
    	created                bool
    	searchResponseBodyType reflect.Type
    	getResponseBodyType    reflect.Type
    }
    
    // NewDocumentStore purpose:
    //	create a document store index to write e.g. orders then allow one to search them
    // parameters:
    //	name must be the complete openSearch index name e.g. "uafrica-v3-orders"
    //	tmpl must be your document data struct consisting of public fields as:
    //		Xxx string `json:"<name>" search:"keyword|text|long|date"`	(can later add more types)
    //		Xxx time.Time `json:"<name>"`								assumes type "date" for opensearch
    //		Xxx int `json:"<name>"`										assumes type "long" for opensearch, specify keyword if required
    func (w *Writer) NewDocumentStore(name string, tmpl interface{}) (DocumentStore, error) {
    	ds := DocumentStore{}
    	if !indexNameRegex.MatchString(name) {
    		return ds, errors.Errorf("invalid index_name:\"%s\"", name)
    	}
    
    	//if already created, just return
    	if existingDocumentStore, ok := w.documentStoreByName[name]; ok {
    		return existingDocumentStore, nil
    	}
    
    	structType := reflect.TypeOf(tmpl)
    	if tmpl == nil || structType.Kind() != reflect.Struct {
    		return ds, errors.Errorf("%T is not a struct", tmpl)
    	}
    
    	ds = DocumentStore{
    		w:        w,
    		name:     name,
    		dataType: structType,
    		created:  false,
    	}
    
    	//define the OpenSearch index mapping
    	ds.settings = Settings{
    		Index: &SettingsIndex{
    			NumberOfShards:   4,
    			NumberOfReplicas: 0,
    		},
    	}
    
    	if properties, err := structMappingProperties(structType); err != nil {
    		return ds, errors.Wrapf(err, "cannot map struct %s", structType)
    	} else {
    		ds.mappings = Mappings{
    			Properties: properties,
    		}
    	}
    
    	var err error
    	ds.jsonSettings, err = json.Marshal(ds.settings)
    	if err != nil {
    		return ds, errors.Wrapf(err, "failed to marshal index settings")
    	}
    	ds.jsonMappings, err = json.Marshal(ds.mappings)
    	if err != nil {
    		return ds, errors.Wrapf(err, "failed to marshal index mappings")
    	}
    	logs.Info("%s Index Mappings: %s", structType, string(ds.jsonMappings))
    
    	//define search response type
    	//similar to SearchResponseBody
    	ds.searchResponseBodyType, err = reflection.CloneType(
    		reflect.TypeOf(SearchResponseBody{}),
    		map[string]reflect.Type{
    			".hits.hits[]._source": ds.dataType,
    		})
    	if err != nil {
    		return ds, errors.Wrapf(err, "failed to make search response type for document store")
    	}
    
    	//define get response type
    	//similar to GetResponseBody
    	ds.getResponseBodyType, err = reflection.CloneType(
    		reflect.TypeOf(GetResponseBody{}),
    		map[string]reflect.Type{
    			"._source": ds.dataType,
    		})
    	if err != nil {
    		return ds, errors.Wrapf(err, "failed to make get response type for document store")
    	}
    	w.documentStoreByName[name] = ds
    	return ds, nil
    }
    
    //data must be of type specified in Writer.TimeSeries(tmpl)
    func (ds *DocumentStore) Write(id string, data interface{}) error {
    	if data == nil {
    		return errors.Errorf("data:nil")
    	}
    	t := reflect.TypeOf(data)
    	if t != ds.dataType {
    		return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType)
    	}
    
    	//get daily search index to write to, from start time
    	indexName := ds.name // + "-" + startTime.Format("20060102")
    	if !ds.created {
    		res, err := ds.w.api.Create(
    			indexName, //index name
    			indexName, //document id
    			strings.NewReader(string(ds.jsonSettings)))
    		if err != nil {
    			return errors.Wrapf(err, "failed to create index(%s)", indexName)
    		}
    		switch res.StatusCode {
    		case http.StatusOK:
    		case http.StatusCreated:
    		case http.StatusConflict: //409 = already exists
    		default:
    			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
    		}
    
    		res, err = opensearchapi.IndicesPutMappingRequest{
    			Index: []string{indexName},
    			Body:  strings.NewReader(string(ds.jsonMappings)),
    		}.Do(context.Background(), ds.w.client)
    		if err != nil {
    			return errors.Wrapf(err, "failed to create index(%s)", indexName)
    		}
    		switch res.StatusCode {
    		case http.StatusOK:
    		case http.StatusCreated:
    		case http.StatusConflict: //409 = already exists
    		default:
    			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
    		}
    		ds.created = true
    	}
    	if res, err := ds.w.Write(indexName, id, data); err != nil {
    		return err
    	} else {
    		logs.Info("IndexResponse: %+v", res)
    	}
    	return nil
    }
    
    //Search
    //Return:
    //	docs will be a slice of the DocumentStore data type
    func (ds *DocumentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) {
    	if ds == nil {
    		return nil, 0, errors.Errorf("document store == nil")
    	}
    	if limit < 0 || limit > 1000 {
    		err = errors.Errorf("limit=%d not 0..1000", limit)
    		return
    	}
    
    	// example search request body for free text
    	// 	{
    	// 		"size": 5,
    	// 		"query": {
    	// 			"multi_match": {
    	// 				"query": "miller",
    	// 				"fields": ["title^2", "director"]
    	// 			}
    	//    	}
    	//  }
    	body := SearchRequestBody{
    		Size:  limit,
    		Query: query,
    	}
    
    	jsonBody, _ := json.Marshal(body)
    	search := opensearchapi.SearchRequest{
    		Index: []string{ds.name},
    		Body:  bytes.NewReader(jsonBody),
    	}
    
    	searchResponse, err := search.Do(context.Background(), ds.w.client)
    	if err != nil {
    		err = errors.Wrapf(err, "failed to search documents")
    		return
    	}
    
    	switch searchResponse.StatusCode {
    	case http.StatusOK:
    	default:
    		resBody, _ := ioutil.ReadAll(searchResponse.Body)
    		err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
    		return
    	}
    
    	bodyData, _ := ioutil.ReadAll(searchResponse.Body)
    	logs.Info("Response Body: %s", string(bodyData))
    
    	resBodyPtrValue := reflect.New(ds.searchResponseBodyType)
    	// if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
    	if err = json.Unmarshal(bodyData, resBodyPtrValue.Interface()); err != nil {
    		err = errors.Wrapf(err, "cannot decode search response body")
    		return
    	}
    
    	logs.Info("Response Parsed: %+v", resBodyPtrValue.Interface())
    
    	hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value")
    	if err != nil {
    		err = errors.Wrapf(err, "cannot get total nr of hits")
    		return
    	}
    	if hitsTotalValue.Interface().(int) < 1 {
    		return nil, 0, nil //no matches
    	}
    
    	foundIDs, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._id")
    	if err != nil {
    		err = errors.Wrapf(err, "cannot get search response documents")
    		return
    	}
    	//logs.Errorf("items: (%T) %+v", foundIDs.Interface(), foundIDs.Interface())
    	return foundIDs.Interface().([]string), hitsTotalValue.Interface().(int), nil
    }
    
    func (ds *DocumentStore) Get(id string) (doc interface{}, err error) {
    	if ds == nil {
    		return nil, errors.Errorf("document store == nil")
    	}
    	get := opensearchapi.GetRequest{
    		Index:        ds.name,
    		DocumentType: "_doc",
    		DocumentID:   id,
    	}
    	getResponse, err := get.Do(context.Background(), ds.w.client)
    	if err != nil {
    		err = errors.Wrapf(err, "failed to get document")
    		return
    	}
    
    	switch getResponse.StatusCode {
    	case http.StatusOK:
    	default:
    		resBody, _ := ioutil.ReadAll(getResponse.Body)
    		err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
    		return
    	}
    
    	resBodyPtrValue := reflect.New(ds.getResponseBodyType)
    	if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
    		err = errors.Wrapf(err, "cannot decode get response body")
    		return
    	}
    
    	foundVar, err := reflection.Get(resBodyPtrValue, ".found")
    	if err != nil {
    		err = errors.Wrapf(err, "cannot get found value")
    		return
    	}
    	if found, ok := foundVar.Interface().(bool); !ok || !found {
    		return nil, nil //not found
    	}
    
    	//found
    	source, err := reflection.Get(resBodyPtrValue, "._source")
    	if err != nil {
    		err = errors.Wrapf(err, "cannot get document from get response")
    		return
    	}
    	return source.Interface(), nil
    }
    
    func (ds *DocumentStore) Delete(id string) (err error) {
    	if ds == nil {
    		return errors.Errorf("document store == nil")
    	}
    	del := opensearchapi.DeleteRequest{
    		Index:        ds.name,
    		DocumentType: "_doc",
    		DocumentID:   id,
    	}
    	delResponse, err := del.Do(context.Background(), ds.w.client)
    	if err != nil {
    		err = errors.Wrapf(err, "failed to del document")
    		return
    	}
    
    	switch delResponse.StatusCode {
    	case http.StatusOK:
    	case http.StatusNotFound:
    	case http.StatusNoContent:
    	default:
    		resBody, _ := ioutil.ReadAll(delResponse.Body)
    		err = errors.Errorf("Del failed with HTTP status %v: %s", delResponse.StatusCode, string(resBody))
    		return
    	}
    	return nil
    }