Skip to content

Commit 0e209e7

Browse files
Pengzna彭俊植
andauthored
Refine CN consensus layer API for procedure robustness (apache#16303)
* refine cn consensus layer API for procedure * refine and fix the whole logic * fix comment * fix comment --------- Co-authored-by: 彭俊植 <[email protected]>
1 parent b3509d6 commit 0e209e7

File tree

7 files changed

+203
-40
lines changed

7 files changed

+203
-40
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,30 @@ protected void periodicExecute(final Env env) {
7070
// Failed procedures aren't persisted in WAL.
7171
batchIds[batchCount++] = entry.getKey();
7272
if (batchCount == batchIds.length) {
73-
store.delete(batchIds, 0, batchCount);
74-
batchCount = 0;
73+
try {
74+
store.delete(batchIds, 0, batchCount);
75+
} catch (Exception e) {
76+
LOG.error("Error deleting completed procedures {}.", proc, e);
77+
// Do not remove from the completed map. Even this procedure may be restored
78+
// unexpectedly in another new CN leader, we do not need to do anything else since
79+
// procedures are idempotent.
80+
continue;
81+
} finally {
82+
batchCount = 0;
83+
}
7584
}
7685
it.remove();
7786
LOG.trace("Evict completed {}", proc);
7887
}
7988
}
8089
if (batchCount > 0) {
81-
store.delete(batchIds, 0, batchCount);
90+
try {
91+
store.delete(batchIds, 0, batchCount);
92+
} catch (Exception e) {
93+
// Even this procedure may be restored unexpectedly in another new CN leader, we do not need
94+
// to do anything else since procedures are idempotent.
95+
LOG.error("Error deleting completed procedures {}.", batchIds, e);
96+
}
8297
}
8398
}
8499
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
2525
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
2626
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
27+
import org.apache.iotdb.consensus.exception.ConsensusException;
2728

