Skip to content

Commit 1ca6002

Browse files
authored
[To dev/1.3] Pipe: Do not use the fork join pool in TerminateEvent #16113 (#16114)
* fix * optimize
1 parent fc6913b commit 1ca6002

File tree

4 files changed

+52
-8
lines changed

4 files changed

+52
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
133133
0L,
134134
TimeUnit.SECONDS,
135135
new ArrayBlockingQueue<>(
136-
IoTDBDescriptor.getInstance().getConfig().getSchemaThreadCount()),
136+
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
137137
new IoTThreadFactory(ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName()),
138138
ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName(),
139139
new ThreadPoolExecutor.CallerRunsPolicy());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.terminate;
2121

22+
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
23+
import org.apache.iotdb.commons.concurrent.ThreadName;
24+
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
2225
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2326
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2427
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2528
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2629
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
30+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2731
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
2832
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
2933
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3034

31-
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ArrayBlockingQueue;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.TimeUnit;
3238

3339
/**
3440
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe,
@@ -40,13 +46,29 @@ public class PipeTerminateEvent extends EnrichedEvent {
4046

4147
private final int dataRegionId;
4248

49+
private final boolean shouldMark;
50+
51+
// Do not use call run policy to avoid deadlock
52+
private static final ExecutorService terminateExecutor =
53+
new WrappedThreadPoolExecutor(
54+
0,
55+
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
56+
0L,
57+
TimeUnit.SECONDS,
58+
new ArrayBlockingQueue<>(
59+
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
60+
new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
61+
ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
62+
4363
public PipeTerminateEvent(
4464
final String pipeName,
4565
final long creationTime,
4666
final PipeTaskMeta pipeTaskMeta,
47-
final int dataRegionId) {
67+
final int dataRegionId,
68+
final boolean shouldMark) {
4869
super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
4970
this.dataRegionId = dataRegionId;
71+
this.shouldMark = shouldMark;
5072
}
5173

5274
@Override
@@ -74,7 +96,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
7496
final long endTime) {
7597
// Should record PipeTaskMeta, for the terminateEvent shall report progress to
7698
// notify the pipeTask it's completed.
77-
return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId);
99+
return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId, shouldMark);
78100
}
79101

80102
@Override
@@ -95,13 +117,16 @@ public boolean mayEventPathsOverlappedWithPattern() {
95117
@Override
96118
public void reportProgress() {
97119
// To avoid deadlock
98-
CompletableFuture.runAsync(
99-
() -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
120+
if (shouldMark) {
121+
terminateExecutor.submit(
122+
() -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
123+
}
100124
}
101125

102126
@Override
103127
public String toString() {
104-
return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId)
128+
return String.format(
129+
"PipeTerminateEvent{dataRegionId=%s, shouldMark=%s}", dataRegionId, shouldMark)
105130
+ " - "
106131
+ super.toString();
107132
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@
7878
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
7979
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
8080
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
81+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
82+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
83+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
84+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
8185
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
8286
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_KEY;
8387
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_START_TIME_KEY;
@@ -87,6 +91,7 @@
8791
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
8892
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
8993
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
94+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
9095
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
9196
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY;
9297

@@ -117,6 +122,7 @@ public class PipeHistoricalDataRegionTsFileSource implements PipeHistoricalDataR
117122
private boolean shouldExtractInsertion;
118123
private boolean shouldTransferModFile; // Whether to transfer mods
119124

125+
private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
120126
private boolean isTerminateSignalSent = false;
121127

122128
private boolean isForwardingPipeRequests;
@@ -283,6 +289,13 @@ public void customize(
283289
|| // Should extract deletion
284290
listeningOptionPair.getRight());
285291

292+
final String extractorModeValue =
293+
parameters.getStringOrDefault(
294+
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE);
295+
shouldTerminatePipeOnAllHistoricalEventsConsumed =
296+
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
297+
|| extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
298+
286299
isForwardingPipeRequests =
287300
parameters.getBooleanOrDefault(
288301
Arrays.asList(
@@ -528,7 +541,12 @@ public synchronized Event supply() {
528541

529542
if (resource == null) {
530543
final PipeTerminateEvent terminateEvent =
531-
new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId);
544+
new PipeTerminateEvent(
545+
pipeName,
546+
creationTime,
547+
pipeTaskMeta,
548+
dataRegionId,
549+
shouldTerminatePipeOnAllHistoricalEventsConsumed);
532550
if (!terminateEvent.increaseReferenceCount(
533551
PipeHistoricalDataRegionTsFileSource.class.getName())) {
534552
LOGGER.warn(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public enum ThreadName {
146146
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
147147
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
148148
PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
149+
PIPE_TERMINATE_EXECUTION_POOL("Pipe-Terminate-Execution-Pool"),
149150
LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
150151
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
151152
SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),

0 commit comments

Comments
 (0)