From a4c71c057874b3781438f301b3f8857a5878f059 Mon Sep 17 00:00:00 2001
From: Jan Semmelink <jan@uafrica.com>
Date: Thu, 11 Nov 2021 15:32:41 +0200
Subject: [PATCH] Added DocumentStore() to search, which will be used in V3 to
 store orders in OpenSearch then do text searches on them

---
 search/document_store.go      | 285 ++++++++++++++++++++++++++++++++++
 search/document_store_test.go | 138 ++++++++++++++++
 search/opensearch_types.go    |  22 +++
 search/time_series.go         |  15 +-
 search/writer.go              |  34 ++--
 5 files changed, 476 insertions(+), 18 deletions(-)
 create mode 100644 search/document_store.go
 create mode 100644 search/document_store_test.go

diff --git a/search/document_store.go b/search/document_store.go
new file mode 100644
index 0000000..a8c8bd9
--- /dev/null
+++ b/search/document_store.go
@@ -0,0 +1,285 @@
+package search
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"io/ioutil"
+	"net/http"
+	"reflect"
+	"strings"
+
+	opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
+	"gitlab.com/uafrica/go-utils/errors"
+	"gitlab.com/uafrica/go-utils/logger"
+	"gitlab.com/uafrica/go-utils/reflection"
+)
+
+type DocumentStore interface {
+	Write(id string, data interface{}) error
+	Search(query Query, limit int64) (ids []string, totalCount int, err error)
+	Get(id string) (doc interface{}, err error)
+}
+
+type documentStore struct {
+	w                      *writer
+	name                   string
+	dataType               reflect.Type
+	settings               Settings
+	mappings               Mappings
+	jsonSettings           []byte
+	jsonMappings           []byte
+	created                bool
+	searchResponseBodyType reflect.Type
+	getResponseBodyType    reflect.Type
+}
+
+//purpose:
+//	create a document store index to write e.g. orders then allow one to search them
+//parameters:
+//	name must be the complete openSearch index name e.g. "uafrica-v3-orders"
+//	tmpl must be your document data struct consisting of public fields as:
+//		Xxx string `json:"<name>" search:"keyword|text|long|date"`	(can later add more types)
+//		Xxx time.Time `json:"<name>"`								assumes type "date" for opensearch
+//		Xxx int `json:"<name>"`										assumes type "long" for opensearch, specify keyword if required
+func (w *writer) DocumentStore(name string, tmpl interface{}) (DocumentStore, error) {
+	if !indexNameRegex.MatchString(name) {
+		return nil, errors.Errorf("invalid index_name:\"%s\"", name)
+	}
+
+	//if already created, just return
+	if existingDocumentStore, ok := w.documentStoreByName[name]; ok {
+		return existingDocumentStore, nil
+	}
+
+	structType := reflect.TypeOf(tmpl)
+	if tmpl == nil || structType.Kind() != reflect.Struct {
+		return nil, errors.Errorf("%T is not a struct", tmpl)
+	}
+
+	ds := &documentStore{
+		w:        w,
+		name:     name,
+		dataType: structType,
+		created:  false,
+	}
+
+	//define the OpenSearch index mapping
+	ds.settings = Settings{
+		Index: &SettingsIndex{
+			NumberOfShards:   4,
+			NumberOfReplicas: 0,
+		},
+	}
+
+	if properties, err := structMappingProperties(structType); err != nil {
+		return nil, errors.Wrapf(err, "cannot map struct %s", structType)
+	} else {
+		ds.mappings = Mappings{
+			Properties: properties,
+		}
+	}
+
+	var err error
+	ds.jsonSettings, err = json.Marshal(ds.settings)
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to marshal index settings")
+	}
+	ds.jsonMappings, err = json.Marshal(ds.mappings)
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to marshal index mappings")
+	}
+	logger.Debugf("%s Index Mappings: %s", structType, string(ds.jsonMappings))
+
+	//define search response type
+	//similar to SearchResponseBody
+	ds.searchResponseBodyType, err = reflection.CloneType(
+		reflect.TypeOf(SearchResponseBody{}),
+		map[string]reflect.Type{
+			".hits.hits[]._source": ds.dataType,
+		})
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to make search response type for document store")
+	}
+
+	//define get response type
+	//similar to GetResponseBody
+	ds.getResponseBodyType, err = reflection.CloneType(
+		reflect.TypeOf(GetResponseBody{}),
+		map[string]reflect.Type{
+			"._source": ds.dataType,
+		})
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to make get response type for document store")
+	}
+	w.documentStoreByName[name] = ds
+	return ds, nil
+}
+
+//data must be of type specified in Writer.TimeSeries(tmpl)
+func (ds *documentStore) Write(id string, data interface{}) error {
+	if data == nil {
+		return errors.Errorf("data:nil")
+	}
+	t := reflect.TypeOf(data)
+	if t != ds.dataType {
+		return errors.Errorf("cannot write %T into DocumentStore(%s), expecting %s", data, ds.name, ds.dataType)
+	}
+
+	//get daily search index to write to, from start time
+	indexName := ds.name // + "-" + startTime.Format("20060102")
+	if !ds.created {
+		res, err := ds.w.api.Create(
+			indexName, //index name
+			indexName, //document id
+			strings.NewReader(string(ds.jsonSettings)))
+		if err != nil {
+			return errors.Wrapf(err, "failed to create index(%s)", indexName)
+		}
+		switch res.StatusCode {
+		case http.StatusOK:
+		case http.StatusCreated:
+		case http.StatusConflict: //409 = already exists
+		default:
+			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
+		}
+
+		res, err = opensearchapi.IndicesPutMappingRequest{
+			Index: []string{indexName},
+			Body:  strings.NewReader(string(ds.jsonMappings)),
+		}.Do(context.Background(), ds.w.client)
+		if err != nil {
+			return errors.Wrapf(err, "failed to create index(%s)", indexName)
+		}
+		switch res.StatusCode {
+		case http.StatusOK:
+		case http.StatusCreated:
+		case http.StatusConflict: //409 = already exists
+		default:
+			return errors.Errorf("failed to create index(%s): %v %s %s", indexName, res.StatusCode, res.Status(), res.String())
+		}
+		ds.created = true
+	}
+	if res, err := ds.w.Write(indexName, id, data); err != nil {
+		return err
+	} else {
+		logger.Debugf("IndexResponse: %+v", res)
+	}
+	return nil
+}
+
+//Search
+//Return:
+//	docs will be a slice of the DocumentStore data type
+func (ds *documentStore) Search(query Query, limit int64) (ids []string, totalCount int, err error) {
+	if ds == nil {
+		return nil, 0, errors.Errorf("document store == nil")
+	}
+	if limit < 0 || limit > 1000 {
+		err = errors.Errorf("limit=%d not 0..1000", limit)
+		return
+	}
+
+	// example search request body for free text
+	// 	{
+	// 		"size": 5,
+	// 		"query": {
+	// 			"multi_match": {
+	// 				"query": "miller",
+	// 				"fields": ["title^2", "director"]
+	// 			}
+	//    	}
+	//  }
+	body := SearchRequestBody{
+		Size:  limit,
+		Query: query,
+	}
+	jsonBody, _ := json.Marshal(body)
+	search := opensearchapi.SearchRequest{
+		Body: bytes.NewReader(jsonBody),
+	}
+
+	searchResponse, err := search.Do(context.Background(), ds.w.client)
+	if err != nil {
+		err = errors.Wrapf(err, "failed to search documents")
+		return
+	}
+
+	switch searchResponse.StatusCode {
+	case http.StatusOK:
+	default:
+		resBody, _ := ioutil.ReadAll(searchResponse.Body)
+		err = errors.Errorf("Search failed with HTTP status %v: %s", searchResponse.StatusCode, string(resBody))
+		return
+	}
+
+	resBodyPtrValue := reflect.New(ds.searchResponseBodyType)
+	if err = json.NewDecoder(searchResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
+		err = errors.Wrapf(err, "cannot decode search response body")
+		return
+	}
+
+	hitsTotalValue, err := reflection.Get(resBodyPtrValue, ".hits.total.value")
+	if err != nil {
+		err = errors.Wrapf(err, "cannot get total nr of hits")
+		return
+	}
+	if hitsTotalValue.Interface().(int) < 1 {
+		return nil, 0, nil //no matches
+	}
+
+	foundIDs, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._id")
+	if err != nil {
+		err = errors.Wrapf(err, "cannot get search response documents")
+		return
+	}
+	logger.Errorf("items: (%T) %+v", foundIDs.Interface(), foundIDs.Interface())
+	return foundIDs.Interface().([]string), hitsTotalValue.Interface().(int), nil
+}
+
+func (ds *documentStore) Get(id string) (doc interface{}, err error) {
+	if ds == nil {
+		return nil, errors.Errorf("document store == nil")
+	}
+	get := opensearchapi.GetRequest{
+		Index:        ds.name,
+		DocumentType: "_doc",
+		DocumentID:   id,
+	}
+	getResponse, err := get.Do(context.Background(), ds.w.client)
+	if err != nil {
+		err = errors.Wrapf(err, "failed to get document")
+		return
+	}
+
+	switch getResponse.StatusCode {
+	case http.StatusOK:
+	default:
+		resBody, _ := ioutil.ReadAll(getResponse.Body)
+		err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
+		return
+	}
+
+	resBodyPtrValue := reflect.New(ds.getResponseBodyType)
+	if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
+		err = errors.Wrapf(err, "cannot decode get response body")
+		return
+	}
+
+	foundVar, err := reflection.Get(resBodyPtrValue, ".found")
+	if err != nil {
+		err = errors.Wrapf(err, "cannot get found value")
+		return
+	}
+	if found, ok := foundVar.Interface().(bool); !ok || !found {
+		return nil, nil //not found
+	}
+
+	//found
+	source, err := reflection.Get(resBodyPtrValue, "._source")
+	if err != nil {
+		err = errors.Wrapf(err, "cannot get document from get response")
+		return
+	}
+	return source.Interface(), nil
+}
diff --git a/search/document_store_test.go b/search/document_store_test.go
new file mode 100644
index 0000000..788d43a
--- /dev/null
+++ b/search/document_store_test.go
@@ -0,0 +1,138 @@
+package search_test
+
+import (
+	"math/rand"
+	"testing"
+
+	"github.com/google/uuid"
+	"gitlab.com/uafrica/go-utils/logger"
+	"gitlab.com/uafrica/go-utils/search"
+)
+
+func TestLocalDocuments(t *testing.T) {
+	testDocuments(t, search.Config{
+		Addresses: []string{"https://localhost:9200"},
+	})
+}
+
+func TestDevDocuments(t *testing.T) {
+	testDocuments(t, search.Config{
+		Addresses: []string{"https://search-uafrica-v3-api-logs-fefgiypvmb3sg5wqohgsbqnzvq.af-south-1.es.amazonaws.com/"}, //from AWS Console OpenSearch Service > Domains > uafrica-v3-api-logs > General Information: Domain Endpoints
+		Username:  "uafrica",
+		Password:  "Aiz}a4ee",
+	})
+}
+
+func testDocuments(t *testing.T, c search.Config) {
+	logger.SetGlobalFormat(logger.NewConsole())
+	logger.SetGlobalLevel(logger.LevelDebug)
+	a, err := search.New(c)
+	if err != nil {
+		t.Fatalf("failed to create writer: %+v", err)
+	}
+
+	indexName := "go-utils-search-docs-test"
+	ds, err := a.DocumentStore(indexName, testDocument{})
+	if err != nil {
+		t.Fatalf("failed to create document store: %+v", err)
+	}
+
+	//write N documents
+	buyers := []string{"Joe", "Anne", "Griet", "Kobus", "Caleb", "Roger", "Susan", "Maria", "Sandra"}
+	sellers := []string{"Hannelie", "Angelica", "Louis", "Bertus", "Bongi", "Vusi", "Andrew", "Joseph"}
+	nextItem := 0
+	itemInfos := []testItemInfo{
+		{Name: "Samsung 17\" LCD Monitor", Cost: 0},
+		{Name: "Acer 15\" LED Monitor", Cost: 0},
+		{Name: "Apple M1 16\" MAC", Cost: 0},
+		{Name: "Red Dress size M", Cost: 0},
+		{Name: "Grey Shorts size 115cm", Cost: 0},
+		{Name: "Black Student Prince School Shoes Boys Size 8", Cost: 0},
+		{Name: "Black Student Prince School Shoes Boys Size 9", Cost: 0},
+		{Name: "Black Student Prince School Shoes Boys Size 10", Cost: 0},
+		{Name: "Black Student Prince School Shoes Boys Size 11", Cost: 0},
+		{Name: "Black Student Prince School Shoes Girst Size 6", Cost: 0},
+		{Name: "Black Student Prince School Shoes Girst Size 7", Cost: 0},
+		{Name: "Black Student Prince School Shoes Girst Size 8", Cost: 0},
+		{Name: "Faber Castell HB Pencil", Cost: 0},
+		{Name: "Faber Castell 2H Pencil", Cost: 0},
+		{Name: "Faber Castell 4H Pencil", Cost: 0},
+		{Name: "12 Colour Crayons", Cost: 0},
+		{Name: "Steadler Rubber", Cost: 0},
+	}
+	N := 100
+	for i := 0; i < N; i++ {
+		//make a random document
+		id := uuid.New().String()
+		doc := testDocument{
+			Buyer:  buyers[rand.Intn(len(buyers))],
+			Seller: sellers[rand.Intn(len(sellers))],
+			Items:  []docItem{},
+		}
+		for i := 0; i < rand.Intn(5)+1; i++ {
+			itemInfo := itemInfos[nextItem]
+			nextItem++
+			if nextItem >= len(itemInfos) {
+				nextItem = 0
+			}
+			item := docItem{
+				Description: itemInfo.Name,
+				UnitCost:    itemInfo.Cost,
+				Qty:         rand.Intn(3) + 1,
+			}
+			doc.Items = append(doc.Items, item)
+			doc.TotalCost += item.UnitCost * float64(item.Qty)
+			doc.TotalItems += item.Qty
+		}
+
+		//add to the search
+		if err := ds.Write(id, doc); err != nil {
+			t.Fatalf("failed to add doc: %+v", err)
+		}
+	}
+
+	//search some of the documents
+	query := search.Query{
+		MultiMatch: &search.QueryMultiMatch{
+			Query:  buyers[0],
+			Fields: []string{"buyer"},
+		},
+	}
+	ids, totalCount, err := ds.Search(query, 10)
+	if err != nil {
+		t.Errorf("failed to search: %+v", err)
+	} else {
+		t.Logf("search result total_count:%d with %d ids", totalCount, len(ids))
+		if len(ids) > 10 {
+			t.Errorf("got %d docs > max 10", len(ids))
+		}
+		for _, id := range ids {
+			if getDoc, err := ds.Get(id); err != nil {
+				t.Fatalf("failed to get: %+v", err)
+			} else {
+				t.Logf("   GOT: %s: %+v", id, getDoc)
+			}
+		}
+	}
+	t.Logf("Done")
+}
+
+type testDocument struct {
+	//UUID       string    `json:"uuid"`
+	Buyer      string    `json:"buyer" search:"keyword"`
+	Seller     string    `json:"seller" search:"keyword"`
+	Items      []docItem `json:"items"`
+	TotalCost  float64   `json:"total_cost"`
+	TotalItems int       `json:"total_items"`
+}
+
+type docItem struct {
+	Description string  `json:"description" search:"keyword"`
+	UnitCost    float64 `json:"unit_cost"`
+	Qty         int     `json:"qty"`
+}
+
+type testItemInfo struct {
+	Name string
+	Cost float64
+}
diff --git a/search/opensearch_types.go b/search/opensearch_types.go
index 2c127dd..aeb65b3 100644
--- a/search/opensearch_types.go
+++ b/search/opensearch_types.go
@@ -155,3 +155,25 @@ type HitDoc struct {
 	Score  float64                `json:"_score"`  //
 	Source map[string]interface{} `json:"_source"` //the document of itemType
 }
