Skip to content
Snippets Groups Projects
Select Git revision
  • d14e15cc870310f1fcd461bbd2f7eb2eaa934cbf
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

writer.go

Blame
  • writer.go 4.12 KiB
    package search
    
    import (
    	"crypto/tls"
    	"encoding/json"
    	"net/http"
    	"strings"
    
    	opensearch "github.com/opensearch-project/opensearch-go"
    	opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi"
    	"gitlab.com/uafrica/go-utils/errors"
    	"gitlab.com/uafrica/go-utils/logs"
    )
    
    type Writer interface {
    	TimeSeries(name string, tmpl interface{}) (TimeSeries, error) //tmpl must embed TimeSeriesHeader as first unanymous field
    	DelOldTimeSeries(name string, olderThanDays int) ([]string, error)
    	DocumentStore(name string, tmpl interface{}) (DocumentStore, error)
    }
    
    func New(config Config) (Writer, error) {
    	if err := config.Validate(); err != nil {
    		return nil, errors.Wrapf(err, "invalid config")
    	}
    	w := &writer{
    		config:              config,
    		timeSeriesByName:    map[string]TimeSeries{},
    		documentStoreByName: map[string]DocumentStore{},
    	}
    
    	searchConfig := opensearch.Config{
    		Transport: &http.Transport{
    			TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
    		},
    		Addresses: config.Addresses,
    		Username:  config.Username,
    		Password:  config.Password,
    	}
    	// Initialize the client with SSL/TLS enabled.
    	var err error
    	w.client, err = opensearch.NewClient(searchConfig)
    	if err != nil {
    		return nil, errors.Wrapf(err, "cannot initialize opensearch connection")
    	}
    	// Print OpenSearch version information on console.
    	logs.Info("Search client created with config: %+v", searchConfig)
    
    	w.api = opensearchapi.New(w.client)
    	return w, nil
    }
    
    //implements audit.Auditor
    type writer struct {
    	config              Config
    	client              *opensearch.Client
    	api                 *opensearchapi.API
    	timeSeriesByName    map[string]TimeSeries
    	documentStoreByName map[string]DocumentStore
    }
    
    func (writer writer) Write(indexName string, id string, doc interface{}) (*IndexResponse, error) {
    	if writer.client == nil {
    		return nil, errors.Errorf("writer closed")
    	}
    	jsonDocStr, ok := doc.(string)
    	if !ok {
    		jsonDoc, err := json.Marshal(doc)
    		if err != nil {
    			return nil, errors.Wrapf(err, "failed to JSON encode document")
    		}
    		jsonDocStr = string(jsonDoc)
    	}
    	options := []func(*opensearchapi.IndexRequest){}
    	if id != "" {
    		options = append(options, writer.api.Index.WithDocumentID(id))
    	}
    	indexResponse, err := writer.api.Index(
    		indexName,
    		strings.NewReader(jsonDocStr),
    		options...,
    	)
    	if err != nil {
    		return nil, errors.Wrapf(err, "failed to index document")
    	}
    	var res IndexResponse
    	if err := json.NewDecoder(indexResponse.Body).Decode(&res); err != nil {
    		return nil, errors.Wrapf(err, "failed to decode JSON response")
    	}
    	//success example:
    	//res = map[
    	// 	_id:oZJZxXwBPnbPDFcjNpuO
    	// 	_index:go-utils-audit-test
    	// 	_primary_term:1
    	// 	_seq_no:5
    	// 	_shards:map[failed:0 successful:2 total:2]
    	// 	_type:_doc
    	// 	_version:1
    	// 	result:created
    	// ]
    	//error example:
    	//res = map[
    	// error:map[
    	// 	reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value
    	// 	root_cause:[
    	// 		map[reason:object mapping for [response.Body] tried to parse field [Body] as object, but found a concrete value type:mapper_parsing_exception]
    	//	]
    	// 	type:mapper_parsing_exception]
    	// 	status:400
    	//]
    	if res.Error != nil {
    		return nil, errors.Errorf("failed to insert: %v", res.Error.Reason)
    	}
    	return &res, nil
    }
    
    type CreateResponse struct {
    	Error *Error `json:"error,omitempty"`
    }
    
    type IndexResponse struct {
    	Error       *Error  `json:"error,omitempty"`
    	Result      string  `json:"result,omitempty"` //e.g. "created"
    	Index       string  `json:"_index,omitempty"`
    	ID          string  `json:"_id,omitempty"`
    	Version     int     `json:"_version,omitempty"`
    	SeqNo       int     `json:"_seq_no,omitempty"`
    	Shards      *Shards `json:"_shards,omitempty"`
    	Type        string  `json:"_type,omitempty"`
    	PrimaryTerm int     `json:"_primary_term,omitempty"`
    }
    
    type Shards struct {
    	Total      int `json:"total"`
    	Successful int `json:"successful"`
    	Failed     int `json:"failed"`
    }
    
    type Error struct {
    	Reason    string  `json:"reason,omitempty"`
    	RootCause []Cause `json:"root_cause,omitempty"`
    }
    
    type Cause struct {
    	Reason string `json:"reason,omitempty"`
    	Type   string `json:"type,omitempty"`
    }