Skip to content

Commit 185ca4a

Browse files
authored
Refactoring the callbacks for the group commit error handling in CommitHandler (#2390)
1 parent 00949f0 commit 185ca4a

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,20 @@ public CommitHandler(
5151
this.parallelExecutor = checkNotNull(parallelExecutor);
5252
}
5353

54-
protected void onPrepareFailure(Snapshot snapshot) {}
54+
/**
55+
* A callback invoked when any exception occurs before committing transactions.
56+
*
57+
* @param snapshot the failed snapshot.
58+
*/
59+
protected void onFailureBeforeCommit(Snapshot snapshot) {}
5560

56-
protected void onValidateFailure(Snapshot snapshot) {}
61+
private void safelyCallOnFailureBeforeCommit(Snapshot snapshot) {
62+
try {
63+
onFailureBeforeCommit(snapshot);
64+
} catch (Exception e) {
65+
logger.warn("Failed to call the callback. Transaction ID: {}", snapshot.getId(), e);
66+
}
67+
}
5768

5869
private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snapshot)
5970
throws UnknownTransactionStatusException, CommitException {
@@ -65,11 +76,9 @@ private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snap
6576
return Optional.of(
6677
beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets()));
6778
} catch (Exception e) {
79+
safelyCallOnFailureBeforeCommit(snapshot);
6880
abortState(snapshot.getId());
6981
rollbackRecords(snapshot);
70-
// TODO: This method is actually a part of preparation phase. But the callback method name
71-
// `onPrepareFailure()` should be renamed to more reasonable one.
72-
onPrepareFailure(snapshot);
7382
throw new CommitException(
7483
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
7584
e,
@@ -87,11 +96,9 @@ private void waitBeforePreparationSnapshotHookFuture(
8796
try {
8897
snapshotHookFuture.get();
8998
} catch (Exception e) {
99+
safelyCallOnFailureBeforeCommit(snapshot);
90100
abortState(snapshot.getId());
91101
rollbackRecords(snapshot);
92-
// TODO: This method is actually a part of validation phase. But the callback method name
93-
// `onValidateFailure()` should be renamed to more reasonable one.
94-
onValidateFailure(snapshot);
95102
throw new CommitException(
96103
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
97104
e,
@@ -104,28 +111,30 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction
104111
try {
105112
prepare(snapshot);
106113
} catch (PreparationException e) {
114+
safelyCallOnFailureBeforeCommit(snapshot);
107115
abortState(snapshot.getId());
108116
rollbackRecords(snapshot);
109117
if (e instanceof PreparationConflictException) {
110118
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
111119
}
112120
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
113121
} catch (Exception e) {
114-
onPrepareFailure(snapshot);
122+
safelyCallOnFailureBeforeCommit(snapshot);
115123
throw e;
116124
}
117125

118126
try {
119127
validate(snapshot);
120128
} catch (ValidationException e) {
129+
safelyCallOnFailureBeforeCommit(snapshot);
121130
abortState(snapshot.getId());
122131
rollbackRecords(snapshot);
123132
if (e instanceof ValidationConflictException) {
124133
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
125134
}
126135
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
127136
} catch (Exception e) {
128-
onValidateFailure(snapshot);
137+
safelyCallOnFailureBeforeCommit(snapshot);
129138
throw e;
130139
}
131140

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,7 @@ public CommitHandlerWithGroupCommit(
3838
}
3939

4040
@Override
41-
protected void onPrepareFailure(Snapshot snapshot) {
42-
cancelGroupCommitIfNeeded(snapshot.getId());
43-
}
44-
45-
@Override
46-
protected void onValidateFailure(Snapshot snapshot) {
41+
protected void onFailureBeforeCommit(Snapshot snapshot) {
4742
cancelGroupCommitIfNeeded(snapshot.getId());
4843
}
4944

@@ -77,7 +72,12 @@ private void commitStateViaGroupCommit(Snapshot snapshot)
7772
}
7873

7974
private void cancelGroupCommitIfNeeded(String id) {
80-
groupCommitter.remove(id);
75+
try {
76+
groupCommitter.remove(id);
77+
} catch (Exception e) {
78+
logger.warn(
79+
"Unexpectedly failed to remove the snapshot ID from the group committer. ID: {}", id);
80+
}
8181
}
8282

8383
@Override

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectiv
183183
verify(storage, times(4)).mutate(anyList());
184184
verifyCoordinatorPutState(TransactionState.COMMITTED);
185185
verifySnapshotHook(withSnapshotHook, readWriteSets);
186-
verify(handler, never()).onPrepareFailure(any());
187-
verify(handler, never()).onValidateFailure(any());
186+
verify(handler, never()).onFailureBeforeCommit(any());
188187
}
189188

190189
@ParameterizedTest
@@ -206,8 +205,7 @@ public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean
206205
verify(storage, times(2)).mutate(anyList());
207206
verifyCoordinatorPutState(TransactionState.COMMITTED);
208207
verifySnapshotHook(withSnapshotHook, readWriteSets);
209-
verify(handler, never()).onPrepareFailure(any());
210-
verify(handler, never()).onValidateFailure(any());
208+
verify(handler, never()).onFailureBeforeCommit(any());
211209
}
212210

213211
@Test
@@ -230,6 +228,7 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept
230228
verify(coordinator, never())
231229
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
232230
verify(handler).rollbackRecords(snapshot);
231+
verify(handler).onFailureBeforeCommit(snapshot);
233232
}
234233

235234
@Test
@@ -252,6 +251,7 @@ public void commit_RetriableExecutionExceptionThrownInPrepareRecords_ShouldThrow
252251
verify(coordinator, never())
253252
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
254253
verify(handler).rollbackRecords(snapshot);
254+
verify(handler).onFailureBeforeCommit(snapshot);
255255
}
256256

257257
@Test
@@ -274,6 +274,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
274274
verify(coordinator, never())
275275
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
276276
verify(handler).rollbackRecords(snapshot);
277+
verify(handler).onFailureBeforeCommit(snapshot);
277278
}
278279

