package search import ( "crypto/tls" "encoding/json" opensearch "github.com/opensearch-project/opensearch-go" opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" "gitlab.com/uafrica/go-utils/errors" "gitlab.com/uafrica/go-utils/logs" "net/http" "strings" "time" ) type Writer struct { config Config client *opensearch.Client api *opensearchapi.API timeSeriesByName map[string]TimeSeries documentStoreByName map[string]DocumentStore } func New(config Config) (Writer, error) { w := Writer{} if err := config.Validate(); err != nil { return w, errors.Wrapf(err, "invalid config") } w = Writer{ config: config, timeSeriesByName: map[string]TimeSeries{}, documentStoreByName: map[string]DocumentStore{}, } searchConfig := opensearch.Config{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, ResponseHeaderTimeout: time.Second * 28, // timeout the request just before api gateway times out }, Addresses: config.Addresses, Username: config.Username, Password: config.Password, } // Initialize the client with SSL/TLS enabled. var err error w.client, err = opensearch.NewClient(searchConfig) if err != nil { return w, errors.Wrapf(err, "cannot initialize opensearch connection") } // Print OpenSearch version information on console. logs.Info("Search client created with config: %+v", searchConfig) w.api = opensearchapi.New(w.client) return w, nil } func (writer Writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) { if writer.client == nil { return nil, errors.Errorf("writer closed") } jsonDocStr, ok := doc.(string) if !ok { jsonDoc, err := json.Marshal(doc) if err != nil { return nil, errors.Wrapf(err, "failed to JSON encode document") } jsonDocStr = string(jsonDoc) } options := []func(*opensearchapi.IndexRequest){} if id != "" { options = append(options, writer.api.Index.WithDocumentID(id)) } indexResponse, err := writer.api.Index( indexName, strings.NewReader(jsonDocStr), options..., ) if err != nil { return nil, errors.Wrapf(err, "failed to index document") } var res IndexResponse if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil { return nil, errors.Wrapf(err, "failed to decode JSON response") } // success example: // res = map[ // _id:oZJZxXwBPnbPDFcjNpuO // _index:go-utils-audit-test // _primary_term:1 // _seq_no:5 // _shards:map[failed:0 successful:2 total:2] // _type:_doc // _version:1 // result:created // ] // error example: // res = map[ // error:map[ // reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value // root_cause:[ // map[reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value type:mapper_parsing_exception] // ] // type:mapper_parsing_exception] // status:400 // ] if res.Error != nil { return nil, errors.Errorf("failed to insert: %v", res.Error.Reason) } return &res, nil } type CreateResponse struct { Error *Error `json:"error,omitempty"` } type IndexResponse struct { Error *Error `json:"error,omitempty"` Result string `json:"result,omitempty"` // e.g. "created" Index string `json:"_index,omitempty"` ID string `json:"_id,omitempty"` Version int `json:"_version,omitempty"` SeqNo int `json:"_seq_no,omitempty"` Shards *Shards `json:"_shards,omitempty"` Type string `json:"_type,omitempty"` PrimaryTerm int `json:"_primary_term,omitempty"` } type Shards struct { Total int `json:"total"` Successful int `json:"successful"` Failed int `json:"failed"` } type Error struct { Reason string `json:"reason,omitempty"` RootCause []Cause `json:"root_cause,omitempty"` } type Cause struct { Reason string `json:"reason,omitempty"` Type string `json:"type,omitempty"` }