2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -196,7 +197,7 @@ public void deserialize(ByteBuffer byteBuffer) {
196197
byteBuffer.get(resultArr);
197198
}
198199
// has lock
199-
if (byteBuffer.get() == 1) {
200+
if (byteBuffer.get() == 1 && this.state != ProcedureState.ROLLEDBACK) {
200201
this.lockedWhenLoading();
201202
}
202203
}
@@ -300,8 +301,15 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
300301
}
301302
ProcedureLockState state = acquireLock(env);
302303
if (state == ProcedureLockState.LOCK_ACQUIRED) {
303-
locked = true;
304-
store.update(this);
304+
try {
305+
locked = true;
306+
store.update(this);
307+
} catch (Exception e) {
308+
// Do not need to do anything else. New leader which restore this procedure from a wrong
309+
// state will reexecute it and converge to the correct state since procedures are
310+
// idempotent.
311+
LOG.warn("pid={} Failed to persist lock state to store.", this.procId, e);
312+
}
305313
}
306314
return state;
307315
}
@@ -312,12 +320,19 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
312320
* @param env environment
313321
* @param store ProcedureStore
314322
*/
315-
public final void doReleaseLock(Env env, IProcedureStore store) {
323+
public final void doReleaseLock(Env env, IProcedureStore store) throws Exception {
316324
locked = false;
317-
if (getState() != ProcedureState.ROLLEDBACK) {
325+
if (getState() == ProcedureState.ROLLEDBACK) {
326+
LOG.info("Force write unlock state to raft for pid={}", this.procId);
327+
}
328+
try {
318329
store.update(this);
330+
// do not release lock when consensus layer is not working
331+
releaseLock(env);
332+
} catch (ConsensusException e) {
333+
LOG.error("pid={} Failed to persist unlock state to store.", this.procId, e);
334+
throw e;
319335
}
320-
releaseLock(env);
321336
}
322337

323338
public final void restoreLock(Env env) {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.procedure;
2121

2222
import org.apache.iotdb.commons.concurrent.ThreadName;
23+
import org.apache.iotdb.commons.utils.RetryUtils;
2324
import org.apache.iotdb.commons.utils.TestOnly;
2425
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
2526
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -221,7 +222,8 @@ Long getRootProcedureId(Procedure<Env> proc) {
221222

222223
private void releaseLock(Procedure<Env> procedure, boolean force) {
223224
if (force || !procedure.holdLock(this.environment) || procedure.isFinished()) {
224-
procedure.doReleaseLock(this.environment, store);
225+
RetryUtils.executeWithEndlessBackoffRetry(
226+
() -> procedure.doReleaseLock(this.environment, store), "procedure release lock");
225227
}
226228
}
227229

@@ -477,7 +479,11 @@ private void countDownChildren(RootProcedureStack rootProcStack, Procedure<Env>
477479
}
478480
if (parent != null && parent.tryRunnable()) {
479481
// If success, means all its children have completed, move parent to front of the queue.
480-
store.update(parent);
482+
// Must endless retry here, since this step is not idempotent and can not be re-execute
483+
// correctly in new CN leader.
484+
RetryUtils.executeWithEndlessBackoffRetry(
485+
() -> store.update(parent), "count down children procedure");
486+
// do not add this procedure when exception occurred
481487
scheduler.addFront(parent);
482488
LOG.info(
483489
"Finished subprocedure pid={}, resume processing ppid={}",
@@ -506,21 +512,44 @@ private void updateStoreOnExecution(
506512
if (LOG.isDebugEnabled()) {
507513
LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs));
508514
}
509-
store.update(subprocs);
515+
try {
516+
store.update(subprocs);
517+
} catch (Exception e) {
518+
// Do nothing since this step is idempotent. New CN leader can converge to the correct
519+
// state when restore this procedure.
520+
LOG.warn("Failed to update subprocs on execution", e);
521+
}
510522
} else {
511523
LOG.debug("Store update {}", proc);
512524
if (proc.isFinished() && !proc.hasParent()) {
513525
final long[] childProcIds = rootProcStack.getSubprocedureIds();
514526
if (childProcIds != null) {
515-
store.delete(childProcIds);
516-
for (long childProcId : childProcIds) {
517-
procedures.remove(childProcId);
527+
try {
528+
store.delete(childProcIds);
529+
// do not remove these procedures when exception occurred
530+
for (long childProcId : childProcIds) {
531+
procedures.remove(childProcId);
532+
}
533+
} catch (Exception e) {
534+
// Do nothing since this step is idempotent. New CN leader can converge to the correct
535+
// state when restore this procedure.
536+
LOG.warn("Failed to delete subprocedures on execution", e);
518537
}
519538
} else {
520-
store.update(proc);
539+
try {
540+
store.update(proc);
541+
} catch (Exception e) {
542+
LOG.warn("Failed to update procedure on execution", e);
543+
}
521544
}
522545
} else {
523-
store.update(proc);
546+
try {
547+
store.update(proc);
548+
} catch (Exception e) {
549+
// Do nothing since this step is idempotent. New CN leader can converge to the correct
550+
// state when restore this procedure.
551+
LOG.warn("Failed to update procedure on execution", e);
552+
}
524553
}
525554
}
526555
}
@@ -577,7 +606,9 @@ private ProcedureLockState executeRootStackRollback(
577606
if (exception == null) {
578607
exception = procedureStack.getException();
579608
rootProcedure.setFailure(exception);
580-
store.update(rootProcedure);
609+
// Endless retry since this step is not idempotent.
610+
RetryUtils.executeWithEndlessBackoffRetry(
611+
() -> store.update(rootProcedure), "root procedure rollback");
581612
}
582613
List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack();
583614
int stackTail = subprocStack.size();
@@ -653,18 +684,37 @@ private void cleanupAfterRollback(Procedure<Env> procedure) {
653684
procedure.updateMetricsOnFinish(getEnvironment(), procedure.elapsedTime(), false);
654685

655686
if (procedure.hasParent()) {
656-
store.delete(procedure.getProcId());
657-
procedures.remove(procedure.getProcId());
687+
try {
688+
store.delete(procedure.getProcId());
689+
// do not remove this procedure when exception occurred
690+
procedures.remove(procedure.getProcId());
691+
} catch (Exception e) {
692+
// Do nothing since this step is idempotent. New CN leader can converge to the correct
693+
// state when restore this procedure.
694+
LOG.warn("Failed to delete procedure on rollback", e);
695+
}
658696
} else {
659697
final long[] childProcIds = rollbackStack.get(procedure.getProcId()).getSubprocedureIds();
660-
if (childProcIds != null) {
661-
store.delete(childProcIds);
662-
} else {
663-
store.update(procedure);
698+
try {
699+
if (childProcIds != null) {
700+
store.delete(childProcIds);
701+
} else {
702+
store.update(procedure);
703+
}
704+
} catch (Exception e) {
705+
// Do nothing since this step is idempotent. New CN leader can converge to the correct
706+
// state when restore this procedure.
707+
LOG.warn("Failed to delete procedure on rollback", e);
664708
}
665709
}
666710
} else {
667-
store.update(procedure);
711+
try {
712+
store.update(procedure);
713+
} catch (Exception e) {
714+
// Do nothing since this step is idempotent. New CN leader can converge to the correct
715+
// state when restore this procedure.
716+
LOG.warn("Failed to update procedure on rollback", e);
717+
}
668718
}
669719
}
670720

@@ -916,7 +966,11 @@ public long submitProcedure(Procedure<Env> procedure) {
916966
procedure.setProcId(store.getNextProcId());
917967
procedure.setProcRunnable();
918968
// Commit the transaction
919-
store.update(procedure);
969+
try {
970+
store.update(procedure);
971+
} catch (Exception e) {
972+
LOG.error("Failed to update store procedure {}", procedure, e);
973+
}
920974
LOG.debug("{} is stored.", procedure);
921975
// Add the procedure to the executor
922976
return pushProcedure(procedure);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919

2020
package org.apache.iotdb.confignode.procedure;
2121

22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
2225
import java.util.concurrent.DelayQueue;
2326
import java.util.concurrent.Delayed;
2427
import java.util.concurrent.TimeUnit;
2528

2629
public class TimeoutExecutorThread<Env> extends StoppableThread {
2730

31+
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutExecutorThread.class);
2832
private static final int DELAY_QUEUE_TIMEOUT = 20;
2933
private final ProcedureExecutor<Env> executor;
3034
private final DelayQueue<ProcedureDelayContainer<Env>> queue = new DelayQueue<>();
@@ -71,7 +75,13 @@ public void run() {
7175
long rootProcId = executor.getRootProcedureId(procedure);
7276
RootProcedureStack<Env> rollbackStack = executor.getRollbackStack(rootProcId);
7377
rollbackStack.abort();
74-
executor.getStore().update(procedure);
78+
try {
79+
executor.getStore().update(procedure);
80+
} catch (Exception e) {
81+
// Do nothing since new CN leader can converge to the correct state when restore this
82+
// procedure.
83+
LOGGER.warn("Failed to update procedure {}", procedure, e);
84+
}
7585
executor.getScheduler().addFront(procedure);
7686
}
7787
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,43 +89,55 @@ public long getNextProcId() {
8989
}
9090

9191
@Override
92-
public void update(Procedure<ConfigNodeProcedureEnv> procedure) {
92+
public void update(Procedure<ConfigNodeProcedureEnv> procedure) throws Exception {
9393
Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure), "Procedure type is null");
9494
final UpdateProcedurePlan updateProcedurePlan = new UpdateProcedurePlan(procedure);
9595
try {
9696
configManager.getConsensusManager().write(updateProcedurePlan);
9797
} catch (ConsensusException e) {
98-
LOG.warn("Failed in the write API executing the consensus layer due to: ", e);
98+
LOG.warn(
99+
"pid={} Failed in the write update API executing the consensus layer due to: ",
100+
procedure.getProcId(),
101+
e);
102+
// In consensus layer API, do nothing but just throw an exception to let upper caller handle
103+
// it.
104+
throw e;
99105
}
100106
}
101107

102108
@Override
103-
public void update(Procedure[] subprocs) {
109+
public void update(Procedure[] subprocs) throws Exception {
104110
for (Procedure subproc : subprocs) {
105111
update(subproc);
106112
}
107113
}
108114

109115
@Override
110-
public void delete(long procId) {
116+
public void delete(long procId) throws Exception {
111117
DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan();
112118
deleteProcedurePlan.setProcId(procId);
113119
try {
114120
configManager.getConsensusManager().write(deleteProcedurePlan);
115121
} catch (ConsensusException e) {
116-
LOG.warn("Failed in the write API executing the consensus layer due to: ", e);
122+
LOG.warn(
123+
"pid={} Failed in the write delete API executing the consensus layer due to: ",
124+
procId,
125+
e);
126+
// In consensus layer API, do nothing but just throw an exception to let upper caller handle
127+
// it.
128+
throw e;
117129
}
118130
}
119131

120132
@Override
121-
public void delete(long[] childProcIds) {
133+
public void delete(long[] childProcIds) throws Exception {
122134
for (long childProcId : childProcIds) {
123135
delete(childProcId);
124136
}
125137
}
126138

127139
@Override
128-
public void delete(long[] batchIds, int startIndex, int batchCount) {
140+
public void delete(long[] batchIds, int startIndex, int batchCount) throws Exception {
129141
for (int i = startIndex; i < batchCount; i++) {
130142
delete(batchIds[i]);
131143
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ public interface IProcedureStore<Env> {
3838

3939
long getNextProcId();
4040

41-
void update(Procedure<Env> procedure);
41+
void update(Procedure<Env> procedure) throws Exception;
4242

43-
void update(Procedure<Env>[] subprocs);
43+
void update(Procedure<Env>[] subprocs) throws Exception;
4444

45-
void delete(long procId);
45+
void delete(long procId) throws Exception;
4646

47-
void delete(long[] childProcIds);
47+
void delete(long[] childProcIds) throws Exception;
4848

49-
void delete(long[] batchIds, int startIndex, int batchCount);
49+
void delete(long[] batchIds, int startIndex, int batchCount) throws Exception;
5050

5151
void cleanup();
5252

0 commit comments

Comments
 (0)