Skip to content

Commit 6e8748e

Browse files
authored
Pipe: Optimized the default behavior in meta sync for retries to wait for other regions (apache#16740)
* partial * cfg * core * fix * fix
1 parent 7515568 commit 6e8748e

File tree

9 files changed

+119
-59
lines changed

9 files changed

+119
-59
lines changed

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ public enum TSStatusCode {
291291
PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
292292
PIPE_PUSH_META_TIMEOUT(1813),
293293
PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
294+
PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION(1815),
294295

295296
// Subscription
296297
SUBSCRIPTION_VERSION_ERROR(1900),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -892,8 +892,8 @@ private long calculateAssignerMemory(final PipeParameters sourceParameters) {
892892
.getLeft()) {
893893
return 0;
894894
}
895-
return PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize()
896-
* PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()
895+
return PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferSize()
896+
* PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()
897897
* Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10);
898898
} catch (final IllegalPathException e) {
899899
return 0;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,22 @@ public TSStatus visitInsertBase(
106106
} else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
107107
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
108108
.setMessage(context.getMessage());
109-
} else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
110-
&& (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
111-
&& config.isEnablePartialInsert())) {
112-
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
109+
} else if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
110+
return new TSStatus(
111+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
113112
.setMessage(context.getMessage());
113+
} else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
114+
if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
115+
&& config.isEnablePartialInsert()) {
116+
return new TSStatus(
117+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
118+
.setMessage(context.getMessage());
119+
}
120+
if (context.getMessage().contains("does not exist")) {
121+
return new TSStatus(
122+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
123+
.setMessage(context.getMessage());
124+
}
114125
}
115126
return visitStatement(insertBaseStatement, context);
116127
}
@@ -226,14 +237,24 @@ public TSStatus visitActivateTemplate(
226237
@Override
227238
public TSStatus visitBatchActivateTemplate(
228239
final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) {
240+
boolean userConflict = false;
229241
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
230242
for (final TSStatus status : context.getSubStatus()) {
231243
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
232244
&& status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
233245
return visitStatement(batchActivateTemplateStatement, context);
234246
}
247+
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
248+
&& context.isSetMessage()
249+
&& context.getMessage().contains("has not been set any template")) {
250+
userConflict = true;
251+
}
235252
}
236-
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
253+
return (userConflict
254+
? new TSStatus(
255+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
256+
: new TSStatus(
257+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
237258
.setMessage(context.getMessage());
238259
}
239260
return visitGeneralActivateTemplate(batchActivateTemplateStatement, context);
@@ -245,6 +266,12 @@ private TSStatus visitGeneralActivateTemplate(
245266
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
246267
.setMessage(context.getMessage());
247268
}
269+
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
270+
&& context.isSetMessage()
271+
&& context.getMessage().contains("has not been set any template")) {
272+
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
273+
.setMessage(context.getMessage());
274+
}
248275
return visitStatement(activateTemplateStatement, context);
249276
}
250277
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ public DisruptorQueue(
4949
final EventHandler<PipeRealtimeEvent> eventHandler,
5050
final Consumer<PipeRealtimeEvent> onAssignedHook) {
5151
final PipeConfig config = PipeConfig.getInstance();
52-
final int ringBufferSize = config.getPipeExtractorAssignerDisruptorRingBufferSize();
52+
final int ringBufferSize = config.getPipeSourceAssignerDisruptorRingBufferSize();
5353
final long ringBufferEntrySizeInBytes =
54-
config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
54+
config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
5555

5656
allocatedMemoryBlock =
5757
PipeDataNodeResourceManager.memory()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ public static PartialPath getDatabasePathByLevel(PartialPath path, int level)
7070
throw new IllegalPathException(
7171
path.getFullPath(), "it does not start with " + IoTDBConstant.PATH_ROOT);
7272
}
73-
String[] storageGroupNodes = new String[level + 1];
74-
System.arraycopy(nodeNames, 0, storageGroupNodes, 0, level + 1);
75-
return new PartialPath(storageGroupNodes);
73+
String[] databaseNodes = new String[level + 1];
74+
System.arraycopy(nodeNames, 0, databaseNodes, 0, level + 1);
75+
return new PartialPath(databaseNodes);
7676
}
7777