279280
@Test
@@ -303,6 +304,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
303304
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
304305
verify(coordinator).getState(anyId());
305306
verify(handler).rollbackRecords(snapshot);
307+
verify(handler).onFailureBeforeCommit(snapshot);
306308
}
307309

308310
@Test
@@ -330,6 +332,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
330332
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
331333
verify(coordinator).getState(anyId());
332334
verify(handler, never()).rollbackRecords(snapshot);
335+
verify(handler).onFailureBeforeCommit(snapshot);
333336
}
334337

335338
@Test
@@ -357,6 +360,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
357360
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
358361
verify(coordinator).getState(anyId());
359362
verify(handler, never()).rollbackRecords(snapshot);
363+
verify(handler).onFailureBeforeCommit(snapshot);
360364
}
361365

362366
@Test
@@ -382,6 +386,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
382386
verify(coordinator, never())
383387
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
384388
verify(handler, never()).rollbackRecords(snapshot);
389+
verify(handler).onFailureBeforeCommit(snapshot);
385390
}
386391

387392
@Test
@@ -405,6 +410,7 @@ public void commit_ValidationConflictExceptionThrownInValidation_ShouldAbortAndR
405410
verify(coordinator, never())
406411
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
407412
verify(handler).rollbackRecords(snapshot);
413+
verify(handler).onFailureBeforeCommit(snapshot);
408414
}
409415

410416
@Test
@@ -428,6 +434,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
428434
verify(coordinator, never())
429435
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
430436
verify(handler).rollbackRecords(snapshot);
437+
verify(handler).onFailureBeforeCommit(snapshot);
431438
}
432439

433440
@Test
@@ -458,6 +465,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
458465
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
459466
verify(coordinator).getState(anyId());
460467
verify(handler).rollbackRecords(snapshot);
468+
verify(handler).onFailureBeforeCommit(snapshot);
461469
}
462470

