Select Git revision
writer.go 4.07 KiB
package search
import (
"crypto/tls"
"encoding/json"
opensearch "github.com/opensearch-project/opensearch-go"
opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
"net/http"
"strings"
"time"
)
type Writer struct {
config Config
client *opensearch.Client
api *opensearchapi.API
timeSeriesByName map[string]TimeSeries
documentStoreByName map[string]DocumentStore
}
func New(config Config) (Writer, error) {
w := Writer{}
if err := config.Validate(); err != nil {
return w, errors.Wrapf(err, "invalid config")
}
w = Writer{
config: config,
timeSeriesByName: map[string]TimeSeries{},
documentStoreByName: map[string]DocumentStore{},
}
searchConfig := opensearch.Config{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ResponseHeaderTimeout: time.Second * 28, // timeout the request just before api gateway times out
},
Addresses: config.Addresses,
Username: config.Username,
Password: config.Password,
}
// Initialize the client with SSL/TLS enabled.
var err error
w.client, err = opensearch.NewClient(searchConfig)
if err != nil {
return w, errors.Wrapf(err, "cannot initialize opensearch connection")
}
// Print OpenSearch version information on console.
logs.Info("Search client created with config: %+v", searchConfig)
w.api = opensearchapi.New(w.client)
return w, nil
}
func (writer Writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) {
if writer.client == nil {
return nil, errors.Errorf("writer closed")
}
jsonDocStr, ok := doc.(string)
if !ok {
jsonDoc, err := json.Marshal(doc)
if err != nil {
return nil, errors.Wrapf(err, "failed to JSON encode document")
}
jsonDocStr = string(jsonDoc)
}
options := []func(*opensearchapi.IndexRequest){}