Skip to content

Commit 01f0a8f

Browse files
authored
Procedure: Fix procedure framework data race (apache#14250)
* fix procedure framework data race * fix review * spotless
1 parent a409dab commit 01f0a8f

File tree

2 files changed

+64
-50
lines changed

2 files changed

+64
-50
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.concurrent.ThreadName;
2323
import org.apache.iotdb.commons.utils.TestOnly;
24+
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
2425
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
2526
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
2627
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
@@ -41,6 +42,7 @@
4142
import java.util.Deque;
4243
import java.util.HashSet;
4344
import java.util.List;
45+
import java.util.Objects;
4446
import java.util.Set;
4547
import java.util.concurrent.ConcurrentHashMap;
4648
import java.util.concurrent.CopyOnWriteArrayList;
@@ -329,68 +331,79 @@ private void executeProcedure(Procedure<Env> proc) {
329331
LOG.warn("Rollback stack is null for {}", proc.getProcId());
330332
return;
331333
}
332-
do {
333-
if (!rootProcStack.acquire()) {
334-
if (rootProcStack.setRollback()) {
335-
switch (executeRootStackRollback(rootProcId, rootProcStack)) {
336-
case LOCK_ACQUIRED:
337-
break;
338-
case LOCK_EVENT_WAIT:
339-
LOG.info("LOCK_EVENT_WAIT rollback " + proc);
340-
rootProcStack.unsetRollback();
341-
break;
342-
case LOCK_YIELD_WAIT:
343-
rootProcStack.unsetRollback();
344-
scheduler.yield(proc);
345-
break;
346-
default:
347-
throw new UnsupportedOperationException();
348-
}
349-
} else {
350-
if (!proc.wasExecuted()) {
351-
switch (executeRollback(proc)) {
334+
ProcedureLockState lockState = null;
335+
try {
336+
do {
337+
if (!rootProcStack.acquire()) {
338+
if (rootProcStack.setRollback()) {
339+
lockState = executeRootStackRollback(rootProcId, rootProcStack);
340+
switch (lockState) {
352341
case LOCK_ACQUIRED:
353342
break;
354343
case LOCK_EVENT_WAIT:
355-
LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
344+
LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
345+
rootProcStack.unsetRollback();
356346
break;
357347
case LOCK_YIELD_WAIT:
348+
rootProcStack.unsetRollback();
358349
scheduler.yield(proc);
359350
break;
360351
default:
361352
throw new UnsupportedOperationException();
362353
}
354+
} else {
355+
if (!proc.wasExecuted()) {
356+
switch (executeRollback(proc)) {
357+
case LOCK_ACQUIRED:
358+
break;
359+
case LOCK_EVENT_WAIT:
360+
LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
361+
break;
362+
case LOCK_YIELD_WAIT:
363+
scheduler.yield(proc);
364+
break;
365+
default:
366+
throw new UnsupportedOperationException();
367+
}
368+
}
363369
}
364-
}
365-
break;
366-
}
367-
ProcedureLockState lockState = acquireLock(proc);
368-
switch (lockState) {
369-
case LOCK_ACQUIRED:
370-
executeProcedure(rootProcStack, proc);
371370
break;
372-
case LOCK_YIELD_WAIT:
373-
case LOCK_EVENT_WAIT:
374-
LOG.info("{} lockstate is {}", proc, lockState);
375-
break;
376-
default:
377-
throw new UnsupportedOperationException();
378-
}
379-
rootProcStack.release();
380-
381-
if (proc.isSuccess()) {
382-
// update metrics on finishing the procedure
383-
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
384-
LOG.debug("{} finished in {}ms successfully.", proc, proc.elapsedTime());
385-
if (proc.getProcId() == rootProcId) {
386-
rootProcedureCleanup(proc);
387-
} else {
388-
executeCompletionCleanup(proc);
389371
}
390-
return;
391-
}
372+
lockState = acquireLock(proc);
373+
switch (lockState) {
374+
case LOCK_ACQUIRED:
375+
executeProcedure(rootProcStack, proc);
376+
break;
377+
case LOCK_YIELD_WAIT:
378+
case LOCK_EVENT_WAIT:
379+
LOG.info("{} lockstate is {}", proc, lockState);
380+
break;
381+
default:
382+
throw new UnsupportedOperationException();
383+
}
384+
rootProcStack.release();
385+
386+
if (proc.isSuccess()) {
387+
// update metrics on finishing the procedure
388+
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
389+
LOG.debug("{} finished in {}ms successfully.", proc, proc.elapsedTime());
390+
if (proc.getProcId() == rootProcId) {
391+
rootProcedureCleanup(proc);
392+
} else {
393+
executeCompletionCleanup(proc);
394+
}
395+
return;
396+
}
392397

393-
} while (rootProcStack.isFailed());
398+
} while (rootProcStack.isFailed());
399+
} finally {
400+
// Only after procedure has completed execution can it be allowed to be rescheduled to prevent
401+
// data races
402+
if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
403+
LOG.info("procedureId {} wait for lock.", proc.getProcId());
404+
((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc);
405+
}
406+
}
394407
}
395408

396409
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
4747
LOG.info("procedureId {} acquire lock.", getProcId());
4848
return ProcedureLockState.LOCK_ACQUIRED;
4949
}
50-
configNodeProcedureEnv.getNodeLock().waitProcedure(this);
51-
LOG.info("procedureId {} wait for lock.", getProcId());
50+
LOG.info(
51+
"procedureId {} acquire lock failed, will wait for lock after finishing execution.",
52+
getProcId());
5253
return ProcedureLockState.LOCK_EVENT_WAIT;
5354
} finally {
5455
configNodeProcedureEnv.getSchedulerLock().unlock();

0 commit comments

Comments
 (0)