Skip to content

Commit 474e747

Browse files
authored
Pipe: Fix TSFile transfer blocking InsertNode sending (apache#15666)
* Pipe: Fix TSFile transfer blocking InsertNode sending * fix * fix * fix * fix * fix * fix * fix
1 parent 0eddc71 commit 474e747

File tree

7 files changed

+148
-17
lines changed

7 files changed

+148
-17
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
3131
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
3232
import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
33+
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
3334
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
3435
import org.apache.iotdb.db.storageengine.StorageEngine;
3536
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -47,6 +48,7 @@
4748
import java.util.List;
4849
import java.util.Map;
4950
import java.util.TreeMap;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052

5153
public class PipeConnectorSubtaskManager {
5254

@@ -112,12 +114,17 @@ public synchronized String register(
112114
final List<PipeConnectorSubtaskLifeCycle> pipeConnectorSubtaskLifeCycleList =
113115
new ArrayList<>(connectorNum);
114116

117+
AtomicInteger counter = new AtomicInteger(0);
115118
// Shared pending queue for all subtasks
116119
final UnboundedBlockingPendingQueue<Event> pendingQueue =
117120
realTimeFirst
118121
? new PipeRealtimePriorityBlockingQueue()
119122
: new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter());
120123

124+
if (realTimeFirst) {
125+
((PipeRealtimePriorityBlockingQueue) pendingQueue).setOfferTsFileCounter(counter);
126+
}
127+
121128
for (int connectorIndex = 0; connectorIndex < connectorNum; connectorIndex++) {
122129
final PipeConnector pipeConnector =
123130
isDataRegionConnector
@@ -128,6 +135,9 @@ public synchronized String register(
128135
// 1. Construct, validate and customize PipeConnector, and then handshake (create
129136
// connection) with the target
130137
try {
138+
if (pipeConnector instanceof IoTDBDataRegionAsyncConnector) {
139+
((IoTDBDataRegionAsyncConnector) pipeConnector).setTransferTsFileCounter(counter);
140+
}
131141
pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
132142
pipeConnector.customize(
133143
pipeConnectorParameters, new PipeTaskRuntimeConfiguration(environment));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ
4747

4848
private final AtomicLong pollHistoricalTsFileCounter = new AtomicLong(0);
4949

50+
// Need to ensure that NPE does not occur
51+
private AtomicInteger offerTsFileCounter = new AtomicInteger(0);
52+
5053
public PipeRealtimePriorityBlockingQueue() {
5154
super(new PipeDataRegionEventCounter());
5255
}
@@ -85,18 +88,22 @@ public Event directPoll() {
8588
Event event = null;
8689
final int pollHistoricalTsFileThreshold =
8790
PIPE_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
91+
final int realTimeQueueMaxWaitingTsFileSize =
92+
PIPE_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
8893

89-
if (pollTsFileCounter.get() >= PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()) {
94+
if (pollTsFileCounter.get() >= PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()
95+
&& offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
9096
event =
9197
pollHistoricalTsFileCounter.incrementAndGet() % pollHistoricalTsFileThreshold == 0
9298
? tsfileInsertEventDeque.pollFirst()
9399
: tsfileInsertEventDeque.pollLast();
94100
pollTsFileCounter.set(0);
95101
}
102+
96103
if (Objects.isNull(event)) {
97104
// Sequentially poll the first offered non-TsFileInsertionEvent
98105
event = super.directPoll();
99-
if (Objects.isNull(event)) {
106+
if (Objects.isNull(event) && offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
100107
event =
101108
pollHistoricalTsFileCounter.incrementAndGet() % pollHistoricalTsFileThreshold == 0
102109
? tsfileInsertEventDeque.pollFirst()
@@ -126,8 +133,11 @@ public Event waitedPoll() {
126133
Event event = null;
127134
final int pollHistoricalTsFileThreshold =
128135
PIPE_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
136+
final int realTimeQueueMaxWaitingTsFileSize =
137+
PIPE_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
129138

130-
if (pollTsFileCounter.get() >= PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()) {
139+
if (pollTsFileCounter.get() >= PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()
140+
&& offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
131141
event =
132142
pollHistoricalTsFileCounter.incrementAndGet() % pollHistoricalTsFileThreshold == 0
133143
? tsfileInsertEventDeque.pollFirst()
@@ -149,7 +159,7 @@ public Event waitedPoll() {
149159
}
150160

151161
// If no event is available, block until an event is available
152-
if (Objects.isNull(event)) {
162+
if (Objects.isNull(event) && offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
153163
event = super.waitedPoll();
154164
if (Objects.isNull(event)) {
155165
event =
@@ -233,4 +243,8 @@ public int size() {
233243
public int getTsFileInsertionEventCount() {
234244
return tsfileInsertEventDeque.size();
235245
}
246+
247+
public void setOfferTsFileCounter(AtomicInteger offerTsFileCounter) {
248+
this.offerTsFileCounter = offerTsFileCounter;
249+
}
236250
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ public IoTDBDataNodeAsyncClientManager(
8787
final boolean shouldReceiverConvertOnTypeMismatch,
8888
final String loadTsFileStrategy,
8989
final boolean validateTsFile,
90-
final boolean shouldMarkAsPipeRequest) {
90+
final boolean shouldMarkAsPipeRequest,
91+
final boolean isTSFileUsed) {
9192
super(
9293
endPoints,
9394
useLeaderCache,
@@ -102,12 +103,13 @@ public IoTDBDataNodeAsyncClientManager(
102103

103104
receiverAttributes =
104105
String.format(
105-
"%s-%s-%s-%s-%s",
106+
"%s-%s-%s-%s-%s-%s",
106107
Base64.getEncoder().encodeToString((username + ":" + password).getBytes()),
107108
shouldReceiverConvertOnTypeMismatch,
108109
loadTsFileStrategy,
109110
validateTsFile,
110-
shouldMarkAsPipeRequest);
111+
shouldMarkAsPipeRequest,
112+
isTSFileUsed);
111113
synchronized (IoTDBDataNodeAsyncClientManager.class) {
112114
if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) {
113115
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@
7474
import java.util.Map;
7575
import java.util.Objects;
7676
import java.util.concurrent.BlockingQueue;
77+
import java.util.concurrent.CompletableFuture;
7778
import java.util.concurrent.ConcurrentHashMap;
79+
import java.util.concurrent.ExecutorService;
80+
import java.util.concurrent.Executors;
7881
import java.util.concurrent.LinkedBlockingQueue;
7982
import java.util.concurrent.atomic.AtomicBoolean;
8083
import java.util.concurrent.atomic.AtomicInteger;
@@ -97,13 +100,21 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector {
97100
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
98101
"Exception occurred while sending to receiver %s:%s.";
99102

103+
private static final boolean isSplitTSFileBatchModeEnabled = true;
104+
private static final ExecutorService executor =
105+
Executors.newFixedThreadPool(PipeConfig.getInstance().getPipeAsyncConnectorMaxClientNumber());
106+
100107
private final IoTDBDataRegionSyncConnector syncConnector = new IoTDBDataRegionSyncConnector();
101108

102109
private final BlockingQueue<Event> retryEventQueue = new LinkedBlockingQueue<>();
103110
private final PipeDataRegionEventCounter retryEventQueueEventCounter =
104111
new PipeDataRegionEventCounter();
105112

106113
private IoTDBDataNodeAsyncClientManager clientManager;
114+
private IoTDBDataNodeAsyncClientManager transferTsFileClientManager;
115+
116+
// It is necessary to ensure that other classes that inherit Async Connector will not have NPE
117+
public AtomicInteger transferTsFileCounter = new AtomicInteger(0);
107118

108119
private PipeTransferBatchReqBuilder tabletBatchBuilder;
109120

@@ -146,7 +157,23 @@ public void customize(
146157
shouldReceiverConvertOnTypeMismatch,
147158
loadTsFileStrategy,
148159
loadTsFileValidation,
149-
shouldMarkAsPipeRequest);
160+
shouldMarkAsPipeRequest,
161+
false);
162+
163+
transferTsFileClientManager =
164+
new IoTDBDataNodeAsyncClientManager(
165+
nodeUrls,
166+
parameters.getBooleanOrDefault(
167+
Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, CONNECTOR_LEADER_CACHE_ENABLE_KEY),
168+
CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
169+
loadBalanceStrategy,
170+
username,
171+
password,
172+
shouldReceiverConvertOnTypeMismatch,
173+
loadTsFileStrategy,
174+
loadTsFileValidation,
175+
shouldMarkAsPipeRequest,
176+
isSplitTSFileBatchModeEnabled);
150177

151178
if (isTabletBatchModeEnabled) {
152179
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
@@ -390,14 +417,37 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
390417
}
391418
}
392419

393-
private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) {
394-
AsyncPipeDataTransferServiceClient client = null;
395-
try {
396-
client = clientManager.borrowClient();
397-
pipeTransferTsFileHandler.transfer(clientManager, client);
398-
} catch (final Exception ex) {
399-
logOnClientException(client, ex);
400-
pipeTransferTsFileHandler.onError(ex);
420+
private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler)
421+
throws Exception {
422+
transferTsFileCounter.incrementAndGet();
423+
CompletableFuture<Void> completableFuture =
424+
CompletableFuture.supplyAsync(
425+
() -> {
426+
AsyncPipeDataTransferServiceClient client = null;
427+
try {
428+
client = transferTsFileClientManager.borrowClient();
429+
pipeTransferTsFileHandler.transfer(clientManager, client);
430+
} catch (final Exception ex) {
431+
logOnClientException(client, ex);
432+
pipeTransferTsFileHandler.onError(ex);
433+
} finally {
434+
transferTsFileCounter.decrementAndGet();
435+
}
436+
return null;
437+
},
438+
executor);
439+
440+
if (PipeConfig.getInstance().isTransferTsFileSync()) {
441+
try {
442+
completableFuture.get();
443+
} catch (InterruptedException e) {
444+
Thread.currentThread().interrupt();
445+
LOGGER.error("Transfer tsfile event asynchronously was interrupted.", e);
446+
throw new PipeException("Transfer tsfile event asynchronously was interrupted.", e);
447+
} catch (Exception e) {
448+
LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
449+
throw e;
450+
}
401451
}
402452
}
403453

@@ -682,6 +732,10 @@ public synchronized void close() {
682732
if (clientManager != null) {
683733
clientManager.close();
684734
}
735+
736+
if (transferTsFileClientManager != null) {
737+
transferTsFileClientManager.close();
738+
}
685739
} catch (final Exception e) {
686740
LOGGER.warn("Failed to close client manager.", e);
687741
}
@@ -734,4 +788,8 @@ public void eliminateHandler(final PipeTransferTrackableHandler handler) {
734788
public boolean hasPendingHandlers() {
735789
return !pendingHandlers.isEmpty();
736790
}
791+
792+
public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
793+
this.transferTsFileCounter = transferTsFileCounter;
794+
}
737795
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ public class CommonConfig {
207207

208208
private int pipeRealTimeQueuePollTsFileThreshold = 10;
209209
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3;
210+
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
210211

211212
/** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */
212213
private int pipeSubtaskExecutorMaxThreadNum =
@@ -235,7 +236,7 @@ public class CommonConfig {
235236

236237
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
237238
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
238-
private int pipeConnectorReadFileBufferSize = 8388608;
239+
private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
239240
private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
240241
private long pipeConnectorRetryIntervalMs = 1000L;
241242
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
@@ -310,6 +311,7 @@ public class CommonConfig {
310311
private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 0.1d;
311312
private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
312313
private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
314+
private boolean pipeTransferTsFileSync = false;
313315

314316
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 minutes
315317
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; // 3 minutes
@@ -1335,6 +1337,20 @@ public void setPipeRealTimeQueuePollHistoricalTsFileThreshold(
13351337
pipeRealTimeQueuePollHistoricalTsFileThreshold);
13361338
}
13371339

1340+
public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
1341+
return pipeRealTimeQueueMaxWaitingTsFileSize;
1342+
}
1343+
1344+
public void setPipeRealTimeQueueMaxWaitingTsFileSize(int pipeRealTimeQueueMaxWaitingTsFileSize) {
1345+
if (this.pipeRealTimeQueueMaxWaitingTsFileSize == pipeRealTimeQueueMaxWaitingTsFileSize) {
1346+
return;
1347+
}
1348+
this.pipeRealTimeQueueMaxWaitingTsFileSize = pipeRealTimeQueueMaxWaitingTsFileSize;
1349+
logger.info(
1350+
"pipeRealTimeQueueMaxWaitingTsFileSize is set to {}.",
1351+
pipeRealTimeQueueMaxWaitingTsFileSize);
1352+
}
1353+
13381354
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
13391355
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
13401356
return;
@@ -1951,6 +1967,18 @@ public void setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
19511967
pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold);
19521968
}
19531969

1970+
public boolean getPipeTransferTsFileSync() {
1971+
return pipeTransferTsFileSync;
1972+
}
1973+
1974+
public void setPipeTransferTsFileSync(boolean pipeTransferTsFileSync) {
1975+
if (this.pipeTransferTsFileSync == pipeTransferTsFileSync) {
1976+
return;
1977+
}
1978+
this.pipeTransferTsFileSync = pipeTransferTsFileSync;
1979+
logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
1980+
}
1981+
19541982
public double getPipeAllSinksRateLimitBytesPerSecond() {
19551983
return pipeAllSinksRateLimitBytesPerSecond;
19561984
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
101101
return Math.max(COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold(), 1);
102102
}
103103

104+
public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
105+
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
106+
}
107+
104108
/////////////////////////////// Subtask Executor ///////////////////////////////
105109

106110
public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -255,6 +259,10 @@ public double getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()
255259
return COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold();
256260
}
257261

262+
public boolean isTransferTsFileSync() {
263+
return COMMON_CONFIG.getPipeTransferTsFileSync();
264+
}
265+
258266
/////////////////////////////// Meta Consistency ///////////////////////////////
259267

260268
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -520,6 +528,7 @@ public void printAllConfigs() {
520528
LOGGER.info(
521529
"PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountAverage());
522530
LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold());
531+
LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
523532

524533
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight());
525534
LOGGER.info(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
264264
properties.getProperty(
265265
"pipe_realtime_queue_poll_historical_tsfile_threshold",
266266
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
267+
config.setPipeRealTimeQueueMaxWaitingTsFileSize(
268+
Integer.parseInt(
269+
properties.getProperty(
270+
"pipe_realTime_queue_max_waiting_tsFile_size",
271+
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
267272
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
268273
Integer.parseInt(
269274
properties.getProperty(
@@ -526,6 +531,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
526531
"pipe_max_aligned_series_num_in_one_batch",
527532
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
528533

534+
config.setPipeTransferTsFileSync(
535+
Boolean.parseBoolean(
536+
properties.getProperty(
537+
"pipe_transfer_tsfile_sync", String.valueOf(config.getPipeTransferTsFileSync()))));
538+
529539
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
530540
Long.parseLong(
531541
properties.getProperty(

0 commit comments

Comments
 (0)