Skip to content

Commit 309d1eb

Browse files
committed
Support begin in read-only mode for Consensus Commit
1 parent b180847 commit 309d1eb

File tree

8 files changed

+873
-111
lines changed

8 files changed

+873
-111
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.scalar.db.api.Upsert;
2323
import com.scalar.db.common.AbstractDistributedTransactionManager;
2424
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
25+
import com.scalar.db.common.ReadOnlyDistributedTransaction;
2526
import com.scalar.db.config.DatabaseConfig;
2627
import com.scalar.db.exception.transaction.CommitConflictException;
2728
import com.scalar.db.exception.transaction.CrudConflictException;
@@ -137,22 +138,24 @@ private CommitHandler createCommitHandler() {
137138

138139
@Override
139140
public DistributedTransaction begin() {
140-
return begin(config.getIsolation());
141+
String txId = UUID.randomUUID().toString();
142+
return begin(txId);
141143
}
142144

143145
@Override
144146
public DistributedTransaction begin(String txId) {
145-
return begin(txId, config.getIsolation());
147+
return begin(txId, config.getIsolation(), false);
146148
}
147149

148150
@Override
149151
public DistributedTransaction beginReadOnly() {
150-
throw new UnsupportedOperationException("implement later");
152+
String txId = UUID.randomUUID().toString();
153+
return beginReadOnly(txId);
151154
}
152155

153156
@Override
154157
public DistributedTransaction beginReadOnly(String txId) {
155-
throw new UnsupportedOperationException("implement later");
158+
return begin(txId, config.getIsolation(), true);
156159
}
157160

158161
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -166,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
166169
@Deprecated
167170
@Override
168171
public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) {
169-
return begin(txId, Isolation.valueOf(isolation.name()));
172+
return begin(txId, Isolation.valueOf(isolation.name()), false);
170173
}
171174

172175
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -189,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat
189192
@Override
190193
public DistributedTransaction start(
191194
String txId, com.scalar.db.api.SerializableStrategy strategy) {
192-
return begin(txId, Isolation.SERIALIZABLE);
195+
return begin(txId, Isolation.SERIALIZABLE, false);
193196
}
194197

195198
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -199,17 +202,23 @@ public DistributedTransaction start(
199202
String txId,
200203
com.scalar.db.api.Isolation isolation,
201204
com.scalar.db.api.SerializableStrategy strategy) {
202-
return begin(txId, Isolation.valueOf(isolation.name()));
205+
return begin(txId, Isolation.valueOf(isolation.name()), false);
203206
}
204207

205208
@VisibleForTesting
206209
DistributedTransaction begin(Isolation isolation) {
207210
String txId = UUID.randomUUID().toString();
208-
return begin(txId, isolation);
211+
return begin(txId, isolation, false);
212+
}
213+
214+
@VisibleForTesting
215+
DistributedTransaction beginReadOnly(Isolation isolation) {
216+
String txId = UUID.randomUUID().toString();
217+
return begin(txId, isolation, true);
209218
}
210219

211220
@VisibleForTesting
212-
DistributedTransaction begin(String txId, Isolation isolation) {
221+
DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) {
213222
checkArgument(!Strings.isNullOrEmpty(txId));
214223
checkNotNull(isolation);
215224
if (isGroupCommitEnabled()) {
@@ -224,27 +233,35 @@ DistributedTransaction begin(String txId, Isolation isolation) {
224233
Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor);
225234
CrudHandler crud =
226235
new CrudHandler(
227-
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);
228-
ConsensusCommit consensus =
236+
storage,
237+
snapshot,
238+
tableMetadataManager,
239+
isIncludeMetadataEnabled,
240+
parallelExecutor,
241+
readOnly);
242+
DistributedTransaction transaction =
229243
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
230-
getNamespace().ifPresent(consensus::withNamespace);
231-
getTable().ifPresent(consensus::withTable);
232-
return consensus;
244+
if (readOnly) {
245+
transaction = new ReadOnlyDistributedTransaction(transaction);
246+
}
247+
getNamespace().ifPresent(transaction::withNamespace);
248+
getTable().ifPresent(transaction::withTable);
249+
return transaction;
233250
}
234251

235252
@Override
236253
public Optional<Result> get(Get get) throws CrudException, UnknownTransactionStatusException {
237-
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)));
254+
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true);
238255
}
239256

240257
@Override
241258
public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStatusException {
242-
return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)));
259+
return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)), true);
243260
}
244261

245262
@Override
246263
public Scanner getScanner(Scan scan) throws CrudException {
247-
DistributedTransaction transaction = begin();
264+
DistributedTransaction transaction = beginReadOnly();
248265

249266
TransactionCrudOperable.Scanner scanner;
250267
try {
@@ -331,7 +348,8 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException
331348
t -> {
332349
t.put(copyAndSetTargetToIfNot(put));
333350
return null;
334-
});
351+
},
352+
false);
335353
}
336354

