Skip to content
Snippets Groups Projects
Commit de384247 authored by Cornel Rautenbach's avatar Cornel Rautenbach
Browse files

Merge branch '22-search-return-unique-id' into 'main'

Resolve "Return unique id for search documents"

See merge request uafrica/go-utils!14
parents bf394e36 b52cf3d7
No related branches found
No related tags found
1 merge request!14Resolve "Return unique id for search documents"
......@@ -16,3 +16,25 @@ We use a document store and search to provide quick text searches from the API.
When a user is looking for an order, the API provides an end-point to search orders e.g. for "lcd screen", then the API does an OpenSearch query in the orders index, get N results and then read those orders from the orders table in the database (not OpenSearch) and return those results to the user.
We therefore use OpenSearch only for searching and returning a list of document ids, then read the documents from the database. A document is typically an "order" but also anything else that we need to do free text searches on.
## Testing
The dev sub-directory contains a docker-compose.yml that runs OpenSearch loccally for test programs.
Start it with:
```
cd dev
docker-compose up -d
```
Then run the go test programs in this directory...
E.g.:
```go test -v --run TestLocalWriter```
To work with this local instance from the command line:
```curl --insecure -uadmin:admin "https://localhost:9200/_cat/indices"```
If the test fail with index mapping error, you can delete the index before running the test, with the following command. It often happens when the code that generate the mapping changed and the existing index is incompatible with the new mapping:
```
curl --insecure -uadmin:admin -XDELETE "https://localhost:9200/go-utils-search-docs-test"
```
Some of the test programs also refer to the cloud instance created manually in V3, e.g. search_test.go TestDevWriter(). That can be updated or deleted as required.
......@@ -42,7 +42,7 @@ func test(t *testing.T, c search.Config) {
//write N records
methods := []string{"GET", "POST", "GET", "PATCH", "GET", "GET", "DELETE", "GET", "GET"} //more gets than others
paths := []string{"/users", "/orders", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates", "/accounts", "/shipment", "/rates"}
N := 100
N := 1
testTime := time.Now().Add(-time.Hour * time.Duration(N))
for i := 0; i < N; i++ {
testTime = testTime.Add(time.Duration(float64(rand.Intn(100)) / 60.0 * float64(time.Hour)))
......@@ -74,17 +74,19 @@ func test(t *testing.T, c search.Config) {
},
}
docs, totalCount, err := ts.Search(query, 10)
docsByIDMap, totalCount, err := ts.Search(query, 10)
if err != nil {
t.Errorf("failed to search: %+v", err)
} else {
if docsSlice, ok := docs.([]testStruct); ok {
t.Logf("search result total_count:%d with %d docs", totalCount, len(docsSlice))
if len(docsSlice) > 10 {
t.Errorf("got %d docs > max 10", len(docsSlice))
t.Logf("search result total_count:%d with %d docs", totalCount, len(docsByIDMap))
if len(docsByIDMap) > 10 {
t.Errorf("got %d docs > max 10", len(docsByIDMap))
}
for id, doc := range docsByIDMap {
t.Logf("id=%s doc=(%T)%+v", id, doc, doc)
if _, ok := doc.(testStruct); !ok {
t.Errorf("docs %T is not testStruct!", docsByIDMap)
}
} else {
t.Errorf("docs %T is not []testStruct!", docs)
}
}
......
......@@ -27,7 +27,25 @@ type TimeSeriesHeader struct {
type TimeSeries interface {
Write(StartTime time.Time, EndTime time.Time, data interface{}) error
Search(query Query, limit int64) (docs interface{}, totalCount int, err error)
//Search() returns docs indexed on OpenSearch document ID which cat be used in Get(id)
//The docs value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl)
//So you can safely type assert e.g.
// type myType struct {...}
// ts := search.TimeSeries(..., myType{})
// docs,totalCount,err := ts.Search(...)
// if err == nil {
// for id,docValue := range docs {
// doc := docValue.(myType)
// ...
// }
// }
Search(query Query, limit int64) (docs map[string]interface{}, totalCount int, err error)
//Get() takes the id returned in Search()
//The id is uuid assigned by OpenSearch when documents are added with Write().
//The document value type is the same as that of tmpl specified when you created the TimeSeries(..., tmpl)
Get(id string) (interface{}, error)
}
type timeSeries struct {
......@@ -41,6 +59,7 @@ type timeSeries struct {
createdDates map[string]bool
searchResponseBodyType reflect.Type
getResponseBodyType reflect.Type
}
//purpose:
......@@ -114,6 +133,18 @@ func (w *writer) TimeSeries(name string, tmpl interface{}) (TimeSeries, error) {
if err != nil {
return nil, errors.Wrapf(err, "failed to make search response type for time-series")
}
//define get response type
//similar to GetResponseBody
ts.getResponseBodyType, err = reflection.CloneType(
reflect.TypeOf(GetResponseBody{}),
map[string]reflect.Type{
"._source": ts.dataType,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to make get response type for time-series")
}
w.timeSeriesByName[name] = ts
return ts, nil
}
......@@ -360,7 +391,7 @@ type IndexSettings struct {
//Search
//Return:
// docs will be a slice of the TimeSeries data type
func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalCount int, err error) {
func (ts *timeSeries) Search(query Query, limit int64) (docs map[string]interface{}, totalCount int, err error) {
if ts == nil {
return nil, 0, errors.Errorf("time series == nil")
}
......@@ -425,10 +456,66 @@ func (ts *timeSeries) Search(query Query, limit int64) (docs interface{}, totalC
return nil, 0, nil //no matches
}
items, err := reflection.Get(resBodyPtrValue, ".hits.hits[]._source")
hits, err := reflection.Get(resBodyPtrValue, ".hits.hits[]")
if err != nil {
err = errors.Wrapf(err, "cannot get search response documents")
return
}
return items.Interface(), hitsTotalValue.Interface().(int), nil
docs = map[string]interface{}{}
for i := 0; i < hits.Len(); i++ {
hit := hits.Index(i)
index := hit.Field(0).Interface().(string) //HitDoc.Index
id := hit.Field(2).Interface().(string) //HitDoc.ID
docs[index+"/"+id] = hit.Field(4).Interface() //HitDoc.Source
}
return docs, hitsTotalValue.Interface().(int), nil
}
func (ds *timeSeries) Get(indexSlashDocumentID string) (doc interface{}, err error) {
if ds == nil {
return nil, errors.Errorf("document store == nil")
}
parts := strings.SplitN(indexSlashDocumentID, "/", 2)
get := opensearchapi.GetRequest{
Index: parts[0],
DocumentType: "_doc",
DocumentID: parts[1],
}
getResponse, err := get.Do(context.Background(), ds.w.client)
if err != nil {
err = errors.Wrapf(err, "failed to get document")
return
}
switch getResponse.StatusCode {
case http.StatusOK:
default:
resBody, _ := ioutil.ReadAll(getResponse.Body)
err = errors.Errorf("Get failed with HTTP status %v: %s", getResponse.StatusCode, string(resBody))
return
}
resBodyPtrValue := reflect.New(ds.getResponseBodyType)
if err = json.NewDecoder(getResponse.Body).Decode(resBodyPtrValue.Interface()); err != nil {
err = errors.Wrapf(err, "cannot decode get response body")
return
}
foundVar, err := reflection.Get(resBodyPtrValue, ".found")
if err != nil {
err = errors.Wrapf(err, "cannot get found value")
return
}
if found, ok := foundVar.Interface().(bool); !ok || !found {
return nil, nil //not found
}
//found
source, err := reflection.Get(resBodyPtrValue, "._source")
if err != nil {
err = errors.Wrapf(err, "cannot get document from get response")
return
}
return source.Interface(), nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment