Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public DistributedTransaction begin() {

@Override
public DistributedTransaction begin(String txId) {
return begin(txId, config.getIsolation(), false);
return begin(txId, config.getIsolation(), false, false);
}

@Override
Expand All @@ -155,7 +155,7 @@ public DistributedTransaction beginReadOnly() {

@Override
public DistributedTransaction beginReadOnly(String txId) {
return begin(txId, config.getIsolation(), true);
return begin(txId, config.getIsolation(), true, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -169,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
@Deprecated
@Override
public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) {
return begin(txId, Isolation.valueOf(isolation.name()), false);
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -192,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat
@Override
public DistributedTransaction start(
String txId, com.scalar.db.api.SerializableStrategy strategy) {
return begin(txId, Isolation.SERIALIZABLE, false);
return begin(txId, Isolation.SERIALIZABLE, false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -202,23 +202,24 @@ public DistributedTransaction start(
String txId,
com.scalar.db.api.Isolation isolation,
com.scalar.db.api.SerializableStrategy strategy) {
return begin(txId, Isolation.valueOf(isolation.name()), false);
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

@VisibleForTesting
DistributedTransaction begin(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, false);
return begin(txId, isolation, false, false);
}

@VisibleForTesting
DistributedTransaction beginReadOnly(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, true);
return begin(txId, isolation, true, false);
}

@VisibleForTesting
DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) {
DistributedTransaction begin(
String txId, Isolation isolation, boolean readOnly, boolean singleOperation) {
checkArgument(!Strings.isNullOrEmpty(txId));
checkNotNull(isolation);
if (isGroupCommitEnabled()) {
Expand All @@ -238,7 +239,8 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
readOnly);
readOnly,
singleOperation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
singleOperation);
oneOperation);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 042d1e1. Thanks!

DistributedTransaction transaction =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
if (readOnly) {
Expand All @@ -249,6 +251,11 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
return transaction;
}

private DistributedTransaction beginSingleOperation(boolean readOnly) {
String txId = UUID.randomUUID().toString();
return begin(txId, config.getIsolation(), readOnly, true);
}

@Override
public Optional<Result> get(Get get) throws CrudException, UnknownTransactionStatusException {
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true);
Expand All @@ -261,7 +268,7 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat

@Override
public Scanner getScanner(Scan scan) throws CrudException {
DistributedTransaction transaction = beginReadOnly();
DistributedTransaction transaction = beginSingleOperation(true);

TransactionCrudOperable.Scanner scanner;
try {
Expand Down Expand Up @@ -431,12 +438,7 @@ private <R> R executeTransaction(
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction,
boolean readOnly)
throws CrudException, UnknownTransactionStatusException {
DistributedTransaction transaction;
if (readOnly) {
transaction = beginReadOnly();
} else {
transaction = begin();
}
DistributedTransaction transaction = beginSingleOperation(readOnly);

try {
R result = throwableFunction.apply(transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class CrudHandler {
// Whether the transaction is in read-only mode or not.
private final boolean readOnly;

// Whether the transaction is in single-operation mode or not. Single-operation mode refers to
// executing a CRUD operation directly through `DistributedTransactionManager` without explicitly
// beginning a transaction.
private final boolean singleOperation;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced single-operation mode, a mode for transactions executed without explicitly beginning a transaction, such as DistributedTransactionManager.get(), DistributedTransactionManager.scan(), and DistributedTransactionManager.insert().


private final List<ConsensusCommitScanner> scanners = new ArrayList<>();

@SuppressFBWarnings("EI_EXPOSE_REP2")
Expand All @@ -60,14 +65,16 @@ public CrudHandler(
TransactionTableMetadataManager tableMetadataManager,
boolean isIncludeMetadataEnabled,
ParallelExecutor parallelExecutor,
boolean readOnly) {
boolean readOnly,
boolean singleOperation) {
this.storage = checkNotNull(storage);
this.snapshot = checkNotNull(snapshot);
this.tableMetadataManager = tableMetadataManager;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId());
this.parallelExecutor = parallelExecutor;
this.readOnly = readOnly;
this.singleOperation = singleOperation;
}

@VisibleForTesting
Expand All @@ -78,14 +85,16 @@ public CrudHandler(
boolean isIncludeMetadataEnabled,
MutationConditionsValidator mutationConditionsValidator,
ParallelExecutor parallelExecutor,
boolean readOnly) {
boolean readOnly,
boolean singleOperation) {
this.storage = checkNotNull(storage);
this.snapshot = checkNotNull(snapshot);
this.tableMetadataManager = tableMetadataManager;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationConditionsValidator = mutationConditionsValidator;
this.parallelExecutor = parallelExecutor;
this.readOnly = readOnly;
this.singleOperation = singleOperation;
}

public Optional<Result> get(Get originalGet) throws CrudException {
Expand All @@ -102,11 +111,17 @@ public Optional<Result> get(Get originalGet) throws CrudException {
key = new Snapshot.Key(get);
}

readUnread(key, get);

return snapshot
.getResult(key, get)
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
if (isSnapshotReadRequired()) {
readUnread(key, get);
return snapshot
.getResult(key, get)
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
} else {
Optional<TransactionResult> result = read(key, get);
return snapshot
.mergeResult(key, result, get.getConjunctions())
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
}
}

// Only for a Get with index, the argument `key` is null
Expand All @@ -120,7 +135,7 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
// concurrently in the implicit pre-read
@VisibleForTesting
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
if (result.isPresent() || get.getConjunctions().isEmpty()) {
Expand All @@ -142,8 +157,8 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
}
}
}
snapshot.putIntoGetSet(get, result);
return;
putIntoGetSetInSnapshot(get, result);
return result;
}
throw new UncommittedRecordException(
get,
Expand Down Expand Up @@ -204,7 +219,7 @@ private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(Scan scan)
}
}

snapshot.putIntoScanSet(scan, results);
putIntoScanSetInSnapshot(scan, results);

return results;
}
Expand Down Expand Up @@ -263,9 +278,43 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResu
}
}

private boolean isSnapshotReadRequired() {
// In single-operation mode, we don't need snapshot read
return !singleOperation;
}

private boolean isValidationOrSnapshotReadRequired() {
return snapshot.isValidationRequired() || isSnapshotReadRequired();
}

private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result) {
// If neither validation nor snapshot read is required, we don't need to put the result into
// the get set
if (isValidationOrSnapshotReadRequired()) {
snapshot.putIntoGetSet(get, result);
}
}

private void putIntoScanSetInSnapshot(
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
// If neither validation nor snapshot read is required, we don't need to put the results into
// the scan set
if (isValidationOrSnapshotReadRequired()) {
snapshot.putIntoScanSet(scan, results);
}
}

private void putIntoScannerSetInSnapshot(
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
// if validation is not required, we don't need to put the results into the scanner set
if (snapshot.isValidationRequired()) {
snapshot.putIntoScannerSet(scan, results);
}
}

private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
// In read-only mode, we don't need to verify the overlap
if (!readOnly) {
// In either read-only mode or single-operation mode, we don't need to verify the overlap
if (!readOnly && !singleOperation) {
snapshot.verifyNoOverlap(scan, results);
}
}
Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the main part of the optimizations.

The optimizations can be summarized as follows:

  • If snapshot reads and serializable validation are not required, we can avoid adding records to the getSet and scanSet, thereby reducing memory usage. This is because they are only used for snapshot reads and serializable validation.
  • If serializable validation is not required, we can skip adding ScannerInfo to the scannerSet, since it is used solely for that purpose.
  • In addition to read-only mode, we can also skip verifying overlaps between the scan and the records in the writeSet and deleteSet in single-operation mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a single-crud operation is a self-join reading the same record more than twice, is it OK not to use snapshot read?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single-operation mode here means the mode for transactions that are executed without explicitly beginning a transaction, such as the following methods:

DistributedTransactionManger transactionManager = ...;

Optional<Result> result = transactionManager.get(Get.newBuilder()...);

List<Result> results = transactionManager.scan(Scan.newBuilder()...);

Scanner scanner = transactionManager.getScanner(Scan.newBuilder()...);

transactionManager.insert(Insert.newBuilder()...);

transactionManager.upsert(Upsert.newBuilder()...);

transactionManager.update(Update.newBuilder()...);

transactionManager.delete(Delete.newBuilder()...);

transactionManager.mutate(Arrays.asList(Delete.newBuilder()..., Update.newBuilder()...));

In these cases, the transaction is committed immediately after the operation is done. So we don't need snapshot reads in such cases. Does this answer your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 Probably, I am confused with single-crud operation transaction and single-operation mode.
If they are different things, it's pretty confusing so I think we should change the name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay. This isn’t related to single CRUD operation transactions — it’s specific to Consensus Commit.

Do you have any suggestions for what we should call transactions that are executed without explicitly beginning a transaction?

Expand Down Expand Up @@ -432,7 +481,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera
private final List<String> originalProjections;
private final Scanner scanner;

private final LinkedHashMap<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
@Nullable private final LinkedHashMap<Snapshot.Key, TransactionResult> results;
private final AtomicBoolean fullyScanned = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();

Expand All @@ -441,6 +490,14 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
this.scan = scan;
this.originalProjections = originalProjections;
scanner = scanFromStorage(scan);

if (isValidationOrSnapshotReadRequired()) {
results = new LinkedHashMap<>();
} else {
// If neither validation nor snapshot read is required, we don't need to put the results
// into the scan set
results = null;
}
}

