Skip to content

Commit 9f14f7a

Browse files
authored
Merge branch 'master' into cassandra-permission-list
2 parents 173f850 + 7a01959 commit 9f14f7a

File tree

11 files changed

+315
-97
lines changed

11 files changed

+315
-97
lines changed

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,21 @@ subprojects {
2525
guavaVersion = '32.1.3-jre'
2626
slf4jVersion = '1.7.36'
2727
cassandraDriverVersion = '3.11.5'
28-
azureCosmosVersion = '4.71.0'
28+
azureCosmosVersion = '4.72.0'
2929
jooqVersion = '3.14.16'
3030
awssdkVersion = '2.31.3'
3131
commonsDbcp2Version = '2.13.0'
3232
mysqlDriverVersion = '8.4.0'
3333
postgresqlDriverVersion = '42.7.7'
3434
oracleDriverVersion = '23.8.0.25.04'
3535
sqlserverDriverVersion = '12.8.1.jre8'
36-
sqliteDriverVersion = '3.50.1.0'
36+
sqliteDriverVersion = '3.50.2.0'
3737
yugabyteDriverVersion = '42.7.3-yb-4'
3838
db2DriverVersion= '12.1.2.0'
39-
mariadDbDriverVersion = '3.5.3'
39+
mariadDbDriverVersion = '3.5.4'
4040
picocliVersion = '4.7.7'
4141
commonsTextVersion = '1.13.1'
42-
junitVersion = '5.11.4'
42+
junitVersion = '5.13.2'
4343
commonsLangVersion = '3.17.0'
4444
assertjVersion = '3.27.3'
4545
mockitoVersion = '4.11.0'

core/build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ dependencies {
132132
exclude group: 'org.slf4j', module: 'slf4j-api'
133133
}
134134
implementation "org.apache.commons:commons-text:${commonsTextVersion}"
135-
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
136-
testImplementation "org.junit.jupiter:junit-jupiter-params:${junitVersion}"
137-
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitVersion}"
135+
testImplementation platform("org.junit:junit-bom:${junitVersion}")
136+
testImplementation 'org.junit.jupiter:junit-jupiter'
137+
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
138138
testImplementation "org.assertj:assertj-core:${assertjVersion}"
139139
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
140140
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
261261
}
262262