+
+//Get Response Body Example:
+// {
+// 	"_index": "go-utils-search-docs-test",
+// 	"_type": "_doc",
+// 	"_id": "836c6443-5b0e-489b-aa0f-712ebed96841",
+// 	"_version": 1,
+// 	"_seq_no": 6,
+// 	"_primary_term": 1,
+// 	"found": true,
+// 	"_source": { ... }
+//  }
+type GetResponseBody struct {
+	Index       string                 `json:"_index"` //name of index
+	Type        string                 `json:"_type"`  //_doc
+	ID          string                 `json:"_id"`
+	Version     int                    `json:"_version"`
+	SeqNo       int                    `json:"_seq_no"`
+	PrimaryTerm int                    `json:"_primary_term"`
+	Found       bool                   `json:"found"`
+	Source      map[string]interface{} `json:"_source"` //the document of itemType
+}
diff --git a/search/time_series.go b/search/time_series.go
index 8f766ea..294dab7 100644
--- a/search/time_series.go
+++ b/search/time_series.go
@@ -114,9 +114,6 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
 	if err != nil {
 		return nil, errors.Wrapf(err, "failed to make search response type for time-series")
 	}
-
-	//new package: copy type recursively, find index of special field and replace when found....
-
 	w.timeSeriesByName[name] = ts
 	return ts, nil
 }
