Skip to content

Commit 1a64df6

Browse files
authored
Merge branch 'master' into data-loader/add-ci-for-build
2 parents be8e0e2 + 960a825 commit 1a64df6

20 files changed

+1820
-304
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class CommitHandler {
3636
protected final Coordinator coordinator;
3737
private final TransactionTableMetadataManager tableMetadataManager;
3838
private final ParallelExecutor parallelExecutor;
39+
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
3940

4041
@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;
4142

@@ -44,11 +45,13 @@ public CommitHandler(
4445
DistributedStorage storage,
4546
Coordinator coordinator,
4647
TransactionTableMetadataManager tableMetadataManager,
47-
ParallelExecutor parallelExecutor) {
48+
ParallelExecutor parallelExecutor,
49+
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
4850
this.storage = checkNotNull(storage);
4951
this.coordinator = checkNotNull(coordinator);
5052
this.tableMetadataManager = checkNotNull(tableMetadataManager);
5153
this.parallelExecutor = checkNotNull(parallelExecutor);
54+
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
5255
}
5356

5457
/**
@@ -106,42 +109,62 @@ private void waitBeforePreparationSnapshotHookFuture(
106109
}
107110
}
108111

109-
public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException {
112+
public void commit(Snapshot snapshot, boolean readOnly)
113+
throws CommitException, UnknownTransactionStatusException {
114+
boolean hasWritesOrDeletesInSnapshot = !readOnly && snapshot.hasWritesOrDeletes();
115+
110116
Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);
111-
try {
112-
prepare(snapshot);
113-
} catch (PreparationException e) {
114-
safelyCallOnFailureBeforeCommit(snapshot);
115-
abortState(snapshot.getId());
116-
rollbackRecords(snapshot);
117-
if (e instanceof PreparationConflictException) {
118-
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
117+
118+
if (hasWritesOrDeletesInSnapshot) {
119+
try {
120+
prepare(snapshot);
121+
} catch (PreparationException e) {
122+
safelyCallOnFailureBeforeCommit(snapshot);
123+
abortState(snapshot.getId());
124+
rollbackRecords(snapshot);
125+
if (e instanceof PreparationConflictException) {
126+
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
127+
}
128+
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
129+
} catch (Exception e) {
130+
safelyCallOnFailureBeforeCommit(snapshot);
131+
throw e;
119132
}
120-
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
121-
} catch (Exception e) {
122-
safelyCallOnFailureBeforeCommit(snapshot);
123-
throw e;
124133
}
125134

126-
try {
127-
validate(snapshot);
128-
} catch (ValidationException e) {
129-
safelyCallOnFailureBeforeCommit(snapshot);
130-
abortState(snapshot.getId());
131-
rollbackRecords(snapshot);
132-
if (e instanceof ValidationConflictException) {
133-
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
135+
if (snapshot.hasReads()) {
136+
try {
137+
validate(snapshot);
138+
} catch (ValidationException e) {
139+
safelyCallOnFailureBeforeCommit(snapshot);
140+
141+
// If the transaction has no writes and deletes, we don't need to abort-state and
142+
// rollback-records since there are no changes to be made.
143+
if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) {
144+
abortState(snapshot.getId());
145+
}
146+
if (hasWritesOrDeletesInSnapshot) {
147+
rollbackRecords(snapshot);
148+
}
149+
150+
if (e instanceof ValidationConflictException) {
151+
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
152+
}
153+
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
154+
} catch (Exception e) {
155+
safelyCallOnFailureBeforeCommit(snapshot);
156+
throw e;
134157
}
135-
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
136-
} catch (Exception e) {
137-
safelyCallOnFailureBeforeCommit(snapshot);
138-
throw e;
139158
}
140159

141160
waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null));
142161

143-
commitState(snapshot);
144-
commitRecords(snapshot);
162+
if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) {
163+
commitState(snapshot);
164+
}
165+
if (hasWritesOrDeletesInSnapshot) {
166+
commitRecords(snapshot);
167+
}
145168
}
146169

147170
protected void handleCommitConflict(Snapshot snapshot, Exception cause)

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.scalar.db.api.DistributedStorage;
66
import com.scalar.db.api.TransactionState;
77
import com.scalar.db.exception.transaction.CommitConflictException;
8+
import com.scalar.db.exception.transaction.CommitException;
89
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
910
import com.scalar.db.transaction.consensuscommit.Coordinator.State;
1011
import com.scalar.db.util.groupcommit.Emittable;
@@ -28,15 +29,31 @@ public CommitHandlerWithGroupCommit(
2829
Coordinator coordinator,
2930
TransactionTableMetadataManager tableMetadataManager,
3031
ParallelExecutor parallelExecutor,
32+
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
3133
CoordinatorGroupCommitter groupCommitter) {
32-
super(storage, coordinator, tableMetadataManager, parallelExecutor);
34+
super(
35+
storage,
36+
coordinator,
37+
tableMetadataManager,
38+
parallelExecutor,
39+
coordinatorWriteOmissionOnReadOnlyEnabled);
3340

3441
checkNotNull(groupCommitter);
3542
// The methods of this emitter will be called via GroupCommitter.ready().
3643
groupCommitter.setEmitter(new Emitter(coordinator));
3744
this.groupCommitter = groupCommitter;
3845
}
3946

47+
@Override
48+
public void commit(Snapshot snapshot, boolean readOnly)
49+
throws CommitException, UnknownTransactionStatusException {
50+
if (!readOnly && !snapshot.hasWritesOrDeletes() && coordinatorWriteOmissionOnReadOnlyEnabled) {
51+
cancelGroupCommitIfNeeded(snapshot.getId());
52+
}
53+
54+
super.commit(snapshot, readOnly);
55+
}
56+
4057
@Override
4158
protected void onFailureBeforeCommit(Snapshot snapshot) {
4259
cancelGroupCommitIfNeeded(snapshot.getId());

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
269269
CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId());
270270
}
271271

272-
commit.commit(crud.getSnapshot());
272+
commit.commit(crud.getSnapshot(), crud.isReadOnly());
273273
}
274274

275275
@Override
@@ -280,7 +280,7 @@ public void rollback() {
280280
logger.warn("Failed to close the scanner", e);
281281
}
282282

283-
if (groupCommitter != null) {
283+
if (groupCommitter != null && !crud.isReadOnly()) {
284284
groupCommitter.remove(crud.getSnapshot().getId());
285285
}
286286
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public class ConsensusCommitConfig {
3333
public static final String ASYNC_COMMIT_ENABLED = PREFIX + "async_commit.enabled";
3434
public static final String ASYNC_ROLLBACK_ENABLED = PREFIX + "async_rollback.enabled";
3535

36+
public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
37+
PREFIX + "coordinator.write_omission_on_read_only.enabled";
38+
3639
public static final String PARALLEL_IMPLICIT_PRE_READ =
3740
PREFIX + "parallel_implicit_pre_read.enabled";
3841

@@ -73,10 +76,12 @@ public class ConsensusCommitConfig {
7376
private final boolean asyncCommitEnabled;
7477
private final boolean asyncRollbackEnabled;
7578

76-
private final boolean isIncludeMetadataEnabled;
79+
private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
7780

7881
private final boolean parallelImplicitPreReadEnabled;
7982

83+
private final boolean isIncludeMetadataEnabled;
84+
8085
private final boolean coordinatorGroupCommitEnabled;
8186
private final int coordinatorGroupCommitSlotCapacity;
8287
private final int coordinatorGroupCommitGroupSizeFixTimeoutMillis;
@@ -92,7 +97,9 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
9297
DatabaseConfig.TRANSACTION_MANAGER + " should be '" + TRANSACTION_MANAGER_NAME + "'");
9398
}
9499

95-
if (databaseConfig.getProperties().containsKey("scalar.db.isolation_level")) {
100+
Properties properties = databaseConfig.getProperties();
101+
102+
if (properties.containsKey("scalar.db.isolation_level")) {
96103
logger.warn(
97104
"The property \"scalar.db.isolation_level\" is deprecated and will be removed in 5.0.0. "
98105
+ "Please use \""
@@ -102,91 +109,78 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
102109
isolation =
103110
Isolation.valueOf(
104111
getString(
105-
databaseConfig.getProperties(),
112+
properties,
106113
ISOLATION_LEVEL,
107114
getString(
108-
databaseConfig.getProperties(),
115+
properties,
109116
"scalar.db.isolation_level", // for backward compatibility
110117
Isolation.SNAPSHOT.toString()))
111118
.toUpperCase(Locale.ROOT));
112119
if (isolation.equals(Isolation.SERIALIZABLE)) {
113120
validateCrossPartitionScanConfig(databaseConfig);
114121

115-
if (databaseConfig
116-
.getProperties()
117-
.containsKey("scalar.db.consensus_commit.serializable_strategy")) {
122+
if (properties.containsKey("scalar.db.consensus_commit.serializable_strategy")) {
118123
logger.warn(
119124
"The property \"scalar.db.consensus_commit.serializable_strategy\" is deprecated and will "
120125
+ "be removed in 5.0.0. The EXTRA_READ strategy is always used for the SERIALIZABLE "
121126
+ "isolation level.");
122127
}
123128
}
124129

125-
coordinatorNamespace = getString(databaseConfig.getProperties(), COORDINATOR_NAMESPACE, null);
130+
coordinatorNamespace = getString(properties, COORDINATOR_NAMESPACE, null);
126131

127132
parallelExecutorCount =
128-
getInt(
129-
databaseConfig.getProperties(),
130-
PARALLEL_EXECUTOR_COUNT,
131-
DEFAULT_PARALLEL_EXECUTOR_COUNT);
132-
parallelPreparationEnabled =
133-
getBoolean(databaseConfig.getProperties(), PARALLEL_PREPARATION_ENABLED, true);
134-
parallelCommitEnabled =
135-
getBoolean(databaseConfig.getProperties(), PARALLEL_COMMIT_ENABLED, true);
133+
getInt(properties, PARALLEL_EXECUTOR_COUNT, DEFAULT_PARALLEL_EXECUTOR_COUNT);
134+
parallelPreparationEnabled = getBoolean(properties, PARALLEL_PREPARATION_ENABLED, true);
135+
parallelCommitEnabled = getBoolean(properties, PARALLEL_COMMIT_ENABLED, true);
136136

137137
// Use the value of parallel commit for parallel validation and parallel rollback as default
138138
// value
139139
parallelValidationEnabled =
140-
getBoolean(
141-
databaseConfig.getProperties(), PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled);
140+
getBoolean(properties, PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled);
142141
parallelRollbackEnabled =
143-
getBoolean(
144-
databaseConfig.getProperties(), PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled);
142+
getBoolean(properties, PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled);
145143

146-
asyncCommitEnabled = getBoolean(databaseConfig.getProperties(), ASYNC_COMMIT_ENABLED, false);
144+
asyncCommitEnabled = getBoolean(properties, ASYNC_COMMIT_ENABLED, false);
147145

148146
// Use the value of async commit for async rollback as default value
149-
asyncRollbackEnabled =
150-
getBoolean(databaseConfig.getProperties(), ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled);
147+
asyncRollbackEnabled = getBoolean(properties, ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled);
148+
149+
coordinatorWriteOmissionOnReadOnlyEnabled =
150+
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);
151151

152-
isIncludeMetadataEnabled =
153-
getBoolean(databaseConfig.getProperties(), INCLUDE_METADATA_ENABLED, false);
152+
parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);
154153

155-
parallelImplicitPreReadEnabled =
156-
getBoolean(databaseConfig.getProperties(), PARALLEL_IMPLICIT_PRE_READ, true);
154+
isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);
157155

158-
coordinatorGroupCommitEnabled =
159-
getBoolean(databaseConfig.getProperties(), COORDINATOR_GROUP_COMMIT_ENABLED, false);
156+
coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
160157
coordinatorGroupCommitSlotCapacity =
161158
getInt(
162-
databaseConfig.getProperties(),
159+
properties,
163160
COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY,
164161
DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY);
165162
coordinatorGroupCommitGroupSizeFixTimeoutMillis =
166163
getInt(
167-
databaseConfig.getProperties(),
164+
properties,
168165
COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS,
169166
DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS);
170167
coordinatorGroupCommitDelayedSlotMoveTimeoutMillis =
171168
getInt(
172-
databaseConfig.getProperties(),
169+
properties,
173170
COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS,
174171
DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS);
175172
coordinatorGroupCommitOldGroupAbortTimeoutMillis =
176173
getInt(
177-
databaseConfig.getProperties(),
174+
properties,
178175
COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS,
179176
DEFAULT_COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS);
180177
coordinatorGroupCommitTimeoutCheckIntervalMillis =
181178
getInt(
182-
databaseConfig.getProperties(),
179+
properties,
183180
COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS,
184181
DEFAULT_COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS);
185182
coordinatorGroupCommitMetricsMonitorLogEnabled =
186-
getBoolean(
187-
databaseConfig.getProperties(),
188-
COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED,
189-
false);
183+
getBoolean(properties, COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED, false);
190184
}
191185

192186
public Isolation getIsolation() {
@@ -225,14 +219,18 @@ public boolean isAsyncRollbackEnabled() {
225219
return asyncRollbackEnabled;
226220
}
227221

228-
public boolean isIncludeMetadataEnabled() {
229-
return isIncludeMetadataEnabled;
222+
public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
223+
return coordinatorWriteOmissionOnReadOnlyEnabled;
230224
}
231225

232226
public boolean isParallelImplicitPreReadEnabled() {
233227
return parallelImplicitPreReadEnabled;
234228
}
235229

230+
public boolean isIncludeMetadataEnabled() {
231+
return isIncludeMetadataEnabled;
232+
}
233+
236234
public boolean isCoordinatorGroupCommitEnabled() {
237235
return coordinatorGroupCommitEnabled;
238236
}

0 commit comments

Comments
 (0)