337355
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -342,7 +360,8 @@ public void put(List<Put> puts) throws CrudException, UnknownTransactionStatusEx
342360
t -> {
343361
t.put(copyAndSetTargetToIfNot(puts));
344362
return null;
345-
});
363+
},
364+
false);
346365
}
347366

348367
@Override
@@ -351,7 +370,8 @@ public void insert(Insert insert) throws CrudException, UnknownTransactionStatus
351370
t -> {
352371
t.insert(copyAndSetTargetToIfNot(insert));
353372
return null;
354-
});
373+
},
374+
false);
355375
}
356376

357377
@Override
@@ -360,7 +380,8 @@ public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatus
360380
t -> {
361381
t.upsert(copyAndSetTargetToIfNot(upsert));
362382
return null;
363-
});
383+
},
384+
false);
364385
}
365386

366387
@Override
@@ -369,7 +390,8 @@ public void update(Update update) throws CrudException, UnknownTransactionStatus
369390
t -> {
370391
t.update(copyAndSetTargetToIfNot(update));
371392
return null;
372-
});
393+
},
394+
false);
373395
}
374396

375397
@Override
@@ -378,7 +400,8 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus
378400
t -> {
379401
t.delete(copyAndSetTargetToIfNot(delete));
380402
return null;
381-
});
403+
},
404+
false);
382405
}
383406

384407
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -389,7 +412,8 @@ public void delete(List<Delete> deletes) throws CrudException, UnknownTransactio
389412
t -> {
390413
t.delete(copyAndSetTargetToIfNot(deletes));
391414
return null;
392-
});
415+
},
416+
false);
393417
}
394418

395419
@Override
@@ -399,13 +423,21 @@ public void mutate(List<? extends Mutation> mutations)
399423
t -> {
400424
t.mutate(copyAndSetTargetToIfNot(mutations));
401425
return null;
402-
});
426+
},
427+
false);
403428
}
404429

405430
private <R> R executeTransaction(
406-
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction)
431+
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction,
432+
boolean readOnly)
407433
throws CrudException, UnknownTransactionStatusException {
408-
DistributedTransaction transaction = begin();
434+
DistributedTransaction transaction;
435+
if (readOnly) {
436+
transaction = beginReadOnly();
437+
} else {
438+
transaction = begin();
439+
}
440+
409441
try {
410442
R result = throwableFunction.apply(transaction);
411443
transaction.commit();

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public class CrudHandler {
4747
private final boolean isIncludeMetadataEnabled;
4848
private final MutationConditionsValidator mutationConditionsValidator;
4949
private final ParallelExecutor parallelExecutor;
50+
51+
// Whether the transaction is in read-only mode or not.
52+
private final boolean readOnly;
53+
5054
private final List<ConsensusCommitScanner> scanners = new ArrayList<>();
5155

5256
@SuppressFBWarnings("EI_EXPOSE_REP2")
@@ -55,13 +59,15 @@ public CrudHandler(
5559
Snapshot snapshot,
5660
TransactionTableMetadataManager tableMetadataManager,
5761
boolean isIncludeMetadataEnabled,
58-
ParallelExecutor parallelExecutor) {
62+
ParallelExecutor parallelExecutor,
63+
boolean readOnly) {
5964
this.storage = checkNotNull(storage);
6065
this.snapshot = checkNotNull(snapshot);
6166
this.tableMetadataManager = tableMetadataManager;
6267
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
6368
this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId());
6469
this.parallelExecutor = parallelExecutor;
70+
this.readOnly = readOnly;
6571
}
6672

6773
@VisibleForTesting
@@ -71,13 +77,15 @@ public CrudHandler(
7177
TransactionTableMetadataManager tableMetadataManager,
7278
boolean isIncludeMetadataEnabled,
7379
MutationConditionsValidator mutationConditionsValidator,
74-
ParallelExecutor parallelExecutor) {
80+
ParallelExecutor parallelExecutor,
81+
boolean readOnly) {
7582
this.storage = checkNotNull(storage);
7683
this.snapshot = checkNotNull(snapshot);
7784
this.tableMetadataManager = tableMetadataManager;
7885
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
7986
this.mutationConditionsValidator = mutationConditionsValidator;
8087
this.parallelExecutor = parallelExecutor;
88+
this.readOnly = readOnly;
8189
}
8290

8391
public Optional<Result> get(Get originalGet) throws CrudException {
@@ -122,19 +130,19 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
122130
// conjunction or the result exists. This is because we don’t know whether the record
123131
// actually exists or not due to the conjunction.
124132
if (key != null) {
125-
snapshot.putIntoReadSet(key, result);
133+
putIntoReadSetInSnapshot(key, result);
126134
} else {
127135
// Only for a Get with index, the argument `key` is null
128136

129137
if (result.isPresent()) {
130138
// Only when we can get the record with the Get with index, we can put it into the read
131139
// set
132140
key = new Snapshot.Key(get, result.get());
133-
snapshot.putIntoReadSet(key, result);
141+
putIntoReadSetInSnapshot(key, result);
134142
}
135143
}
136144
}
137-
snapshot.putIntoGetSet(get, result); // for re-read and validation
145+
snapshot.putIntoGetSet(get, result);
138146
return;
139147
}
140148
throw new UncommittedRecordException(
@@ -148,7 +156,7 @@ public List<Result> scan(Scan originalScan) throws CrudException {
148156
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
149157
Scan scan = (Scan) prepareStorageSelection(originalScan);
150158
LinkedHashMap<Snapshot.Key, TransactionResult> results = scanInternal(scan);
151-
snapshot.verifyNoOverlap(scan, results);
159+
verifyNoOverlap(scan, results);
152160

153161
TableMetadata metadata = getTableMetadata(scan);
154162
return results.values().stream()
@@ -214,7 +222,7 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re
214222
// We always update the read set to create before image by using the latest record (result)
215223
// because another conflicting transaction might have updated the record after this
216224
// transaction read it first.
217-
snapshot.putIntoReadSet(key, Optional.of(result));
225+
putIntoReadSetInSnapshot(key, Optional.of(result));
218226
}
219227

220228
public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException {
@@ -248,6 +256,20 @@ public void closeScanners() throws CrudException {
248256
}
249257
}
250258

259+
private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResult> result) {
260+
// In read-only mode, we don't need to put the result into the read set
261+
if (!readOnly) {
262+
snapshot.putIntoReadSet(key, result);
263+
}
264+
}
265+
266+
private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
267+
// In read-only mode, we don't need to verify the overlap
268+
if (!readOnly) {
269+
snapshot.verifyNoOverlap(scan, results);
270+
}
271+
}
272+
251273
public void put(Put put) throws CrudException {
252274
Snapshot.Key key = new Snapshot.Key(put);
253275

@@ -483,7 +505,7 @@ public void close() {
483505
snapshot.putIntoScannerSet(scan, results);
484506
}
485507

486-
snapshot.verifyNoOverlap(scan, results);
508+
verifyNoOverlap(scan, results);
487509
}
488510

