Skip to content
Snippets Groups Projects
writer.go 4.13 KiB
Newer Older
package opensearch
Jan Semmelink's avatar
Jan Semmelink committed

import (
	"crypto/tls"
	"encoding/json"
	opensearch "github.com/opensearch-project/opensearch-go/v2"
	opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
	"net/http"
	"strings"
	"time"
Jan Semmelink's avatar
Jan Semmelink committed
)

type Writer struct {
	config              Config
	client              *opensearch.Client
	api                 *opensearchapi.API
	timeSeriesByName    map[string]TimeSeries
	documentStoreByName map[string]DocumentStore
Jan Semmelink's avatar
Jan Semmelink committed
}

func New(config Config) (Writer, error) {
Jan Semmelink's avatar
Jan Semmelink committed
	if err := config.Validate(); err != nil {
		return w, errors.Wrapf(err, "invalid config")
Jan Semmelink's avatar
Jan Semmelink committed
	}
		config:              config,
		timeSeriesByName:    map[string]TimeSeries{},
		documentStoreByName: map[string]DocumentStore{},
Jan Semmelink's avatar
Jan Semmelink committed
	}

Jan Semmelink's avatar
Jan Semmelink committed
	searchConfig := opensearch.Config{
Jan Semmelink's avatar
Jan Semmelink committed
		Transport: &http.Transport{
			TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
			ResponseHeaderTimeout: time.Second * 28, // timeout the request just before api gateway times out
Jan Semmelink's avatar
Jan Semmelink committed
		},
		Addresses: config.Addresses,
		Username:  config.Username,
		Password:  config.Password,
Jan Semmelink's avatar
Jan Semmelink committed
	}
	// Initialize the client with SSL/TLS enabled.
	var err error
	w.client, err = opensearch.NewClient(searchConfig)
Jan Semmelink's avatar
Jan Semmelink committed
	if err != nil {
		return w, errors.Wrapf(err, "cannot initialize opensearch connection")
Jan Semmelink's avatar
Jan Semmelink committed
	}
	// Print OpenSearch version information on console.
Francé Wilke's avatar
Francé Wilke committed
	logs.Info("Search client created with config: %+v", searchConfig)
Jan Semmelink's avatar
Jan Semmelink committed

	w.api = opensearchapi.New(w.client)
Jan Semmelink's avatar
Jan Semmelink committed
	return w, nil
}

func (writer Writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) {
Jan Semmelink's avatar
Jan Semmelink committed
	if writer.client == nil {
		return nil, errors.Errorf("writer closed")
Jan Semmelink's avatar
Jan Semmelink committed
	}
Jan Semmelink's avatar
Jan Semmelink committed
	jsonDocStr, ok := doc.(string)
	if !ok {
		jsonDoc, err := json.Marshal(doc)
		if err != nil {
			return nil, errors.Wrapf(err, "failed to JSON encode document")
Jan Semmelink's avatar
Jan Semmelink committed
		}
		jsonDocStr = string(jsonDoc)
Jan Semmelink's avatar
Jan Semmelink committed
	}
	options := []func(*opensearchapi.IndexRequest){}
	if id != "" {
		options = append(options, writer.api.Index.WithDocumentID(id))
	}

	var indexResponse *opensearchapi.Response
	var err error
	defer func() {
		if indexResponse != nil {
			indexResponse.Body.Close()
		}
	}()
	indexResponse, err = writer.api.Index(
		indexName,
		strings.NewReader(jsonDocStr),
		options...,
	)
Jan Semmelink's avatar
Jan Semmelink committed
	if err != nil {
		return nil, errors.Wrapf(err, "failed to index document")
Jan Semmelink's avatar
Jan Semmelink committed
	}
	var res IndexResponse
	if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil {
		return nil, errors.Wrapf(err, "failed to decode JSON response")
Jan Semmelink's avatar
Jan Semmelink committed
	}
	// success example:
	// res = map[
Jan Semmelink's avatar
Jan Semmelink committed
	// 	_id:oZJZxXwBPnbPDFcjNpuO
	// 	_index:go-utils-audit-test
	// 	_primary_term:1
	// 	_seq_no:5
	// 	_shards:map[failed:0 successful:2 total:2]
	// 	_type:_doc
	// 	_version:1
	// 	result:created
	// ]
	// error example:
	// res = map[
Jan Semmelink's avatar
Jan Semmelink committed
	// error:map[
	// 	reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value
	// 	root_cause:[
	// 		map[reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value type:mapper_parsing_exception]
	//	]
	// 	type:mapper_parsing_exception]
	// 	status:400
Jan Semmelink's avatar
Jan Semmelink committed
	if res.Error != nil {
		return nil, errors.Errorf("failed to insert: %v", res.Error.Reason)
Jan Semmelink's avatar
Jan Semmelink committed
	}
Jan Semmelink's avatar
Jan Semmelink committed
}

type CreateResponse struct {
	Error *Error `json:"error,omitempty"`
}

type IndexResponse struct {
	Error       *Error  `json:"error,omitempty"`
	Result      string  `json:"result,omitempty"` // e.g. "created"
Jan Semmelink's avatar
Jan Semmelink committed
	Index       string  `json:"_index,omitempty"`
	ID          string  `json:"_id,omitempty"`
	Version     int     `json:"_version,omitempty"`
	SeqNo       int     `json:"_seq_no,omitempty"`
	Shards      *Shards `json:"_shards,omitempty"`
	Type        string  `json:"_type,omitempty"`
	PrimaryTerm int     `json:"_primary_term,omitempty"`
}

type Shards struct {
	Total      int `json:"total"`
	Successful int `json:"successful"`
	Failed     int `json:"failed"`
}

type Error struct {
	Reason    string  `json:"reason,omitempty"`
	RootCause []Cause `json:"root_cause,omitempty"`
}

type Cause struct {
	Reason string `json:"reason,omitempty"`
	Type   string `json:"type,omitempty"`
}