@@ -241,7 +238,6 @@ func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) erro
 		if err != nil {
 			return errors.Wrapf(err, "failed to create index(%s)", indexName)
 		}
-
 		switch res.StatusCode {
 		case http.StatusOK:
 		case http.StatusCreated:
@@ -260,7 +256,13 @@ func (ts *timeSeries) Write(startTime, endTime time.Time, data interface{}) erro
 		EndTime:    endTime,
 		DurationMs: endTime.Sub(startTime).Milliseconds(),
 	}))
-	return ts.w.Write(indexName, x.Elem().Interface())
+	if res, err := ts.w.Write(indexName, "", x.Elem().Interface()); err != nil {
+		return err
+	} else {
+		logger.Debugf("IndexResponse: %+v", res)
+	}
+	return nil
+
 }
 
 //parameters:
@@ -346,6 +348,9 @@ type IndexSettings struct {
 //Return:
 //	docs will be a slice of the TimeSeries data type
 func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalCount int, err error) {
+	if ts == nil {
+		return nil, 0, errors.Errorf("time series == nil")
+	}
 	if limit < 0 || limit > 1000 {
 		err = errors.Errorf("limit=%d not 0..1000", limit)
 		return
diff --git a/search/writer.go b/search/writer.go
index 5347c2b..201accb 100644
--- a/search/writer.go
+++ b/search/writer.go
@@ -16,6 +16,7 @@ import (
 type Writer interface {
 	TimeSeries(name string, tmpl interface{}) (TimeSeries, error) //tmpl must embed TimeSeriesHeader as first unanymous field
 	DelOldTimeSeries(name string, olderThanDays int) ([]string, error)
+	DocumentStore(name string, tmpl interface{}) (DocumentStore, error)
 }
 
 func New(config Config) (Writer, error) {
@@ -23,8 +24,9 @@ func New(config Config) (Writer, error) {
 		return nil, errors.Wrapf(err, "invalid config")
 	}
 	w := &writer{
-		config:           config,
-		timeSeriesByName: map[string]TimeSeries{},
+		config:              config,
+		timeSeriesByName:    map[string]TimeSeries{},
+		documentStoreByName: map[string]DocumentStore{},
 	}
 
 	searchConfig := opensearch.Config{
@@ -50,34 +52,40 @@ func New(config Config) (Writer, error) {
 
 //implements audit.Auditor
 type writer struct {
-	config           Config
-	client           *opensearch.Client
-	api              *opensearchapi.API
-	timeSeriesByName map[string]TimeSeries
+	config              Config
+	client              *opensearch.Client
+	api                 *opensearchapi.API
+	timeSeriesByName    map[string]TimeSeries
+	documentStoreByName map[string]DocumentStore
 }
 
-func (writer writer) Write(indexName string, doc interface{}) error {
+func (writer writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) {
 	if writer.client == nil {
-		return errors.Errorf("writer closed")
+		return nil, errors.Errorf("writer closed")
 	}
 	jsonDocStr, ok := doc.(string)
 	if !ok {
 		jsonDoc, err := json.Marshal(doc)
 		if err != nil {
-			return errors.Wrapf(err, "failed to JSON encode document")
+			return nil, errors.Wrapf(err, "failed to JSON encode document")
 		}
 		jsonDocStr = string(jsonDoc)
 	}
+	options := []func(*opensearchapi.IndexRequest){}
+	if id != "" {
+		options = append(options, writer.api.Index.WithDocumentID(id))
+	}
 	indexResponse, err := writer.api.Index(
 		indexName,
 		strings.NewReader(jsonDocStr),
+		options...,
 	)
 	if err != nil {
-		return errors.Wrapf(err, "failed to index document")
+		return nil, errors.Wrapf(err, "failed to index document")
 	}
 	var res IndexResponse
 	if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil {
-		return errors.Wrapf(err, "failed to decode JSON response")
+		return nil, errors.Wrapf(err, "failed to decode JSON response")
 	}
 	//success example:
 	//res = map[
@@ -101,9 +109,9 @@ func (writer writer) Write(indexName string, doc interface{}) error {
 	// 	status:400
 	//]
 	if res.Error != nil {
-		return errors.Errorf("failed to insert: %v", res.Error.Reason)
+		return nil, errors.Errorf("failed to insert: %v", res.Error.Reason)
 	}
-	return nil
+	return &res, nil
 }
 
 func (writer writer) Search() ([]interface{}, error) {
-- 
GitLab