7878
/**

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ public class CommonConfig {
214214
private int pipeSubtaskExecutorMaxThreadNum =
215215
Math.max(5, Runtime.getRuntime().availableProcessors() / 2);
216216

217+
private boolean pipeRetryLocallyForParallelOrUserConflict = true;
218+
217219
private int pipeDataStructureTabletRowSize = 2048;
218220
private int pipeDataStructureTabletSizeInBytes = 2097152;
219221
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3;
@@ -256,8 +258,8 @@ public class CommonConfig {
256258

257259
private long pipeMaxWaitFinishTime = 10 * 1000;
258260

259-
private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
260-
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
261+
private int pipeSourceAssignerDisruptorRingBufferSize = 128;
262+
private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
261263
private long pipeSourceMatcherCacheSize = 1024;
262264

263265
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
@@ -996,38 +998,37 @@ public void setPipeTotalFloatingMemoryProportion(double pipeTotalFloatingMemoryP
996998
"pipeTotalFloatingMemoryProportion is set to {}.", pipeTotalFloatingMemoryProportion);
997999
}
9981000

999-
public int getPipeExtractorAssignerDisruptorRingBufferSize() {
1000-
return pipeExtractorAssignerDisruptorRingBufferSize;
1001+
public int getPipeSourceAssignerDisruptorRingBufferSize() {
1002+
return pipeSourceAssignerDisruptorRingBufferSize;
10011003
}
10021004

1003-
public void setPipeExtractorAssignerDisruptorRingBufferSize(
1004-
int pipeExtractorAssignerDisruptorRingBufferSize) {
1005-
if (this.pipeExtractorAssignerDisruptorRingBufferSize
1006-
== pipeExtractorAssignerDisruptorRingBufferSize) {
1005+
public void setPipeSourceAssignerDisruptorRingBufferSize(
1006+
int pipeSourceAssignerDisruptorRingBufferSize) {
1007+
if (this.pipeSourceAssignerDisruptorRingBufferSize
1008+
== pipeSourceAssignerDisruptorRingBufferSize) {
10071009
return;
10081010
}
1009-
this.pipeExtractorAssignerDisruptorRingBufferSize =
1010-
pipeExtractorAssignerDisruptorRingBufferSize;
1011+
this.pipeSourceAssignerDisruptorRingBufferSize = pipeSourceAssignerDisruptorRingBufferSize;
10111012
logger.info(
1012-
"pipeExtractorAssignerDisruptorRingBufferSize is set to {}.",
1013-
pipeExtractorAssignerDisruptorRingBufferSize);
1013+
"pipeSourceAssignerDisruptorRingBufferSize is set to {}.",
1014+
pipeSourceAssignerDisruptorRingBufferSize);
10141015
}
10151016

1016-
public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
1017-
return pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes;
1017+
public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
1018+
return pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes;
10181019
}
10191020

1020-
public void setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(
1021-
long pipeExtractorAssignerDisruptorRingBufferEntrySize) {
1022-
if (pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes
1023-
== pipeExtractorAssignerDisruptorRingBufferEntrySize) {
1021+
public void setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes(
1022+
long pipeSourceAssignerDisruptorRingBufferEntrySize) {
1023+
if (pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes
1024+
== pipeSourceAssignerDisruptorRingBufferEntrySize) {
10241025
return;
10251026
}
1026-
this.pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes =
1027-
pipeExtractorAssignerDisruptorRingBufferEntrySize;
1027+
this.pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes =
1028+
pipeSourceAssignerDisruptorRingBufferEntrySize;
10281029
logger.info(
1029-
"pipeExtractorAssignerDisruptorRingBufferEntrySize is set to {}.",
1030-
pipeExtractorAssignerDisruptorRingBufferEntrySize);
1030+
"pipeSourceAssignerDisruptorRingBufferEntrySize is set to {}.",
1031+
pipeSourceAssignerDisruptorRingBufferEntrySize);
10311032
}
10321033

10331034
public long getPipeSourceMatcherCacheSize() {
@@ -1400,6 +1401,21 @@ public void setPipeSubtaskExecutorMaxThreadNum(int pipeSubtaskExecutorMaxThreadN
14001401
logger.info("pipeSubtaskExecutorMaxThreadNum is set to {}.", pipeSubtaskExecutorMaxThreadNum);
14011402
}
14021403

1404+
public boolean isPipeRetryLocallyForParallelOrUserConflict() {
1405+
return pipeRetryLocallyForParallelOrUserConflict;
1406+
}
1407+
1408+
public void setPipeRetryLocallyForParallelOrUserConflict(
1409+
boolean pipeRetryLocallyForParallelOrUserConflict) {
1410+
if (this.pipeRetryLocallyForParallelOrUserConflict
1411+
== pipeRetryLocallyForParallelOrUserConflict) {
1412+
return;
1413+
}
1414+
this.pipeRetryLocallyForParallelOrUserConflict = pipeRetryLocallyForParallelOrUserConflict;
1415+
logger.info(
1416+
"pipeRetryLocallyForParallelOrUserConflict is set to {}.", pipeSubtaskExecutorMaxThreadNum);
1417+
}
1418+
14031419
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
14041420
return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
14051421
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ public long getPipeMaxWaitFinishTime() {
143143
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
144144
}
145145

146-
/////////////////////////////// Extractor ///////////////////////////////
146+
/////////////////////////////// Source ///////////////////////////////
147147

148-
public int getPipeExtractorAssignerDisruptorRingBufferSize() {
149-
return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
148+
public int getPipeSourceAssignerDisruptorRingBufferSize() {
149+
return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferSize();
150150
}
151151

152-
public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
153-
return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
152+
public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
153+
return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
154154
}
155155

156156
public long getPipeSourceMatcherCacheSize() {
@@ -337,8 +337,8 @@ public long getPipePeriodicalLogMinIntervalSeconds() {
337337
return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds();
338338
}
339339

340-
public long getPipeLoggerCacheMaxSizeInBytes() {
341-
return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
340+
public boolean isPipeRetryLocallyForParallelOrUserConflict() {
341+
return COMMON_CONFIG.isPipeRetryLocallyForParallelOrUserConflict();
342342
}
343343

344344
/////////////////////////////// Logger ///////////////////////////////
@@ -359,6 +359,10 @@ public int getPipeTsFilePinMaxLogIntervalRounds() {
359359
return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
360360
}
361361

362+
public long getPipeLoggerCacheMaxSizeInBytes() {
363+
return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
364+
}
365+
362366
/////////////////////////////// Memory ///////////////////////////////
363367

364368
public boolean getPipeMemoryManagementEnabled() {
@@ -482,12 +486,12 @@ public void printAllConfigs() {
482486
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
483487

484488
LOGGER.info(
485-
"PipeExtractorAssignerDisruptorRingBufferSize: {}",
486-
getPipeExtractorAssignerDisruptorRingBufferSize());
489+
"PipeSourceAssignerDisruptorRingBufferSize: {}",
490+
getPipeSourceAssignerDisruptorRingBufferSize());
487491
LOGGER.info(
488-
"PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
489-
getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
490-
LOGGER.info("PipeExtractorMatcherCacheSize: {}", getPipeSourceMatcherCacheSize());
492+
"PipeSourceAssignerDisruptorRingBufferEntrySizeInBytes: {}",
493+
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
494+
LOGGER.info("PipeSourceMatcherCacheSize: {}", getPipeSourceMatcherCacheSize());
491495

492496
LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", getPipeConnectorHandshakeTimeoutMs());
493497
LOGGER.info("PipeConnectorTransferTimeoutMs: {}", getPipeConnectorTransferTimeoutMs());
@@ -592,12 +596,15 @@ public void printAllConfigs() {
592596
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());
593597
LOGGER.info(
594598
"PipePeriodicalLogMinIntervalSeconds: {}", getPipePeriodicalLogMinIntervalSeconds());
595-
LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", getPipeLoggerCacheMaxSizeInBytes());
599+
LOGGER.info(
600+
"PipeRetryLocallyForParallelOrUserConflict: {}",
601+
isPipeRetryLocallyForParallelOrUserConflict());
596602

597603
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound());
598604
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());
599605
LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", getPipeTsFilePinMaxLogNumPerRound());
600606
LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", getPipeTsFilePinMaxLogIntervalRounds());
607+
LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", getPipeLoggerCacheMaxSizeInBytes());
601608

602609
LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled());
603610
LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,17 +290,21 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
290290
properties.getProperty(
291291
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
292292
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
293+
config.setPipeRetryLocallyForParallelOrUserConflict(
294+
Boolean.parseBoolean(
295+
properties.getProperty(
296+
"pipe_retry_locally_for_user_conflict",
297+
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
293298

294-
config.setPipeExtractorAssignerDisruptorRingBufferSize(
299+
config.setPipeSourceAssignerDisruptorRingBufferSize(
295300
Integer.parseInt(
296301
Optional.ofNullable(
297302
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
298303
.orElse(
299304
properties.getProperty(
300305
"pipe_extractor_assigner_disruptor_ring_buffer_size",
301-
String.valueOf(
302-
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
303-
config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
306+
String.valueOf(config.getPipeSourceAssignerDisruptorRingBufferSize())))));
307+
config.setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
304308
Integer.parseInt(
305309
Optional.ofNullable(
306310
properties.getProperty(
@@ -309,8 +313,7 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
309313
properties.getProperty(
310314
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
311315
String.valueOf(
312-
config
313-
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
316+
config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes())))));
314317

315318
config.setPipeSourceMatcherCacheSize(
316319
Integer.parseInt(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
2525
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
2626
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
27+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2728
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
2829
import org.apache.iotdb.commons.utils.RetryUtils;
2930
import org.apache.iotdb.pipe.api.event.Event;
@@ -131,6 +132,7 @@ public void handle(
131132
}
132133

133134
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
135+
case 1815: // PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION
134136
if (!isRetryAllowedWhenConflictOccurs) {
135137
LOGGER.warn(
136138
"User conflict exception: will be ignored because retry is not allowed. event: {}. status: {}",
@@ -165,12 +167,16 @@ public void handle(
165167
+ " seconds",
166168
status);
167169
exceptionEventHasBeenRetried.set(true);
168-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
169-
exceptionMessage,
170-
(int)
171-
Math.max(
172-
PipeSubtask.MAX_RETRY_TIMES,
173-
Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1)));
170+
throw status.getCode() == 1815
171+
&& PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
172+
? new PipeNonReportException(exceptionMessage)
173+
: new PipeRuntimeSinkRetryTimesConfigurableException(
174+
exceptionMessage,
175+
(int)
176+
Math.max(
177+
PipeSubtask.MAX_RETRY_TIMES,
178+
Math.min(
179+
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1)));
174180
}
175181

176182
case 803: // NO_PERMISSION

0 commit comments

Comments
 (0)