Skip to content

Commit 1b558bc

Browse files
joeyjacksonspeezepearson
authored andcommitted
Kafka concurrent (#159)
* kafka source init * kafka consumer wrapper and listener * unit test and checkstyle * exception handling * unit tests for kafka source * interfaces and wrappers for kafka source * removed kafka dep * integration tests for kafka source * config deserialization * Fixed deserializer * Fixed deserializer * styling * kafka docker * pipeline config example * style * error checking * error checking * integration test kafka source from config * style * added parser to kafka source * example pipeline * Fail integration test on send fail to kafka server * requested changes * requested changes * configurable backoff time for kafka source * fixed conf deserializer * concurrent parsing workers * multi worker unit test * queue holds record values instead of records * style * mock observer for multithreaded testing * configurable buffer queue size * moved fill queue integration test to unit test * style * ensure queue fills in queue filled test * refactor kafka source constructors
1 parent fcb58a0 commit 1b558bc

File tree

4 files changed

+262
-60
lines changed

4 files changed

+262
-60
lines changed

src/main/java/com/arpnetworking/metrics/common/kafka/RunnableConsumerImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public void run() {
7373
_listener.handle(e);
7474
}
7575
}
76-
_consumer.close();
7776
}
7877

7978
@Override

src/main/java/com/arpnetworking/metrics/common/sources/KafkaSource.java

Lines changed: 116 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,21 @@
2626
import com.arpnetworking.steno.LoggerFactory;
2727
import net.sf.oval.constraint.CheckWith;
2828
import net.sf.oval.constraint.CheckWithCheck;
29+
import net.sf.oval.constraint.Min;
2930
import net.sf.oval.constraint.NotNull;
3031
import org.apache.kafka.clients.consumer.Consumer;
3132
import org.apache.kafka.clients.consumer.ConsumerRecord;
3233
import org.apache.kafka.common.KafkaException;
3334

3435
import java.time.Duration;
36+
import java.util.concurrent.ArrayBlockingQueue;
37+
import java.util.concurrent.BlockingQueue;
3538
import java.util.concurrent.ExecutorService;
3639
import java.util.concurrent.Executors;
3740
import java.util.concurrent.TimeUnit;
3841

3942
/**
40-
* Produce instances of {@code Record} from the values of entries
43+
* Produce instances of {@link com.arpnetworking.metrics.mad.model.Record} from the values of entries
4144
* from a Kafka topic. The key from the entries gets discarded
4245
*
4346
* @param <T> the type of data created by the source
@@ -52,28 +55,47 @@ public final class KafkaSource<T, V> extends BaseSource {
5255
private final Consumer<?, V> _consumer;
5356
private final RunnableConsumer _runnableConsumer;
5457
private final ExecutorService _consumerExecutor;
58+
private final ExecutorService _parserExecutor;
5559
private final Parser<T, V> _parser;
5660
private final Logger _logger;
5761
private final Duration _shutdownAwaitTime;
5862
private final Duration _backoffTime;
63+
private final Integer _numWorkerThreads;
64+
private final BlockingQueue<V> _buffer;
65+
private final ParsingWorker _parsingWorker = new ParsingWorker();
5966

6067
@Override
6168
public void start() {
6269
_consumerExecutor.execute(_runnableConsumer);
70+
for (int i = 0; i < _numWorkerThreads; i++) {
71+
_parserExecutor.execute(_parsingWorker);
72+
}
6373
}
6474

6575
@Override
6676
public void stop() {
6777
_runnableConsumer.stop();
68-
6978
_consumerExecutor.shutdown();
7079
try {
7180
_consumerExecutor.awaitTermination(_shutdownAwaitTime.toMillis(), TimeUnit.MILLISECONDS);
7281
} catch (final InterruptedException e) {
73-
LOGGER.warn()
82+
_logger.warn()
7483
.setMessage("Unable to shutdown kafka consumer executor")
7584
.setThrowable(e)
7685
.log();
86+
} finally {
87+
_consumer.close();
88+
}
89+
90+
_parsingWorker.stop();
91+
_parserExecutor.shutdown();
92+
try {
93+
_parserExecutor.awaitTermination(_shutdownAwaitTime.toMillis(), TimeUnit.MILLISECONDS);
94+
} catch (final InterruptedException e) {
95+
_logger.warn()
96+
.setMessage("Unable to shutdown parsing worker executor")
97+
.setThrowable(e)
98+
.log();
7799
}
78100
}
79101

@@ -97,39 +119,88 @@ public String toString() {
97119

98120
@SuppressWarnings("unused")
99121
private KafkaSource(final Builder<T, V> builder) {
100-
this(builder, LOGGER);
122+
this(builder, LOGGER, new ArrayBlockingQueue<>(builder._bufferSize));
101123
}
102124

125+
// NOTE: Package private for testing
103126
/* package private */ KafkaSource(final Builder<T, V> builder, final Logger logger) {
127+
this(builder, logger, new ArrayBlockingQueue<>(builder._bufferSize));
128+
}
129+
130+
// NOTE: Package private for testing
131+
/* package private */ KafkaSource(final Builder<T, V> builder, final BlockingQueue<V> buffer) {
132+
this(builder, LOGGER, buffer);
133+
}
134+
135+
private KafkaSource(final Builder<T, V> builder, final Logger logger, final BlockingQueue<V> buffer) {
104136
super(builder);
105-
_logger = logger;
106137
_consumer = builder._consumer;
107138
_parser = builder._parser;
108139
_runnableConsumer = new RunnableConsumerImpl.Builder<V>()
109140
.setConsumer(builder._consumer)
110141
.setListener(new LogConsumerListener())
111142
.setPollTime(builder._pollTime)
112143
.build();
144+
_numWorkerThreads = builder._numWorkerThreads;
113145
_consumerExecutor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "KafkaConsumer"));
146+
_parserExecutor = Executors.newFixedThreadPool(_numWorkerThreads);
114147
_shutdownAwaitTime = builder._shutdownAwaitTime;
115148
_backoffTime = builder._backoffTime;
149+
_logger = logger;
150+
_buffer = buffer;
151+
}
152+
153+
private class ParsingWorker implements Runnable {
154+
private volatile boolean _isRunning = true;
155+
156+
@Override
157+
public void run() {
158+
while (_isRunning || !_buffer.isEmpty()) { // Empty the queue before stopping the workers
159+
final V value = _buffer.poll();
160+
if (value != null) {
161+
final T record;
162+
try {
163+
record = _parser.parse(value);
164+
} catch (final ParsingException e) {
165+
_logger.error()
166+
.setMessage("Failed to parse data")
167+
.setThrowable(e)
168+
.log();
169+
return;
170+
}
171+
KafkaSource.this.notify(record);
172+
} else {
173+
// Queue is empty
174+
try {
175+
Thread.sleep(_backoffTime.toMillis());
176+
} catch (final InterruptedException e) {
177+
Thread.currentThread().interrupt();
178+
stop();
179+
}
180+
}
181+
}
182+
}
183+
184+
public void stop() {
185+
_isRunning = false;
186+
}
116187
}
117188

