Skip to content

Commit fb26dd5

Browse files
committed
Upgrade lmax disruptor to 4.0.0
disruptor 4.0.0 folded LifecycleAware and SequenceReportingEventHandler into EventHandler interface
1 parent 8548291 commit fb26dd5

File tree

3 files changed

+34
-38
lines changed

3 files changed

+34
-38
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<logback-access.version>2.0.6</logback-access.version>
3030

3131
<!-- shaded runtime dependencies -->
32-
<disruptor.version>3.4.4</disruptor.version>
32+
<disruptor.version>4.0.0</disruptor.version>
3333

3434
<!-- test dependencies -->
3535
<assertj.version>3.27.6</assertj.version>

src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.ScheduledExecutorService;
4242
import java.util.concurrent.ScheduledFuture;
4343
import java.util.concurrent.ScheduledThreadPoolExecutor;
44+
import java.util.concurrent.ThreadFactory;
4445
import java.util.concurrent.TimeUnit;
4546

4647
import javax.net.SocketFactory;
@@ -68,7 +69,6 @@
6869
import ch.qos.logback.core.util.CloseUtil;
6970
import ch.qos.logback.core.util.Duration;
7071
import com.lmax.disruptor.EventHandler;
71-
import com.lmax.disruptor.LifecycleAware;
7272
import com.lmax.disruptor.RingBuffer;
7373

7474
/**
@@ -152,7 +152,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
152152
*
153153
* The interpretation of this list is up to the current {@link #connectionStrategy}.
154154
*/
155-
private List<InetSocketAddress> destinations = new ArrayList<>(2);
155+
private final List<InetSocketAddress> destinations = new ArrayList<>(2);
156156

157157
/**
158158
* When connected, this is the index into {@link #destinations}
@@ -220,6 +220,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
220220

221221
/**
222222
* Used to create client {@link Socket}s to which to communicate.
223+
* <p>
223224
*
224225
* If set prior to startup, it will be used.
225226
* <p>
@@ -246,7 +247,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
246247
* then the {@link #keepAliveMessage} will be sent to the socket in
247248
* order to keep the connection alive.
248249
*
249-
* When null (the default), no keepAlive messages will be sent.
250+
* <p>When null (the default), no keepAlive messages will be sent.</p>
250251
*/
251252
private Duration keepAliveDuration;
252253