489511
@Override
@@ -554,7 +576,7 @@ public List<Result> all() throws CrudException {
554576
@Override
555577
public void close() {
556578
closed = true;
557-
snapshot.verifyNoOverlap(scan, results);
579+
verifyNoOverlap(scan, results);
558580
}
559581

560582
@Override

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,29 @@ public class Snapshot {
5858
private final Isolation isolation;
5959
private final TransactionTableMetadataManager tableMetadataManager;
6060
private final ParallelExecutor parallelExecutor;
61+
62+
// The read set stores information about the records that are read in this transaction. This is
63+
// used as a previous version for write operations.
6164
private final ConcurrentMap<Key, Optional<TransactionResult>> readSet;
65+
66+
// The get set stores information about the records retrieved by Get operations in this
67+
// transaction. This is used for validation and snapshot read.
6268
private final ConcurrentMap<Get, Optional<TransactionResult>> getSet;
69+
70+
// The scan set stores information about the records retrieved by Scan operations in this
71+
// transaction. This is used for validation and snapshot read.
6372
private final Map<Scan, LinkedHashMap<Key, TransactionResult>> scanSet;
64-
private final Map<Key, Put> writeSet;
65-
private final Map<Key, Delete> deleteSet;
6673

67-
// The scanner set used to store information about scanners that are not fully scanned
74+
// The scanner set stores information about scanners that are not fully scanned. This is used for
75+
// validation.
6876
private final List<ScannerInfo> scannerSet;
6977

78+
// The write set stores information about writes in this transaction.
79+
private final Map<Key, Put> writeSet;
80+
81+
// The delete set stores information about deletes in this transaction.
82+
private final Map<Key, Delete> deleteSet;
83+
7084
public Snapshot(
7185
String id,
7286
Isolation isolation,

core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,12 @@ private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation is
170170
Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor);
171171
CrudHandler crud =
172172
new CrudHandler(
173-
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);
173+
storage,
174+
snapshot,
175+
tableMetadataManager,
176+
isIncludeMetadataEnabled,
177+
parallelExecutor,
178+
false);
174179

175180
TwoPhaseConsensusCommit transaction =
176181
new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker);

0 commit comments

Comments
 (0)