Skip to content

Commit 403cf64

Browse files
authored
[To dev/1.3] Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. (#16732) (#16759)
* Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. (#16732) * Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. * update (cherry picked from commit a6191d9) * update * update
1 parent f0cdfe0 commit 403cf64

File tree

8 files changed

+75
-89
lines changed

8 files changed

+75
-89
lines changed

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,7 @@ private void initClient() {
295295
this.client =
296296
new IoTDBSyncClient(
297297
new ThriftClientProperty.Builder()
298-
.setConnectionTimeoutMs(
299-
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
298+
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
300299
.setRpcThriftCompressionEnabled(
301300
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
302301
.build(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
208208
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
209209
return new IoTDBSyncClient(
210210
new ThriftClientProperty.Builder()
211-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
211+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
212212
.setRpcThriftCompressionEnabled(
213213
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
214214
.build(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public void onError(final Exception e) {
300300
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
301301
Boolean.toString(shouldMarkAsPipeRequest));
302302

303-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
303+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
304304
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback);
305305
waitHandshakeFinished(isHandshakeFinished);
306306

@@ -319,7 +319,7 @@ public void onError(final Exception e) {
319319
resp.set(null);
320320
exception.set(null);
321321

322-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
322+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
323323
client.pipeTransfer(
324324
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
325325
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -519,14 +519,11 @@ private void transferQueuedEventsIfNecessary(final boolean forced) {
519519
if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty())
520520
|| (!forced
521521
&& retryEventQueueEventCounter.getTabletInsertionEventCount()
522-
< PipeConfig.getInstance()
523-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
522+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
524523
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
525-
< PipeConfig.getInstance()
526-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
524+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
527525
&& retryEventQueue.size() + retryTsFileQueue.size()
528-
< PipeConfig.getInstance()
529-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) {
526+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) {
530527
return;
531528
}
532529

@@ -584,14 +581,11 @@ private void transferQueuedEventsIfNecessary(final boolean forced) {
584581
if (System.currentTimeMillis() - retryStartTime
585582
> PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) {
586583
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
587-
< PipeConfig.getInstance()
588-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
584+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
589585
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
590-
< PipeConfig.getInstance()
591-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
586+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
592587
&& retryEventQueue.size() + retryTsFileQueue.size()
593-
< PipeConfig.getInstance()
594-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) {
588+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) {
595589
return;
596590
}
597591

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -259,16 +259,16 @@ public class CommonConfig {
259259
private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
260260
private long pipeSourceMatcherCacheSize = 1024;
261261

262-
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
262+
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
263263
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
264264
private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
265265
private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
266266
private long pipeConnectorRetryIntervalMs = 1000L;
267267
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
268268

269-
private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
270-
private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
271-
private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
269+
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
270+
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
271+
private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
272272
private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
273273
private int pipeAsyncConnectorSelectorNumber =
274274
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -988,22 +988,21 @@ public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
988988
logger.info("pipeExtractorMatcherCacheSize is set to {}.", pipeSourceMatcherCacheSize);
989989
}
990990

991-
public int getPipeConnectorHandshakeTimeoutMs() {
992-
return pipeConnectorHandshakeTimeoutMs;
991+
public int getPipeSinkHandshakeTimeoutMs() {
992+
return pipeSinkHandshakeTimeoutMs;
993993
}
994994

995-
public void setPipeConnectorHandshakeTimeoutMs(long pipeConnectorHandshakeTimeoutMs) {
996-
final int fPipeConnectorHandshakeTimeoutMs = this.pipeConnectorHandshakeTimeoutMs;
995+
public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
996+
final int fPipeConnectorHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs;
997997
try {
998-
this.pipeConnectorHandshakeTimeoutMs = Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
998+
this.pipeSinkHandshakeTimeoutMs = Math.toIntExact(pipeSinkHandshakeTimeoutMs);
999999
} catch (ArithmeticException e) {
1000-
this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
1000+
this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
10011001
logger.warn(
10021002
"Given pipe connector handshake timeout is too large, set to {} ms.", Integer.MAX_VALUE);
10031003
} finally {
1004-
if (fPipeConnectorHandshakeTimeoutMs != this.pipeConnectorHandshakeTimeoutMs) {
1005-
logger.info(
1006-
"pipeConnectorHandshakeTimeoutMs is set to {}.", fPipeConnectorHandshakeTimeoutMs);
1004+
if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) {
1005+
logger.info("pipeSinkHandshakeTimeoutMs is set to {}.", this.pipeSinkHandshakeTimeoutMs);
10071006
}
10081007
}
10091008
}
@@ -1072,55 +1071,54 @@ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
10721071
return pipeConnectorRPCThriftCompressionEnabled;
10731072
}
10741073

1075-
public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
1076-
int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
1077-
if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
1078-
== pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
1074+
public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
1075+
int pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
1076+
if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize
1077+
== pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
10791078
return;
10801079
}
1081-
this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
1082-
pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
1080+
this.pipeAsyncSinkForcedRetryTsFileEventQueueSize =
1081+
pipeAsyncSinkForcedRetryTsFileEventQueueSize;
10831082
logger.info(
1084-
"pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to {}.",
1085-
pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold);
1083+
"pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.",
1084+
pipeAsyncSinkForcedRetryTsFileEventQueueSize);
10861085
}
10871086

1088-
public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
1089-
return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
1087+
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
1088+
return pipeAsyncSinkForcedRetryTsFileEventQueueSize;
10901089
}
10911090

1092-
public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
1093-
int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
1094-
if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold
1095-
== pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
1091+
public void setPipeAsyncSinkForcedRetryTabletEventQueueSize(
1092+
int pipeAsyncSinkForcedRetryTabletEventQueueSize) {
1093+
if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize
1094+
== pipeAsyncSinkForcedRetryTabletEventQueueSize) {
10961095
return;
10971096
}
1098-
this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
1099-
pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
1097+
this.pipeAsyncSinkForcedRetryTabletEventQueueSize =
1098+
pipeAsyncSinkForcedRetryTabletEventQueueSize;
11001099
logger.info(
1101-
"pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to {}.",
1102-
pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold);
1100+
"pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.",
1101+
pipeAsyncSinkForcedRetryTabletEventQueueSize);
11031102
}
11041103

1105-
public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
1106-
return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
1104+
public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
1105+
return pipeAsyncSinkForcedRetryTabletEventQueueSize;
11071106
}
11081107

1109-
public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
1110-
int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
1111-
if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold
1112-
== pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
1108+
public void setPipeAsyncSinkForcedRetryTotalEventQueueSize(
1109+
int pipeAsyncSinkForcedRetryTotalEventQueueSize) {
1110+
if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize
1111+
== pipeAsyncSinkForcedRetryTotalEventQueueSize) {
11131112
return;
11141113
}
1115-
this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
1116-
pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
1114+
this.pipeAsyncSinkForcedRetryTotalEventQueueSize = pipeAsyncSinkForcedRetryTotalEventQueueSize;
11171115
logger.info(
1118-
"pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to {}.",
1119-
pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold);
1116+
"pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.",
1117+
pipeAsyncSinkForcedRetryTotalEventQueueSize);
11201118
}
11211119

1122-
public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
1123-
return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
1120+
public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
1121+
return pipeAsyncSinkForcedRetryTotalEventQueueSize;
11241122
}
11251123

11261124
public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ public long getPipeSourceMatcherCacheSize() {
159159

160160
/////////////////////////////// Connector ///////////////////////////////
161161

162-
public int getPipeConnectorHandshakeTimeoutMs() {
163-
return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
162+
public int getPipeSinkHandshakeTimeoutMs() {
163+
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
164164
}
165165

166166
public int getPipeConnectorTransferTimeoutMs() {
@@ -183,16 +183,16 @@ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
183183
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
184184
}
185185

186-
public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
187-
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
186+
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
187+
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize();
188188
}
189189

190-
public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
191-
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
190+
public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
191+
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize();
192192
}
193193

194-
public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
195-
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
194+
public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
195+
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
196196
}
197197

198198
public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
@@ -493,7 +493,7 @@ public void printAllConfigs() {
493493
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
494494
LOGGER.info("PipeSourceMatcherCacheSize: {}", getPipeSourceMatcherCacheSize());
495495

496-
LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", getPipeConnectorHandshakeTimeoutMs());
496+
LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", getPipeSinkHandshakeTimeoutMs());
497497
LOGGER.info("PipeConnectorTransferTimeoutMs: {}", getPipeConnectorTransferTimeoutMs());
498498
LOGGER.info("PipeConnectorReadFileBufferSize: {}", getPipeConnectorReadFileBufferSize());
499499
LOGGER.info(
@@ -540,15 +540,14 @@ public void printAllConfigs() {
540540
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
541541

542542
LOGGER.info(
543-
"PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
544-
getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
543+
"PipeAsyncSinkForcedRetryTsFileEventQueueSize: {}",
544+
getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
545545
LOGGER.info(
546-
"PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
547-
getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
546+
"PipeAsyncSinkForcedRetryTabletEventQueueSize: {}",
547+
getPipeAsyncSinkForcedRetryTabletEventQueueSize());
548548
LOGGER.info(
549-
"PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
550-
getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
551-
549+
"PipeAsyncSinkForcedRetryTotalEventQueueSize: {}",
550+
getPipeAsyncSinkForcedRetryTotalEventQueueSize());
552551
LOGGER.info(
553552
"PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
554553
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -323,13 +323,13 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
323323
"pipe_extractor_matcher_cache_size",
324324
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
325325

326-
config.setPipeConnectorHandshakeTimeoutMs(
326+
config.setPipeSinkHandshakeTimeoutMs(
327327
Long.parseLong(
328328
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
329329
.orElse(
330330
properties.getProperty(
331331
"pipe_connector_handshake_timeout_ms",
332-
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
332+
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
333333
config.setPipeConnectorReadFileBufferSize(
334334
Integer.parseInt(
335335
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
@@ -368,36 +368,32 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
368368
"pipe_async_connector_max_retry_execution_time_ms_per_call",
369369
String.valueOf(
370370
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
371-
config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
371+
config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
372372
Integer.parseInt(
373373
Optional.ofNullable(
374374
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
375375
.orElse(
376376
properties.getProperty(
377377
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
378378
String.valueOf(
379-
config
380-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
381-
config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
379+
config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize())))));
380+
config.setPipeAsyncSinkForcedRetryTabletEventQueueSize(
382381
Integer.parseInt(
383382
Optional.ofNullable(
384383
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
385384
.orElse(
386385
properties.getProperty(
387386
"pipe_async_connector_forced_retry_tablet_event_queue_size",
388387
String.valueOf(
389-
config
390-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
391-
config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
388+
config.getPipeAsyncSinkForcedRetryTabletEventQueueSize())))));
389+
config.setPipeAsyncSinkForcedRetryTotalEventQueueSize(
392390
Integer.parseInt(
393391
Optional.ofNullable(
394392
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
395393
.orElse(
396394
properties.getProperty(
397395
"pipe_async_connector_forced_retry_total_event_queue_size",
398-
String.valueOf(
399-
config
400-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
396+
String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize())))));
401397
config.setRateLimiterHotReloadCheckIntervalMs(
402398
Integer.parseInt(
403399
properties.getProperty(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private boolean initClientAndStatus(
193193
clientAndStatus.setLeft(
194194
new IoTDBSyncClient(
195195
new ThriftClientProperty.Builder()
196-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
196+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
197197
.setRpcThriftCompressionEnabled(
198198
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
199199
.build(),

0 commit comments

Comments
 (0)