Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.scalar.db.api.Upsert;
import com.scalar.db.common.AbstractDistributedTransactionManager;
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
import com.scalar.db.common.ReadOnlyDistributedTransaction;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CrudConflictException;
Expand Down Expand Up @@ -137,22 +138,24 @@ private CommitHandler createCommitHandler() {

@Override
public DistributedTransaction begin() {
return begin(config.getIsolation());
String txId = UUID.randomUUID().toString();
return begin(txId);
}

@Override
public DistributedTransaction begin(String txId) {
return begin(txId, config.getIsolation());
return begin(txId, config.getIsolation(), false);
}

@Override
public DistributedTransaction beginReadOnly() {
throw new UnsupportedOperationException("implement later");
String txId = UUID.randomUUID().toString();
return beginReadOnly(txId);
}

@Override
public DistributedTransaction beginReadOnly(String txId) {
throw new UnsupportedOperationException("implement later");
return begin(txId, config.getIsolation(), true);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -166,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
@Deprecated
@Override
public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) {
return begin(txId, Isolation.valueOf(isolation.name()));
return begin(txId, Isolation.valueOf(isolation.name()), false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -189,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat
@Override
public DistributedTransaction start(
String txId, com.scalar.db.api.SerializableStrategy strategy) {
return begin(txId, Isolation.SERIALIZABLE);
return begin(txId, Isolation.SERIALIZABLE, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -199,17 +202,23 @@ public DistributedTransaction start(
String txId,
com.scalar.db.api.Isolation isolation,
com.scalar.db.api.SerializableStrategy strategy) {
return begin(txId, Isolation.valueOf(isolation.name()));
return begin(txId, Isolation.valueOf(isolation.name()), false);
}

@VisibleForTesting
DistributedTransaction begin(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation);
return begin(txId, isolation, false);
}

@VisibleForTesting
DistributedTransaction beginReadOnly(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, true);
}

@VisibleForTesting
DistributedTransaction begin(String txId, Isolation isolation) {
DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) {
checkArgument(!Strings.isNullOrEmpty(txId));
checkNotNull(isolation);
if (isGroupCommitEnabled()) {
Expand All @@ -224,27 +233,35 @@ DistributedTransaction begin(String txId, Isolation isolation) {
Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor);
CrudHandler crud =
new CrudHandler(
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);
ConsensusCommit consensus =
storage,
snapshot,
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
readOnly);
DistributedTransaction transaction =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
getNamespace().ifPresent(consensus::withNamespace);
getTable().ifPresent(consensus::withTable);
return consensus;
if (readOnly) {
transaction = new ReadOnlyDistributedTransaction(transaction);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the transaction object with ReadOnlyDistributedTransaction for read-only transactions.

}
getNamespace().ifPresent(transaction::withNamespace);
getTable().ifPresent(transaction::withTable);
return transaction;
}

@Override
public Optional<Result> get(Get get) throws CrudException, UnknownTransactionStatusException {
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)));
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true);
}

@Override
public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStatusException {
return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)));
return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)), true);
}

@Override
public Scanner getScanner(Scan scan) throws CrudException {
DistributedTransaction transaction = begin();
DistributedTransaction transaction = beginReadOnly();

TransactionCrudOperable.Scanner scanner;
try {
Expand Down Expand Up @@ -331,7 +348,8 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException
t -> {
t.put(copyAndSetTargetToIfNot(put));
return null;
});
},
false);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -342,7 +360,8 @@ public void put(List<Put> puts) throws CrudException, UnknownTransactionStatusEx
t -> {
t.put(copyAndSetTargetToIfNot(puts));
return null;
});
},
false);
}

@Override
Expand All @@ -351,7 +370,8 @@ public void insert(Insert insert) throws CrudException, UnknownTransactionStatus
t -> {
t.insert(copyAndSetTargetToIfNot(insert));
return null;
});
},
false);
}

@Override
Expand All @@ -360,7 +380,8 @@ public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatus
t -> {
t.upsert(copyAndSetTargetToIfNot(upsert));
return null;
});
},
false);
}

@Override
Expand All @@ -369,7 +390,8 @@ public void update(Update update) throws CrudException, UnknownTransactionStatus
t -> {
t.update(copyAndSetTargetToIfNot(update));
return null;
});
},
false);
}

@Override
Expand All @@ -378,7 +400,8 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus
t -> {
t.delete(copyAndSetTargetToIfNot(delete));
return null;
});
},
false);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -389,7 +412,8 @@ public void delete(List<Delete> deletes) throws CrudException, UnknownTransactio
t -> {
t.delete(copyAndSetTargetToIfNot(deletes));
return null;
});
},
false);
}

@Override
Expand All @@ -399,13 +423,21 @@ public void mutate(List<? extends Mutation> mutations)
t -> {
t.mutate(copyAndSetTargetToIfNot(mutations));
return null;
});
},
false);
}

