Skip to content
This repository was archived by the owner on May 27, 2022. It is now read-only.

Commit f4007f3

Browse files
Gérald QUINTANAgquintana
authored andcommitted
Merge ProducerSupplier with DeliveryStrategy
1 parent c8f43e0 commit f4007f3

21 files changed

+481
-377
lines changed

src/example/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
2525

2626
<!-- use async delivery. the application threads are not blocked by logging -->
27-
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
27+
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.LateAsyncDeliveryStrategy" />
2828

2929
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
3030
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->

src/main/java/com/github/danielwegener/logback/kafka/KafkaAppender.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import ch.qos.logback.core.Appender;
55
import ch.qos.logback.core.spi.AppenderAttachableImpl;
66
import com.github.danielwegener.logback.kafka.delivery.FailedDeliveryCallback;
7-
import org.apache.kafka.clients.producer.Producer;
7+
import org.apache.kafka.clients.producer.KafkaProducer;
88
import org.apache.kafka.clients.producer.ProducerConfig;
99
import org.apache.kafka.clients.producer.ProducerRecord;
1010
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -15,7 +15,7 @@
1515
/**
1616
* @since 0.0.1
1717
*/
18-
public class KafkaAppender<E> extends KafkaAppenderConfig<E> {
18+
public class KafkaAppender<E> extends KafkaAppenderConfig<E, byte[], byte[]> {
1919

2020
/**
2121
* Kafka clients uses this prefix for its slf4j logging.
@@ -57,15 +57,15 @@ public void start() {
5757
partition = null;
5858
}
5959

60-
producerSupplier.start(this, producerConfig);
60+
deliveryStrategy.start(producerConfig, failedDeliveryCallback);
6161

6262
super.start();
6363
}
6464

6565
@Override
6666
public void stop() {
6767
super.stop();
68-
producerSupplier.stop();
68+
deliveryStrategy.stop();
6969
}
7070

7171
@Override
@@ -112,12 +112,7 @@ protected void append(E e) {
112112

113113
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, partition, timestamp, key, payload);
114114

115-
final Producer<byte[], byte[]> producer = producerSupplier.get();
116-
if (producer != null) {
117-
deliveryStrategy.send(producer, record, e, failedDeliveryCallback);
118-
} else {
119-
failedDeliveryCallback.onFailedDelivery(e, null);
120-
}
115+
deliveryStrategy.send(record, e);
121116
}
122117

123118
protected Long getTimestamp(E e) {

src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderConfig.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,32 @@
33
import ch.qos.logback.core.UnsynchronizedAppenderBase;
44
import ch.qos.logback.core.encoder.Encoder;
55
import ch.qos.logback.core.spi.AppenderAttachable;
6-
import com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy;
76
import com.github.danielwegener.logback.kafka.delivery.DeliveryStrategy;
7+
import com.github.danielwegener.logback.kafka.delivery.LateAsyncDeliveryStrategy;
88
import com.github.danielwegener.logback.kafka.keying.KeyingStrategy;
99
import com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy;
10-
import com.github.danielwegener.logback.kafka.producer.LazyProducerSupplier;
11-
import com.github.danielwegener.logback.kafka.producer.ProducerSupplier;
1210

1311
import java.util.HashMap;
1412
import java.util.Map;
1513

16-
import static org.apache.kafka.clients.producer.ProducerConfig.*;
14+
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
1715

1816
/**
1917
* @since 0.0.1
2018
*/
21-
public abstract class KafkaAppenderConfig<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {
19+
public abstract class KafkaAppenderConfig<E, K, V> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {
2220

2321
protected String topic = null;
2422

2523
protected Encoder<E> encoder = null;
2624
protected KeyingStrategy<? super E> keyingStrategy = null;
27-
protected DeliveryStrategy deliveryStrategy;
25+
protected DeliveryStrategy<E, K, V> deliveryStrategy;
2826

2927
protected Integer partition = null;
3028

3129
protected boolean appendTimestamp = true;
3230

3331
protected Map<String,Object> producerConfig = new HashMap<String, Object>();
34-
protected ProducerSupplier<byte[], byte[]> producerSupplier = new LazyProducerSupplier();
3532

3633
protected boolean checkPrerequisites() {
3734
boolean errorFree = true;
@@ -58,8 +55,8 @@ protected boolean checkPrerequisites() {
5855
}
5956

6057
if (deliveryStrategy == null) {
61-
addInfo("No explicit deliveryStrategy set for the appender named [\""+name+"\"]. Using default asynchronous strategy.");
62-
deliveryStrategy = new AsynchronousDeliveryStrategy();
58+
addInfo("No explicit deliveryStrategy set for the appender named [\""+name+"\"]. Using default asynchronous lazy strategy.");
59+
deliveryStrategy = new LateAsyncDeliveryStrategy();
6360
}
6461

6562
return errorFree;
@@ -107,7 +104,4 @@ public void setAppendTimestamp(boolean appendTimestamp) {
107104
this.appendTimestamp = appendTimestamp;
108105
}
109106

110-
public void setProducerSupplier(ProducerSupplier<byte[], byte[]> producerSupplier) {
111-
this.producerSupplier = producerSupplier;
112-
}
113107
}

src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.github.danielwegener.logback.kafka.delivery;
2+
3+
import ch.qos.logback.core.spi.ContextAwareBase;
4+
import org.apache.kafka.clients.producer.BufferExhaustedException;
5+
import org.apache.kafka.clients.producer.Callback;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.Producer;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.clients.producer.RecordMetadata;
10+
import org.apache.kafka.common.KafkaException;
11+
import org.apache.kafka.common.errors.TimeoutException;
12+
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.concurrent.Future;
16+
17+
/**
18+
* Base implementation of {@link DeliveryStrategy}
19+
* @since 0.0.1
20+
*/
21+
public abstract class BaseDeliveryStrategy<E, K, V> extends ContextAwareBase implements DeliveryStrategy<E, K, V> {
22+
private HashMap<String, Object> producerConfig;
23+
protected FailedDeliveryCallback<E> failedDeliveryCallback;
24+
25+
@Override
26+
public void start(Map<String, Object> producerConfig, FailedDeliveryCallback<E> failedDeliveryCallback) {
27+
this.producerConfig = new HashMap<>(producerConfig);
28+
this.failedDeliveryCallback = failedDeliveryCallback;
29+
}
30+
31+
/**
32+
* Instantiates the producer with its config
33+
* @return Created producer
34+
* @throws KafkaException Producer initialization failed
35+
*/
36+
protected Producer<K, V> createProducer() throws KafkaException{
37+
return new KafkaProducer<>(new HashMap<>(producerConfig));
38+
}
39+
40+
/**
41+
* Get producer
42+
* @return Producer or null if producer is not initiliazed
43+
*/
44+
protected abstract Producer<K, V> getProducer();
45+
/**
46+
* Stop the producer
47+
*/
48+
public void stopProducer() {
49+
if (getProducer() != null) {
50+
try {
51+
getProducer().close();
52+
} catch (KafkaException e) {
53+
addWarn("Failed to shut down kafka producer: " + e.getMessage(), e);
54+
}
55+
}
56+
}
57+
58+
/**
59+
* Send a record using the provider.
60+
* In case of failure, the event is sent to the {@link #failedDeliveryCallback}
61+
* @param record Record to send
62+
* @param event Event to send to failedDeliveryCallback in case of failure
63+
* @return Future returned by the Kafka producer
64+
* @throws KafkaException Record sending failed
65+
*/
66+
protected Future<RecordMetadata> doSend(ProducerRecord<K, V> record, final E event) throws KafkaException {
67+
try {
68+
return getProducer().send(record, new Callback() {
69+
@Override
70+
public void onCompletion(RecordMetadata metadata, Exception exception) {
71+
if (exception != null) {
72+
failedDeliveryCallback.onFailedDelivery(event, exception);
73+
}
74+
}
75+
});
76+
} catch (BufferExhaustedException | TimeoutException e) {
77+
failedDeliveryCallback.onFailedDelivery(event, e);
78+
return null;
79+
}
80+
}
81+
82+
}

src/main/java/com/github/danielwegener/logback/kafka/delivery/BlockingDeliveryStrategy.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package com.github.danielwegener.logback.kafka.delivery;
22

3-
import ch.qos.logback.core.spi.ContextAwareBase;
4-
import org.apache.kafka.clients.producer.BufferExhaustedException;
53
import org.apache.kafka.clients.producer.Producer;
64
import org.apache.kafka.clients.producer.ProducerRecord;
75
import org.apache.kafka.clients.producer.RecordMetadata;
86

7+
import java.util.Map;
98
import java.util.concurrent.CancellationException;
109
import java.util.concurrent.ExecutionException;
1110
import java.util.concurrent.Future;
@@ -16,27 +15,45 @@
1615
* DeliveryStrategy that waits on the producer if the output buffer is full.
1716
* The wait timeout is configurable with {@link BlockingDeliveryStrategy#setTimeout(long)}
1817
* @since 0.0.1
19-
* @deprecated Use {@link AsynchronousDeliveryStrategy} instead.
18+
* @deprecated Use {@link BaseDeliveryStrategy} instead.
2019
*/
2120
@Deprecated
22-
public class BlockingDeliveryStrategy extends ContextAwareBase implements DeliveryStrategy {
21+
public class BlockingDeliveryStrategy<E, K, V> extends BaseDeliveryStrategy<E, K, V> {
2322

2423
private long timeout = 0L;
24+
private Producer<K,V> producer;
2525

2626
@Override
27-
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, E event, FailedDeliveryCallback<E> failureCallback) {
28-
try {
27+
public void start(Map<String, Object> producerConfig, FailedDeliveryCallback<E> failedDeliveryCallback) {
28+
super.start(producerConfig, failedDeliveryCallback);
29+
producer = createProducer();
30+
}
2931

30-
final Future<RecordMetadata> future = producer.send(record);
31-
if (timeout > 0L) future.get(timeout, TimeUnit.MILLISECONDS);
32-
else if (timeout == 0) future.get();
33-
return true;
32+
@Override
33+
public void send(ProducerRecord<K, V> record, E event) {
34+
try {
35+
final Future<RecordMetadata> future = doSend(record, event);
36+
if (future == null) {
37+
return;
38+
}
39+
if (timeout > 0L) future.get(timeout, TimeUnit.MILLISECONDS);
40+
else if (timeout == 0) future.get();
3441
}
35-
catch (InterruptedException e) { return false; }
36-
catch (BufferExhaustedException | ExecutionException | CancellationException | TimeoutException e) {
37-
failureCallback.onFailedDelivery(event, e);
42+
catch (InterruptedException e) { }
43+
catch (ExecutionException | CancellationException | TimeoutException e) {
44+
failedDeliveryCallback.onFailedDelivery(event, e);
3845
}
39-
return false;
46+
}
47+
48+
@Override
49+
public Producer<K, V> getProducer() {
50+
return producer;
51+
}
52+
53+
@Override
54+
public void stop() {
55+
stopProducer();
56+
producer = null;
4057
}
4158

4259
public long getTimeout() {
Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,37 @@
11
package com.github.danielwegener.logback.kafka.delivery;
22

3+
import ch.qos.logback.core.spi.ContextAware;
34
import org.apache.kafka.clients.producer.Producer;
45
import org.apache.kafka.clients.producer.ProducerRecord;
56

7+
import java.util.Map;
8+
69
/**
710
* Interface for DeliveryStrategies.
11+
*
12+
* @param <E> the type of the logging event.
13+
* @param <K> the key type of a persisted log message.
14+
* @param <V> the value type of a persisted log message.
815
* @since 0.0.1
916
*/
10-
public interface DeliveryStrategy {
17+
public interface DeliveryStrategy<E, K, V> {
18+
/**
19+
* Initialize the strategy and create Kafka producer if needed
20+
* @param producerConfig Kafka producer config map
21+
* @param failedDeliveryCallback a callback that handles messages that could not be delivered with best-effort.
22+
*/
23+
void start(Map<String, Object> producerConfig, FailedDeliveryCallback<E> failedDeliveryCallback);
1124

1225
/**
1326
* Sends a message to a kafka producer and somehow deals with failures.
1427
*
15-
* @param producer the backing kafka producer
16-
* @param record the prepared kafka message (ready to ship)
17-
* @param event the originating logging event
18-
* @param failedDeliveryCallback a callback that handles messages that could not be delivered with best-effort.
19-
* @param <K> the key type of a persisted log message.
20-
* @param <V> the value type of a persisted log message.
21-
* @param <E> the type of the logging event.
22-
* @return {@code true} if the message could be sent successfully, {@code false} otherwise.
28+
* @param record the prepared kafka message (ready to ship)
29+
* @param event the originating logging event
2330
*/
24-
<K,V,E> boolean send(Producer<K,V> producer, ProducerRecord<K, V> record, E event, FailedDeliveryCallback<E> failedDeliveryCallback);
31+
void send(ProducerRecord<K, V> record, E event);
2532

33+
/**
34+
* Stop this strategy and close the Kafka producer if needed, freeing resources
35+
*/
36+
void stop();
2637
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.github.danielwegener.logback.kafka.delivery;
2+
3+
import org.apache.kafka.clients.producer.Producer;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
6+
import java.util.Map;
7+
8+
/**
9+
* {@link DeliveryStrategy} which initializes the Kafka producer on startup
10+
* and sends messages asynchronously
11+
*/
12+
public class EarlyAsyncDeliveryStrategy<E,K,V> extends BaseDeliveryStrategy<E,K,V> {
13+
private Producer<K, V> producer;
14+
15+
@Override
16+
public void start(Map<String, Object> producerConfig, FailedDeliveryCallback<E> failedDeliveryCallback) {
17+
super.start(producerConfig, failedDeliveryCallback);
18+
producer = createProducer();
19+
}
20+
21+
@Override
22+
public Producer<K, V> getProducer() {
23+
return producer;
24+
}
25+
26+
@Override
27+
public void send(ProducerRecord<K, V> record, E event) {
28+
doSend(record, event);
29+
}
30+
31+
@Override
32+
public void stop() {
33+
stopProducer();
34+
producer = null;
35+
}
36+
}

0 commit comments

Comments
 (0)