@@ -3,9 +3,12 @@ package elasticsearch
33import (
44 "bytes"
55 "context"
6+ "crypto/tls"
67 "encoding/json"
78 "fmt"
89 "log"
10+ "net/http"
11+ "os"
912 "strings"
1013 "sync"
1114 "sync/atomic"
@@ -38,7 +41,8 @@ type ElasticsearchClient struct {
3841// NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL
3942// and kubearmor LogClient with endpoint. It has a retry mechanism for certain HTTP status codes and a backoff function for retry delays.
4043// It then creates a new NewBulkIndexer with the esClient
41- func NewElasticsearchClient (esURL string , esUser string , esPassword string ) (* ElasticsearchClient , error ) {
44+ func NewElasticsearchClient (esURL string , esUser string , esPassword string , esCaCertPath string , esAllowInsecureTLS bool ) (* ElasticsearchClient , error ) {
45+
4246 retryBackoff := backoff .NewExponentialBackOff ()
4347 cfg := elasticsearch.Config {
4448 Addresses : []string {esURL },
@@ -54,6 +58,19 @@ func NewElasticsearchClient(esURL string, esUser string, esPassword string) (*El
5458 return retryBackoff .NextBackOff ()
5559 },
5660 MaxRetries : 5 ,
61+ Transport : & http.Transport {
62+ TLSClientConfig : & tls.Config {
63+ InsecureSkipVerify : esAllowInsecureTLS ,
64+ },
65+ },
66+ }
67+
68+ if esCaCertPath != "" {
69+ caCertBytes , err := os .ReadFile (esCaCertPath )
70+ if err != nil {
71+ return nil , fmt .Errorf ("failed to open Elasticsearch CA file: %v" , err )
72+ }
73+ cfg .CACert = caCertBytes
5774 }
5875
5976 if len (esUser ) != 0 && len (esPassword ) != 0 {
@@ -68,10 +85,13 @@ func NewElasticsearchClient(esURL string, esUser string, esPassword string) (*El
6885 bi , err := esutil .NewBulkIndexer (esutil.BulkIndexerConfig {
6986 Client : esClient , // The Elasticsearch client
7087 FlushBytes : 1000000 , // The flush threshold in bytes [1mb]
71- FlushInterval : 30 * time .Second , // The periodic flush interval [30 secs]
88+ FlushInterval : 10 * time .Second , // The periodic flush interval [30 secs]
89+ OnError : func (ctx context.Context , err error ) {
90+ log .Fatalf ("Error creating the indexer: %v" , err )
91+ },
7292 })
7393 if err != nil {
74- log .Fatalf ("Error creating the indexer: %s " , err )
94+ log .Fatalf ("Error creating the indexer: %v " , err )
7595 }
7696 alertCh := make (chan interface {}, 10000 )
7797 return & ElasticsearchClient {bulkIndexer : bi , esClient : esClient , alertCh : alertCh }, nil
0 commit comments