Skip to content

Commit 517903e

Browse files
committed
Restore support for Disruptor 3.x
1 parent c621bf4 commit 517903e

File tree

8 files changed

+152
-59
lines changed

8 files changed

+152
-59
lines changed

log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerConfigDisruptor.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.lmax.disruptor.ExceptionHandler;
2323
import com.lmax.disruptor.RingBuffer;
2424
import com.lmax.disruptor.Sequence;
25+
import com.lmax.disruptor.SequenceReportingEventHandler;
2526
import com.lmax.disruptor.TimeoutException;
2627
import com.lmax.disruptor.WaitStrategy;
2728
import com.lmax.disruptor.dsl.Disruptor;
@@ -40,6 +41,7 @@
4041
import org.apache.logging.log4j.core.util.Log4jThreadFactory;
4142
import org.apache.logging.log4j.core.util.Throwables;
4243
import org.apache.logging.log4j.message.ReusableMessage;
44+
import org.apache.logging.log4j.util.LoaderUtil;
4345

4446
/**
4547
* Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
@@ -97,7 +99,9 @@ private static class Log4jEventWrapperHandler implements EventHandler<Log4jEvent
9799
private Sequence sequenceCallback;
98100
private int counter;
99101

100-
@Override
102+
/*
103+
* Overrides a method from Disruptor 4.x. Do not remove.
104+
*/
101105
public void setSequenceCallback(final Sequence sequenceCallback) {
102106
this.sequenceCallback = sequenceCallback;
103107
}
@@ -124,6 +128,12 @@ private void notifyIntermediateProgress(final long sequence) {
124128
}
125129
}
126130

131+
/**
132+
* A version of Log4jEventWrapperHandler for LMAX Disruptor 3.x.
133+
*/
134+
private static final class Log4jEventWrapperHandler3 extends Log4jEventWrapperHandler
135+
implements SequenceReportingEventHandler<Log4jEventWrapper> {}
136+
127137
/**
128138
* Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
129139
* RingBuffer.
@@ -155,6 +165,16 @@ private void notifyIntermediateProgress(final long sequence) {
155165
ringBufferElement.loggerConfig = loggerConfig;
156166
};
157167

168+
private Log4jEventWrapperHandler createEventHandler() {
169+
try {
170+
return LoaderUtil.newInstanceOf(
171+
"org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler3");
172+
} catch (final ReflectiveOperationException | LinkageError e) {
173+
LOGGER.debug("LMAX Disruptor 3.x is missing, trying version 4.x.", e);
174+
}
175+
return new Log4jEventWrapperHandler();
176+
}
177+
158178
private int ringBufferSize;
159179
private AsyncQueueFullPolicy asyncQueueFullPolicy;
160180
private Boolean mutable = Boolean.FALSE;
@@ -220,7 +240,7 @@ public Thread newThread(final Runnable r) {
220240
final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
221241
disruptor.setDefaultExceptionHandler(errorHandler);
222242

223-
final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
243+
final Log4jEventWrapperHandler[] handlers = {createEventHandler()};
224244
disruptor.handleEventsWith(handlers);
225245

226246
LOGGER.debug(

log4j-core/src/main/java/org/apache/logging/log4j/core/async/AsyncLoggerDisruptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public Thread newThread(final Runnable r) {
122122
final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
123123
disruptor.setDefaultExceptionHandler(errorHandler);
124124

125-
final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
125+
final RingBufferLogEventHandler4[] handlers = {RingBufferLogEventHandler4.create()};
126126
disruptor.handleEventsWith(handlers);
127127

128128
LOGGER.debug(

log4j-core/src/main/java/org/apache/logging/log4j/core/async/DefaultAsyncWaitStrategyFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,19 @@ static WaitStrategy createDefaultWaitStrategy(final String propertyName) {
7474
LOGGER.trace(
7575
"DefaultAsyncWaitStrategyFactory creating TimeoutBlockingWaitStrategy(timeout={}, unit=MILLIS)",
7676
timeoutMillis);
77+
try {
78+
// Check for the v 4.x version of the strategy, the version in 3.x is not garbage-free.
79+
if (DisruptorUtil.DISRUPTOR_MAJOR_VERSION == 4) {
80+
return (WaitStrategy) Class.forName("com.lmax.disruptor.TimeoutBlockingWaitStrategy")
81+
.getConstructor(long.class, TimeUnit.class)
82+
.newInstance(timeoutMillis, TimeUnit.MILLISECONDS);
83+
}
84+
} catch (final ReflectiveOperationException | LinkageError e) {
85+
LOGGER.debug(
86+
"DefaultAsyncWaitStrategyFactory failed to load 'com.lmax.disruptor.TimeoutBlockingWaitStrategy', using '{}' instead.",
87+
TimeoutBlockingWaitStrategy.class.getName());
88+
}
89+
// Use our version
7790
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
7891
}
7992

log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ final class DisruptorUtil {
5050
static final boolean ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL = PropertiesUtil.getProperties()
5151
.getBooleanProperty("AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull", true);
5252

53+
static final int DISRUPTOR_MAJOR_VERSION =
54+
LoaderUtil.isClassAvailable("com.lmax.disruptor.SequenceReportingEventHandler") ? 3 : 4;
55+
5356
private DisruptorUtil() {}
5457

5558
static WaitStrategy createWaitStrategy(

log4j-core/src/main/java/org/apache/logging/log4j/core/async/RingBufferLogEventHandler.java

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,68 +16,21 @@
1616
*/
1717
package org.apache.logging.log4j.core.async;
1818

