Skip to content

Commit d3105b5

Browse files
committed
Implement elasticsearch bulk indexing
Signed-off-by: Markus Blaschke <mblaschke82@gmail.com>
1 parent a95e5c7 commit d3105b5

File tree

3 files changed

+74
-20
lines changed

3 files changed

+74
-20
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Configuration
1919
| `PAGERDUTY_MAX_CONNECTIONS` | `4` | Maximum numbers of HTTP connections to PagerDuty API |
2020
| `ELASTICSEARCH_ADDRESS` | none, required | ElasticSearch cluster addresses (multiple) |
2121
| `ELASTICSEARCH_INDEX` | `pagerduty` | Name of ElasticSearch index |
22+
| `ELASTICSEARCH_BATCH_COUNT` | `50` | Number of documents which should be indexed in one request |
2223
| `ELASTICSEARCH_RETRY_COUNT` | `5` | ElasticSearch request retry count |
2324
| `ELASTICSEARCH_RETRY_DELAY` | `5s` | ElasticSearch request delay for reach retry |
2425

exporter.go

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"bytes"
5-
"context"
65
"encoding/json"
76
"fmt"
87
"github.com/PagerDuty/go-pagerduty"
@@ -18,8 +17,8 @@ type (
1817
PagerdutyElasticsearchExporter struct {
1918
scrapeTime *time.Duration
2019

21-
elasticSearchClient *elasticsearch.Client
22-
elasticsearchIndexName string
20+
elasticSearchClient *elasticsearch.Client
21+
elasticsearchIndexName string
2322
elasticsearchBatchCount int
2423
elasticsearchRetryCount int
2524
elasticsearchRetryDelay time.Duration
@@ -30,21 +29,21 @@ type (
3029
prometheus struct {
3130
incident *prometheus.CounterVec
3231
incidentLogEntry *prometheus.CounterVec
33-
esRequestTotal *prometheus.CounterVec
34-
esRequestRetries *prometheus.CounterVec
35-
duration *prometheus.GaugeVec
32+
esRequestTotal *prometheus.CounterVec
33+
esRequestRetries *prometheus.CounterVec
34+
duration *prometheus.GaugeVec
3635
}
3736
}
3837

3938
ElasticsearchIncident struct {
40-
DocumentID string `json:"_id,omitempty"`
39+
DocumentID string `json:"_id,omitempty"`
4140
Timestamp string `json:"@timestamp,omitempty"`
4241
IncidentId string `json:"@incident,omitempty"`
4342
*pagerduty.Incident
4443
}
4544

4645
ElasticsearchIncidentLog struct {
47-
DocumentID string `json:"_id,omitempty"`
46+
DocumentID string `json:"_id,omitempty"`
4847
Timestamp string `json:"@timestamp,omitempty"`
4948
IncidentId string `json:"@incident,omitempty"`
5049
*pagerduty.LogEntry
@@ -130,6 +129,10 @@ func (e *PagerdutyElasticsearchExporter) ConnectElasticsearch(cfg elasticsearch.
130129
e.elasticsearchIndexName = indexName
131130
}
132131

132+
func (e *PagerdutyElasticsearchExporter) SetElasticsearchBatchCount(batchCount int) {
133+
e.elasticsearchBatchCount = batchCount
134+
}
135+
133136
func (e *PagerdutyElasticsearchExporter) SetElasticsearchRetry(retryCount int, retryDelay time.Duration) {
134137
e.elasticsearchRetryCount = retryCount
135138
e.elasticsearchRetryDelay = retryDelay
@@ -168,8 +171,19 @@ func (e *PagerdutyElasticsearchExporter) runScrape() {
168171
wgProcess.Add(1)
169172
go func() {
170173
defer wgProcess.Done()
174+
175+
bulkIndexRequests := []*esapi.IndexRequest{}
171176
for esIndexRequest := range esIndexRequestChannel {
172-
e.doESIndexRequest(esIndexRequest)
177+
bulkIndexRequests = append(bulkIndexRequests, esIndexRequest)
178+
179+
if len(bulkIndexRequests) >= e.elasticsearchBatchCount {
180+
e.doESIndexRequestBulk(bulkIndexRequests)
181+
bulkIndexRequests = []*esapi.IndexRequest{}
182+
}
183+
}
184+
185+
if len(bulkIndexRequests) >= 1 {
186+
e.doESIndexRequestBulk(bulkIndexRequests)
173187
}
174188
}()
175189

@@ -245,16 +259,53 @@ func (e *PagerdutyElasticsearchExporter) indexIncidentLogEntry(incident pagerdut
245259
callback <- &req
246260
}
247261

248-
func (e *PagerdutyElasticsearchExporter) doESIndexRequest(req *esapi.IndexRequest) {
262+
type (
263+
BulkMetaIndex struct {
264+
Index BulkMetaIndexIndex `json:"index,omitempty"`
265+
}
266+
267+
BulkMetaIndexIndex struct {
268+
Id string `json:"_id,omitempty"`
269+
Type string `json:"_type,omitempty"`
270+
Index string `json:"_index,omitempty"`
271+
}
272+
)
273+
274+
func (e *PagerdutyElasticsearchExporter) doESIndexRequestBulk(bulkRequests []*esapi.IndexRequest) {
275+
var buf bytes.Buffer
276+
newline := []byte("\n")
277+
249278
var err error
250-
var res *esapi.Response
279+
var resp *esapi.Response
251280

252281
for i := 0; i < e.elasticsearchRetryCount; i++ {
253-
e.prometheus.esRequestTotal.WithLabelValues().Inc()
282+
for _, indexRequest := range bulkRequests {
283+
// generate bulk index action line
284+
meta := BulkMetaIndex{
285+
Index: BulkMetaIndexIndex{
286+
Id: indexRequest.DocumentID,
287+
Type: indexRequest.DocumentType,
288+
Index: indexRequest.Index,
289+
},
290+
}
291+
metaJson, _ := json.Marshal(meta)
292+
293+
// generate document line
294+
document := new(bytes.Buffer)
295+
document.ReadFrom(indexRequest.Body)
296+
297+
// generate index line
298+
buf.Grow(len(metaJson) + len(newline) + document.Len() + len(newline))
299+
buf.Write(metaJson)
300+
buf.Write(newline)
301+
buf.Write(document.Bytes())
302+
buf.Write(newline)
303+
}
254304

255-
res, err = req.Do(context.Background(), e.elasticSearchClient)
256-
if err == nil {
257-
res.Body.Close()
305+
e.prometheus.esRequestTotal.WithLabelValues().Inc()
306+
resp, err = e.elasticSearchClient.Bulk(bytes.NewReader(buf.Bytes()))
307+
if err == nil && resp.StatusCode == http.StatusOK {
308+
resp.Body.Close()
258309

259310
// success
260311
return

main.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,16 @@ var opts struct {
3636
ScrapeTime time.Duration `long:"scrape-time" env:"SCRAPE_TIME" description:"Scrape time (time.duration)" default:"5m"`
3737

3838
// PagerDuty settings
39-
PagerDutyAuthToken string `long:"pagerduty.authtoken" env:"PAGERDUTY_AUTH_TOKEN" description:"PagerDuty auth token" required:"true"`
40-
PagerDutySince time.Duration `long:"pagerduty.date-range" env:"PAGERDUTY_DATE_RANGE" description:"PagerDuty date range" default:"168h"`
41-
PagerDutyMaxConnections int `long:"pagerduty.max-connections" env:"PAGERDUTY_MAX_CONNECTIONS" description:"Maximum numbers of TCP connections to PagerDuty API (concurrency)" default:"4"`
39+
PagerDutyAuthToken string `long:"pagerduty.authtoken" env:"PAGERDUTY_AUTH_TOKEN" description:"PagerDuty auth token" required:"true"`
40+
PagerDutySince time.Duration `long:"pagerduty.date-range" env:"PAGERDUTY_DATE_RANGE" description:"PagerDuty date range" default:"168h"`
41+
PagerDutyMaxConnections int `long:"pagerduty.max-connections" env:"PAGERDUTY_MAX_CONNECTIONS" description:"Maximum numbers of TCP connections to PagerDuty API (concurrency)" default:"4"`
4242

4343
// ElasticSearch settings
4444
ElasticsearchAddresses []string `long:"elasticsearch.address" env:"ELASTICSEARCH_ADDRESS" delim:" " description:"ElasticSearch urls" required:"true"`
4545
ElasticsearchIndex string `long:"elasticsearch.index" env:"ELASTICSEARCH_INDEX" description:"ElasticSearch index name" default:"pagerduty"`
46-
ElasticsearchRetryCount int `long:"elasticsearch.retry-count" env:"ELASTICSEARCH_RETRY_COUNT" description:"ElasticSearch request retry count" default:"5"`
47-
ElasticsearchRetryDelay time.Duration `long:"elasticsearch.retry-delay" env:"ELASTICSEARCH_RETRY_DELAY" description:"ElasticSearch request delay for reach retry" default:"5s"`
46+
ElasticsearchBatchCount int `long:"elasticsearch.batch-count" env:"ELASTICSEARCH_BATCH_COUNT" description:"Number of documents which should be indexed in one request" default:"50"`
47+
ElasticsearchRetryCount int `long:"elasticsearch.retry-count" env:"ELASTICSEARCH_RETRY_COUNT" description:"ElasticSearch request retry count" default:"5"`
48+
ElasticsearchRetryDelay time.Duration `long:"elasticsearch.retry-delay" env:"ELASTICSEARCH_RETRY_DELAY" description:"ElasticSearch request delay for reach retry" default:"5s"`
4849
}
4950

5051
func main() {
@@ -87,6 +88,7 @@ func main() {
8788
Addresses: opts.ElasticsearchAddresses,
8889
}
8990
exporter.ConnectElasticsearch(cfg, opts.ElasticsearchIndex)
91+
exporter.SetElasticsearchBatchCount(opts.ElasticsearchBatchCount)
9092
exporter.SetElasticsearchRetry(opts.ElasticsearchRetryCount, opts.ElasticsearchRetryDelay)
9193
exporter.Run()
9294

0 commit comments

Comments
 (0)