Newer
Older
opensearch "github.com/opensearch-project/opensearch-go/v2"
opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
type Writer struct {
config Config
client *opensearch.Client
api *opensearchapi.API
timeSeriesByName map[string]TimeSeries
documentStoreByName map[string]DocumentStore
return w, errors.Wrapf(err, "invalid config")
Jan Semmelink
committed
config: config,
timeSeriesByName: map[string]TimeSeries{},
documentStoreByName: map[string]DocumentStore{},
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ResponseHeaderTimeout: time.Second * 28, // timeout the request just before api gateway times out
},
Addresses: config.Addresses,
Username: config.Username,
Password: config.Password,
}
// Initialize the client with SSL/TLS enabled.
var err error
w.client, err = opensearch.NewClient(searchConfig)
return w, errors.Wrapf(err, "cannot initialize opensearch connection")
logs.Info("Search client created with config: %+v", searchConfig)
func (writer Writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) {
Jan Semmelink
committed
return nil, errors.Errorf("writer closed")
jsonDocStr, ok := doc.(string)
if !ok {
jsonDoc, err := json.Marshal(doc)
if err != nil {
Jan Semmelink
committed
return nil, errors.Wrapf(err, "failed to JSON encode document")
Jan Semmelink
committed
options := []func(*opensearchapi.IndexRequest){}
if id != "" {
options = append(options, writer.api.Index.WithDocumentID(id))
}
var indexResponse *opensearchapi.Response
var err error
defer func() {
if indexResponse != nil {
indexResponse.Body.Close()
}
}()
indexResponse, err = writer.api.Index(
indexName,
strings.NewReader(jsonDocStr),
options...,
)
Jan Semmelink
committed
return nil, errors.Wrapf(err, "failed to index document")
}
var res IndexResponse
if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil {
Jan Semmelink
committed
return nil, errors.Wrapf(err, "failed to decode JSON response")
// _id:oZJZxXwBPnbPDFcjNpuO
// _index:go-utils-audit-test
// _primary_term:1
// _seq_no:5
// _shards:map[failed:0 successful:2 total:2]
// _type:_doc
// _version:1
// result:created
// ]
// error:map[
// reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value
// root_cause:[
// map[reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value type:mapper_parsing_exception]
// ]
// type:mapper_parsing_exception]
// status:400
Jan Semmelink
committed
return nil, errors.Errorf("failed to insert: %v", res.Error.Reason)
Jan Semmelink
committed
return &res, nil
}
type CreateResponse struct {
Error *Error `json:"error,omitempty"`
}
type IndexResponse struct {
Error *Error `json:"error,omitempty"`
Result string `json:"result,omitempty"` // e.g. "created"
Index string `json:"_index,omitempty"`
ID string `json:"_id,omitempty"`
Version int `json:"_version,omitempty"`
SeqNo int `json:"_seq_no,omitempty"`
Shards *Shards `json:"_shards,omitempty"`
Type string `json:"_type,omitempty"`
PrimaryTerm int `json:"_primary_term,omitempty"`
}
type Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Failed int `json:"failed"`
}
type Error struct {
Reason string `json:"reason,omitempty"`
RootCause []Cause `json:"root_cause,omitempty"`
}
type Cause struct {
Reason string `json:"reason,omitempty"`
Type string `json:"type,omitempty"`
}