Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public DistributedTransaction begin() {

@Override
public DistributedTransaction begin(String txId) {
return begin(txId, config.getIsolation(), false);
return begin(txId, config.getIsolation(), false, false);
}

@Override
Expand All @@ -155,7 +155,7 @@ public DistributedTransaction beginReadOnly() {

@Override
public DistributedTransaction beginReadOnly(String txId) {
return begin(txId, config.getIsolation(), true);
return begin(txId, config.getIsolation(), true, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -169,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
@Deprecated
@Override
public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) {
return begin(txId, Isolation.valueOf(isolation.name()), false);
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -192,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat
@Override
public DistributedTransaction start(
String txId, com.scalar.db.api.SerializableStrategy strategy) {
return begin(txId, Isolation.SERIALIZABLE, false);
return begin(txId, Isolation.SERIALIZABLE, false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -202,23 +202,24 @@ public DistributedTransaction start(
String txId,
com.scalar.db.api.Isolation isolation,
com.scalar.db.api.SerializableStrategy strategy) {
return begin(txId, Isolation.valueOf(isolation.name()), false);
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

@VisibleForTesting
DistributedTransaction begin(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, false);
return begin(txId, isolation, false, false);
}

@VisibleForTesting
DistributedTransaction beginReadOnly(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, true);
return begin(txId, isolation, true, false);
}

@VisibleForTesting
DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) {
DistributedTransaction begin(
String txId, Isolation isolation, boolean readOnly, boolean oneOperation) {
checkArgument(!Strings.isNullOrEmpty(txId));
checkNotNull(isolation);
if (isGroupCommitEnabled()) {
Expand All @@ -238,7 +239,8 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
readOnly);
readOnly,
oneOperation);
DistributedTransaction transaction =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
if (readOnly) {
Expand All @@ -249,6 +251,11 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
return transaction;
}

private DistributedTransaction beginOneOperation(boolean readOnly) {
String txId = UUID.randomUUID().toString();
return begin(txId, config.getIsolation(), readOnly, true);
}

@Override
public Optional<Result> get(Get get) throws CrudException, UnknownTransactionStatusException {
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true);
Expand All @@ -261,7 +268,7 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat

@Override
public Scanner getScanner(Scan scan) throws CrudException {
DistributedTransaction transaction = beginReadOnly();
DistributedTransaction transaction = beginOneOperation(true);

TransactionCrudOperable.Scanner scanner;
try {
Expand Down Expand Up @@ -431,12 +438,7 @@ private <R> R executeTransaction(
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction,
boolean readOnly)
throws CrudException, UnknownTransactionStatusException {
DistributedTransaction transaction;
if (readOnly) {
transaction = beginReadOnly();
} else {
transaction = begin();
}
DistributedTransaction transaction = beginOneOperation(readOnly);

try {
R result = throwableFunction.apply(transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class CrudHandler {
// Whether the transaction is in read-only mode or not.
private final boolean readOnly;

// Whether the transaction is in one-operation mode or not. One-operation mode refers to executing
// a CRUD operation directly through `DistributedTransactionManager` without explicitly beginning
// a transaction.
private final boolean oneOperation;

private final List<ConsensusCommitScanner> scanners = new ArrayList<>();

@SuppressFBWarnings("EI_EXPOSE_REP2")
Expand All @@ -60,14 +65,16 @@ public CrudHandler(
TransactionTableMetadataManager tableMetadataManager,
boolean isIncludeMetadataEnabled,
ParallelExecutor parallelExecutor,
boolean readOnly) {
boolean readOnly,
boolean oneOperation) {
this.storage = checkNotNull(storage);
this.snapshot = checkNotNull(snapshot);
this.tableMetadataManager = tableMetadataManager;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId());
this.parallelExecutor = parallelExecutor;
this.readOnly = readOnly;
this.oneOperation = oneOperation;
}

@VisibleForTesting
Expand All @@ -78,14 +85,16 @@ public CrudHandler(
boolean isIncludeMetadataEnabled,
MutationConditionsValidator mutationConditionsValidator,
ParallelExecutor parallelExecutor,
boolean readOnly) {
boolean readOnly,
boolean oneOperation) {
this.storage = checkNotNull(storage);
this.snapshot = checkNotNull(snapshot);
this.tableMetadataManager = tableMetadataManager;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationConditionsValidator = mutationConditionsValidator;
this.parallelExecutor = parallelExecutor;
this.readOnly = readOnly;
this.oneOperation = oneOperation;
}

public Optional<Result> get(Get originalGet) throws CrudException {
Expand All @@ -102,11 +111,17 @@ public Optional<Result> get(Get originalGet) throws CrudException {
key = new Snapshot.Key(get);
}

readUnread(key, get);

return snapshot
.getResult(key, get)
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
if (isSnapshotReadRequired()) {
readUnread(key, get);
return snapshot
.getResult(key, get)
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
} else {
Optional<TransactionResult> result = read(key, get);
return snapshot
.mergeResult(key, result, get.getConjunctions())
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
}
}

// Only for a Get with index, the argument `key` is null
Expand All @@ -120,7 +135,7 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
// concurrently in the implicit pre-read
@VisibleForTesting
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
if (result.isPresent() || get.getConjunctions().isEmpty()) {
Expand All @@ -142,8 +157,8 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
}
}
}
snapshot.putIntoGetSet(get, result);
return;
putIntoGetSetInSnapshot(get, result);
return result;
}
throw new UncommittedRecordException(
get,
Expand Down Expand Up @@ -204,7 +219,7 @@ private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(Scan scan)
}
}

snapshot.putIntoScanSet(scan, results);
putIntoScanSetInSnapshot(scan, results);

return results;
}
Expand Down Expand Up @@ -263,9 +278,43 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResu
}
}

private boolean isSnapshotReadRequired() {
// In one-operation mode, we don't need snapshot read
return !oneOperation;
}

private boolean isValidationOrSnapshotReadRequired() {
return snapshot.isValidationRequired() || isSnapshotReadRequired();
}

private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result) {
// If neither validation nor snapshot read is required, we don't need to put the result into
// the get set
if (isValidationOrSnapshotReadRequired()) {
snapshot.putIntoGetSet(get, result);
}
}

private void putIntoScanSetInSnapshot(
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
// If neither validation nor snapshot read is required, we don't need to put the results into
// the scan set
if (isValidationOrSnapshotReadRequired()) {
snapshot.putIntoScanSet(scan, results);
}
}

private void putIntoScannerSetInSnapshot(
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
// if validation is not required, we don't need to put the results into the scanner set
if (snapshot.isValidationRequired()) {
snapshot.putIntoScannerSet(scan, results);
}
}

private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
// In read-only mode, we don't need to verify the overlap
if (!readOnly) {
// In either read-only mode or one-operation mode, we don't need to verify the overlap
if (!readOnly && !oneOperation) {
snapshot.verifyNoOverlap(scan, results);
}
}
Expand Down Expand Up @@ -432,7 +481,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera
private final List<String> originalProjections;
private final Scanner scanner;

private final LinkedHashMap<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
@Nullable private final LinkedHashMap<Snapshot.Key, TransactionResult> results;
private final AtomicBoolean fullyScanned = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();

Expand All @@ -441,6 +490,14 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
this.scan = scan;
this.originalProjections = originalProjections;
scanner = scanFromStorage(scan);

if (isValidationOrSnapshotReadRequired()) {
results = new LinkedHashMap<>();
} else {
// If neither validation nor snapshot read is required, we don't need to put the results
// into the scan set
results = null;
}
}

@Override
Expand All @@ -456,7 +513,10 @@ public Optional<Result> one() throws CrudException {
Snapshot.Key key = new Snapshot.Key(scan, r.get());
TransactionResult result = new TransactionResult(r.get());
processScanResult(key, scan, result);
results.put(key, result);

if (results != null) {
results.put(key, result);
}

TableMetadata metadata = getTableMetadata(scan);
return Optional.of(
Expand Down Expand Up @@ -499,10 +559,10 @@ public void close() {
if (fullyScanned.get()) {
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results
// into the scan set
snapshot.putIntoScanSet(scan, results);
putIntoScanSetInSnapshot(scan, results);
} else {
// If the scanner is not fully scanned, put the results into the scanner set
snapshot.putIntoScannerSet(scan, results);
putIntoScannerSetInSnapshot(scan, results);
}

verifyNoOverlap(scan, results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,22 @@ private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionRes
}
}

private Optional<TransactionResult> mergeResult(
public Optional<TransactionResult> mergeResult(
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
throws CrudException {
return mergeResult(key, result)
.filter(
r ->
// We need to apply conditions if it is a merged result because the transaction’s
// write makes the record no longer match the conditions. Of course, we can just
// return the result without checking the condition if there is no condition.
!r.isMergedResult()
|| conjunctions.isEmpty()
|| ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions));
Optional<TransactionResult> ret = mergeResult(key, result);

if (conjunctions.isEmpty()) {
// We can just return the result without checking the condition if there is no condition.
return ret;
}

return ret.filter(
r ->
// We need to apply conditions if it is a merged result because the transaction’s write
// makes the record no longer match the conditions.
!r.isMergedResult()
|| ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions));
}

private TableMetadata getTableMetadata(Key key) throws CrudException {
Expand Down
Loading