19-
import com.lmax.disruptor.EventHandler;
20-
import com.lmax.disruptor.Sequence;
19+
import com.lmax.disruptor.LifecycleAware;
20+
import com.lmax.disruptor.SequenceReportingEventHandler;
2121

2222
/**
2323
* This event handler gets passed messages from the RingBuffer as they become
2424
* available. Processing of these messages is done in a separate thread,
2525
* controlled by the {@code Executor} passed to the {@code Disruptor}
2626
* constructor.
2727
*/
28-
public class RingBufferLogEventHandler implements EventHandler<RingBufferLogEvent> {
29-
30-
private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
31-
private Sequence sequenceCallback;
32-
private int counter;
33-
private long threadId = -1;
34-
35-
@Override
36-
public void setSequenceCallback(final Sequence sequenceCallback) {
37-
this.sequenceCallback = sequenceCallback;
38-
}
39-
40-
@Override
41-
public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch)
42-
throws Exception {
43-
try {
44-
// RingBufferLogEvents are populated by an EventTranslator. If an exception is thrown during event
45-
// translation, the event may not be fully populated, but Disruptor requires that the associated sequence
46-
// still be published since a slot has already been claimed in the ring buffer. Ignore any such unpopulated
47-
// events. The exception that occurred during translation will have already been propagated.
48-
if (event.isPopulated()) {
49-
event.execute(endOfBatch);
50-
}
51-
} finally {
52-
event.clear();
53-
// notify the BatchEventProcessor that the sequence has progressed.
54-
// Without this callback the sequence would not be progressed
55-
// until the batch has completely finished.
56-
notifyCallback(sequence);
57-
}
58-
}
59-
60-
private void notifyCallback(final long sequence) {
61-
if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
62-
sequenceCallback.set(sequence);
63-
counter = 0;
64-
}
65-
}
28+
public class RingBufferLogEventHandler extends RingBufferLogEventHandler4
29+
implements SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware {
6630

6731
/**
68-
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
69-
* yet.
70-
* @return the thread ID of the background consumer thread, or {@code -1}
32+
* @deprecated Use the {@link RingBufferLogEventHandler4#create()} factory method instead.
7133
*/
72-
public long getThreadId() {
73-
return threadId;
74-
}
75-
76-
@Override
77-
public void onStart() {
78-
threadId = Thread.currentThread().getId();
79-
}
80-
81-
@Override
82-
public void onShutdown() {}
34+
@Deprecated
35+
public RingBufferLogEventHandler() {}
8336
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.logging.log4j.core.async;
18+
19+
import com.lmax.disruptor.EventHandler;
20+
import com.lmax.disruptor.Sequence;
21+
import org.apache.logging.log4j.status.StatusLogger;
22+
import org.apache.logging.log4j.util.LoaderUtil;
23+
24+
/**
25+
* This event handler gets passed messages from the RingBuffer as they become
26+
* available. Processing of these messages is done in a separate thread,
27+
* controlled by the {@code Executor} passed to the {@code Disruptor}
28+
* constructor.
29+
*/
30+
class RingBufferLogEventHandler4 implements EventHandler<RingBufferLogEvent> {
31+
32+
private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
33+
private Sequence sequenceCallback;
34+
private int counter;
35+
private long threadId = -1;
36+
37+
/**
38+
* Returns the appropriate {@link EventHandler} for the version of LMAX Disruptor used.
39+
*/
40+
public static RingBufferLogEventHandler4 create() {
41+
try {
42+
return LoaderUtil.newInstanceOf("org.apache.logging.log4j.core.async.RingBufferLogEventHandler");
43+
} catch (final ReflectiveOperationException | LinkageError e) {
44+
StatusLogger.getLogger().debug("LMAX Disruptor 3.x is missing, trying version 4.x.", e);
45+
}
46+
return new RingBufferLogEventHandler4();
47+
}
48+
49+
/*
50+
* Overrides a method from Disruptor 4.x. Do not remove.
51+
*/
52+
public void setSequenceCallback(final Sequence sequenceCallback) {
53+
this.sequenceCallback = sequenceCallback;
54+
}
55+
56+
@Override
57+
public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch)
58+
throws Exception {
59+
try {
60+
// RingBufferLogEvents are populated by an EventTranslator. If an exception is thrown during event
61+
// translation, the event may not be fully populated, but Disruptor requires that the associated sequence
62+
// still be published since a slot has already been claimed in the ring buffer. Ignore any such unpopulated
63+
// events. The exception that occurred during translation will have already been propagated.
64+
if (event.isPopulated()) {
65+
event.execute(endOfBatch);
66+
}
67+
} finally {
68+
event.clear();
69+
// notify the BatchEventProcessor that the sequence has progressed.
70+
// Without this callback the sequence would not be progressed
71+
// until the batch has completely finished.
72+
notifyCallback(sequence);
73+
}
74+
}
75+
76+
private void notifyCallback(final long sequence) {
77+
if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
78+
sequenceCallback.set(sequence);
79+
counter = 0;
80+
}
81+
}
82+
83+
/**
84+
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
85+
* yet.
86+
*
87+
* @return the thread ID of the background consumer thread, or {@code -1}
88+
*/
89+
public long getThreadId() {
90+
return threadId;
91+
}
92+
93+
/*
94+
* Overrides a method from Disruptor 4.x. Do not remove.
95+
*/
96+
public void onStart() {
97+
threadId = Thread.currentThread().getId();
98+
}
99+
100+
/*
101+
* Overrides a method from Disruptor 4.x. Do not remove.
102+
*/
103+
public void onShutdown() {}
104+
}

log4j-core/src/main/java/org/apache/logging/log4j/core/async/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* Provides Asynchronous Logger classes and interfaces for low-latency logging.
1919
*/
2020
@Export
21-
@Version("2.21.0")
21+
@Version("2.23.0")
2222
package org.apache.logging.log4j.core.async;
2323

2424
import org.osgi.annotation.bundle.Export;

log4j-parent/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
<commons-logging.version>1.3.0</commons-logging.version>
8080
<!-- `com.conversantmedia:disruptor` version 1.2.16 requires Java 9: -->
8181
<conversant.disruptor.version>1.2.15</conversant.disruptor.version>
82-
<disruptor.version>4.0.0</disruptor.version>
82+
<disruptor.version>3.4.4</disruptor.version>
8383
<elasticsearch-java.version>8.11.2</elasticsearch-java.version>
8484
<embedded-ldap.version>0.9.0</embedded-ldap.version>
8585
<felix.version>7.0.5</felix.version>

0 commit comments

Comments
 (0)