2424import com .scalar .db .exception .transaction .UnsatisfiedConditionException ;
2525import com .scalar .db .util .ScalarDbUtils ;
2626import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
27- import java .util .Iterator ;
2827import java .util .List ;
2928import java .util .Optional ;
30- import javax .annotation .Nonnull ;
3129import javax .annotation .Nullable ;
3230import javax .annotation .concurrent .NotThreadSafe ;
3331import org .slf4j .Logger ;
@@ -51,24 +49,19 @@ public class ConsensusCommit extends AbstractDistributedTransaction {
5149 private static final Logger logger = LoggerFactory .getLogger (ConsensusCommit .class );
5250 private final CrudHandler crud ;
5351 private final CommitHandler commit ;
54- private final RecoveryHandler recovery ;
5552 private final ConsensusCommitMutationOperationChecker mutationOperationChecker ;
5653 @ Nullable private final CoordinatorGroupCommitter groupCommitter ;
57- private Runnable beforeRecoveryHook ;
5854
5955 @ SuppressFBWarnings ("EI_EXPOSE_REP2" )
6056 public ConsensusCommit (
6157 CrudHandler crud ,
6258 CommitHandler commit ,
63- RecoveryHandler recovery ,
6459 ConsensusCommitMutationOperationChecker mutationOperationChecker ,
6560 @ Nullable CoordinatorGroupCommitter groupCommitter ) {
6661 this .crud = checkNotNull (crud );
6762 this .commit = checkNotNull (commit );
68- this .recovery = checkNotNull (recovery );
6963 this .mutationOperationChecker = mutationOperationChecker ;
7064 this .groupCommitter = groupCommitter ;
71- this .beforeRecoveryHook = () -> {};
7265 }
7366
7467 @ Override
@@ -78,63 +71,18 @@ public String getId() {
7871
7972 @ Override
8073 public Optional <Result > get (Get get ) throws CrudException {
81- get = copyAndSetTargetToIfNot (get );
82- try {
83- return crud .get (get );
84- } catch (UncommittedRecordException e ) {
85- lazyRecovery (e );
86- throw e ;
87- }
74+ return crud .get (copyAndSetTargetToIfNot (get ));
8875 }
8976
9077 @ Override
9178 public List <Result > scan (Scan scan ) throws CrudException {
92- scan = copyAndSetTargetToIfNot (scan );
93- try {
94- return crud .scan (scan );
95- } catch (UncommittedRecordException e ) {
96- lazyRecovery (e );
97- throw e ;
98- }
79+ return crud .scan (copyAndSetTargetToIfNot (scan ));
9980 }
10081
10182 @ Override
10283 public Scanner getScanner (Scan scan ) throws CrudException {
10384 scan = copyAndSetTargetToIfNot (scan );
104- Scanner scanner = crud .getScanner (scan );
105-
106- return new Scanner () {
107- @ Override
108- public Optional <Result > one () throws CrudException {
109- try {
110- return scanner .one ();
111- } catch (UncommittedRecordException e ) {
112- lazyRecovery (e );
113- throw e ;
114- }
115- }
116-
117- @ Override
118- public List <Result > all () throws CrudException {
119- try {
120- return scanner .all ();
121- } catch (UncommittedRecordException e ) {
122- lazyRecovery (e );
123- throw e ;
124- }
125- }
126-
127- @ Override
128- public void close () throws CrudException {
129- scanner .close ();
130- }
131-
132- @ Nonnull
133- @ Override
134- public Iterator <Result > iterator () {
135- return scanner .iterator ();
136- }
137- };
85+ return crud .getScanner (scan );
13886 }
13987
14088 /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -143,12 +91,7 @@ public Iterator<Result> iterator() {
14391 public void put (Put put ) throws CrudException {
14492 put = copyAndSetTargetToIfNot (put );
14593 checkMutation (put );
146- try {
147- crud .put (put );
148- } catch (UncommittedRecordException e ) {
149- lazyRecovery (e );
150- throw e ;
151- }
94+ crud .put (put );
15295 }
15396
15497 /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -165,12 +108,7 @@ public void put(List<Put> puts) throws CrudException {
165108 public void delete (Delete delete ) throws CrudException {
166109 delete = copyAndSetTargetToIfNot (delete );
167110 checkMutation (delete );
168- try {
169- crud .delete (delete );
170- } catch (UncommittedRecordException e ) {
171- lazyRecovery (e );
172- throw e ;
173- }
111+ crud .delete (delete );
174112 }
175113
176114 /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -196,12 +134,7 @@ public void upsert(Upsert upsert) throws CrudException {
196134 upsert = copyAndSetTargetToIfNot (upsert );
197135 Put put = ConsensusCommitUtils .createPutForUpsert (upsert );
198136 checkMutation (put );
199- try {
200- crud .put (put );
201- } catch (UncommittedRecordException e ) {
202- lazyRecovery (e );
203- throw e ;
204- }
137+ crud .put (put );
205138 }
206139
207140 @ Override
@@ -222,9 +155,6 @@ public void update(Update update) throws CrudException {
222155
223156 // If the condition is not specified, it means that the record does not exist. In this case,
224157 // we do nothing
225- } catch (UncommittedRecordException e ) {
226- lazyRecovery (e );
227- throw e ;
228158 }
229159 }
230160
@@ -257,9 +187,6 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
257187 try {
258188 crud .readIfImplicitPreReadEnabled ();
259189 } catch (CrudConflictException e ) {
260- if (e instanceof UncommittedRecordException ) {
261- lazyRecovery ((UncommittedRecordException ) e );
262- }
263190 throw new CommitConflictException (
264191 CoreError .CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ .buildMessage (),
265192 e ,
@@ -269,6 +196,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
269196 CoreError .CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED .buildMessage (), e , getId ());
270197 }
271198
199+ try {
200+ crud .waitForRecoveryCompletionIfNecessary ();
201+ } catch (CrudException e ) {
202+ throw new CommitException (e .getMessage (), e , getId ());
203+ }
204+
272205 commit .commit (crud .getSnapshot (), crud .isReadOnly ());
273206 }
274207
@@ -295,22 +228,6 @@ CommitHandler getCommitHandler() {
295228 return commit ;
296229 }
297230
298- @ VisibleForTesting
299- RecoveryHandler getRecoveryHandler () {
300- return recovery ;
301- }
302-
303- @ VisibleForTesting
304- void setBeforeRecoveryHook (Runnable beforeRecoveryHook ) {
305- this .beforeRecoveryHook = beforeRecoveryHook ;
306- }
307-
308- private void lazyRecovery (UncommittedRecordException e ) {
309- logger .debug ("Recover uncommitted records: {}" , e .getResults ());
310- beforeRecoveryHook .run ();
311- e .getResults ().forEach (r -> recovery .recover (e .getSelection (), r ));
312- }
313-
314231 private void checkMutation (Mutation mutation ) throws CrudException {
315232 try {
316233 mutationOperationChecker .check (mutation );
0 commit comments