118189
private class LogConsumerListener implements ConsumerListener<V> {
119190

120191
@Override
121192
public void handle(final ConsumerRecord<?, V> consumerRecord) {
122-
final T record;
123193
try {
124-
record = _parser.parse(consumerRecord.value());
125-
} catch (final ParsingException e) {
126-
_logger.error()
127-
.setMessage("Failed to parse data")
194+
_buffer.put(consumerRecord.value());
195+
} catch (final InterruptedException e) {
196+
_logger.info()
197+
.setMessage("Consumer thread interrupted")
198+
.addData("source", KafkaSource.this)
199+
.addData("action", "stopping")
128200
.setThrowable(e)
129201
.log();
130-
return;
202+
_runnableConsumer.stop();
131203
}
132-
KafkaSource.this.notify(record);
133204
}
134205

135206
@Override
@@ -181,7 +252,7 @@ private void backoff(final Throwable throwable) {
181252
}
182253

183254
/**
184-
* Builder pattern class for {@code KafkaSource}.
255+
* Builder pattern class for {@link KafkaSource}.
185256
*
186257
* @param <T> the type of data created by the source
187258
* @param <V> the type of data of value in kafka {@code ConsumerRecords}
@@ -231,11 +302,11 @@ public Builder<T, V> setPollTime(final Duration pollTime) {
231302
}
232303

233304
/**
234-
* Sets the amount of time the {@link KafkaSource} will wait to shutdown the {@code RunnableConsumer} thread.
305+
* Sets the amount of time the {@link KafkaSource} will wait to shutdown the {@link RunnableConsumer} thread.
235306
* Default is 10 seconds. Cannot be null or negative.
236307
*
237308
* @param shutdownAwaitTime The {@code Duration} the {@link KafkaSource} will wait to shutdown
238-
* the {@code RunnableConsumer} thread.
309+
* the {@link RunnableConsumer} thread.
239310
* @return This instance of {@link KafkaSource.Builder}.
240311
*/
241312
public Builder<T, V> setShutdownAwaitTime(final Duration shutdownAwaitTime) {
@@ -256,6 +327,30 @@ public Builder<T, V> setBackoffTime(final Duration backoffTime) {
256327
return this;
257328
}
258329

330+
/**
331+
* Sets the number of threads that will parse {@code ConsumerRecord}s from the {@code Consumer}.
332+
* Default is 1. Must be greater than or equal to 1.
333+
*
334+
* @param numWorkerThreads The number of parsing worker threads.
335+
* @return This instance of {@link KafkaSource.Builder}.
336+
*/
337+
public Builder<T, V> setNumWorkerThreads(final Integer numWorkerThreads) {
338+
_numWorkerThreads = numWorkerThreads;
339+
return this;
340+
}
341+
342+
/**
343+
* Sets the size of the buffer to hold {@code ConsumerRecord}s from the {@code Consumer} before they are parsed.
344+
* Default is 1000. Must be greater than or equal to 1.
345+
*
346+
* @param bufferSize The size of the buffer.
347+
* @return This instance of {@link KafkaSource.Builder}.
348+
*/
349+
public Builder<T, V> setBufferSize(final Integer bufferSize) {
350+
_bufferSize = bufferSize;
351+
return this;
352+
}
353+
259354
@Override
260355
protected Builder<T, V> self() {
261356
return this;
@@ -274,6 +369,12 @@ protected Builder<T, V> self() {
274369
@NotNull
275370
@CheckWith(value = PositiveDuration.class, message = "Backoff time must be positive.")
276371
private Duration _backoffTime = Duration.ofSeconds(1);
372+
@NotNull
373+
@Min(1)
374+
private Integer _numWorkerThreads = 1;
375+
@NotNull
376+
@Min(1)
377+
private Integer _bufferSize = 1000;
277378

278379
private static class PositiveDuration implements CheckWithCheck.SimpleCheck {
279380
@Override

src/test/java/com/arpnetworking/metrics/common/integration/KafkaIT.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.junit.Assert;
4444
import org.junit.Before;
4545
import org.junit.Test;
46+
import org.mockito.ArgumentCaptor;
4647
import org.mockito.Mockito;
4748

4849
import java.io.IOException;
@@ -56,6 +57,7 @@
5657
import java.util.concurrent.ExecutionException;
5758
import java.util.concurrent.TimeUnit;
5859
import java.util.concurrent.TimeoutException;
60+
import java.util.stream.Collectors;
5961

6062
/**
6163
* Tests to check integration with a kafka topic.
@@ -69,7 +71,8 @@ public class KafkaIT {
6971
private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.IntegerDeserializer";
7072
private static final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
7173
private static final Duration POLL_DURATION = Duration.ofSeconds(1);
72-
private static final int TIMEOUT = 5000;
74+
private static final int TIMEOUT = 10000;
75+
private static final int NUM_RECORDS = 500;
7376

7477
private Map<String, Object> _consumerProps;
7578
private KafkaConsumer<Integer, String> _consumer;
@@ -82,19 +85,7 @@ public void setUp() throws TimeoutException {
8285
// Create kafka topic
8386
_topicName = createTopicName();
8487
createTopic(_topicName);
85-
86-
// Create and send producer records
87-
_producerRecords = createProducerRecords(_topicName, 3);
88-
sendRecords(_producerRecords);
89-
90-
// Create consumer props
91-
_consumerProps = Maps.newHashMap();
92-
_consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
93-
_consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
94-
_consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
95-
_consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
96-
_consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
97-
_consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group" + _topicName);
88+
setupKafka();
9889
}
9990

10091
@After
@@ -189,6 +180,32 @@ public void testKafkaSourceFromConfig() throws IOException {
189180
}
190181
}
191182

183+
@Test
184+
public void testKafkaSourceMultipleWorkerThreads() {
185+
// Create Kafka Source
186+
_consumer = new KafkaConsumer<>(_consumerProps);
187+
_consumer.subscribe(Collections.singletonList(_topicName));
188+
_source = new KafkaSource.Builder<String, String>()
189+
.setName("KafkaSource")
190+
.setParser(new StringParser())
191+
.setConsumer(_consumer)
192+
.setPollTime(POLL_DURATION)
193+
.setNumWorkerThreads(4)
194+
.build();
195+
196+
// Observe records
197+
final Observer observer = Mockito.mock(Observer.class);
198+
_source.attach(observer);
199+
_source.start();
200+
201+
final ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
202+
Mockito.verify(observer, Mockito.timeout(TIMEOUT).times(NUM_RECORDS)).notify(Mockito.any(), captor.capture());
203+
Assert.assertEquals(
204+
_producerRecords.stream().map(ProducerRecord::value).sorted().collect(Collectors.toList()),
205+
captor.getAllValues().stream().sorted().collect(Collectors.toList())
206+
);
207+
}
208+
192209
private String createTopicName() {
193210
return "topic_" + UUID.randomUUID().toString();
194211
}
@@ -252,6 +269,21 @@ private void sendRecords(final List<ProducerRecord<Integer, String>> records) {
252269
producer.close();
253270
}
254271

272+
private void setupKafka() {
273+
// Create and send producer records
274+
_producerRecords = createProducerRecords(_topicName, NUM_RECORDS);
275+
sendRecords(_producerRecords);
276+
277+
// Create consumer props
278+
_consumerProps = Maps.newHashMap();
279+
_consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
280+
_consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
281+
_consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
282+
_consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
283+
_consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
284+
_consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group" + _topicName);
285+
}
286+
255287
/**
256288
* Class needed for generic object mapping with {@code ObjectMapper}.
257289
*

0 commit comments

Comments
 (0)