Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit c9bc904

Browse files
AlmogBakuDirectXMan12
authored andcommitted
ElasticSearch: Add supports for Ignest pipelines
This commit adds support for ElasticSearch ingest pipelines. This only works on ES5+.
1 parent 15d2226 commit c9bc904

File tree

4 files changed

+43
-6
lines changed

4 files changed

+43
-6
lines changed

common/elasticsearch/elasticsearch.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,16 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
228228
}
229229
}
230230

231+
pipeline := ""
232+
if len(opts["pipeline"]) > 0 {
233+
pipeline = opts["pipeline"][0]
234+
}
235+
231236
switch version {
232237
case 2:
233238
esSvc.EsClient, err = newEsClientV2(startupFnsV2, bulkWorkers)
234239
case 5:
235-
esSvc.EsClient, err = newEsClientV5(startupFnsV5, bulkWorkers)
240+
esSvc.EsClient, err = newEsClientV5(startupFnsV5, bulkWorkers, pipeline)
236241
default:
237242
return nil, UnsupportedVersion{}
238243
}

common/elasticsearch/elasticsearch_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,32 @@ func TestCreateElasticSearchServiceForDefaultClusterName(t *testing.T) {
146146
}
147147
}
148148

149+
func TestCreateElasticSearchServiceWithIngestPipeline(t *testing.T) {
150+
const pipeline = "test"
151+
152+
esURI := "?nodes=https://foo.com:20468&nodes=https://bar.com:20468&" +
153+
"pipeline=" + pipeline + "&" +
154+
"sniff=false&healthCheck=false"
155+
156+
url, err := url.Parse(esURI)
157+
if err != nil {
158+
t.Fatalf("Error when parsing URL: %s", err.Error())
159+
}
160+
161+
esSvc, err := CreateElasticSearchService(url)
162+
if err != nil {
163+
t.Fatalf("Error when creating config: %s", err.Error())
164+
}
165+
166+
if esSvc.EsClient.pipeline != pipeline {
167+
t.Fatalf("Ingest pipline is not equal. Expected: %s, Got: %s", pipeline, esSvc.EsClient.pipeline)
168+
}
169+
170+
if esSvc.EsClient.version < 5 {
171+
t.Fatalf("ElasticSearch client not using version 5+ (required for pipeline). Got: %d", esSvc.EsClient.version)
172+
}
173+
}
174+
149175
func TestCreateElasticSearchServiceSingleDnsEntrypointV5(t *testing.T) {
150176
clusterName := "sandbox"
151177
esURI := fmt.Sprintf("https://foo.com:9200?"+

common/elasticsearch/esVersionManager.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ type esClient struct {
3535
clientV5 *elastic5.Client
3636
bulkProcessorV2 *elastic2.BulkProcessor
3737
bulkProcessorV5 *elastic5.BulkProcessor
38+
pipeline string
3839
}
3940

4041
func NewMockClient() *esClient {
4142
return &esClient{}
4243
}
43-
func newEsClientV5(startupFns []elastic5.ClientOptionFunc, bulkWorkers int) (*esClient, error) {
44+
func newEsClientV5(startupFns []elastic5.ClientOptionFunc, bulkWorkers int, pipeline string) (*esClient, error) {
4445
client, err := elastic5.NewClient(startupFns...)
4546
if err != nil {
4647
return nil, fmt.Errorf("Failed to an ElasticSearch Client: %v", err)
@@ -56,7 +57,7 @@ func newEsClientV5(startupFns []elastic5.ClientOptionFunc, bulkWorkers int) (*es
5657
if err != nil {
5758
return nil, fmt.Errorf("Failed to an ElasticSearch Bulk Processor: %v", err)
5859
}
59-
return &esClient{version: 5, clientV5: client, bulkProcessorV5: bps}, nil
60+
return &esClient{version: 5, clientV5: client, bulkProcessorV5: bps, pipeline: pipeline}, nil
6061
}
6162
func newEsClientV2(startupFns []elastic2.ClientOptionFunc, bulkWorkers int) (*esClient, error) {
6263
client, err := elastic2.NewClient(startupFns...)
@@ -131,11 +132,16 @@ func (es *esClient) AddBulkReq(index, typeName string, data interface{}) error {
131132
Doc(data))
132133
return nil
133134
case 5:
134-
es.bulkProcessorV5.Add(elastic5.NewBulkIndexRequest().
135+
req := elastic5.NewBulkIndexRequest().
135136
Index(index).
136137
Type(typeName).
137138
Id(uuid.NewUUID().String()).
138-
Doc(data))
139+
Doc(data)
140+
if es.pipeline != "" {
141+
req.Pipeline(es.pipeline)
142+
}
143+
144+
es.bulkProcessorV5.Add(req)
139145
return nil
140146
default:
141147
return UnsupportedVersion{}

docs/sink-configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ Besides this, the following options can be set in query string:
197197
* `ver` - ElasticSearch cluster version, can be either `2` or `5`. The default is `5`
198198
* `bulkWorkers` - number of workers for bulk processing. Default value is `5`.
199199
* `cluster_name` - cluster name for different Kubernetes clusters. Default value is `default`.
200-
200+
* `pipeline` - (optional; >ES5) Ingest Pipeline to process the documents. The default is disabled(empty value)
201201

202202
#### AWS Integration
203203
In order to use AWS Managed Elastic we need to use one of the following methods:

0 commit comments

Comments
 (0)