Skip to content

Commit a8da13d

Browse files
committed
Fix error handling for mutations in Cassandra (#2827)
1 parent e46f8a3 commit a8da13d

File tree

4 files changed

+27
-33
lines changed

4 files changed

+27
-33
lines changed

core/src/main/java/com/scalar/db/storage/cassandra/BatchHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import com.google.common.annotations.VisibleForTesting;
1212
import com.scalar.db.api.Mutation;
1313
import com.scalar.db.common.CoreError;
14+
import com.scalar.db.exception.storage.ExecutionException;
1415
import com.scalar.db.exception.storage.NoMutationException;
15-
import com.scalar.db.exception.storage.RetriableExecutionException;
1616
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1717
import java.util.List;
1818
import javax.annotation.concurrent.ThreadSafe;
@@ -48,12 +48,11 @@ public BatchHandler(Session session, StatementHandlerManager handlers) {
4848
* must be for the same partition.
4949
*
5050
* @param mutations a list of {@code Mutation}s to execute
51-
* @throws RetriableExecutionException if it fails, but it can be retried
5251
* @throws NoMutationException if at least one of conditional {@code Mutation}s fails because it
5352
* didn't meet the condition
53+
* @throws ExecutionException if the execution fails
5454
*/
55-
public void handle(List<? extends Mutation> mutations)
56-
throws RetriableExecutionException, NoMutationException {
55+
public void handle(List<? extends Mutation> mutations) throws ExecutionException {
5756
try {
5857
ResultSet results = execute(mutations);
5958
// it's for conditional update. non-conditional update always return true
@@ -64,16 +63,17 @@ public void handle(List<? extends Mutation> mutations)
6463
logger.warn("Write timeout happened during batch mutate operation", e);
6564
WriteType writeType = e.getWriteType();
6665
if (writeType == WriteType.BATCH_LOG) {
67-
throw new RetriableExecutionException(
68-
CoreError.CASSANDRA_LOGGING_FAILED_IN_BATCH.buildMessage(), e);
66+
// A timeout occurred while the coordinator was waiting for the batch log replicas to
67+
// acknowledge the log. Thus, the batch may or may not be applied.
68+
throw new ExecutionException(CoreError.CASSANDRA_LOGGING_FAILED_IN_BATCH.buildMessage(), e);
6969
} else if (writeType == WriteType.BATCH) {
7070
logger.warn("Logging succeeded, but mutations in the batch partially failed", e);
7171
} else {
72-
throw new RetriableExecutionException(
72+
throw new ExecutionException(
7373
CoreError.CASSANDRA_OPERATION_FAILED_IN_BATCH.buildMessage(writeType), e);
7474
}
7575
} catch (RuntimeException e) {
76-
throw new RetriableExecutionException(
76+
throw new ExecutionException(
7777
CoreError.CASSANDRA_ERROR_OCCURRED_IN_BATCH.buildMessage(e.getMessage()), e);
7878
}
7979
}

core/src/main/java/com/scalar/db/storage/cassandra/MutateStatementHandler.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import com.scalar.db.common.CoreError;
1313
import com.scalar.db.exception.storage.ExecutionException;
1414
import com.scalar.db.exception.storage.NoMutationException;
15-
import com.scalar.db.exception.storage.RetriableExecutionException;
1615
import javax.annotation.Nonnull;
1716
import javax.annotation.concurrent.ThreadSafe;
1817

@@ -28,9 +27,9 @@ public MutateStatementHandler(Session session) {
2827
*
2928
* @param operation {@link Mutation} operation
3029
* @return a {@code ResultSet}
31-
* @throws RetriableExecutionException if the execution fails, but it can be retriable
3230
* @throws ReadRepairableExecutionException if the execution partially fails, which can be
3331
* repaired by a following read
32+
* @throws ExecutionException if the execution fails
3433
*/
3534
@Override
3635
@Nonnull
@@ -45,8 +44,7 @@ public ResultSet handle(Operation operation) throws ExecutionException {
4544
return results;
4645
} catch (WriteTimeoutException e) {
4746
if (e.getWriteType() == WriteType.CAS) {
48-
// retry needs to be done if applications need to do the operation exactly
49-
throw new RetriableExecutionException(
47+
throw new ExecutionException(
5048
CoreError.CASSANDRA_WRITE_TIMEOUT_IN_PAXOS_PHASE_IN_MUTATION.buildMessage(), e);
5149
} else if (e.getWriteType() == WriteType.SIMPLE) {
5250
Mutation mutation = (Mutation) operation;
@@ -55,8 +53,7 @@ public ResultSet handle(Operation operation) throws ExecutionException {
5553
throw new ReadRepairableExecutionException(
5654
CoreError.CASSANDRA_WRITE_TIMEOUT_IN_LEARN_PHASE_IN_MUTATION.buildMessage(), e);
5755
} else {
58-
// retry needs to be done if applications need to do the operation exactly
59-
throw new RetriableExecutionException(
56+
throw new ExecutionException(
6057
CoreError.CASSANDRA_WRITE_TIMEOUT_SIMPLE_WRITE_OPERATION_FAILED_IN_MUTATION
6158
.buildMessage(),
6259
e);
@@ -66,7 +63,7 @@ public ResultSet handle(Operation operation) throws ExecutionException {
6663
CoreError.CASSANDRA_WRITE_TIMEOUT_WITH_OTHER_WRITE_TYPE_IN_MUTATION.buildMessage(), e);
6764
}
6865
} catch (RuntimeException e) {
69-
throw new RetriableExecutionException(
66+
throw new ExecutionException(
7067
CoreError.CASSANDRA_ERROR_OCCURRED_IN_MUTATION.buildMessage(e.getMessage()), e);
7168
}
7269
}

core/src/test/java/com/scalar/db/storage/cassandra/BatchHandlerTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import com.scalar.db.api.Put;
2121
import com.scalar.db.api.PutIfExists;
2222
import com.scalar.db.api.PutIfNotExists;
23+
import com.scalar.db.exception.storage.ExecutionException;
2324
import com.scalar.db.exception.storage.NoMutationException;
24-
import com.scalar.db.exception.storage.RetriableExecutionException;
2525
import com.scalar.db.io.Key;
2626
import java.util.Arrays;
2727
import java.util.List;
@@ -178,7 +178,7 @@ public void handle_CorrectHandlerAndAtLeastOneConditionalPutGiven_ShouldSetConsi
178178
}
179179

180180
@Test
181-
public void handle_WTEThrownInLoggingInBatchExecution_ShouldThrowRetriableExecutionException() {
181+
public void handle_WTEThrownInLoggingInBatchExecution_ShouldThrowExecutionException() {
182182
// Arrange
183183
configureBehavior();
184184
mutations = prepareConditionalPuts();
@@ -188,7 +188,7 @@ public void handle_WTEThrownInLoggingInBatchExecution_ShouldThrowRetriableExecut
188188

189189
// Act Assert
190190
assertThatThrownBy(() -> batch.handle(mutations))
191-
.isInstanceOf(RetriableExecutionException.class)
191+
.isInstanceOf(ExecutionException.class)
192192
.hasCause(e);
193193
}
194194

@@ -206,7 +206,7 @@ public void handle_WTEThrownInMutationInBatchExecution_ShouldExecuteProperly() {
206206
}
207207

208208
@Test
209-
public void handle_WTEThrownInCasInBatchExecution_ShouldThrowRetriableExecutionException() {
209+
public void handle_WTEThrownInCasInBatchExecution_ShouldThrowExecutionException() {
210210
// Arrange
211211
configureBehavior();
212212
mutations = prepareConditionalPuts();
@@ -216,13 +216,12 @@ public void handle_WTEThrownInCasInBatchExecution_ShouldThrowRetriableExecutionE
216216

217217
// Act Assert
218218
assertThatThrownBy(() -> batch.handle(mutations))
219-
.isInstanceOf(RetriableExecutionException.class)
219+
.isInstanceOf(ExecutionException.class)
220220
.hasCause(e);
221221
}
222222

223223
@Test
224-
public void
225-
handle_WTEThrownInSimpleWriteInBatchExecution_ShouldThrowRetriableExecutionException() {
224+
public void handle_WTEThrownInSimpleWriteInBatchExecution_ShouldThrowExecutionException() {
226225
// Arrange
227226
configureBehavior();
228227
mutations = prepareConditionalPuts();
@@ -232,12 +231,12 @@ public void handle_WTEThrownInCasInBatchExecution_ShouldThrowRetriableExecutionE
232231

233232
// Act Assert
234233
assertThatThrownBy(() -> batch.handle(mutations))
235-
.isInstanceOf(RetriableExecutionException.class)
234+
.isInstanceOf(ExecutionException.class)
236235
.hasCause(e);
237236
}
238237

239238
@Test
240-
public void handle_DriverExceptionThrownInExecution_ShouldThrowRetriableExecutionException() {
239+
public void handle_DriverExceptionThrownInExecution_ShouldThrowExecutionException() {
241240
// Arrange
242241
configureBehavior();
243242
mutations = prepareConditionalPuts();
@@ -246,7 +245,7 @@ public void handle_DriverExceptionThrownInExecution_ShouldThrowRetriableExecutio
246245

247246
// Act Assert
248247
assertThatThrownBy(() -> batch.handle(mutations))
249-
.isInstanceOf(RetriableExecutionException.class)
248+
.isInstanceOf(ExecutionException.class)
250249
.hasCause(e);
251250
}
252251

core/src/test/java/com/scalar/db/storage/cassandra/InsertStatementHandlerTest.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.scalar.db.api.PutIfNotExists;
2929
import com.scalar.db.exception.storage.ExecutionException;
3030
import com.scalar.db.exception.storage.NoMutationException;
31-
import com.scalar.db.exception.storage.RetriableExecutionException;
3231
import com.scalar.db.io.Key;
3332
import org.junit.jupiter.api.BeforeEach;
3433
import org.junit.jupiter.api.Test;
@@ -330,7 +329,7 @@ public void setConsistency_PutOperationWithIfNotExistsGiven_ShouldPrepareWithQuo
330329
}
331330

332331
@Test
333-
public void handle_WTEWithCasThrown_ShouldThrowProperExecutionException() {
332+
public void handle_WTEWithCasThrown_ShouldThrowExecutionException() {
334333
// Arrange
335334
put = preparePutWithClusteringKey();
336335
put.withCondition(new PutIfNotExists());
@@ -342,13 +341,12 @@ public void handle_WTEWithCasThrown_ShouldThrowProperExecutionException() {
342341

343342
// Act Assert
344343
assertThatThrownBy(() -> spy.handle(put))
345-
.isInstanceOf(RetriableExecutionException.class)
344+
.isInstanceOf(ExecutionException.class)
346345
.hasCause(toThrow);
347346
}
348347

349348
@Test
350-
public void
351-
handle_PutWithConditionGivenAndWTEWithSimpleThrown_ShouldThrowProperExecutionException() {
349+
public void handle_PutWithConditionGivenAndWTEWithSimpleThrown_ShouldThrowExecutionException() {
352350
// Arrange
353351
put = preparePutWithClusteringKey();
354352
put.withCondition(new PutIfNotExists());
@@ -366,7 +364,7 @@ public void handle_WTEWithCasThrown_ShouldThrowProperExecutionException() {
366364

367365
@Test
368366
public void
369-
handle_PutWithoutConditionGivenAndWTEWithSimpleThrown_ShouldThrowProperExecutionException() {
367+
handle_PutWithoutConditionGivenAndWTEWithSimpleThrown_ShouldThrowExecutionException() {
370368
// Arrange
371369
put = preparePutWithClusteringKey();
372370
spy = prepareSpiedInsertStatementHandler();
@@ -377,12 +375,12 @@ public void handle_WTEWithCasThrown_ShouldThrowProperExecutionException() {
377375

378376
// Act Assert
379377
assertThatThrownBy(() -> spy.handle(put))
380-
.isInstanceOf(RetriableExecutionException.class)
378+
.isInstanceOf(ExecutionException.class)
381379
.hasCause(toThrow);
382380
}
383381

384382
@Test
385-
public void handle_DriverExceptionThrown_ShouldThrowProperExecutionException() {
383+
public void handle_DriverExceptionThrown_ShouldThrowExecutionException() {
386384
// Arrange
387385
put = preparePutWithClusteringKey();
388386
spy = prepareSpiedInsertStatementHandler();

0 commit comments

Comments
 (0)