Skip to content

Commit 299180a

Browse files
luoluoyuyuJackieTien97
authored andcommitted
[To dev/1.3] Pipe: Fix the stuck state caused by unfair lock in Sink start phase (#16100) (#16106)
* Pipe: Fix the stuck state caused by unfair lock in Sink start phase * fix (cherry picked from commit 2df3c45)
1 parent fedfa10 commit 299180a

File tree

4 files changed

+48
-43
lines changed

4 files changed

+48
-43
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,8 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
222222
// Try to remove the events as much as possible
223223
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
224224

225-
highPriorityLockTaskCount.incrementAndGet();
226225
try {
227-
synchronized (highPriorityLockTaskCount) {
228-
highPriorityLockTaskCount.notifyAll();
229-
}
226+
increaseHighPriorityTaskCount();
230227

231228
// synchronized to use the lastEvent & lastExceptionEvent
232229
synchronized (this) {
@@ -265,7 +262,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
265262
}
266263
}
267264
} finally {
268-
highPriorityLockTaskCount.decrementAndGet();
265+
decreaseHighPriorityTaskCount();
269266
}
270267

271268
if (outputPipeConnector instanceof IoTDBConnector) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,12 @@ public synchronized void start() {
124124
}
125125

126126
if (runningTaskCount == 0) {
127-
executor.start(subtask.getTaskID());
127+
try {
128+
subtask.increaseHighPriorityTaskCount();
129+
executor.start(subtask.getTaskID());
130+
} finally {
131+
subtask.decreaseHighPriorityTaskCount();
132+
}
128133
}
129134

130135
runningTaskCount++;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,8 @@ private boolean onPipeConnectionException(final Throwable throwable) {
170170
MAX_RETRY_TIMES,
171171
e);
172172
try {
173-
synchronized (highPriorityLockTaskCount) {
174-
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
175-
// no deadlock.
176-
if (highPriorityLockTaskCount.get() == 0) {
177-
highPriorityLockTaskCount.wait(
178-
retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
179-
}
180-
}
173+
sleepIfNoHighPriorityTask(
174+
retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
181175
} catch (final InterruptedException interruptedException) {
182176
LOGGER.info(
183177
"Interrupted while sleeping, will retry to handshake with the target system.",
@@ -254,17 +248,4 @@ protected synchronized void clearReferenceCountAndReleaseLastExceptionEvent() {
254248
lastExceptionEvent = null;
255249
}
256250
}
257-
258-
private void preScheduleLowPriorityTask(int maxRetries) {
259-
while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
260-
try {
261-
// Introduce a short delay to avoid CPU spinning
262-
Thread.sleep(10);
263-
} catch (InterruptedException e) {
264-
Thread.currentThread().interrupt();
265-
LOGGER.warn("Interrupted while waiting for the high priority lock task.", e);
266-
break;
267-
}
268-
}
269-
}
270251
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,8 @@ private void onEnrichedEventFailure(final Throwable throwable) {
9090
throwable.getMessage(),
9191
throwable);
9292
try {
93-
synchronized (highPriorityLockTaskCount) {
94-
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
95-
// no deadlock.
96-
if (highPriorityLockTaskCount.get() == 0) {
97-
highPriorityLockTaskCount.wait(
98-
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
99-
}
100-
}
93+
sleepIfNoHighPriorityTask(
94+
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
10195
} catch (final InterruptedException e) {
10296
LOGGER.warn(
10397
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
@@ -164,14 +158,8 @@ private void onNonEnrichedEventFailure(final Throwable throwable) {
164158
throwable.getMessage(),
165159
throwable);
166160
try {
167-
synchronized (highPriorityLockTaskCount) {
168-
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
169-
// no deadlock.
170-
if (highPriorityLockTaskCount.get() == 0) {
171-
highPriorityLockTaskCount.wait(
172-
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
173-
}
174-
}
161+
sleepIfNoHighPriorityTask(
162+
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
175163
} catch (final InterruptedException e) {
176164
LOGGER.warn(
177165
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
@@ -183,4 +171,38 @@ private void onNonEnrichedEventFailure(final Throwable throwable) {
183171

184172
submitSelf();
185173
}
174+
175+
protected void preScheduleLowPriorityTask(int maxRetries) {
176+
while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
177+
try {
178+
// Introduce a short delay to avoid CPU spinning
179+
Thread.sleep(10);
180+
} catch (InterruptedException e) {
181+
Thread.currentThread().interrupt();
182+
LOGGER.warn("Interrupted while waiting for the high priority lock task.", e);
183+
break;
184+
}
185+
}
186+
}
187+
188+
protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException {
189+
synchronized (highPriorityLockTaskCount) {
190+
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
191+
// no deadlock.
192+
if (highPriorityLockTaskCount.get() > 0) {
193+
highPriorityLockTaskCount.wait(sleepMillis);
194+
}
195+
}
196+
}
197+
198+
public void increaseHighPriorityTaskCount() {
199+
highPriorityLockTaskCount.incrementAndGet();
200+
synchronized (highPriorityLockTaskCount) {
201+
highPriorityLockTaskCount.notifyAll();
202+
}
203+
}
204+
205+
public void decreaseHighPriorityTaskCount() {
206+
highPriorityLockTaskCount.decrementAndGet();
207+
}
186208
}

0 commit comments

Comments
 (0)