Skip to content

Commit 840ad11

Browse files
joeyjacksoncwbriones
authored andcommitted
Kafka record list (#162)
* kafka source init * kafka consumer wrapper and listener * unit test and checkstyle * exception handling * unit tests for kafka source * interfaces and wrappers for kafka source * removed kafka dep * integration tests for kafka source * config deserialization * Fixed deserializer * Fixed deserializer * styling * kafka docker * pipeline config example * style * error checking * error checking * integration test kafka source from config * style * added parser to kafka source * example pipeline * Fail integration test on send fail to kafka server * requested changes * requested changes * configurable backoff time for kafka source * fixed conf deserializer * concurrent parsing workers * multi worker unit test * queue holds record values instead of records * style * instrument init * todo * mock observer for multithreaded testing * configurable buffer queue size * moved fill queue integration test to unit test * style * ensure queue fills in queue filled test * refactor kafka source constructors * style * fix injector in integration tests * instrumentation testing init * unit tests for instrumentation counter * unit test gauge metric * more instrumentation metrics * remove prinln * new metric names * metrics unit tests * requested changes * nonnull annotate * Make kafka source parser output list of Records
1 parent 702b5fe commit 840ad11

File tree

5 files changed

+124
-83
lines changed

5 files changed

+124
-83
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/KafkaSource.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.arpnetworking.metrics.common.parsers.Parser;
2424
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
2525
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
26+
import com.arpnetworking.metrics.mad.model.Record;
2627
import com.arpnetworking.steno.LogValueMapFactory;
2728
import com.arpnetworking.steno.Logger;
2829
import com.arpnetworking.steno.LoggerFactory;
@@ -37,6 +38,7 @@
3738
import org.apache.kafka.common.KafkaException;
3839

3940
import java.time.Duration;
41+
import java.util.List;
4042
import java.util.Optional;
4143
import java.util.concurrent.ArrayBlockingQueue;
4244
import java.util.concurrent.BlockingQueue;
@@ -49,19 +51,18 @@
4951
* Produce instances of {@link com.arpnetworking.metrics.mad.model.Record} from the values of entries
5052
* from a Kafka topic. The key from the entries gets discarded
5153
*
52-
* @param <T> the type of data created by the source
5354
* @param <V> the type of data of value in kafka {@code ConsumerRecords}
5455
*
5556
* @author Joey Jackson (jjackson at dropbox dot com)
5657
*/
57-
public final class KafkaSource<T, V> extends BaseSource {
58+
public final class KafkaSource<V> extends BaseSource {
5859
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);
5960

6061
private final Consumer<?, V> _consumer;
6162
private final RunnableConsumer _runnableConsumer;
6263
private final ExecutorService _consumerExecutor;
6364
private final ExecutorService _parserExecutor;
64-
private final Parser<T, V> _parser;
65+
private final Parser<List<Record>, V> _parser;
6566
private final Logger _logger;
6667
private final Duration _shutdownAwaitTime;
6768
private final Duration _backoffTime;
@@ -135,21 +136,21 @@ public String toString() {
135136
}
136137

137138
@SuppressWarnings("unused")
138-
private KafkaSource(final Builder<T, V> builder) {
139+
private KafkaSource(final Builder<V> builder) {
139140
this(builder, LOGGER, new ArrayBlockingQueue<>(builder._bufferSize));
140141
}
141142

142143
// NOTE: Package private for testing
143-
/* package private */ KafkaSource(final Builder<T, V> builder, final Logger logger) {
144+
/* package private */ KafkaSource(final Builder<V> builder, final Logger logger) {
144145
this(builder, logger, new ArrayBlockingQueue<>(builder._bufferSize));
145146
}
146147

147148
// NOTE: Package private for testing
148-
/* package private */ KafkaSource(final Builder<T, V> builder, final BlockingQueue<V> buffer) {
149+
/* package private */ KafkaSource(final Builder<V> builder, final BlockingQueue<V> buffer) {
149150
this(builder, LOGGER, buffer);
150151
}
151152

152-
private KafkaSource(final Builder<T, V> builder, final Logger logger, final BlockingQueue<V> buffer) {
153+
private KafkaSource(final Builder<V> builder, final Logger logger, final BlockingQueue<V> buffer) {
153154
super(builder);
154155
_consumer = builder._consumer;
155156
_parser = builder._parser;
@@ -183,10 +184,10 @@ public void run() {
183184
final V value = _buffer.poll();
184185
_periodicMetrics.recordGauge(_queueSizeGaugeMetricName, _buffer.size());
185186
if (value != null) {
186-
final T record;
187+
final List<Record> records;
187188
try {
188189
final Stopwatch parsingTimer = Stopwatch.createStarted();
189-
record = _parser.parse(value);
190+
records = _parser.parse(value);
190191
parsingTimer.stop();
191192
_periodicMetrics.recordTimer(_parsingTimeMetricName,
192193
parsingTimer.elapsed(TimeUnit.NANOSECONDS), Optional.of(Units.NANOSECOND));
@@ -198,8 +199,10 @@ record = _parser.parse(value);
198199
.log();
199200
continue;
200201
}
201-
KafkaSource.this.notify(record);
202-
_currentRecordsProcessedCount.getAndIncrement();
202+
for (final Record record : records) {
203+
KafkaSource.this.notify(record);
204+
_currentRecordsProcessedCount.getAndIncrement();
205+
}
203206
} else {
204207
// Queue is empty
205208
try {
@@ -289,12 +292,11 @@ private void backoff(final Throwable throwable) {
289292
/**
290293
* Builder pattern class for {@link KafkaSource}.
291294
*
292-
* @param <T> the type of data created by the source
293295
* @param <V> the type of data of value in kafka {@code ConsumerRecords}
294296
*
295297
* @author Joey Jackson (jjackson at dropbox dot com)
296298
*/
297-
public static final class Builder<T, V> extends BaseSource.Builder<Builder<T, V>, KafkaSource<T, V>> {
299+
public static final class Builder<V> extends BaseSource.Builder<Builder<V>, KafkaSource<V>> {
298300

299301
/**
300302
* Public constructor.
@@ -309,7 +311,7 @@ public Builder() {
309311
* @param consumer The {@code Consumer}.
310312
* @return This instance of {@link KafkaSource.Builder}.
311313
*/
312-
public Builder<T, V> setConsumer(final Consumer<?, V> consumer) {
314+
public Builder<V> setConsumer(final Consumer<?, V> consumer) {
313315
_consumer = consumer;
314316
return this;
315317
}
@@ -320,7 +322,7 @@ public Builder<T, V> setConsumer(final Consumer<?, V> consumer) {
320322
* @param value The {@link Parser}.
321323
* @return This instance of {@link KafkaSource.Builder}.
322324
*/
323-
public Builder<T, V> setParser(final Parser<T, V> value) {
325+
public Builder<V> setParser(final Parser<List<Record>, V> value) {
324326
_parser = value;
325327
return this;
326328
}
@@ -331,7 +333,7 @@ public Builder<T, V> setParser(final Parser<T, V> value) {
331333
* @param pollTime The {@code Duration} of each poll call made by the {@code KafkaConsumer}.
332334
* @return This instance of {@link KafkaSource.Builder}.
333335
*/
334-
public Builder<T, V> setPollTime(final Duration pollTime) {
336+
public Builder<V> setPollTime(final Duration pollTime) {
335337
_pollTime = pollTime;
336338
return this;
337339
}
@@ -344,7 +346,7 @@ public Builder<T, V> setPollTime(final Duration pollTime) {
344346
* the {@link RunnableConsumer} thread.
345347
* @return This instance of {@link KafkaSource.Builder}.
346348
*/
347-
public Builder<T, V> setShutdownAwaitTime(final Duration shutdownAwaitTime) {
349+
public Builder<V> setShutdownAwaitTime(final Duration shutdownAwaitTime) {
348350
_shutdownAwaitTime = shutdownAwaitTime;
349351
return this;
350352
}
@@ -357,7 +359,7 @@ public Builder<T, V> setShutdownAwaitTime(final Duration shutdownAwaitTime) {
357359
* an operation on exception.
358360
* @return This instance of {@link KafkaSource.Builder}.
359361
*/
360-
public Builder<T, V> setBackoffTime(final Duration backoffTime) {
362+
public Builder<V> setBackoffTime(final Duration backoffTime) {
361363
_backoffTime = backoffTime;
362364
return this;
363365
}
@@ -369,7 +371,7 @@ public Builder<T, V> setBackoffTime(final Duration backoffTime) {
369371
* @param numWorkerThreads The number of parsing worker threads.
370372
* @return This instance of {@link KafkaSource.Builder}.
371373
*/
372-
public Builder<T, V> setNumWorkerThreads(final Integer numWorkerThreads) {
374+
public Builder<V> setNumWorkerThreads(final Integer numWorkerThreads) {
373375
_numWorkerThreads = numWorkerThreads;
374376
return this;
375377
}
@@ -381,7 +383,7 @@ public Builder<T, V> setNumWorkerThreads(final Integer numWorkerThreads) {
381383
* @param bufferSize The size of the buffer.
382384
* @return This instance of {@link KafkaSource.Builder}.
383385
*/
384-
public Builder<T, V> setBufferSize(final Integer bufferSize) {
386+
public Builder<V> setBufferSize(final Integer bufferSize) {
385387
_bufferSize = bufferSize;
386388
return this;
387389
}
@@ -392,20 +394,20 @@ public Builder<T, V> setBufferSize(final Integer bufferSize) {
392394
* @param periodicMetrics The {@code PeriodicMetrics} for the {@link KafkaSource}.
393395
* @return This instance of {@link KafkaSource.Builder}.
394396
*/
395-
public Builder<T, V> setPeriodicMetrics(final PeriodicMetrics periodicMetrics) {
397+
public Builder<V> setPeriodicMetrics(final PeriodicMetrics periodicMetrics) {
396398
_periodicMetrics = periodicMetrics;
397399
return this;
398400
}
399401

400402
@Override
401-
protected Builder<T, V> self() {
403+
protected Builder<V> self() {
402404
return this;
403405
}
404406

405407
@NotNull
406408
private Consumer<?, V> _consumer;
407409
@NotNull
408-
private Parser<T, V> _parser;
410+
private Parser<List<Record>, V> _parser;
409411
@NotNull
410412
@CheckWith(value = PositiveDuration.class, message = "Poll duration must be positive.")
411413
private Duration _pollTime;

src/test/java/com/arpnetworking/metrics/common/integration/KafkaIT.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.arpnetworking.metrics.common.kafka.ConsumerDeserializer;
2121
import com.arpnetworking.metrics.common.sources.KafkaSource;
2222
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
23-
import com.arpnetworking.test.StringParser;
23+
import com.arpnetworking.test.StringToRecordParser;
2424
import com.fasterxml.jackson.core.type.TypeReference;
2525
import com.fasterxml.jackson.databind.ObjectMapper;
2626
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
@@ -84,7 +84,7 @@ public class KafkaIT {
8484
private Map<String, Object> _consumerProps;
8585
private KafkaConsumer<Integer, String> _consumer;
8686
private String _topicName;
87-
private KafkaSource<String, String> _source;
87+
private KafkaSource<String> _source;
8888
private List<ProducerRecord<Integer, String>> _producerRecords;
8989
private PeriodicMetrics _periodicMetrics;
9090

@@ -109,9 +109,9 @@ public void testKafkaSourceSingleObserver() {
109109
// Create Kafka Source
110110
_consumer = new KafkaConsumer<>(_consumerProps);
111111
_consumer.subscribe(Collections.singletonList(_topicName));
112-
_source = new KafkaSource.Builder<String, String>()
112+
_source = new KafkaSource.Builder<String>()
113113
.setName("KafkaSource")
114-
.setParser(new StringParser())
114+
.setParser(new StringToRecordParser())
115115
.setConsumer(_consumer)
116116
.setPollTime(POLL_DURATION)
117117
.setPeriodicMetrics(_periodicMetrics)
@@ -122,7 +122,8 @@ public void testKafkaSourceSingleObserver() {
122122
_source.attach(observer);
123123
_source.start();
124124
for (ProducerRecord<Integer, String> expected : _producerRecords) {
125-
Mockito.verify(observer, Mockito.timeout(TIMEOUT)).notify(_source, expected.value());
125+
Mockito.verify(observer, Mockito.timeout(TIMEOUT)).notify(_source,
126+
StringToRecordParser.recordWithID(expected.value()));
126127
}
127128
}
128129

@@ -131,9 +132,9 @@ public void testKafkaSourceMultipleObservers() {
131132
// Create Kafka Source
132133
_consumer = new KafkaConsumer<>(_consumerProps);
133134
_consumer.subscribe(Collections.singletonList(_topicName));
134-
_source = new KafkaSource.Builder<String, String>()
135+
_source = new KafkaSource.Builder<String>()
135136
.setName("KafkaSource")
136-
.setParser(new StringParser())
137+
.setParser(new StringToRecordParser())
137138
.setConsumer(_consumer)
138139
.setPollTime(POLL_DURATION)
139140
.setPeriodicMetrics(_periodicMetrics)
@@ -146,8 +147,10 @@ public void testKafkaSourceMultipleObservers() {
146147
_source.attach(observer2);
147148
_source.start();
148149
for (ProducerRecord<Integer, String> expected : _producerRecords) {
149-
Mockito.verify(observer1, Mockito.timeout(TIMEOUT)).notify(_source, expected.value());
150-
Mockito.verify(observer2, Mockito.timeout(TIMEOUT)).notify(_source, expected.value());
150+
Mockito.verify(observer1, Mockito.timeout(TIMEOUT)).notify(_source,
151+
StringToRecordParser.recordWithID(expected.value()));
152+
Mockito.verify(observer2, Mockito.timeout(TIMEOUT)).notify(_source,
153+
StringToRecordParser.recordWithID(expected.value()));
151154
}
152155
}
153156

@@ -171,7 +174,7 @@ public void testKafkaSourceFromConfig() throws IOException {
171174
+ "\n }"
172175
+ "\n },"
173176
+ "\n \"parser\":{"
174-
+ "\n \"type\":\"com.arpnetworking.test.StringParser\""
177+
+ "\n \"type\":\"com.arpnetworking.test.StringToRecordParser\""
175178
+ "\n },"
176179
+ "\n \"backoffTime\":\"PT1S\""
177180
+ "\n}";
@@ -204,7 +207,8 @@ protected void configure() {
204207
_source.attach(observer);
205208
_source.start();
206209
for (ProducerRecord<Integer, String> expected : _producerRecords) {
207-
Mockito.verify(observer, Mockito.timeout(TIMEOUT)).notify(_source, expected.value());
210+
Mockito.verify(observer, Mockito.timeout(TIMEOUT)).notify(_source,
211+
StringToRecordParser.recordWithID(expected.value()));
208212
}
209213
}
210214

@@ -213,9 +217,9 @@ public void testKafkaSourceMultipleWorkerThreads() {
213217
// Create Kafka Source
214218
_consumer = new KafkaConsumer<>(_consumerProps);
215219
_consumer.subscribe(Collections.singletonList(_topicName));
216-
_source = new KafkaSource.Builder<String, String>()
220+
_source = new KafkaSource.Builder<String>()
217221
.setName("KafkaSource")
218-
.setParser(new StringParser())
222+
.setParser(new StringToRecordParser())
219223
.setConsumer(_consumer)
220224
.setPollTime(POLL_DURATION)
221225
.setNumWorkerThreads(4)
@@ -231,7 +235,7 @@ public void testKafkaSourceMultipleWorkerThreads() {
231235
Mockito.verify(observer, Mockito.timeout(TIMEOUT).times(NUM_RECORDS)).notify(Mockito.any(), captor.capture());
232236
Assert.assertEquals(
233237
_producerRecords.stream().map(ProducerRecord::value).sorted().collect(Collectors.toList()),
234-
captor.getAllValues().stream().sorted().collect(Collectors.toList())
238+
captor.getAllValues().stream().map(StringToRecordParser::recordID).sorted().collect(Collectors.toList())
235239
);
236240
}
237241

@@ -318,5 +322,5 @@ private void setupKafka() {
318322
*
319323
* @author Joey Jackson (jjackson at dropbox dot com)
320324
*/
321-
private static class KafkaSourceStringType extends TypeReference<KafkaSource<String, String>> {}
325+
private static class KafkaSourceStringType extends TypeReference<KafkaSource<String>> {}
322326
}

0 commit comments

Comments
 (0)