@Override
Expand All @@ -456,7 +513,10 @@ public Optional<Result> one() throws CrudException {
Snapshot.Key key = new Snapshot.Key(scan, r.get());
TransactionResult result = new TransactionResult(r.get());
processScanResult(key, scan, result);
results.put(key, result);

if (results != null) {
results.put(key, result);
}

TableMetadata metadata = getTableMetadata(scan);
return Optional.of(
Expand Down Expand Up @@ -499,10 +559,10 @@ public void close() {
if (fullyScanned.get()) {
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results
// into the scan set
snapshot.putIntoScanSet(scan, results);
putIntoScanSetInSnapshot(scan, results);
} else {
// If the scanner is not fully scanned, put the results into the scanner set
snapshot.putIntoScannerSet(scan, results);
putIntoScannerSetInSnapshot(scan, results);
}

verifyNoOverlap(scan, results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,22 @@ private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionRes
}
}

private Optional<TransactionResult> mergeResult(
public Optional<TransactionResult> mergeResult(
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
throws CrudException {
return mergeResult(key, result)
.filter(
r ->
// We need to apply conditions if it is a merged result because the transaction’s
// write makes the record no longer match the conditions. Of course, we can just
// return the result without checking the condition if there is no condition.
!r.isMergedResult()
|| conjunctions.isEmpty()
|| ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions));
Optional<TransactionResult> ret = mergeResult(key, result);

if (conjunctions.isEmpty()) {
// We can just return the result without checking the condition if there is no condition.
return ret;
}

return ret.filter(
r ->
// We need to apply conditions if it is a merged result because the transaction’s write
// makes the record no longer match the conditions.
!r.isMergedResult()
|| ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions));
}

private TableMetadata getTableMetadata(Key key) throws CrudException {
Expand Down
Loading