@@ -43,14 +43,6 @@ type ElasticsearchClient struct {
4343// It then creates a new NewBulkIndexer with the esClient
4444func NewElasticsearchClient (esURL string , esUser string , esPassword string , esCaCertPath string , esAllowInsecureTLS bool ) (* ElasticsearchClient , error ) {
4545
46- caCertBytes := []byte {}
47- if esCaCertPath != "" {
48- var err error
49- caCertBytes , err = os .ReadFile (esCaCertPath )
50- if err != nil {
51- return nil , fmt .Errorf ("failed to open Elasticsearch CA file: %v" , err )
52- }
53- }
5446 retryBackoff := backoff .NewExponentialBackOff ()
5547 cfg := elasticsearch.Config {
5648 Addresses : []string {esURL },
@@ -71,7 +63,14 @@ func NewElasticsearchClient(esURL string, esUser string, esPassword string, esCa
7163 InsecureSkipVerify : esAllowInsecureTLS ,
7264 },
7365 },
74- CACert : caCertBytes ,
66+ }
67+
68+ if esCaCertPath != "" {
69+ caCertBytes , err := os .ReadFile (esCaCertPath )
70+ if err != nil {
71+ return nil , fmt .Errorf ("failed to open Elasticsearch CA file: %v" , err )
72+ }
73+ cfg .CACert = caCertBytes
7574 }
7675
7776 if len (esUser ) != 0 && len (esPassword ) != 0 {
@@ -86,10 +85,13 @@ func NewElasticsearchClient(esURL string, esUser string, esPassword string, esCa
8685 bi , err := esutil .NewBulkIndexer (esutil.BulkIndexerConfig {
8786 Client : esClient , // The Elasticsearch client
8887 FlushBytes : 1000000 , // The flush threshold in bytes [1mb]
89- FlushInterval : 30 * time .Second , // The periodic flush interval [30 secs]
88+ FlushInterval : 10 * time .Second , // The periodic flush interval [30 secs]
89+ OnError : func (ctx context.Context , err error ) {
90+ log .Fatalf ("Error creating the indexer: %v" , err )
91+ },
9092 })
9193 if err != nil {
92- log .Fatalf ("Error creating the indexer: %s " , err )
94+ log .Fatalf ("Error creating the indexer: %v " , err )
9395 }
9496 alertCh := make (chan interface {}, 10000 )
9597 return & ElasticsearchClient {bulkIndexer : bi , esClient : esClient , alertCh : alertCh }, nil
0 commit comments