Skip to content

Commit 9130e64

Browse files
joeyjacksonvjkoskela
authored andcommitted
Kafka source poll timeout instead of backoff (#164)
* kafka source init * kafka consumer wrapper and listener * unit test and checkstyle * exception handling * unit tests for kafka source * interfaces and wrappers for kafka source * removed kafka dep * integration tests for kafka source * config deserialization * Fixed deserializer * Fixed deserializer * styling * kafka docker * pipeline config example * style * error checking * error checking * integration test kafka source from config * style * added parser to kafka source * example pipeline * Fail integration test on send fail to kafka server * requested changes * requested changes * configurable backoff time for kafka source * fixed conf deserializer * concurrent parsing workers * multi worker unit test * queue holds record values instead of records * style * instrument init * todo * mock observer for multithreaded testing * configurable buffer queue size * moved fill queue integration test to unit test * style * ensure queue fills in queue filled test * refactor kafka source constructors * style * fix injector in integration tests * instrumentation testing init * unit tests for instrumentation counter * unit test gauge metric * more instrumentation metrics * remove prinln * new metric names * metrics unit tests * requested changes * nonnull annotate * Better kafka source worker polling
1 parent 964c02e commit 9130e64

File tree

1 file changed

+26
-29
lines changed

1 file changed

+26
-29
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/KafkaSource.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -181,36 +181,33 @@ private class ParsingWorker implements Runnable {
181181
@Override
182182
public void run() {
183183
while (_isRunning || !_buffer.isEmpty()) { // Empty the queue before stopping the workers
184-
final V value = _buffer.poll();
185-
_periodicMetrics.recordGauge(_queueSizeGaugeMetricName, _buffer.size());
186-
if (value != null) {
187-
final List<Record> records;
188-
try {
189-
final Stopwatch parsingTimer = Stopwatch.createStarted();
190-
records = _parser.parse(value);
191-
parsingTimer.stop();
192-
_periodicMetrics.recordTimer(_parsingTimeMetricName,
193-
parsingTimer.elapsed(TimeUnit.NANOSECONDS), Optional.of(Units.NANOSECOND));
194-
} catch (final ParsingException e) {
195-
_periodicMetrics.recordCounter(_parsingExceptionCountMetricName, 1);
196-
_logger.error()
197-
.setMessage("Failed to parse data")
198-
.setThrowable(e)
199-
.log();
200-
continue;
201-
}
202-
for (final Record record : records) {
203-
KafkaSource.this.notify(record);
204-
_currentRecordsProcessedCount.getAndIncrement();
205-
}
206-
} else {
207-
// Queue is empty
208-
try {
209-
Thread.sleep(_backoffTime.toMillis());
210-
} catch (final InterruptedException e) {
211-
Thread.currentThread().interrupt();
212-
stop();
184+
try {
185+
final V value = _buffer.poll(_backoffTime.toMillis(), TimeUnit.MILLISECONDS);
186+
_periodicMetrics.recordGauge(_queueSizeGaugeMetricName, _buffer.size());
187+
if (value != null) {
188+
final List<Record> records;
189+
try {
190+
final Stopwatch parsingTimer = Stopwatch.createStarted();
191+
records = _parser.parse(value);
192+
parsingTimer.stop();
193+
_periodicMetrics.recordTimer(_parsingTimeMetricName,
194+
parsingTimer.elapsed(TimeUnit.NANOSECONDS), Optional.of(Units.NANOSECOND));
195+
} catch (final ParsingException e) {
196+
_periodicMetrics.recordCounter(_parsingExceptionCountMetricName, 1);
197+
_logger.error()
198+
.setMessage("Failed to parse data")
199+
.setThrowable(e)
200+
.log();
201+
continue;
202+
}
203+
for (final Record record : records) {
204+
KafkaSource.this.notify(record);
205+
_currentRecordsProcessedCount.getAndIncrement();
206+
}
213207
}
208+
} catch (final InterruptedException e) {
209+
Thread.currentThread().interrupt();
210+
stop();
214211
}
215212
}
216213
}

0 commit comments

Comments
 (0)