Skip to content

Commit a32e1e8

Browse files
authored
Improve read process in Consensus Commit (#2798)
1 parent 88914f1 commit a32e1e8

33 files changed

+2737
-2468
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,8 @@ public enum CoreError implements ScalarDbError {
12411241
"Getting the storage information failed. Namespace: %s",
12421242
"",
12431243
""),
1244+
CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED(
1245+
Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""),
12441246

12451247
//
12461248
// Errors for the unknown transaction status error category

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
77
import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue;
88
import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue;
9+
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata;
910

1011
import com.google.common.annotations.VisibleForTesting;
1112
import com.scalar.db.api.ConditionBuilder;
@@ -105,7 +106,7 @@ private Put composePut(Operation base, @Nullable TransactionResult result)
105106
// Set before image columns to null
106107
if (result != null) {
107108
TransactionTableMetadata transactionTableMetadata =
108-
tableMetadataManager.getTransactionTableMetadata(base);
109+
getTransactionTableMetadata(tableMetadataManager, base);
109110
LinkedHashSet<String> beforeImageColumnNames =
110111
transactionTableMetadata.getBeforeImageColumnNames();
111112
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();
@@ -137,8 +138,9 @@ private Key getPartitionKey(Operation base, @Nullable TransactionResult result)
137138
assert base instanceof Selection;
138139
if (result != null) {
139140
// for rollforward in lazy recovery
140-
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
141-
return ScalarDbUtils.getPartitionKey(result, metadata.getTableMetadata());
141+
TransactionTableMetadata transactionTableMetadata =
142+
getTransactionTableMetadata(tableMetadataManager, base);
143+
return ScalarDbUtils.getPartitionKey(result, transactionTableMetadata.getTableMetadata());
142144
} else {
143145
throw new AssertionError(
144146
"This path should not be reached since the EXTRA_WRITE strategy is deleted");
@@ -155,8 +157,9 @@ private Optional<Key> getClusteringKey(Operation base, @Nullable TransactionResu
155157
assert base instanceof Selection;
156158
if (result != null) {
157159
// for rollforward in lazy recovery
158-
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
159-
return ScalarDbUtils.getClusteringKey(result, metadata.getTableMetadata());
160+
TransactionTableMetadata transactionTableMetadata =
161+
getTransactionTableMetadata(tableMetadataManager, base);
162+
return ScalarDbUtils.getClusteringKey(result, transactionTableMetadata.getTableMetadata());
160163
} else {
161164
throw new AssertionError(
162165
"This path should not be reached since the EXTRA_WRITE strategy is deleted");

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java

Lines changed: 12 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@
2424
import com.scalar.db.exception.transaction.UnsatisfiedConditionException;
2525
import com.scalar.db.util.ScalarDbUtils;
2626
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27-
import java.util.Iterator;
2827
import java.util.List;
2928
import java.util.Optional;
30-
import javax.annotation.Nonnull;
3129
import javax.annotation.Nullable;
3230
import javax.annotation.concurrent.NotThreadSafe;
3331
import org.slf4j.Logger;
@@ -51,24 +49,19 @@ public class ConsensusCommit extends AbstractDistributedTransaction {
5149
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommit.class);
5250
private final CrudHandler crud;
5351
private final CommitHandler commit;
54-
private final RecoveryHandler recovery;
5552
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
5653
@Nullable private final CoordinatorGroupCommitter groupCommitter;
57-
private Runnable beforeRecoveryHook;
5854

5955
@SuppressFBWarnings("EI_EXPOSE_REP2")
6056
public ConsensusCommit(
6157
CrudHandler crud,
6258
CommitHandler commit,
63-
RecoveryHandler recovery,
6459
ConsensusCommitMutationOperationChecker mutationOperationChecker,
6560
@Nullable CoordinatorGroupCommitter groupCommitter) {
6661
this.crud = checkNotNull(crud);
6762
this.commit = checkNotNull(commit);
68-
this.recovery = checkNotNull(recovery);
6963
this.mutationOperationChecker = mutationOperationChecker;
7064
this.groupCommitter = groupCommitter;
71-
this.beforeRecoveryHook = () -> {};
7265
}
7366

7467
@Override
@@ -78,63 +71,18 @@ public String getId() {
7871

7972
@Override
8073
public Optional<Result> get(Get get) throws CrudException {
81-
get = copyAndSetTargetToIfNot(get);
82-
try {
83-
return crud.get(get);
84-
} catch (UncommittedRecordException e) {
85-
lazyRecovery(e);
86-
throw e;
87-
}
74+
return crud.get(copyAndSetTargetToIfNot(get));
8875
}
8976

9077
@Override
9178
public List<Result> scan(Scan scan) throws CrudException {
92-
scan = copyAndSetTargetToIfNot(scan);
93-
try {
94-
return crud.scan(scan);
95-
} catch (UncommittedRecordException e) {
96-
lazyRecovery(e);
97-
throw e;
98-
}
79+
return crud.scan(copyAndSetTargetToIfNot(scan));
9980
}
10081

10182
@Override
10283
public Scanner getScanner(Scan scan) throws CrudException {
10384
scan = copyAndSetTargetToIfNot(scan);
104-
Scanner scanner = crud.getScanner(scan);
105-
106-
return new Scanner() {
107-
@Override
108-
public Optional<Result> one() throws CrudException {
109-
try {
110-
return scanner.one();
111-
} catch (UncommittedRecordException e) {
112-
lazyRecovery(e);
113-
throw e;
114-
}
115-
}
116-
117-
@Override
118-
public List<Result> all() throws CrudException {
119-
try {
120-
return scanner.all();
121-
} catch (UncommittedRecordException e) {
122-
lazyRecovery(e);
123-
throw e;
124-
}
125-
}
126-
127-
@Override
128-
public void close() throws CrudException {
129-
scanner.close();
130-
}
131-
132-
@Nonnull
133-
@Override
134-
public Iterator<Result> iterator() {
135-
return scanner.iterator();
136-
}
137-
};
85+
return crud.getScanner(scan);
13886
}
13987

14088
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -143,12 +91,7 @@ public Iterator<Result> iterator() {
14391
public void put(Put put) throws CrudException {
14492
put = copyAndSetTargetToIfNot(put);
14593
checkMutation(put);
146-
try {
147-
crud.put(put);
148-
} catch (UncommittedRecordException e) {
149-
lazyRecovery(e);
150-
throw e;
151-
}
94+
crud.put(put);
15295
}
15396

15497
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -165,12 +108,7 @@ public void put(List<Put> puts) throws CrudException {
165108
public void delete(Delete delete) throws CrudException {
166109
delete = copyAndSetTargetToIfNot(delete);
167110
checkMutation(delete);
168-
try {
169-
crud.delete(delete);
170-
} catch (UncommittedRecordException e) {
171-
lazyRecovery(e);
172-
throw e;
173-
}
111+
crud.delete(delete);
174112
}
175113

176114
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -196,12 +134,7 @@ public void upsert(Upsert upsert) throws CrudException {
196134
upsert = copyAndSetTargetToIfNot(upsert);
197135
Put put = ConsensusCommitUtils.createPutForUpsert(upsert);
198136
checkMutation(put);
199-
try {
200-
crud.put(put);
201-
} catch (UncommittedRecordException e) {
202-
lazyRecovery(e);
203-
throw e;
204-
}
137+
crud.put(put);
205138
}
206139

207140
@Override
@@ -222,9 +155,6 @@ public void update(Update update) throws CrudException {
222155

223156
// If the condition is not specified, it means that the record does not exist. In this case,
224157
// we do nothing
225-
} catch (UncommittedRecordException e) {
226-
lazyRecovery(e);
227-
throw e;
228158
}
229159
}
230160

@@ -257,9 +187,6 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
257187
try {
258188
crud.readIfImplicitPreReadEnabled();
259189
} catch (CrudConflictException e) {
260-
if (e instanceof UncommittedRecordException) {
261-
lazyRecovery((UncommittedRecordException) e);
262-
}
263190
throw new CommitConflictException(
264191
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(),
265192
e,
@@ -269,6 +196,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
269196
CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId());
270197
}
271198

199+
try {
200+
crud.waitForRecoveryCompletionIfNecessary();
201+
} catch (CrudException e) {
202+
throw new CommitException(e.getMessage(), e, getId());
203+
}
204+
272205
commit.commit(crud.getSnapshot(), crud.isReadOnly());
273206
}
274207

@@ -295,22 +228,6 @@ CommitHandler getCommitHandler() {
295228
return commit;
296229
}
297230

298-
@VisibleForTesting
299-
RecoveryHandler getRecoveryHandler() {
300-
return recovery;
301-
}
302-
303-
@VisibleForTesting
304-
void setBeforeRecoveryHook(Runnable beforeRecoveryHook) {
305-
this.beforeRecoveryHook = beforeRecoveryHook;
306-
}
307-
308-
private void lazyRecovery(UncommittedRecordException e) {
309-
logger.debug("Recover uncommitted records: {}", e.getResults());
310-
beforeRecoveryHook.run();
311-
e.getResults().forEach(r -> recovery.recover(e.getSelection(), r));
312-
}
313-
314231
private void checkMutation(Mutation mutation) throws CrudException {
315232
try {
316233
mutationOperationChecker.check(mutation);

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,8 @@ public class ConsensusCommitConfig {
3535

3636
public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
3737
PREFIX + "coordinator.write_omission_on_read_only.enabled";
38-
3938
public static final String PARALLEL_IMPLICIT_PRE_READ =
4039
PREFIX + "parallel_implicit_pre_read.enabled";
41-
42-
public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;
43-
4440
public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
4541

4642
public static final String COORDINATOR_GROUP_COMMIT_PREFIX = PREFIX + "coordinator.group_commit.";
@@ -59,6 +55,8 @@ public class ConsensusCommitConfig {
5955
public static final String COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED =
6056
COORDINATOR_GROUP_COMMIT_PREFIX + "metrics_monitor_log_enabled";
6157

58+
public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;
59+
6260
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY = 20;
6361
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS = 40;
6462
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS = 1200;
@@ -77,9 +75,7 @@ public class ConsensusCommitConfig {
7775
private final boolean asyncRollbackEnabled;
7876

7977
private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
80-
8178
private final boolean parallelImplicitPreReadEnabled;
82-
8379
private final boolean isIncludeMetadataEnabled;
8480

8581
private final boolean coordinatorGroupCommitEnabled;
@@ -149,10 +145,10 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
149145
coordinatorWriteOmissionOnReadOnlyEnabled =
150146
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);
151147

152-
parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);
153-
154148
isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);
155149

150+
parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);
151+
156152
coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
157153
coordinatorGroupCommitSlotCapacity =
158154
getInt(

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage
5252
private final TransactionTableMetadataManager tableMetadataManager;
5353
private final Coordinator coordinator;
5454
private final ParallelExecutor parallelExecutor;
55-
private final RecoveryHandler recovery;
55+
private final RecoveryExecutor recoveryExecutor;
5656
protected final CommitHandler commit;
5757
private final boolean isIncludeMetadataEnabled;
5858
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@@ -71,7 +71,8 @@ public ConsensusCommitManager(
7171
tableMetadataManager =
7272
new TransactionTableMetadataManager(
7373
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
74-
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
74+
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
75+
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
7576
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
7677
commit = createCommitHandler();
7778
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
@@ -90,7 +91,8 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
9091
tableMetadataManager =
9192
new TransactionTableMetadataManager(
9293
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
93-
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
94+
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
95+
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
9496
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
9597
commit = createCommitHandler();
9698
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
@@ -106,7 +108,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
106108
DatabaseConfig databaseConfig,
107109
Coordinator coordinator,
108110
ParallelExecutor parallelExecutor,
109-
RecoveryHandler recovery,
111+
RecoveryExecutor recoveryExecutor,
110112
CommitHandler commit,
111113
@Nullable CoordinatorGroupCommitter groupCommitter) {
112114
super(databaseConfig);
@@ -118,7 +120,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
118120
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
119121
this.coordinator = coordinator;
120122
this.parallelExecutor = parallelExecutor;
121-
this.recovery = recovery;
123+
this.recoveryExecutor = recoveryExecutor;
122124
this.commit = commit;
123125
this.groupCommitter = groupCommitter;
124126
this.isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
@@ -246,13 +248,14 @@ DistributedTransaction begin(
246248
new CrudHandler(
247249
storage,
248250
snapshot,
251+
recoveryExecutor,
249252
tableMetadataManager,
250253
isIncludeMetadataEnabled,
251254
parallelExecutor,
252255
readOnly,
253256
oneOperation);
254257
DistributedTransaction transaction =
255-
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
258+
new ConsensusCommit(crud, commit, mutationOperationChecker, groupCommitter);
256259
if (readOnly) {
257260
transaction = new ReadOnlyDistributedTransaction(transaction);
258261
}
@@ -511,6 +514,7 @@ public void close() {
511514
storage.close();
512515
admin.close();
513516
parallelExecutor.close();
517+
recoveryExecutor.close();
514518
if (isGroupCommitEnabled()) {
515519
assert groupCommitter != null;
516520
groupCommitter.close();

0 commit comments

Comments
 (0)