463471
@Test
@@ -486,6 +494,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
486494
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
487495
verify(coordinator).getState(anyId());
488496
verify(handler, never()).rollbackRecords(snapshot);
497+
verify(handler).onFailureBeforeCommit(snapshot);
489498
}
490499

491500
@Test
@@ -514,6 +523,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
514523
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
515524
verify(coordinator).getState(anyId());
516525
verify(handler, never()).rollbackRecords(snapshot);
526+
verify(handler).onFailureBeforeCommit(snapshot);
517527
}
518528

519529
@Test
@@ -540,6 +550,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
540550
verify(coordinator, never())
541551
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
542552
verify(handler, never()).rollbackRecords(snapshot);
553+
verify(handler).onFailureBeforeCommit(snapshot);
543554
}
544555

545556
@Test
@@ -564,6 +575,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
564575
verifyCoordinatorPutState(TransactionState.COMMITTED);
565576
verify(coordinator).getState(anyId());
566577
verify(handler, never()).rollbackRecords(snapshot);
578+
verify(handler, never()).onFailureBeforeCommit(any());
567579
}
568580

569581
@Test
@@ -587,6 +599,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
587599
verifyCoordinatorPutState(TransactionState.COMMITTED);
588600
verify(coordinator).getState(anyId());
589601
verify(handler).rollbackRecords(snapshot);
602+
verify(handler, never()).onFailureBeforeCommit(any());
590603
}
591604

592605
@Test
@@ -609,6 +622,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
609622
verifyCoordinatorPutState(TransactionState.COMMITTED);
610623
verify(coordinator).getState(anyId());
611624
verify(handler, never()).rollbackRecords(snapshot);
625+
verify(handler, never()).onFailureBeforeCommit(any());
612626
}
613627

614628
@Test
@@ -631,6 +645,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
631645
verifyCoordinatorPutState(TransactionState.COMMITTED);
632646
verify(coordinator).getState(anyId());
633647
verify(handler, never()).rollbackRecords(snapshot);
648+
verify(handler, never()).onFailureBeforeCommit(any());
634649
}
635650

636651
@Test
@@ -649,6 +664,7 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown()
649664
verify(storage, times(2)).mutate(anyList());
650665
verifyCoordinatorPutState(TransactionState.COMMITTED);
651666
verify(handler, never()).rollbackRecords(snapshot);
667+
verify(handler, never()).onFailureBeforeCommit(any());
652668
}
653669

654670
@Test
@@ -687,8 +703,7 @@ public Future<Void> handle(
687703
// This means `commit()` waited until the callback was completed before throwing
688704
// an exception from `commitState()`.
689705
assertThat(Duration.between(start, end)).isGreaterThanOrEqualTo(Duration.ofSeconds(2));
690-
verify(handler, never()).onPrepareFailure(any());
691-
verify(handler, never()).onValidateFailure(any());
706+
verify(handler, never()).onFailureBeforeCommit(any());
692707
}
693708

694709
@Test
@@ -710,8 +725,7 @@ public void commit_FailingSnapshotHookGiven_ShouldThrowCommitException()
710725
verify(coordinator, never())
711726
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
712727
verify(handler).rollbackRecords(snapshot);
713-
verify(handler).onPrepareFailure(any());
714-
verify(handler, never()).onValidateFailure(any());
728+
verify(handler).onFailureBeforeCommit(any());
715729
}
716730

717731
@Test
@@ -735,8 +749,7 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException()
735749
verify(coordinator, never())
736750
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
737751
verify(handler).rollbackRecords(snapshot);
738-
verify(handler, never()).onPrepareFailure(any());
739-
verify(handler).onValidateFailure(snapshot);
752+
verify(handler).onFailureBeforeCommit(snapshot);
740753
}
741754

742755
protected void doThrowExceptionWhenCoordinatorPutState(

0 commit comments

Comments
 (0)