private <R> R executeTransaction(
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction)
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction,
boolean readOnly)
throws CrudException, UnknownTransactionStatusException {
DistributedTransaction transaction = begin();
DistributedTransaction transaction;
if (readOnly) {
transaction = beginReadOnly();
} else {
transaction = begin();
}

try {
R result = throwableFunction.apply(transaction);
transaction.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public class CrudHandler {
private final boolean isIncludeMetadataEnabled;
private final MutationConditionsValidator mutationConditionsValidator;
private final ParallelExecutor parallelExecutor;

// Whether the transaction is in read-only mode or not.
private final boolean readOnly;

private final List<ConsensusCommitScanner> scanners = new ArrayList<>();

@SuppressFBWarnings("EI_EXPOSE_REP2")
Expand All @@ -55,13 +59,15 @@ public CrudHandler(
Snapshot snapshot,
TransactionTableMetadataManager tableMetadataManager,
boolean isIncludeMetadataEnabled,
ParallelExecutor parallelExecutor) {
ParallelExecutor parallelExecutor,
boolean readOnly) {
this.storage = checkNotNull(storage);
this.snapshot = checkNotNull(snapshot);
this.tableMetadataManager = tableMetadataManager;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId());
this.parallelExecutor = parallelExecutor;
this.readOnly = readOnly;
}

@VisibleForTesting
Expand All @@ -71,13 +77,15 @@ public CrudHandler(
TransactionTableMetadataManager tableMetadataManager,
boolean isIncludeMetadataEnabled,
MutationConditionsValidator mutationConditionsValidator,
ParallelExecutor parallelExecutor) {
ParallelExecutor parallelExecutor,
boolean readOnly) {
this.storage = checkNotNull(storage);
this.snapshot = checkNotNull(snapshot);
this.tableMetadataManager = tableMetadataManager;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationConditionsValidator = mutationConditionsValidator;
this.parallelExecutor = parallelExecutor;
this.readOnly = readOnly;
}

public Optional<Result> get(Get originalGet) throws CrudException {
Expand Down Expand Up @@ -122,19 +130,19 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
// conjunction or the result exists. This is because we don’t know whether the record
// actually exists or not due to the conjunction.
if (key != null) {
snapshot.putIntoReadSet(key, result);
putIntoReadSetInSnapshot(key, result);
} else {
// Only for a Get with index, the argument `key` is null

if (result.isPresent()) {
// Only when we can get the record with the Get with index, we can put it into the read
// set
key = new Snapshot.Key(get, result.get());
snapshot.putIntoReadSet(key, result);
putIntoReadSetInSnapshot(key, result);
}
}
}
snapshot.putIntoGetSet(get, result); // for re-read and validation
snapshot.putIntoGetSet(get, result);
return;
}
throw new UncommittedRecordException(
Expand All @@ -148,7 +156,7 @@ public List<Result> scan(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);
LinkedHashMap<Snapshot.Key, TransactionResult> results = scanInternal(scan);
snapshot.verifyNoOverlap(scan, results);
verifyNoOverlap(scan, results);

TableMetadata metadata = getTableMetadata(scan);
return results.values().stream()
Expand Down Expand Up @@ -214,7 +222,7 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.putIntoReadSet(key, Optional.of(result));
putIntoReadSetInSnapshot(key, Optional.of(result));
}

public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException {
Expand Down Expand Up @@ -248,6 +256,20 @@ public void closeScanners() throws CrudException {
}
}

private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResult> result) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an optimization, in read-only mode, we don't need to add the result to the read set, since the read set is only used for write operations.

// In read-only mode, we don't need to put the result into the read set
if (!readOnly) {
snapshot.putIntoReadSet(key, result);
}
}

private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we don't need to verify the overlap in read-only mode, since there are no write operations.

// In read-only mode, we don't need to verify the overlap
if (!readOnly) {
snapshot.verifyNoOverlap(scan, results);
}
}

public void put(Put put) throws CrudException {
Snapshot.Key key = new Snapshot.Key(put);

Expand Down Expand Up @@ -483,7 +505,7 @@ public void close() {
snapshot.putIntoScannerSet(scan, results);
}

snapshot.verifyNoOverlap(scan, results);
verifyNoOverlap(scan, results);
}

@Override
Expand Down Expand Up @@ -554,7 +576,7 @@ public List<Result> all() throws CrudException {
@Override
public void close() {
closed = true;
snapshot.verifyNoOverlap(scan, results);
verifyNoOverlap(scan, results);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,29 @@ public class Snapshot {
private final Isolation isolation;
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;

// The read set stores information about the records that are read in this transaction. This is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// used as a previous version for write operations.
private final ConcurrentMap<Key, Optional<TransactionResult>> readSet;

// The get set stores information about the records retrieved by Get operations in this
// transaction. This is used for validation and snapshot read.
private final ConcurrentMap<Get, Optional<TransactionResult>> getSet;

// The scan set stores information about the records retrieved by Scan operations in this
// transaction. This is used for validation and snapshot read.
private final Map<Scan, LinkedHashMap<Key, TransactionResult>> scanSet;
private final Map<Key, Put> writeSet;
private final Map<Key, Delete> deleteSet;

// The scanner set used to store information about scanners that are not fully scanned
// The scanner set stores information about scanners that are not fully scanned. This is used for
// validation.
private final List<ScannerInfo> scannerSet;

// The write set stores information about writes in this transaction.
private final Map<Key, Put> writeSet;

// The delete set stores information about deletes in this transaction.
private final Map<Key, Delete> deleteSet;

public Snapshot(
String id,
Isolation isolation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,12 @@ private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation is
Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor);
CrudHandler crud =
new CrudHandler(
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);
storage,
snapshot,
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
false);

TwoPhaseConsensusCommit transaction =
new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker);
Expand Down
Loading