@@ -17,6 +17,7 @@ package stackdriver
1717import (
1818 "fmt"
1919 "net/url"
20+ "reflect"
2021 "strconv"
2122 "time"
2223
@@ -39,6 +40,7 @@ type StackdriverSink struct {
3940 project string
4041 zone string
4142 stackdriverClient * sd_api.Service
43+ requestQueue chan * sd_api.CreateTimeSeriesRequest
4244}
4345
4446type metricMetadata struct {
@@ -138,7 +140,7 @@ func (sink *StackdriverSink) Name() string {
138140}
139141
140142func (sink * StackdriverSink ) Stop () {
141- // nothing needs to be done
143+ close ( sink . requestQueue )
142144}
143145
144146func (sink * StackdriverSink ) processMetrics (metricValues map [string ]core.MetricValue ,
@@ -176,7 +178,7 @@ func (sink *StackdriverSink) ExportData(dataBatch *core.DataBatch) {
176178 for _ , ts := range timeseries {
177179 req .TimeSeries = append (req .TimeSeries , ts )
178180 if len (req .TimeSeries ) >= maxTimeseriesPerRequest {
179- sink .sendRequest ( req )
181+ sink .requestQueue <- req
180182 req = getReq ()
181183 }
182184 }
@@ -188,14 +190,14 @@ func (sink *StackdriverSink) ExportData(dataBatch *core.DataBatch) {
188190 req .TimeSeries = append (req .TimeSeries , point )
189191 }
190192 if len (req .TimeSeries ) >= maxTimeseriesPerRequest {
191- sink .sendRequest ( req )
193+ sink .requestQueue <- req
192194 req = getReq ()
193195 }
194196 }
195197 }
196198
197199 if len (req .TimeSeries ) > 0 {
198- sink .sendRequest ( req )
200+ sink .requestQueue <- req
199201 }
200202}
201203
@@ -207,6 +209,20 @@ func CreateStackdriverSink(uri *url.URL) (core.DataSink, error) {
207209 return nil , fmt .Errorf ("Host should not be set for Stackdriver sink" )
208210 }
209211
212+ opts := uri .Query ()
213+ var (
214+ workers int
215+ err error
216+ )
217+ if len (opts ["workers" ]) >= 1 {
218+ workers , err = strconv .Atoi (opts ["workers" ][0 ])
219+ if err != nil {
220+ return nil , fmt .Errorf ("Number of workers should be an integer, found: %v" , opts ["workers" ][0 ])
221+ }
222+ } else {
223+ workers = 1
224+ }
225+
210226 if err := gce_util .EnsureOnGCE (); err != nil {
211227 return nil , err
212228 }
@@ -230,28 +246,53 @@ func CreateStackdriverSink(uri *url.URL) (core.DataSink, error) {
230246 return nil , err
231247 }
232248
249+ requestQueue := make (chan * sd_api.CreateTimeSeriesRequest )
250+
233251 sink := & StackdriverSink {
234252 project : projectId ,
235253 zone : zone ,
236254 stackdriverClient : stackdriverClient ,
255+ requestQueue : requestQueue ,
237256 }
238257
239258 // Register sink metrics
240259 prometheus .MustRegister (requestsSent )
241260 prometheus .MustRegister (timeseriesSent )
242261
243- glog .Infof ("Created Stackdriver sink" )
262+ // Launch Go routines responsible for sending requests
263+ for i := 0 ; i < workers ; i ++ {
264+ go sink .requestSender (sink .requestQueue )
265+ }
266+
267+ glog .Infof ("Created Stackdriver sink, number of workers sending requests to Stackdriver: %v" , workers )
244268
245269 return sink , nil
246270}
247271
272+ func (sink * StackdriverSink ) requestSender (queue chan * sd_api.CreateTimeSeriesRequest ) {
273+ for {
274+ select {
275+ case req , active := <- queue :
276+ if ! active {
277+ return
278+ }
279+ sink .sendRequest (req )
280+ }
281+ }
282+ }
283+
248284func (sink * StackdriverSink ) sendRequest (req * sd_api.CreateTimeSeriesRequest ) {
249285 empty , err := sink .stackdriverClient .Projects .TimeSeries .Create (fullProjectName (sink .project ), req ).Do ()
250286
251287 var responseCode int
252288 if err != nil {
253289 glog .Errorf ("Error while sending request to Stackdriver %v" , err )
254- responseCode = err .(* googleapi.Error ).Code
290+ switch reflect .Indirect (reflect .ValueOf (err )).Type () {
291+ case reflect .Indirect (reflect .ValueOf (& googleapi.Error {})).Type ():
292+ responseCode = err .(* googleapi.Error ).Code
293+ default :
294+ responseCode = - 1
295+ }
255296 } else {
256297 responseCode = empty .ServerResponse .HTTPStatusCode
257298 }
0 commit comments