@@ -89,9 +89,11 @@ def dr_cb(self, err, msg):
8989 else :
9090 self .dr_cnt += 1
9191 self .dd_incr ("producer.drok" , 1 )
92+ self .dd_gauge ("producer.latency" , msg .latency (),
93+ tags = ["partition:{}" .format (msg .partition ())])
9294 if (self .dr_cnt % self .disprate ) == 0 :
93- self .logger .debug ("producer: delivered message to {} [{}] at offset {}" .format (
94- msg .topic (), msg .partition (), msg .offset ()))
95+ self .logger .debug ("producer: delivered message to {} [{}] at offset {} in {}s " .format (
96+ msg .topic (), msg .partition (), msg .offset (), msg . latency () ))
9597
9698 def produce_record (self ):
9799 """ Asynchronously produce a single record, but block and
@@ -236,7 +238,8 @@ def consumer_run(self):
236238 txtime = headers .get ('time' , None )
237239 if txtime is not None :
238240 latency = time .time () - float (txtime )
239- self .dd_gauge ("consumer.e2e_latency" , latency )
241+ self .dd_gauge ("consumer.e2e_latency" , latency ,
242+ tags = ["partition:{}" .format (msg .partition ())])
240243 else :
241244 latency = None
242245
@@ -310,6 +313,25 @@ def producer_error_cb(self, err):
310313 self .producer_error_cb_cnt += 1
311314 self .dd_incr ("producer.errorcb" , 1 )
312315
316+ def rtt_stats (self , d ):
317+ """ Extract broker rtt statistics from the stats dict in @param d """
318+
319+ # Get leader RTT stats
320+ for broker in d ['brokers' ].values ():
321+ if broker ['toppars' ] is None :
322+ continue
323+
324+ parts = ',' .join ([str (x ['partition' ]) for x in broker ['toppars' ].values ()])
325+
326+ tags = ["broker:{}" .format (broker ['nodeid' ]),
327+ "partitions:{}" .format (parts ),
328+ "type:{}" .format (d ['type' ])]
329+
330+ self .dd_gauge ("broker.rtt.p99" ,
331+ float (broker ['rtt' ]['p99' ]) / 1000000.0 , tags = tags )
332+ self .dd_gauge ("broker.rtt.avg" ,
333+ float (broker ['rtt' ]['avg' ]) / 1000000.0 , tags = tags )
334+
313335 def stats_cb (self , json_str ):
314336 """ Common statistics callback. """
315337 d = json .loads (json_str )
@@ -332,6 +354,8 @@ def stats_cb(self, json_str):
332354 if (self .stats_cnt [d ['type' ]] % 11 ) == 0 :
333355 self .logger .info ("{} raw stats: {}" .format (d ['name' ], json_str ))
334356
357+ self .rtt_stats (d )
358+
335359 # Sample the producer queue length
336360 if d ['type' ] == 'producer' :
337361 self .dd_gauge ("producer.outq" , len (self .producer ))
@@ -451,9 +475,10 @@ def dd_incr(self, metric_name, incrval):
451475 """ Increment datadog metric counter by incrval """
452476 self .dd .increment (self .DD_PFX + metric_name , incrval , host = self .hostname )
453477
454- def dd_gauge (self , metric_name , val ):
478+ def dd_gauge (self , metric_name , val , tags = None ):
455479 """ Set datadog metric gauge to val """
456- self .dd .gauge (self .DD_PFX + metric_name , val , host = self .hostname )
480+ self .dd .gauge (self .DD_PFX + metric_name , val ,
481+ tags = tags , host = self .hostname )
457482
458483 def calc_rusage_deltas (self , curr , prev , elapsed ):
459484 """ Calculate deltas between previous and current resource usage """
0 commit comments