16
16
package com .arpnetworking .metrics .common .sources ;
17
17
18
18
import com .arpnetworking .logback .annotations .LogValue ;
19
+ import com .arpnetworking .metrics .Units ;
19
20
import com .arpnetworking .metrics .common .kafka .ConsumerListener ;
20
21
import com .arpnetworking .metrics .common .kafka .RunnableConsumer ;
21
22
import com .arpnetworking .metrics .common .kafka .RunnableConsumerImpl ;
22
23
import com .arpnetworking .metrics .common .parsers .Parser ;
23
24
import com .arpnetworking .metrics .common .parsers .exceptions .ParsingException ;
25
+ import com .arpnetworking .metrics .incubator .PeriodicMetrics ;
24
26
import com .arpnetworking .steno .LogValueMapFactory ;
25
27
import com .arpnetworking .steno .Logger ;
26
28
import com .arpnetworking .steno .LoggerFactory ;
29
+ import com .fasterxml .jackson .annotation .JacksonInject ;
30
+ import com .google .common .base .Stopwatch ;
27
31
import net .sf .oval .constraint .CheckWith ;
28
32
import net .sf .oval .constraint .CheckWithCheck ;
29
33
import net .sf .oval .constraint .Min ;
33
37
import org .apache .kafka .common .KafkaException ;
34
38
35
39
import java .time .Duration ;
40
+ import java .util .Optional ;
36
41
import java .util .concurrent .ArrayBlockingQueue ;
37
42
import java .util .concurrent .BlockingQueue ;
38
43
import java .util .concurrent .ExecutorService ;
39
44
import java .util .concurrent .Executors ;
40
45
import java .util .concurrent .TimeUnit ;
46
+ import java .util .concurrent .atomic .AtomicLong ;
41
47
42
48
/**
43
49
* Produce instances of {@link com.arpnetworking.metrics.mad.model.Record} from the values of entries
49
55
* @author Joey Jackson (jjackson at dropbox dot com)
50
56
*/
51
57
public final class KafkaSource <T , V > extends BaseSource {
52
-
53
58
private static final Logger LOGGER = LoggerFactory .getLogger (KafkaSource .class );
54
59
55
60
private final Consumer <?, V > _consumer ;
@@ -63,6 +68,18 @@ public final class KafkaSource<T, V> extends BaseSource {
63
68
private final Integer _numWorkerThreads ;
64
69
private final BlockingQueue <V > _buffer ;
65
70
private final ParsingWorker _parsingWorker = new ParsingWorker ();
71
+ private final PeriodicMetrics _periodicMetrics ;
72
+ private final AtomicLong _currentRecordsProcessedCount = new AtomicLong (0 );
73
+ private final AtomicLong _currentRecordsIngestedCount = new AtomicLong (0 );
74
+ private final String _parsingTimeMetricName = "sources/kafka/" + getMetricSafeName () + "/parsing_time" ;
75
+ // CHECKSTYLE.OFF: VisibilityModifierCheck - Package private for use in testing
76
+ final String _recordsInCountMetricName = "sources/kafka/" + getMetricSafeName () + "/records_in" ;
77
+ final String _recordsOutCountMetricName = "sources/kafka/" + getMetricSafeName () + "/records_out" ;
78
+ final String _parsingExceptionCountMetricName = "sources/kafka/" + getMetricSafeName () + "/parsing_exceptions" ;
79
+ final String _kafkaExceptionCountMetricName = "sources/kafka/" + getMetricSafeName () + "/kafka_exceptions" ;
80
+ final String _consumerExceptionCountMetricName = "sources/kafka/" + getMetricSafeName () + "/consumer_exceptions" ;
81
+ final String _queueSizeGaugeMetricName = "sources/kafka/" + getMetricSafeName () + "/queue_size" ;
82
+ // CHECKSTYLE.ON: VisibilityModifierCheck
66
83
67
84
@ Override
68
85
public void start () {
@@ -146,6 +163,13 @@ private KafkaSource(final Builder<T, V> builder, final Logger logger, final Bloc
146
163
_parserExecutor = Executors .newFixedThreadPool (_numWorkerThreads );
147
164
_shutdownAwaitTime = builder ._shutdownAwaitTime ;
148
165
_backoffTime = builder ._backoffTime ;
166
+ _periodicMetrics = builder ._periodicMetrics ;
167
+ _periodicMetrics .registerPolledMetric (periodicMetrics ->
168
+ periodicMetrics .recordCounter (_recordsOutCountMetricName ,
169
+ _currentRecordsProcessedCount .getAndSet (0 )));
170
+ _periodicMetrics .registerPolledMetric (periodicMetrics ->
171
+ periodicMetrics .recordCounter (_recordsInCountMetricName ,
172
+ _currentRecordsIngestedCount .getAndSet (0 )));
149
173
_logger = logger ;
150
174
_buffer = buffer ;
151
175
}
@@ -157,18 +181,25 @@ private class ParsingWorker implements Runnable {
157
181
public void run () {
158
182
while (_isRunning || !_buffer .isEmpty ()) { // Empty the queue before stopping the workers
159
183
final V value = _buffer .poll ();
184
+ _periodicMetrics .recordGauge (_queueSizeGaugeMetricName , _buffer .size ());
160
185
if (value != null ) {
161
186
final T record ;
162
187
try {
188
+ final Stopwatch parsingTimer = Stopwatch .createStarted ();
163
189
record = _parser .parse (value );
190
+ parsingTimer .stop ();
191
+ _periodicMetrics .recordTimer (_parsingTimeMetricName ,
192
+ parsingTimer .elapsed (TimeUnit .NANOSECONDS ), Optional .of (Units .NANOSECOND ));
164
193
} catch (final ParsingException e ) {
194
+ _periodicMetrics .recordCounter (_parsingExceptionCountMetricName , 1 );
165
195
_logger .error ()
166
196
.setMessage ("Failed to parse data" )
167
197
.setThrowable (e )
168
198
.log ();
169
- return ;
199
+ continue ;
170
200
}
171
201
KafkaSource .this .notify (record );
202
+ _currentRecordsProcessedCount .getAndIncrement ();
172
203
} else {
173
204
// Queue is empty
174
205
try {
@@ -192,6 +223,8 @@ private class LogConsumerListener implements ConsumerListener<V> {
192
223
public void handle (final ConsumerRecord <?, V > consumerRecord ) {
193
224
try {
194
225
_buffer .put (consumerRecord .value ());
226
+ _currentRecordsIngestedCount .getAndIncrement ();
227
+ _periodicMetrics .recordGauge (_queueSizeGaugeMetricName , _buffer .size ());
195
228
} catch (final InterruptedException e ) {
196
229
_logger .info ()
197
230
.setMessage ("Consumer thread interrupted" )
@@ -215,6 +248,7 @@ public void handle(final Throwable throwable) {
215
248
.log ();
216
249
_runnableConsumer .stop ();
217
250
} else if (throwable instanceof KafkaException ) {
251
+ _periodicMetrics .recordCounter (_kafkaExceptionCountMetricName , 1 );
218
252
_logger .error ()
219
253
.setMessage ("Consumer received Kafka Exception" )
220
254
.addData ("source" , KafkaSource .this )
@@ -223,6 +257,7 @@ public void handle(final Throwable throwable) {
223
257
.log ();
224
258
backoff (throwable );
225
259
} else {
260
+ _periodicMetrics .recordCounter (_consumerExceptionCountMetricName , 1 );
226
261
_logger .error ()
227
262
.setMessage ("Consumer thread error" )
228
263
.addData ("source" , KafkaSource .this )
@@ -351,6 +386,17 @@ public Builder<T, V> setBufferSize(final Integer bufferSize) {
351
386
return this ;
352
387
}
353
388
389
+ /**
390
+ * Sets {@code PeriodicMetrics} for instrumentation of {@link KafkaSource}.
391
+ *
392
+ * @param periodicMetrics The {@code PeriodicMetrics} for the {@link KafkaSource}.
393
+ * @return This instance of {@link KafkaSource.Builder}.
394
+ */
395
+ public Builder <T , V > setPeriodicMetrics (final PeriodicMetrics periodicMetrics ) {
396
+ _periodicMetrics = periodicMetrics ;
397
+ return this ;
398
+ }
399
+
354
400
@ Override
355
401
protected Builder <T , V > self () {
356
402
return this ;
@@ -375,6 +421,9 @@ protected Builder<T, V> self() {
375
421
@ NotNull
376
422
@ Min (1 )
377
423
private Integer _bufferSize = 1000 ;
424
+ @ JacksonInject
425
+ @ NotNull
426
+ private PeriodicMetrics _periodicMetrics ;
378
427
379
428
private static class PositiveDuration implements CheckWithCheck .SimpleCheck {
380
429
@ Override
0 commit comments