263263
@VisibleForTesting
264-
void onePhaseCommitRecords(Snapshot snapshot) throws CommitException {
264+
void onePhaseCommitRecords(Snapshot snapshot)
265+
throws CommitConflictException, UnknownTransactionStatusException {
265266
try {
266267
OnePhaseCommitMutationComposer composer =
267268
new OnePhaseCommitMutationComposer(snapshot.getId(), tableMetadataManager);
@@ -279,7 +280,7 @@ void onePhaseCommitRecords(Snapshot snapshot) throws CommitException {
279280
e,
280281
snapshot.getId());
281282
} catch (ExecutionException e) {
282-
throw new CommitException(
283+
throw new UnknownTransactionStatusException(
283284
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
284285
}
285286
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java

Lines changed: 66 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ public void executeImplicitPreRead(List<ParallelExecutorTask> tasks, String tran
147147
}
148148
}
149149

150-
private void executeTasks(
150+
@VisibleForTesting
151+
void executeTasks(
151152
List<ParallelExecutorTask> tasks,
152153
boolean parallel,
153154
boolean noWait,
@@ -158,14 +159,14 @@ private void executeTasks(
158159
if (tasks.size() == 1 && !noWait) {
159160
// If there is only one task and noWait is false, we can run it directly without parallel
160161
// execution.
161-
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
162+
tasks.get(0).run();
162163
return;
163164
}
164165

165166
if (parallel) {
166167
executeTasksInParallel(tasks, noWait, stopOnError, taskName, transactionId);
167168
} else {
168-
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
169+
executeTasksSerially(tasks, stopOnError);
169170
}
170171
}
171172

@@ -180,94 +181,92 @@ private void executeTasksInParallel(
180181

181182
CompletionService<Void> completionService =
182183
new ExecutorCompletionService<>(parallelExecutorService);
183-
tasks.forEach(
184-
t ->
185-
completionService.submit(
186-
() -> {
187-
try {
188-
t.run();
189-
} catch (Exception e) {
190-
logger.warn(
191-
"Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
192-
throw e;
193-
}
194-
return null;
195-
}));
196-
197-
if (!noWait) {
198-
Exception exception = null;
199-
for (int i = 0; i < tasks.size(); i++) {
200-
Future<Void> future = ScalarDbUtils.takeUninterruptibly(completionService);
201184

202-
try {
203-
Uninterruptibles.getUninterruptibly(future);
204-
} catch (java.util.concurrent.ExecutionException e) {
205-
if (e.getCause() instanceof ExecutionException) {
206-
if (!stopOnError) {
207-
exception = (ExecutionException) e.getCause();
208-
} else {
209-
throw (ExecutionException) e.getCause();
210-
}
211-
} else if (e.getCause() instanceof ValidationConflictException) {
212-
if (!stopOnError) {
213-
exception = (ValidationConflictException) e.getCause();
214-
} else {
215-
throw (ValidationConflictException) e.getCause();
216-
}
217-
} else if (e.getCause() instanceof CrudException) {
218-
if (!stopOnError) {
219-
exception = (CrudException) e.getCause();
220-
} else {
221-
throw (CrudException) e.getCause();
185+
// Submit tasks
186+
for (ParallelExecutorTask task : tasks) {
187+
completionService.submit(
188+
() -> {
189+
try {
190+
task.run();
191+
} catch (Exception e) {
192+
logger.warn(
193+
"Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
194+
throw e;
222195
}
223-
} else if (e.getCause() instanceof RuntimeException) {
224-
throw (RuntimeException) e.getCause();
225-
} else if (e.getCause() instanceof Error) {
226-
throw (Error) e.getCause();
196+
return null;
197+
});
198+
}
199+
200+
// Optionally wait for completion
201+
if (noWait) {
202+
return;
203+
}
204+
205+
Throwable throwable = null;
206+
207+
for (int i = 0; i < tasks.size(); i++) {
208+
Future<Void> future = ScalarDbUtils.takeUninterruptibly(completionService);
209+
try {
210+
Uninterruptibles.getUninterruptibly(future);
211+
} catch (java.util.concurrent.ExecutionException e) {
212+
Throwable cause = e.getCause();
213+
214+
if (stopOnError) {
215+
rethrow(cause);
216+
} else {
217+
if (throwable == null) {
218+
throwable = cause;
227219
} else {
228-
throw new AssertionError("Can't reach here. Maybe a bug", e);
220+
throwable.addSuppressed(cause);
229221
}
230222
}
231223
}
224+
}
232225

233-
if (!stopOnError && exception != null) {
234-
if (exception instanceof ExecutionException) {
235-
throw (ExecutionException) exception;
236-
} else if (exception instanceof ValidationConflictException) {
237-
throw (ValidationConflictException) exception;
238-
} else {
239-
throw (CrudException) exception;
240-
}
241-
}
226+
// Rethrow exception if necessary
227+
if (!stopOnError && throwable != null) {
228+
rethrow(throwable);
242229
}
243230
}
244231

245-
private void executeTasksSerially(
246-
List<ParallelExecutorTask> tasks, boolean stopOnError, String taskName, String transactionId)
232+
private void executeTasksSerially(List<ParallelExecutorTask> tasks, boolean stopOnError)
247233
throws ExecutionException, ValidationConflictException, CrudException {
248234
Exception exception = null;
249235
for (ParallelExecutorTask task : tasks) {
250236
try {
251237
task.run();
252238
} catch (ExecutionException | ValidationConflictException | CrudException e) {
253-
logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
254-
255239
if (!stopOnError) {
256-
exception = e;
240+
if (exception == null) {
241+
exception = e;
242+
} else {
243+
exception.addSuppressed(e);
244+
}
257245
} else {
258246
throw e;
259247
}
260248
}
261249
}
262250

263251
if (!stopOnError && exception != null) {
264-
if (exception instanceof ExecutionException) {
265-
throw (ExecutionException) exception;
266-
} else if (exception instanceof ValidationConflictException) {
267-
throw (ValidationConflictException) exception;
268-
} else {
269-
throw (CrudException) exception;
270-
}
252+
rethrow(exception);
253+
}
254+
}
255+
256+
private void rethrow(Throwable cause)
257+
throws ExecutionException, ValidationConflictException, CrudException {
258+
if (cause instanceof ExecutionException) {
259+
throw (ExecutionException) cause;
260+
} else if (cause instanceof ValidationConflictException) {
261+
throw (ValidationConflictException) cause;
262+
} else if (cause instanceof CrudException) {
263+
throw (CrudException) cause;
264+
} else if (cause instanceof RuntimeException) {
265+
throw (RuntimeException) cause;
266+
} else if (cause instanceof Error) {
267+
throw (Error) cause;
268+
} else {
269+
throw new AssertionError("Unexpected exception type", cause);
271270
}
272271
}
273272

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,7 @@ public void canOnePhaseCommit_WhenMutationsGrouperThrowsException_ShouldThrowCom
10891089

10901090
@Test
10911091
public void onePhaseCommitRecords_WhenSuccessful_ShouldMutateUsingComposerMutations()
1092-
throws CommitException, ExecutionException {
1092+
throws CommitConflictException, UnknownTransactionStatusException, ExecutionException {
10931093
// Arrange
10941094
Snapshot snapshot = spy(prepareSnapshotWithSamePartitionPut());
10951095
doNothing().when(storage).mutate(anyList());
@@ -1131,15 +1131,16 @@ public void onePhaseCommitRecords_WhenSuccessful_ShouldMutateUsingComposerMutati
11311131
}
11321132

11331133
@Test
1134-
public void onePhaseCommitRecords_WhenExecutionExceptionThrown_ShouldThrowCommitException()
1135-
throws ExecutionException {
1134+
public void
1135+
onePhaseCommitRecords_WhenExecutionExceptionThrown_ShouldThrowUnknownTransactionStatusException()
1136+
throws ExecutionException {
11361137
// Arrange
11371138
Snapshot snapshot = prepareSnapshotWithSamePartitionPut();
11381139
doThrow(ExecutionException.class).when(storage).mutate(anyList());
11391140

11401141
// Act Assert
11411142
assertThatThrownBy(() -> handler.onePhaseCommitRecords(snapshot))
1142-
.isInstanceOf(CommitException.class)
1143+
.isInstanceOf(UnknownTransactionStatusException.class)
11431144
.hasCauseInstanceOf(ExecutionException.class);
11441145
}
11451146

@@ -1162,17 +1163,19 @@ public void commit_OnePhaseCommitted_ShouldNotThrowAnyException()
11621163
}
11631164

11641165
@Test
1165-
public void commit_OnePhaseCommitted_CommitExceptionThrown_ShouldThrowCommitException()
1166-
throws CommitException {
1166+
public void
1167+
commit_OnePhaseCommitted_UnknownTransactionStatusExceptionThrown_ShouldThrowUnknownTransactionStatusException()
1168+
throws CommitException, UnknownTransactionStatusException {
11671169
// Arrange
11681170
CommitHandler handler = spy(createCommitHandlerWithOnePhaseCommit());
11691171
Snapshot snapshot = prepareSnapshotWithSamePartitionPut();
11701172

11711173
doReturn(true).when(handler).canOnePhaseCommit(snapshot);
1172-
doThrow(CommitException.class).when(handler).onePhaseCommitRecords(snapshot);
1174+
doThrow(UnknownTransactionStatusException.class).when(handler).onePhaseCommitRecords(snapshot);
11731175

11741176
// Act Assert
1175-
assertThatThrownBy(() -> handler.commit(snapshot, true)).isInstanceOf(CommitException.class);
1177+
assertThatThrownBy(() -> handler.commit(snapshot, true))
1178+
.isInstanceOf(UnknownTransactionStatusException.class);
11761179

11771180
verify(handler).onFailureBeforeCommit(snapshot);
11781181
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ public void onePhaseCommitRecords_WhenSuccessful_ShouldMutateUsingComposerMutati
254254
@Disabled("Enabling both one-phase commit and group commit is not supported")
255255
@Override
256256
@Test
257-
public void onePhaseCommitRecords_WhenExecutionExceptionThrown_ShouldThrowCommitException() {}
257+
public void
258+
onePhaseCommitRecords_WhenExecutionExceptionThrown_ShouldThrowUnknownTransactionStatusException() {}
258259

259260
@Disabled("Enabling both one-phase commit and group commit is not supported")
260261
@Override
@@ -264,5 +265,6 @@ public void commit_OnePhaseCommitted_ShouldNotThrowAnyException() {}
264265
@Disabled("Enabling both one-phase commit and group commit is not supported")
265266
@Override
266267
@Test
267-
public void commit_OnePhaseCommitted_CommitExceptionThrown_ShouldThrowCommitException() {}
268+
public void
269+
commit_OnePhaseCommitted_UnknownTransactionStatusExceptionThrown_ShouldThrowUnknownTransactionStatusException() {}
268270
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitterTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,9 @@ void remove_GivenAllFullTxIds_ShouldRemoveAll() throws Exception {
258258
}
259259
}
260260

261+
@SuppressWarnings("ClassCanBeStatic")
261262
@Nested
262-
static class CoordinatorGroupCommitKeyManipulatorTest {
263+
class CoordinatorGroupCommitKeyManipulatorTest {
263264
private final CoordinatorGroupCommitKeyManipulator keyManipulator =
264265
new CoordinatorGroupCommitKeyManipulator();
265266

0 commit comments

Comments
 (0)