Skip to content

Commit d34054d

Browse files
authored
Pipe: Fix the problem that a cluster crash in the multi-cluster receiving end causes all synchronization to be suspended (apache#15962)
* Pipe: Fix the problem that a cluster crash in the multi-cluster receiving end causes all synchronization to be suspended * add IsEndPointAlive Function * add IsEndPointAlive Function * add IsEndPointAlive Function * add pipeCheckSyncAllClientLiveTimeIntervalMs config * fix * fix * fix * fix
1 parent f076824 commit d34054d

File tree

6 files changed

+95
-1
lines changed

6 files changed

+95
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager
8585

8686
private volatile boolean isClosed = false;
8787

88+
private final Map<TEndPoint, Long> unhealthyEndPointMap = new ConcurrentHashMap<>();
89+
8890
public IoTDBDataNodeAsyncClientManager(
8991
final List<TEndPoint> endPoints,
9092
/* The following parameters are used locally. */
@@ -179,7 +181,7 @@ public AsyncPipeDataTransferServiceClient borrowClient(final String deviceId) th
179181

180182
public AsyncPipeDataTransferServiceClient borrowClient(final TEndPoint endPoint)
181183
throws Exception {
182-
if (!useLeaderCache || Objects.isNull(endPoint)) {
184+
if (!useLeaderCache || Objects.isNull(endPoint) || isUnhealthy(endPoint)) {
183185
return borrowClient();
184186
}
185187

@@ -313,10 +315,14 @@ public void onError(final Exception e) {
313315
waitHandshakeFinished(isHandshakeFinished);
314316
}
315317
if (exception.get() != null) {
318+
markUnhealthy(targetNodeUrl);
316319
throw new PipeConnectionException("Failed to handshake.", exception.get());
320+
} else {
321+
markHealthy(targetNodeUrl);
317322
}
318323
} catch (TException e) {
319324
client.resetMethodStateIfStopped();
325+
markUnhealthy(targetNodeUrl);
320326
throw e;
321327
} finally {
322328
if (isClosed) {
@@ -423,8 +429,14 @@ private class RoundRobinLoadBalancer implements LoadBalancer {
423429
@Override
424430
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
425431
final int clientSize = endPointList.size();
432+
long n = 0;
426433
while (true) {
427434
final TEndPoint targetNodeUrl = endPointList.get((int) (currentClientIndex++ % clientSize));
435+
if (isUnhealthy(targetNodeUrl) && n < clientSize) {
436+
n++;
437+
continue;
438+
}
439+
428440
final AsyncPipeDataTransferServiceClient client =
429441
endPoint2Client.borrowClient(targetNodeUrl);
430442
if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -438,8 +450,15 @@ private class RandomLoadBalancer implements LoadBalancer {
438450
@Override
439451
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
440452
final int clientSize = endPointList.size();
453+
long n = 0;
454+
441455
while (true) {
442456
final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() * clientSize));
457+
if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
458+
n++;
459+
continue;
460+
}
461+
443462
final AsyncPipeDataTransferServiceClient client =
444463
endPoint2Client.borrowClient(targetNodeUrl);
445464
if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -452,8 +471,15 @@ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
452471
private class PriorityLoadBalancer implements LoadBalancer {
453472
@Override
454473
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
474+
final int clientSize = endPointList.size();
475+
long n = 0;
455476
while (true) {
456477
for (final TEndPoint targetNodeUrl : endPointList) {
478+
if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
479+
n++;
480+
continue;
481+
}
482+
457483
final AsyncPipeDataTransferServiceClient client =
458484
endPoint2Client.borrowClient(targetNodeUrl);
459485
if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -463,4 +489,25 @@ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
463489
}
464490
}
465491
}
492+
493+
private boolean isUnhealthy(TEndPoint endPoint) {
494+
Long downTime = unhealthyEndPointMap.get(endPoint);
495+
if (downTime == null) {
496+
return false;
497+
}
498+
if (System.currentTimeMillis() - downTime
499+
> PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) {
500+
markHealthy(endPoint);
501+
return false;
502+
}
503+
return true;
504+
}
505+
506+
private void markUnhealthy(TEndPoint endPoint) {
507+
unhealthyEndPointMap.put(endPoint, System.currentTimeMillis());
508+
}
509+
510+
private void markHealthy(TEndPoint endPoint) {
511+
unhealthyEndPointMap.remove(endPoint);
512+
}
466513
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
2727
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2828
import org.apache.iotdb.commons.utils.RetryUtils;
29+
import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager;
2930
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
3031
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
3132
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
@@ -623,4 +624,8 @@ public void close() {
623624

624625
super.close();
625626
}
627+
628+
public IoTDBDataNodeSyncClientManager getClientManager() {
629+
return clientManager;
630+
}
626631
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ public class CommonConfig {
325325
private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
326326
private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
327327
private boolean pipeTransferTsFileSync = false;
328+
private long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5 minutes
328329

329330
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 minutes
330331
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; // 3 minutes
@@ -1887,6 +1888,21 @@ public void setPipeTransferTsFileSync(boolean pipeTransferTsFileSync) {
18871888
logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
18881889
}
18891890

1891+
public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
1892+
return pipeCheckAllSyncClientLiveTimeIntervalMs;
1893+
}
1894+
1895+
public void setPipeCheckAllSyncClientLiveTimeIntervalMs(
1896+
long pipeCheckSyncAllClientLiveTimeIntervalMs) {
1897+
if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == pipeCheckSyncAllClientLiveTimeIntervalMs) {
1898+
return;
1899+
}
1900+
this.pipeCheckAllSyncClientLiveTimeIntervalMs = pipeCheckSyncAllClientLiveTimeIntervalMs;
1901+
logger.info(
1902+
"pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
1903+
pipeCheckSyncAllClientLiveTimeIntervalMs);
1904+
}
1905+
18901906
public double getPipeSendTsFileRateLimitBytesPerSecond() {
18911907
return pipeSendTsFileRateLimitBytesPerSecond;
18921908
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,10 @@ public boolean isTransferTsFileSync() {
289289
return COMMON_CONFIG.getPipeTransferTsFileSync();
290290
}
291291

292+
public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
293+
return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs();
294+
}
295+
292296
/////////////////////////////// Meta Consistency ///////////////////////////////
293297

294298
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -512,6 +516,9 @@ public void printAllConfigs() {
512516
"PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountEMAAlpha());
513517
LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold());
514518
LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
519+
LOGGER.info(
520+
"PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
521+
getPipeCheckAllSyncClientLiveTimeIntervalMs());
515522

516523
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight());
517524
LOGGER.info(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
494494
Boolean.parseBoolean(
495495
properties.getProperty(
496496
"pipe_transfer_tsfile_sync", String.valueOf(config.getPipeTransferTsFileSync()))));
497+
config.setPipeCheckAllSyncClientLiveTimeIntervalMs(
498+
Long.parseLong(
499+
properties.getProperty(
500+
"pipe_check_all_sync_client_live_time_interval_ms",
501+
String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs()))));
497502

498503
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
499504
Long.parseLong(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen
5959
new ConcurrentHashMap<>();
6060
private final Map<TEndPoint, String> endPoint2HandshakeErrorMessage = new ConcurrentHashMap<>();
6161

62+
private volatile long lastCheckClientStatusTimestamp = 0L;
63+
6264
private final LoadBalancer loadBalancer;
6365

6466
protected IoTDBSyncClientManager(
@@ -113,6 +115,17 @@ protected IoTDBSyncClientManager(
113115
}
114116

115117
public void checkClientStatusAndTryReconstructIfNecessary() {
118+
if (System.currentTimeMillis() - lastCheckClientStatusTimestamp
119+
< PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) {
120+
// Check whether any clients are available, if any client is available, return directly
121+
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
122+
endPoint2ClientAndStatus.values()) {
123+
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
124+
return;
125+
}
126+
}
127+
}
128+
116129
// Reconstruct all dead clients
117130
for (final Map.Entry<TEndPoint, Pair<IoTDBSyncClient, Boolean>> entry :
118131
endPoint2ClientAndStatus.entrySet()) {
@@ -126,6 +139,7 @@ public void checkClientStatusAndTryReconstructIfNecessary() {
126139
// Check whether any clients are available
127140
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus : endPoint2ClientAndStatus.values()) {
128141
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
142+
lastCheckClientStatusTimestamp = System.currentTimeMillis();
129143
return;
130144
}
131145
}

0 commit comments

Comments
 (0)