@@ -288,15 +289,15 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
288289
/**
289290
* Event handler responsible for performing the TCP transmission.
290291
*/
291-
private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, LifecycleAware {
292+
private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>> {
292293

293294
/**
294295
* Max number of consecutive failed connection attempts for which
295296
* logback status messages will be logged.
296297
*
297-
* After this many failed attempts, reconnection will still
298+
* <p>After this many failed attempts, reconnection will still
298299
* be attempted, but failures will not be logged again
299-
* (until after the connection is successful, and then fails again.)
300+
* (until after the connection is successful, and then fails again.)</p>
300301
*/
301302
private static final int MAX_REPEAT_CONNECTION_ERROR_LOG = 5;
302303

@@ -372,11 +373,11 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
372373
* after the calculated {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
373374
* from the last sent event using {@link TcpSendingEventHandler#scheduleKeepAlive(long)}.
374375
*
375-
* When the keepAlive event is processed by the event handler,
376+
* <p>When the keepAlive event is processed by the event handler,
376377
* if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
377378
* has elapsed since the last event was sent,
378379
* then the event handler will send the {@link AbstractLogstashTcpSocketAppender#keepAliveMessage}
379-
* to the socket OutputStream.
380+
* to the socket OutputStream.</p>
380381
*
381382
*/
382383
private class KeepAliveRunnable implements Runnable {
@@ -418,8 +419,8 @@ public void run() {
418419
* Keeps reading the {@link ReaderCallable#inputStream} until the
419420
* end of the stream is reached.
420421
*
421-
* This helps pro-actively detect server-side socket disconnections,
422-
* specifically in the case of Amazon's Elastic Load Balancers (ELB).
422+
* <p>This helps proactively detect server-side socket disconnections,
423+
* specifically in the case of Amazon's Elastic Load Balancers (ELB).</p>
423424
*/
424425
private class ReaderCallable implements Callable<Void> {
425426

@@ -710,8 +711,8 @@ private synchronized void reopenSocket() {
710711
* Repeatedly tries to open a socket until it is successful,
711712
* or the hander is stopped, or the handler thread is interrupted.
712713
*
713-
* If the socket is non-null when this method returns,
714-
* then it should be able to be used to send.
714+
* <p>If the socket is non-null when this method returns,
715+
* then it should be able to be used to send.</p>
715716
*/
716717
private synchronized void openSocket() {
717718
int errorCount = 0;
@@ -946,7 +947,6 @@ private synchronized void unscheduleWriteTimeout() {
946947
/**
947948
* Wrap exceptions thrown by {@link Encoder}
948949
*/
949-
@SuppressWarnings("serial")
950950
private static class EncoderException extends Exception {
951951
EncoderException(Throwable cause) {
952952
super(cause);
@@ -1278,8 +1278,8 @@ public Duration getInitialSendDelay() {
12781278
/**
12791279
* Convenience method for setting {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.
12801280
*
1281-
* When the {@link #connectionStrategy} is a {@link PreferPrimaryDestinationConnectionStrategy},
1282-
* this will set its {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.
1281+
* <p>When the {@link #connectionStrategy} is a {@link PreferPrimaryDestinationConnectionStrategy},
1282+
* this will set its {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.</p>
12831283
*
12841284
* @see PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)
12851285
* @param secondaryConnectionTTL the TTL of a connection when connected to a secondary destination
@@ -1320,7 +1320,7 @@ public Duration getSecondaryConnectionTTL() {
13201320
/**
13211321
* Set the connection timeout when establishing a connection to a remote destination.
13221322
*
1323-
* Use {@code 0} for an "infinite timeout" which often really means "use the OS defaults".
1323+
* <p>Use {@code 0} for an "infinite timeout" which often really means "use the OS defaults".</p>
13241324
*
13251325
* @param connectionTimeout connection timeout
13261326
*/
@@ -1408,7 +1408,7 @@ public Duration getKeepAliveDuration() {
14081408
* then the {@link #keepAliveMessage} will be sent to the socket in
14091409
* order to keep the connection alive.
14101410
*
1411-
* When {@code null}, zero or negative, no keepAlive messages will be sent.
1411+
* <p>When {@code null}, zero or negative, no keepAlive messages will be sent.</p>
14121412
*
14131413
* @param keepAliveDuration duration between consecutive keep alive messages
14141414
*/
@@ -1424,15 +1424,15 @@ public String getKeepAliveMessage() {
14241424
* Message to send for keeping the connection alive
14251425
* if {@link #keepAliveDuration} is non-null and strictly positive.
14261426
*
1427-
* The following values have special meaning:
1427+
* <p>The following values have special meaning:</p>
14281428
* <ul>
14291429
* <li>{@code null} or empty string = no keep alive.</li>
14301430
* <li>"{@code SYSTEM}" = operating system new line (default).</li>
14311431
* <li>"{@code UNIX}" = unix line ending (\n).</li>
14321432
* <li>"{@code WINDOWS}" = windows line ending (\r\n).</li>
14331433
* </ul>
1434-
* <p>
1435-
* Any other value will be used as-is.
1434+
*
1435+
* <p>Any other value will be used as-is.</p>
14361436
*
14371437
* @param keepAliveMessage the keep alive message
14381438
*/
@@ -1473,8 +1473,8 @@ public void setKeepAliveCharset(Charset keepAliveCharset) {
14731473
* Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}.
14741474
* <p>
14751475
*
1476-
* If you change the {@link #threadFactory}, then this
1477-
* value may not be honored.
1476+
* If you change the {@link #setThreadFactory(ThreadFactory) threadFactory},
1477+
* then this value may not be honored.
14781478
* <p>
14791479
*
14801480
* The string is a format pattern understood by {@link Formatter#format(String, Object...)}.

src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,9 @@
4545
import com.lmax.disruptor.EventHandler;
4646
import com.lmax.disruptor.EventTranslatorOneArg;
4747
import com.lmax.disruptor.ExceptionHandler;
48-
import com.lmax.disruptor.LifecycleAware;
4948
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
5049
import com.lmax.disruptor.RingBuffer;
5150
import com.lmax.disruptor.Sequence;
52-
import com.lmax.disruptor.SequenceReportingEventHandler;
5351
import com.lmax.disruptor.SleepingWaitStrategy;
5452
import com.lmax.disruptor.WaitStrategy;
5553
import com.lmax.disruptor.dsl.Disruptor;
@@ -86,6 +84,7 @@
8684
* needing to explicitly shut down the appender.
8785
* Note that in this case, it is possible for appended log events to not
8886
* be handled (if the child thread has not had a chance to process them yet).
87+
* <p>
8988
*
9089
* By setting {@link #setDaemon(boolean)} to false, you can change this behavior.
9190
* When false, child threads created by this appender will not be daemon threads,
@@ -133,7 +132,7 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
133132

134133
/**
135134
* The {@link WaitStrategy} to used by the RingBuffer
136-
* when pulling events to be processed by {@link #eventHandler}.
135+
* when pulling events to be processed by {@link #createEventHandler() event handler}.
137136
* <p>
138137
* By default, a {@link BlockingWaitStrategy} is used, which is the most
139138
* CPU conservative, but results in a higher latency.
@@ -214,7 +213,7 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
214213
* Defines what happens when there is an exception during
215214
* {@link RingBuffer} processing.
216215
*/
217-
private ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler();
216+
private final ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler();
218217

219218
/**
220219
* Consecutive number of dropped events.
@@ -323,7 +322,7 @@ public void translateTo(LogEvent<Event> logEvent, long sequence, Event event) {
323322
* Defines what happens when there is an exception during
324323
* {@link RingBuffer} processing.
325324
*
326-
* Currently, just logs to the logback context.
325+
* <p>Currently, just logs to the logback context.</p>
327326
*/
328327
private class LogEventExceptionHandler implements ExceptionHandler<LogEvent<Event>> {
329328

@@ -347,7 +346,7 @@ public void handleOnShutdownException(Throwable ex) {
347346
* Clears the event after a delegate event handler has processed the event,
348347
* so that the event can be garbage collected.
349348
*/
350-
private static class EventClearingEventHandler<Event> implements SequenceReportingEventHandler<LogEvent<Event>>, LifecycleAware {
349+
private static class EventClearingEventHandler<Event> implements EventHandler<LogEvent<Event>> {
351350

352351
private final EventHandler<LogEvent<Event>> delegate;
353352
private Sequence sequenceCallback;
@@ -378,16 +377,13 @@ public void onEvent(LogEvent<Event> event, long sequence, boolean endOfBatch) th
378377

379378
@Override
380379
public void onStart() {
381-
if (delegate instanceof LifecycleAware) {
382-
((LifecycleAware) delegate).onStart();
383-
}
380+
delegate.onStart();
384381
}
385382

386383
@Override
387384
public void onShutdown() {
388-
if (delegate instanceof LifecycleAware) {
389-
((LifecycleAware) delegate).onShutdown();
390-
}
385+
delegate.onShutdown();
386+
391387
}
392388

393389
@Override
@@ -610,7 +606,7 @@ protected String calculateThreadName() {
610606
}
611607

612608
protected List<Object> getThreadNameFormatParams() {
613-
return Arrays.<Object>asList(
609+
return Arrays.asList(
614610
getName(),
615611
threadNumber.incrementAndGet());
616612
}
@@ -722,8 +718,8 @@ public ProducerType getProducerType() {
722718
* The {@link ProducerType} to use to configure the Disruptor.
723719
* By default this is {@link ProducerType#MULTI}.
724720
*
725-
* Can be set to {@link ProducerType#SINGLE} for increase performance if (and only if) only
726-
* one thread will ever be appending to this appender.
721+
* <p>Can be set to {@link ProducerType#SINGLE} for increase performance if (and only if) only
722+
* one thread will ever be appending to this appender.</p>
727723
*
728724
* <p>WARNING: unexpected behavior may occur if this parameter is set to {@link ProducerType#SINGLE}
729725
* and multiple threads are appending to this appender.

0 commit comments

Comments
 (0)