@@ -20,9 +20,10 @@ import (
2020 "time"
2121
2222 "github.com/golang/glog"
23- "github.com/pborman/uuid"
2423
25- "gopkg.in/olivere/elastic.v3"
24+ "errors"
25+ elastic2 "gopkg.in/olivere/elastic.v3"
26+ elastic5 "gopkg.in/olivere/elastic.v5"
2627 "os"
2728)
2829
@@ -32,10 +33,9 @@ const (
3233)
3334
3435type ElasticSearchService struct {
35- EsClient * elastic.Client
36- bulkProcessor * elastic.BulkProcessor
37- baseIndex string
38- ClusterName string
36+ EsClient * esClient
37+ baseIndex string
38+ ClusterName string
3939}
4040
4141func (esSvc * ElasticSearchService ) Index (date time.Time ) string {
@@ -46,7 +46,7 @@ func (esSvc *ElasticSearchService) IndexAlias(date time.Time, typeName string) s
4646}
4747
4848func (esSvc * ElasticSearchService ) FlushData () error {
49- return esSvc .bulkProcessor . Flush ()
49+ return esSvc .EsClient . FlushBulk ()
5050}
5151
5252// SaveDataIntoES save metrics and events to ES by using ES client
@@ -58,44 +58,63 @@ func (esSvc *ElasticSearchService) SaveData(date time.Time, typeName string, sin
5858 indexName := esSvc .Index (date )
5959
6060 // Use the IndexExists service to check if a specified index exists.
61- exists , err := esSvc .EsClient .IndexExists (indexName ). Do ()
61+ exists , err := esSvc .EsClient .IndexExists (indexName )
6262 if err != nil {
6363 return err
6464 }
65+
6566 if ! exists {
6667 // Create a new index.
67- createIndex , err := esSvc .EsClient .CreateIndex (indexName ). BodyString ( mapping ). Do ( )
68+ createIndex , err := esSvc .EsClient .CreateIndex (indexName , mapping )
6869 if err != nil {
6970 return err
7071 }
71- if ! createIndex .Acknowledged {
72- return fmt .Errorf ("Failed to create Index in ES cluster: %s" , err )
72+
73+ ack := false
74+ switch i := createIndex .(type ) {
75+ case * elastic2.IndicesCreateResult :
76+ ack = i .Acknowledged
77+ case * elastic5.IndicesCreateResult :
78+ ack = i .Acknowledged
79+ }
80+ if ! ack {
81+ return errors .New ("Failed to acknoledge index creation" )
7382 }
7483 }
7584
76- aliases , err := esSvc .EsClient .Aliases (). Index ( indexName ). Do ( )
85+ aliases , err := esSvc .EsClient .GetAliases ( indexName )
7786 if err != nil {
7887 return err
7988 }
8089 aliasName := esSvc .IndexAlias (date , typeName )
81- if ! aliases .Indices [indexName ].HasAlias (aliasName ) {
82- createAlias , err := esSvc .EsClient .Alias ().Add (indexName , esSvc .IndexAlias (date , typeName )).Do ()
90+
91+ hasAlias := false
92+ switch a := aliases .(type ) {
93+ case * elastic2.AliasesResult :
94+ hasAlias = a .Indices [indexName ].HasAlias (aliasName )
95+ case * elastic5.AliasesResult :
96+ hasAlias = a .Indices [indexName ].HasAlias (aliasName )
97+ }
98+ if ! hasAlias {
99+ createAlias , err := esSvc .EsClient .AddAlias (indexName , esSvc .IndexAlias (date , typeName ))
83100 if err != nil {
84101 return err
85102 }
86- if ! createAlias .Acknowledged {
87- return fmt .Errorf ("Failed to create Index Alias in ES cluster: %s" , err )
103+
104+ ack := false
105+ switch i := createAlias .(type ) {
106+ case * elastic2.AliasResult :
107+ ack = i .Acknowledged
108+ case * elastic5.AliasResult :
109+ ack = i .Acknowledged
110+ }
111+ if ! ack {
112+ return errors .New ("Failed to acknoledge index alias creation" )
88113 }
89114 }
90115
91116 for _ , data := range sinkData {
92- indexID := uuid .NewUUID ()
93- req := elastic .NewBulkIndexRequest ().
94- Index (indexName ).
95- Type (typeName ).
96- Id (indexID .String ()).
97- Doc (data )
98- esSvc .bulkProcessor .Add (req )
117+ esSvc .EsClient .AddBulkReq (indexName , typeName , data )
99118 }
100119
101120 return nil
@@ -111,6 +130,14 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
111130 return nil , fmt .Errorf ("Failed to parser url's query string: %s" , err )
112131 }
113132
133+ version := 5
134+ if len (opts ["ver" ]) > 0 {
135+ version , err = strconv .Atoi (opts ["ver" ][0 ])
136+ if err != nil {
137+ return nil , fmt .Errorf ("Failed to parse URL's version value into an int: %v" , err )
138+ }
139+ }
140+
114141 esSvc .ClusterName = ESClusterName
115142 if len (opts ["cluster_name" ]) > 0 {
116143 esSvc .ClusterName = opts ["cluster_name" ][0 ]
@@ -122,45 +149,53 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
122149 esSvc .baseIndex = opts ["index" ][0 ]
123150 }
124151
152+ var startupFnsV5 []elastic5.ClientOptionFunc
153+ var startupFnsV2 []elastic2.ClientOptionFunc
154+
125155 // Set the URL endpoints of the ES's nodes. Notice that when sniffing is
126156 // enabled, these URLs are used to initially sniff the cluster on startup.
127- var startupFns []elastic.ClientOptionFunc
128157 if len (opts ["nodes" ]) > 0 {
129- startupFns = append (startupFns , elastic .SetURL (opts ["nodes" ]... ))
158+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetURL (opts ["nodes" ]... ))
159+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetURL (opts ["nodes" ]... ))
130160 } else if uri .Scheme != "" && uri .Host != "" {
131- startupFns = append (startupFns , elastic .SetURL (uri .Scheme + "://" + uri .Host ))
161+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetURL (uri .Scheme + "://" + uri .Host ))
162+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetURL (uri .Scheme + "://" + uri .Host ))
132163 } else {
133- return nil , fmt . Errorf ("There is no node assigned for connecting ES cluster" )
164+ return nil , errors . New ("There is no node assigned for connecting ES cluster" )
134165 }
135166
136167 // If the ES cluster needs authentication, the username and secret
137168 // should be set in sink config.Else, set the Authenticate flag to false
138169 if len (opts ["esUserName" ]) > 0 && len (opts ["esUserSecret" ]) > 0 {
139- startupFns = append (startupFns , elastic .SetBasicAuth (opts ["esUserName" ][0 ], opts ["esUserSecret" ][0 ]))
170+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetBasicAuth (opts ["esUserName" ][0 ], opts ["esUserSecret" ][0 ]))
171+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetBasicAuth (opts ["esUserName" ][0 ], opts ["esUserSecret" ][0 ]))
140172 }
141173
142174 if len (opts ["maxRetries" ]) > 0 {
143175 maxRetries , err := strconv .Atoi (opts ["maxRetries" ][0 ])
144176 if err != nil {
145- return nil , fmt . Errorf ("Failed to parse URL's maxRetries value into an int" )
177+ return nil , errors . New ("Failed to parse URL's maxRetries value into an int" )
146178 }
147- startupFns = append (startupFns , elastic .SetMaxRetries (maxRetries ))
179+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetMaxRetries (maxRetries ))
180+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetMaxRetries (maxRetries ))
148181 }
149182
150183 if len (opts ["healthCheck" ]) > 0 {
151184 healthCheck , err := strconv .ParseBool (opts ["healthCheck" ][0 ])
152185 if err != nil {
153- return nil , fmt . Errorf ("Failed to parse URL's healthCheck value into a bool" )
186+ return nil , errors . New ("Failed to parse URL's healthCheck value into a bool" )
154187 }
155- startupFns = append (startupFns , elastic .SetHealthcheck (healthCheck ))
188+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetHealthcheck (healthCheck ))
189+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetHealthcheck (healthCheck ))
156190 }
157191
158192 if len (opts ["startupHealthcheckTimeout" ]) > 0 {
159193 timeout , err := time .ParseDuration (opts ["startupHealthcheckTimeout" ][0 ] + "s" )
160194 if err != nil {
161195 return nil , fmt .Errorf ("Failed to parse URL's startupHealthcheckTimeout: %s" , err .Error ())
162196 }
163- startupFns = append (startupFns , elastic .SetHealthcheckTimeoutStartup (timeout ))
197+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetHealthcheckTimeoutStartup (timeout ))
198+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetHealthcheckTimeoutStartup (timeout ))
164199 }
165200
166201 if os .Getenv ("AWS_ACCESS_KEY_ID" ) != "" || os .Getenv ("AWS_ACCESS_KEY" ) != "" ||
@@ -172,58 +207,45 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
172207 return nil , err
173208 }
174209
175- startupFns = append (startupFns , elastic .SetHttpClient (awsClient ), elastic .SetSniff (false ))
210+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetHttpClient (awsClient ), elastic2 .SetSniff (false ))
211+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetHttpClient (awsClient ), elastic5 .SetSniff (false ))
176212 } else {
177213 if len (opts ["sniff" ]) > 0 {
178214 sniff , err := strconv .ParseBool (opts ["sniff" ][0 ])
179215 if err != nil {
180- return nil , fmt . Errorf ("Failed to parse URL's sniff value into a bool" )
216+ return nil , errors . New ("Failed to parse URL's sniff value into a bool" )
181217 }
182- startupFns = append (startupFns , elastic .SetSniff (sniff ))
218+ startupFnsV2 = append (startupFnsV2 , elastic2 .SetSniff (sniff ))
219+ startupFnsV5 = append (startupFnsV5 , elastic5 .SetSniff (sniff ))
183220 }
184221 }
185222
186- esSvc .EsClient , err = elastic .NewClient (startupFns ... )
187- if err != nil {
188- return nil , fmt .Errorf ("Failed to create ElasticSearch client: %v" , err )
189- }
190-
191223 bulkWorkers := 5
192224 if len (opts ["bulkWorkers" ]) > 0 {
193225 bulkWorkers , err = strconv .Atoi (opts ["bulkWorkers" ][0 ])
194226 if err != nil {
195- return nil , fmt . Errorf ("Failed to parse URL's bulkWorkers value into an int" )
227+ return nil , errors . New ("Failed to parse URL's bulkWorkers value into an int" )
196228 }
197229 }
198- esSvc .bulkProcessor , err = esSvc .EsClient .BulkProcessor ().
199- Name ("ElasticSearchWorker" ).
200- Workers (bulkWorkers ).
201- After (bulkAfterCB ).
202- BulkActions (1000 ). // commit if # requests >= 1000
203- BulkSize (2 << 20 ). // commit if size of requests >= 2 MB
204- FlushInterval (10 * time .Second ). // commit every 10s
205- Do ()
230+
231+ pipeline := ""
232+ if len (opts ["pipeline" ]) > 0 {
233+ pipeline = opts ["pipeline" ][0 ]
234+ }
235+
236+ switch version {
237+ case 2 :
238+ esSvc .EsClient , err = newEsClientV2 (startupFnsV2 , bulkWorkers )
239+ case 5 :
240+ esSvc .EsClient , err = newEsClientV5 (startupFnsV5 , bulkWorkers , pipeline )
241+ default :
242+ return nil , UnsupportedVersion {}
243+ }
206244 if err != nil {
207- return nil , fmt .Errorf ("Failed to an ElasticSearch Bulk Processor : %v" , err )
245+ return nil , fmt .Errorf ("Failed to create ElasticSearch client : %v" , err )
208246 }
209247
210248 glog .V (2 ).Infof ("ElasticSearch sink configure successfully" )
211249
212250 return & esSvc , nil
213251}
214-
215- func bulkAfterCB (executionId int64 , requests []elastic.BulkableRequest , response * elastic.BulkResponse , err error ) {
216- if err != nil {
217- glog .Warningf ("Failed to execute bulk operation to ElasticSearch: %v" , err )
218- }
219-
220- if response .Errors {
221- for _ , list := range response .Items {
222- for name , itm := range list {
223- if itm .Error != nil {
224- glog .V (3 ).Infof ("Failed to execute bulk operation to ElasticSearch on %s: %v" , name , itm .Error )
225- }
226- }
227- }
228- }
229- }
0 commit comments