Skip to content

Commit 4525d07

Browse files
authored
Pipe: Adjusted the waiting time of temporary unavailable exceptions (apache#16798)
* fff * fix * fix * fix * fix * f
1 parent 03b60d1 commit 4525d07

File tree

7 files changed

+154
-2
lines changed

7 files changed

+154
-2
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
package org.apache.iotdb.confignode.manager.pipe.agent.task;
2121

22+
import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
2223
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2324
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2425
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2526
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2627
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
28+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2729
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
2830
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
2931
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -188,7 +190,9 @@ protected boolean executeOnce() throws Exception {
188190
PipeConfigRegionSinkMetrics.getInstance().markConfigEvent(taskID);
189191
}
190192
decreaseReferenceCountAndReleaseLastEvent(event, true);
191-
193+
sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
194+
} catch (final PipeNonReportException e) {
195+
sleep4NonReportException();
192196
} catch (final PipeException e) {
193197
setLastExceptionEvent(event);
194198
if (!isClosed.get()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2424
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2525
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
26+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2627
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2728
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
2829
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -132,8 +133,9 @@ protected boolean executeOnce() {
132133
}
133134

134135
decreaseReferenceCountAndReleaseLastEvent(event, true);
136+
sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
135137
} catch (final PipeNonReportException e) {
136-
// Ignore, go directly next round
138+
sleep4NonReportException();
137139
} catch (final PipeException e) {
138140
if (!isClosed.get()) {
139141
setLastExceptionEvent(event);

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ public class CommonConfig {
254254
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L;
255255
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
256256

257+
private volatile long pipeSinkSubtaskSleepIntervalInitMs = 250L;
258+
private volatile long pipeSinkSubtaskSleepIntervalMaxMs = 1000L;
259+
257260
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
258261

259262
private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -1415,6 +1418,32 @@ public void setPipeRetryLocallyForParallelOrUserConflict(
14151418
"pipeRetryLocallyForParallelOrUserConflict is set to {}.", pipeSubtaskExecutorMaxThreadNum);
14161419
}
14171420

1421+
public long getPipeSinkSubtaskSleepIntervalInitMs() {
1422+
return pipeSinkSubtaskSleepIntervalInitMs;
1423+
}
1424+
1425+
public void setPipeSinkSubtaskSleepIntervalInitMs(long pipeSinkSubtaskSleepIntervalInitMs) {
1426+
if (this.pipeSinkSubtaskSleepIntervalInitMs == pipeSinkSubtaskSleepIntervalInitMs) {
1427+
return;
1428+
}
1429+
this.pipeSinkSubtaskSleepIntervalInitMs = pipeSinkSubtaskSleepIntervalInitMs;
1430+
logger.info(
1431+
"pipeSinkSubtaskSleepIntervalInitMs is set to {}.", pipeSinkSubtaskSleepIntervalInitMs);
1432+
}
1433+
1434+
public long getPipeSinkSubtaskSleepIntervalMaxMs() {
1435+
return pipeSinkSubtaskSleepIntervalMaxMs;
1436+
}
1437+
1438+
public void setPipeSinkSubtaskSleepIntervalMaxMs(long pipeSinkSubtaskSleepIntervalMaxMs) {
1439+
if (this.pipeSinkSubtaskSleepIntervalMaxMs == pipeSinkSubtaskSleepIntervalMaxMs) {
1440+
return;
1441+
}
1442+
this.pipeSinkSubtaskSleepIntervalMaxMs = pipeSinkSubtaskSleepIntervalMaxMs;
1443+
logger.info(
1444+
"pipeSinkSubtaskSleepIntervalMaxMs is set to {}.", pipeSinkSubtaskSleepIntervalMaxMs);
1445+
}
1446+
14181447
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
14191448
return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
14201449
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask {
5353
@SuppressWarnings("java:S3077")
5454
protected volatile Event lastExceptionEvent;
5555

56+
protected long sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
57+
5658
protected PipeAbstractSinkSubtask(
5759
final String taskID, final long creationTime, final PipeConnector outputPipeSink) {
5860
super(taskID, creationTime);
@@ -248,4 +250,15 @@ protected synchronized void clearReferenceCountAndReleaseLastExceptionEvent() {
248250
lastExceptionEvent = null;
249251
}
250252
}
253+
254+
public void sleep4NonReportException() {
255+
if (sleepInterval < PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()) {
256+
sleepInterval <<= 1;
257+
}
258+
try {
259+
Thread.sleep(sleepInterval);
260+
} catch (final InterruptedException e) {
261+
Thread.currentThread().interrupt();
262+
}
263+
}
251264
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,14 @@ public long getPipeMaxWaitFinishTime() {
143143
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
144144
}
145145

146+
public long getPipeSinkSubtaskSleepIntervalInitMs() {
147+
return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalInitMs();
148+
}
149+
150+
public long getPipeSinkSubtaskSleepIntervalMaxMs() {
151+
return COMMON_CONFIG.getPipeSinkSubtaskSleepIntervalMaxMs();
152+
}
153+
146154
/////////////////////////////// Source ///////////////////////////////
147155

148156
public int getPipeSourceAssignerDisruptorRingBufferSize() {
@@ -484,6 +492,8 @@ public void printAllConfigs() {
484492
"PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
485493
getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
486494
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
495+
LOGGER.info("PipeSinkSubtaskSleepIntervalInitMs: {}", getPipeSinkSubtaskSleepIntervalInitMs());
496+
LOGGER.info("PipeSinkSubtaskSleepIntervalMaxMs: {}", getPipeSinkSubtaskSleepIntervalMaxMs());
487497

488498
LOGGER.info(
489499
"PipeSourceAssignerDisruptorRingBufferSize: {}",

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,17 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
296296
"pipe_retry_locally_for_user_conflict",
297297
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
298298

299+
config.setPipeSinkSubtaskSleepIntervalInitMs(
300+
Long.parseLong(
301+
properties.getProperty(
302+
"pipe_sink_subtask_sleep_interval_init_ms",
303+
String.valueOf(config.getPipeSinkSubtaskSleepIntervalInitMs()))));
304+
config.setPipeSinkSubtaskSleepIntervalMaxMs(
305+
Long.parseLong(
306+
properties.getProperty(
307+
"pipe_sink_subtask_sleep_interval_max_ms",
308+
String.valueOf(config.getPipeSinkSubtaskSleepIntervalMaxMs()))));
309+
299310
config.setPipeSourceAssignerDisruptorRingBufferSize(
300311
Integer.parseInt(
301312
Optional.ofNullable(
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.pipe.task;
21+
22+
import org.apache.iotdb.commons.conf.CommonConfig;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
25+
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
26+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
27+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
28+
29+
import org.junit.After;
30+
import org.junit.Assert;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
34+
public class PipeSleepIntervalTest {
35+
private long oldPipeSinkSubtaskSleepIntervalInitMs;
36+
private long oldPipeSinkSubtaskSleepIntervalMaxMs;
37+
38+
@Before
39+
public void setUp() throws Exception {
40+
final CommonConfig config = CommonDescriptor.getInstance().getConfig();
41+
oldPipeSinkSubtaskSleepIntervalInitMs = config.getPipeSinkSubtaskSleepIntervalInitMs();
42+
oldPipeSinkSubtaskSleepIntervalMaxMs = config.getPipeSinkSubtaskSleepIntervalMaxMs();
43+
config.setPipeSinkSubtaskSleepIntervalInitMs(25L);
44+
config.setPipeSinkSubtaskSleepIntervalMaxMs(50L);
45+
}
46+
47+
@After
48+
public void tearDown() throws Exception {
49+
final CommonConfig config = CommonDescriptor.getInstance().getConfig();
50+
config.setPipeSinkSubtaskSleepIntervalInitMs(oldPipeSinkSubtaskSleepIntervalInitMs);
51+
config.setPipeSinkSubtaskSleepIntervalMaxMs(oldPipeSinkSubtaskSleepIntervalMaxMs);
52+
}
53+
54+
@Test
55+
public void test() {
56+
try (final PipeAbstractSinkSubtask subtask =
57+
new PipeAbstractSinkSubtask(null, 0, null) {
58+
@Override
59+
protected String getRootCause(Throwable throwable) {
60+
return null;
61+
}
62+
63+
@Override
64+
protected void report(EnrichedEvent event, PipeRuntimeException exception) {}
65+
66+
@Override
67+
protected boolean executeOnce() {
68+
return false;
69+
}
70+
}) {
71+
long startTime = System.currentTimeMillis();
72+
subtask.sleep4NonReportException();
73+
Assert.assertTrue(
74+
System.currentTimeMillis() - startTime
75+
>= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
76+
startTime = System.currentTimeMillis() - startTime;
77+
subtask.sleep4NonReportException();
78+
Assert.assertTrue(
79+
System.currentTimeMillis() - startTime
80+
>= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)