Select Git revision
writer.go 4.12 KiB
package search
import (
"crypto/tls"
"encoding/json"
"net/http"
"strings"
opensearch "github.com/opensearch-project/opensearch-go"
opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
"gitlab.com/uafrica/go-utils/errors"
"gitlab.com/uafrica/go-utils/logs"
)
type Writer interface {
TimeSeries(name string, tmpl interface{}) (TimeSeries, error) //tmpl must embed TimeSeriesHeader as first unanymous field
DelOldTimeSeries(name string, olderThanDays int) ([]string, error)
DocumentStore(name string, tmpl interface{}) (DocumentStore, error)
}
func New(config Config) (Writer, error) {
if err := config.Validate(); err != nil {
return nil, errors.Wrapf(err, "invalid config")
}
w := &writer{
config: config,
timeSeriesByName: map[string]TimeSeries{},
documentStoreByName: map[string]DocumentStore{},
}
searchConfig := opensearch.Config{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Addresses: config.Addresses,
Username: config.Username,
Password: config.Password,
}
// Initialize the client with SSL/TLS enabled.
var err error
w.client, err = opensearch.NewClient(searchConfig)
if err != nil {
return nil, errors.Wrapf(err, "cannot initialize opensearch connection")
}
// Print OpenSearch version information on console.
logs.Info("Search client created with config: %+v", searchConfig)
w.api = opensearchapi.New(w.client)
return w, nil
}
//implements audit.Auditor
type writer struct {
config Config
client *opensearch.Client
api *opensearchapi.API
timeSeriesByName map[string]TimeSeries
documentStoreByName map[string]DocumentStore
}
func (writer writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) {
if writer.client == nil {
return nil, errors.Errorf("writer closed")
}
jsonDocStr, ok := doc.(string)
if !ok {
jsonDoc, err := json.Marshal(doc)
if err != nil {
return nil, errors.Wrapf(err, "failed to JSON encode document")
}
jsonDocStr = string(jsonDoc)
}
options := []func(*opensearchapi.IndexRequest){}
if id != "" {
options = append(options, writer.api.Index.WithDocumentID(id))
}
indexResponse, err := writer.api.Index(
indexName,
strings.NewReader(jsonDocStr),
options...,
)
if err != nil {
return nil, errors.Wrapf(err, "failed to index document")
}
var res IndexResponse
if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil {
return nil, errors.Wrapf(err, "failed to decode JSON response")
}
//success example:
//res = map[
// _id:oZJZxXwBPnbPDFcjNpuO
// _index:go-utils-audit-test
// _primary_term:1
// _seq_no:5
// _shards:map[failed:0 successful:2 total:2]
// _type:_doc
// _version:1
// result:created
// ]
//error example:
//res = map[
// error:map[
// reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value
// root_cause:[
// map[reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value type:mapper_parsing_exception]
// ]
// type:mapper_parsing_exception]
// status:400
//]
if res.Error != nil {
return nil, errors.Errorf("failed to insert: %v", res.Error.Reason)
}
return &res, nil
}
type CreateResponse struct {
Error *Error `json:"error,omitempty"`
}
type IndexResponse struct {
Error *Error `json:"error,omitempty"`
Result string `json:"result,omitempty"` //e.g. "created"
Index string `json:"_index,omitempty"`
ID string `json:"_id,omitempty"`
Version int `json:"_version,omitempty"`
SeqNo int `json:"_seq_no,omitempty"`
Shards *Shards `json:"_shards,omitempty"`
Type string `json:"_type,omitempty"`
PrimaryTerm int `json:"_primary_term,omitempty"`
}
type Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Failed int `json:"failed"`
}
type Error struct {
Reason string `json:"reason,omitempty"`
RootCause []Cause `json:"root_cause,omitempty"`
}
type Cause struct {
Reason string `json:"reason,omitempty"`
Type string `json:"type,omitempty"`
}