Skip to content

Commit a429dfc

Browse files
authored
Pipe: Implementing DisruptorQueue (apache#16639)
1 parent 9b0e48f commit a429dfc

File tree

17 files changed

+1230
-24
lines changed

17 files changed

+1230
-24
lines changed

LICENSE

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,4 +329,14 @@ Apache Commons Collections is open source software licensed under the Apache Lic
329329
Project page: https://github.com/apache/commons-collections
330330
License: https://github.com/apache/commons-collections/blob/master/LICENSE.txt
331331

332+
--------------------------------------------------------------------------------
333+
334+
The following files include code modified from LMax Disruptor project.
335+
336+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/*
337+
338+
LMax Disruptor is open source software licensed under the Apache License 2.0 and supported by the Apache Software Foundation.
339+
Project page: https://github.com/LMAX-Exchange/disruptor
340+
License: https://github.com/LMAX-Exchange/disruptor/blob/master/LICENCE.txt
341+
332342
--------------------------------------------------------------------------------

dependencies.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
"com.google.guava:listenablefuture",
2828
"com.google.j2objc:j2objc-annotations",
2929
"com.h2database:h2-mvstore",
30-
"com.lmax:disruptor",
3130
"com.nimbusds:content-type",
3231
"com.nimbusds:lang-tag",
3332
"com.nimbusds:nimbus-jose-jwt",

iotdb-core/datanode/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,6 @@
284284
<groupId>com.google.guava</groupId>
285285
<artifactId>guava</artifactId>
286286
</dependency>
287-
<dependency>
288-
<groupId>com.lmax</groupId>
289-
<artifactId>disruptor</artifactId>
290-
</dependency>
291287
<dependency>
292288
<groupId>org.java-websocket</groupId>
293289
<artifactId>Java-WebSocket</artifactId>

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2929
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
3030
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
31+
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
3132
import org.apache.iotdb.db.utils.DateTimeUtils;
3233
import org.apache.iotdb.pipe.api.event.Event;
3334

34-
import com.lmax.disruptor.RingBuffer;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

@@ -200,7 +200,7 @@ public void onTransferred() {
200200

201201
/////////////////////////////// Queue size Reporting ///////////////////////////////
202202

203-
public void recordDisruptorSize(final RingBuffer<?> ringBuffer) {
203+
public void recordDisruptorSize(final RingBuffer ringBuffer) {
204204
if (shouldPrintMessage) {
205205
disruptorSize = ringBuffer.getBufferSize() - (int) ringBuffer.remainingCapacity();
206206
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,9 @@
2626
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
2727
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
2828
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
29-
30-
import com.lmax.disruptor.BlockingWaitStrategy;
31-
import com.lmax.disruptor.EventHandler;
32-
import com.lmax.disruptor.RingBuffer;
33-
import com.lmax.disruptor.dsl.Disruptor;
34-
import com.lmax.disruptor.dsl.ProducerType;
29+
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor;
30+
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler;
31+
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
3532

3633
import java.util.function.Consumer;
3734

@@ -68,9 +65,8 @@ public DisruptorQueue(
6865
32,
6966
Math.toIntExact(
7067
allocatedMemoryBlock.getMemoryUsageInBytes() / ringBufferEntrySizeInBytes)),
71-
THREAD_FACTORY,
72-
ProducerType.MULTI,
73-
new BlockingWaitStrategy());
68+
THREAD_FACTORY);
69+
7470
disruptor.handleEventsWith(
7571
(container, sequence, endOfBatch) -> {
7672
final PipeRealtimeEvent realtimeEvent = container.getEvent();
@@ -103,7 +99,7 @@ public boolean isClosed() {
10399

104100
private static class EventContainer {
105101

106-
private PipeRealtimeEvent event;
102+
private volatile PipeRealtimeEvent event;
107103

108104
private EventContainer() {}
109105

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
2121

22-
import com.lmax.disruptor.ExceptionHandler;
22+
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.ExceptionHandler;
23+
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
/**
26+
* Batch event processor for consuming events
27+
*
28+
* <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor)
29+
* and simplified for IoTDB's Pipe module (removed complex lifecycle management).
30+
*
31+
* <p>Core algorithm preserved from LMAX Disruptor:
32+
*
33+
* <ul>
34+
* <li>Batch processing loop
35+
* <li>Sequence tracking
36+
* <li>endOfBatch detection
37+
* </ul>
38+
*
39+
* @param <T> event type
40+
*/
41+
public final class BatchEventProcessor<T> implements Runnable {
42+
private static final Logger LOGGER = LoggerFactory.getLogger(BatchEventProcessor.class);
43+
44+
private final RingBuffer<T> ringBuffer;
45+
private final SequenceBarrier sequenceBarrier;
46+
private final EventHandler<? super T> eventHandler;
47+
private final Sequence sequence = new Sequence();
48+
private ExceptionHandler<? super T> exceptionHandler = new DefaultExceptionHandler<>();
49+
private volatile boolean running = true;
50+
51+
public BatchEventProcessor(
52+
RingBuffer<T> ringBuffer, SequenceBarrier barrier, EventHandler<? super T> eventHandler) {
53+
this.ringBuffer = ringBuffer;
54+
this.sequenceBarrier = barrier;
55+
this.eventHandler = eventHandler;
56+
}
57+
58+
public Sequence getSequence() {
59+
return sequence;
60+
}
61+
62+
public void setExceptionHandler(ExceptionHandler<? super T> exceptionHandler) {
63+
this.exceptionHandler = exceptionHandler;
64+
}
65+
66+
public void halt() {
67+
running = false;
68+
}
69+
70+
@Override
71+
public void run() {
72+
T event = null;
73+
long nextSequence = sequence.get() + 1L;
74+
75+
while (running) {
76+
try {
77+
// Wait for available sequence
78+
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
79+
80+
// Batch process all available events
81+
while (nextSequence <= availableSequence) {
82+
event = ringBuffer.get(nextSequence);
83+
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
84+
nextSequence++;
85+
}
86+
87+
// Update sequence
88+
sequence.set(availableSequence);
89+
90+
} catch (final InterruptedException ex) {
91+
Thread.currentThread().interrupt();
92+
LOGGER.info("Processor interrupted");
93+
break;
94+
} catch (final Throwable ex) {
95+
exceptionHandler.handleEventException(ex, nextSequence, event);
96+
sequence.set(nextSequence);
97+
nextSequence++;
98+
}
99+
}
100+
101+
LOGGER.info("Processor stopped");
102+
}
103+
104+
private static class DefaultExceptionHandler<T> implements ExceptionHandler<T> {
105+
@Override
106+
public void handleEventException(Throwable ex, long sequence, T event) {
107+
LoggerFactory.getLogger(getClass()).error("Exception processing: {} {}", sequence, event, ex);
108+
}
109+
110+
@Override
111+
public void handleOnStartException(Throwable ex) {
112+
LoggerFactory.getLogger(getClass()).error("Exception during onStart()", ex);
113+
}
114+
115+
@Override
116+
public void handleOnShutdownException(Throwable ex) {
117+
LoggerFactory.getLogger(getClass()).error("Exception during onShutdown()", ex);
118+
}
119+
}
120+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.util.concurrent.ThreadFactory;
26+
27+
/**
28+
* Simplified Disruptor implementation for IoTDB Pipe
29+
*
30+
* <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor)
31+
* and simplified for IoTDB's specific use case in the Pipe module.
32+
*
33+
* <p>Key simplifications:
34+
*
35+
* <ul>
36+
* <li>Single event handler support (no complex dependency graphs)
37+
* <li>Simplified lifecycle management
38+
* <li>Removed wait strategies (using simple sleep-based waiting)
39+
* </ul>
40+
*
41+
* @param <T> event type
42+
*/
43+
public class Disruptor<T> {
44+
private static final Logger LOGGER = LoggerFactory.getLogger(Disruptor.class);
45+
46+
private final RingBuffer<T> ringBuffer;
47+
private final ThreadFactory threadFactory;
48+
private BatchEventProcessor<T> processor;
49+
private Thread processorThread;
50+
private ExceptionHandler<? super T> exceptionHandler;
51+
private volatile boolean started = false;
52+
53+
/**
54+
* Create a Disruptor instance
55+
*
56+
* @param eventFactory factory for creating pre-allocated events
57+
* @param ringBufferSize buffer size (must be power of 2)
58+
* @param threadFactory factory for creating consumer thread
59+
*/
60+
public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, ThreadFactory threadFactory) {
61+
this.ringBuffer = RingBuffer.createMultiProducer(eventFactory, ringBufferSize);
62+
this.threadFactory = threadFactory;
63+
}
64+
65+
/**
66+
* Configure event handler for processing events
67+
*
68+
* <p>Creates a batch event processor that will run in its own thread
69+
*
70+
* @param handler event handler implementation
71+
* @return this instance for method chaining
72+
*/
73+
public Disruptor<T> handleEventsWith(final EventHandler<? super T> handler) {
74+
SequenceBarrier barrier = ringBuffer.newBarrier();
75+
processor = new BatchEventProcessor<>(ringBuffer, barrier, handler);
76+
77+
if (exceptionHandler != null) {
78+
processor.setExceptionHandler(exceptionHandler);
79+
}
80+
81+
ringBuffer.addGatingSequences(processor.getSequence());
82+
return this;
83+
}
84+
85+
/**
86+
* Set exception handler for error handling
87+
*
88+
* @param exceptionHandler handler for processing exceptions
89+
*/
90+
public void setDefaultExceptionHandler(ExceptionHandler<? super T> exceptionHandler) {
91+
this.exceptionHandler = exceptionHandler;
92+
if (processor != null) {
93+
processor.setExceptionHandler(exceptionHandler);
94+
}
95+
}
96+
97+
public RingBuffer<T> start() {
98+
if (started) {
99+
throw new IllegalStateException("Disruptor already started");
100+
}
101+
102+
if (processor == null) {
103+
throw new IllegalStateException("No event handler configured");
104+
}
105+
106+
processorThread = threadFactory.newThread(processor);
107+
processorThread.start();
108+
started = true;
109+
110+
LOGGER.info("Disruptor started with buffer size: {}", ringBuffer.getBufferSize());
111+
return ringBuffer;
112+
}
113+
114+
public void shutdown() {
115+
if (!started) {
116+
return;
117+
}
118+
119+
if (processor != null) {
120+
processor.halt();
121+
}
122+
123+
if (processorThread != null) {
124+
try {
125+
processorThread.join(5000);
126+
} catch (InterruptedException e) {
127+
Thread.currentThread().interrupt();
128+
LOGGER.warn("Interrupted waiting for processor to stop");
129+
}
130+
}
131+
132+
started = false;
133+
LOGGER.info("Disruptor shutdown completed");
134+
}
135+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
21+
22+
/**
23+
* Event factory for pre-allocating events in RingBuffer
24+
*
25+
* <p>This interface is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) and
26+
* adapted for IoTDB's Pipe module.
27+
*
28+
* @param <T> event type
29+
*/
30+
@FunctionalInterface
31+
public interface EventFactory<T> {
32+
/**
33+
* Create new event instance
34+
*
35+
* @return new event
36+
*/
37+
T newInstance();
38+
}

0 commit comments

Comments
 (0)