diff --git a/core/src/main/java/com/scalar/db/common/CoreError.java b/core/src/main/java/com/scalar/db/common/CoreError.java index fff02ce179..6664ea599a 100644 --- a/core/src/main/java/com/scalar/db/common/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/CoreError.java @@ -728,19 +728,35 @@ public enum CoreError implements ScalarDbError { "", ""), CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS( - Category.CONCURRENCY_ERROR, "0013", "The record being prepared already exists", "", ""), + Category.CONCURRENCY_ERROR, + "0013", + "The record being prepared already exists. Details: %s", + "", + ""), CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_PREPARING_RECORDS( - Category.CONCURRENCY_ERROR, "0014", "A conflict occurred when preparing records", "", ""), + Category.CONCURRENCY_ERROR, + "0014", + "A conflict occurred when preparing records. Details: %s", + "", + ""), CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE( Category.CONCURRENCY_ERROR, "0015", - "The committing state in the coordinator failed. The transaction has been aborted", + "The committing state in the coordinator failed. The transaction has been aborted. Details: %s", "", ""), CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ( - Category.CONCURRENCY_ERROR, "0016", "A conflict occurred during implicit pre-read", "", ""), + Category.CONCURRENCY_ERROR, + "0016", + "A conflict occurred during implicit pre-read. Details: %s", + "", + ""), CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD( - Category.CONCURRENCY_ERROR, "0017", "This record needs to be recovered", "", ""), + Category.CONCURRENCY_ERROR, + "0017", + "This record needs to be recovered. Table: %s; Partition Key: %s; Clustering Key: %s; Transaction ID that wrote the record: %s", + "", + ""), CONSENSUS_COMMIT_CONDITION_NOT_SATISFIED_BECAUSE_RECORD_NOT_EXISTS( Category.CONCURRENCY_ERROR, "0018", @@ -784,7 +800,11 @@ public enum CoreError implements ScalarDbError { "", ""), CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS( - Category.CONCURRENCY_ERROR, "0026", "A conflict occurred when committing records", "", ""), + Category.CONCURRENCY_ERROR, + "0026", + "A conflict occurred when committing records. Details: %s", + "", + ""), // // Errors for the internal error category @@ -890,20 +910,21 @@ public enum CoreError implements ScalarDbError { JDBC_TRANSACTION_BEGINNING_TRANSACTION_FAILED( Category.INTERNAL_ERROR, "0035", "Beginning a transaction failed. Details: %s", "", ""), CONSENSUS_COMMIT_PREPARING_RECORDS_FAILED( - Category.INTERNAL_ERROR, "0036", "Preparing records failed", "", ""), - CONSENSUS_COMMIT_VALIDATION_FAILED(Category.INTERNAL_ERROR, "0037", "Validation failed", "", ""), + Category.INTERNAL_ERROR, "0036", "Preparing records failed. Details: %s", "", ""), + CONSENSUS_COMMIT_VALIDATION_FAILED( + Category.INTERNAL_ERROR, "0037", "Validation failed. Details: %s", "", ""), CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED( - Category.INTERNAL_ERROR, "0038", "Executing implicit pre-read failed", "", ""), + Category.INTERNAL_ERROR, "0038", "Executing implicit pre-read failed. Details: %s", "", ""), CONSENSUS_COMMIT_READING_RECORD_FROM_STORAGE_FAILED( Category.INTERNAL_ERROR, "0039", - "Reading a record from the underlying storage failed", + "Reading a record from the underlying storage failed. Details: %s", "", ""), CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED( Category.INTERNAL_ERROR, "0040", - "Scanning records from the underlying storage failed", + "Scanning records from the underlying storage failed. Details: %s", "", ""), CONSENSUS_COMMIT_ROLLBACK_FAILED_BECAUSE_TRANSACTION_ALREADY_COMMITTED( @@ -919,7 +940,7 @@ public enum CoreError implements ScalarDbError { Category.INTERNAL_ERROR, "0044", "The Upsert operation failed. Details: %s", "", ""), JDBC_TRANSACTION_UPDATE_OPERATION_FAILED( Category.INTERNAL_ERROR, "0045", "The Update operation failed. Details: %s", "", ""), - HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED( + CONSENSUS_COMMIT_HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED( Category.INTERNAL_ERROR, "0046", "Handling the before-preparation snapshot hook failed. Details: %s", @@ -938,7 +959,7 @@ public enum CoreError implements ScalarDbError { CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED( Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""), CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED( - Category.INTERNAL_ERROR, "0058", "Committing records failed", "", ""), + Category.INTERNAL_ERROR, "0058", "Committing records failed. Details: %s", "", ""), // // Errors for the unknown transaction status error category @@ -952,21 +973,31 @@ public enum CoreError implements ScalarDbError { CONSENSUS_COMMIT_COMMITTING_STATE_FAILED_WITH_NO_MUTATION_EXCEPTION_BUT_COORDINATOR_STATUS_DOES_NOT_EXIST( Category.UNKNOWN_TRANSACTION_STATUS_ERROR, "0001", - "Committing state failed with NoMutationException, but the coordinator status does not exist", + "Committing state failed with NoMutationException, but the coordinator status does not exist. Details: %s", + "", + ""), + CONSENSUS_COMMIT_CANNOT_COORDINATOR_STATUS( + Category.UNKNOWN_TRANSACTION_STATUS_ERROR, + "0002", + "The coordinator status cannot be retrieved. Details: %s", "", ""), - CONSENSUS_COMMIT_CANNOT_GET_STATE( - Category.UNKNOWN_TRANSACTION_STATUS_ERROR, "0002", "The state cannot be retrieved", "", ""), CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS( Category.UNKNOWN_TRANSACTION_STATUS_ERROR, "0003", - "The coordinator status is unknown", + "The coordinator status is unknown. Details: %s", "", ""), CONSENSUS_COMMIT_ABORTING_STATE_FAILED_WITH_NO_MUTATION_EXCEPTION_BUT_COORDINATOR_STATUS_DOES_NOT_EXIST( Category.UNKNOWN_TRANSACTION_STATUS_ERROR, "0004", - "Aborting state failed with NoMutationException, but the coordinator status does not exist", + "Aborting state failed with NoMutationException, but the coordinator status does not exist. Details: %s", + "", + ""), + CONSENSUS_COMMIT_ONE_PHASE_COMMITTING_RECORDS_FAILED( + Category.UNKNOWN_TRANSACTION_STATUS_ERROR, + "0005", + "One-phase committing records failed. Details: %s", "", ""), ; diff --git a/core/src/main/java/com/scalar/db/exception/storage/NoMutationException.java b/core/src/main/java/com/scalar/db/exception/storage/NoMutationException.java index d11eb0aca3..92eb0f9255 100644 --- a/core/src/main/java/com/scalar/db/exception/storage/NoMutationException.java +++ b/core/src/main/java/com/scalar/db/exception/storage/NoMutationException.java @@ -1,12 +1,62 @@ package com.scalar.db.exception.storage; +import com.google.common.collect.ImmutableList; +import com.scalar.db.api.Delete; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.List; + public class NoMutationException extends ExecutionException { - public NoMutationException(String message) { - super(message); + private final List mutations; + + public NoMutationException( + String message, @SuppressFBWarnings("EI_EXPOSE_REP2") List mutations) { + super(addTransactionIdToMessage(message, mutations)); + this.mutations = mutations; } - public NoMutationException(String message, Throwable cause) { - super(message, cause); + public NoMutationException( + String message, + @SuppressFBWarnings("EI_EXPOSE_REP2") List mutations, + Throwable cause) { + super(addTransactionIdToMessage(message, mutations), cause); + this.mutations = mutations; + } + + public List getMutations() { + return ImmutableList.copyOf(mutations); + } + + private static String addTransactionIdToMessage( + String message, List mutations) { + StringBuilder builder = new StringBuilder(message).append(". Mutations: ["); + + boolean first = true; + for (Mutation mutation : mutations) { + assert mutation.forFullTableName().isPresent(); + assert mutation instanceof Put || mutation instanceof Delete; + + if (!first) { + builder.append(", "); + } else { + first = false; + } + builder + .append("{Type: ") + .append(mutation.getClass().getSimpleName()) + .append(", Table: ") + .append(mutation.forFullTableName().get()) + .append(", Partition Key: ") + .append(mutation.getPartitionKey()) + .append(", Clustering Key: ") + .append(mutation.getClusteringKey()) + .append(", Condition: ") + .append(mutation.getCondition()) + .append("}"); + } + + return builder.append("]").toString(); } } diff --git a/core/src/main/java/com/scalar/db/storage/cassandra/BatchHandler.java b/core/src/main/java/com/scalar/db/storage/cassandra/BatchHandler.java index 1b141f5ec9..ce4ce7aed5 100644 --- a/core/src/main/java/com/scalar/db/storage/cassandra/BatchHandler.java +++ b/core/src/main/java/com/scalar/db/storage/cassandra/BatchHandler.java @@ -57,7 +57,7 @@ public void handle(List mutations) throws ExecutionException ResultSet results = execute(mutations); // it's for conditional update. non-conditional update always return true if (!results.wasApplied()) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage()); + throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), mutations); } } catch (WriteTimeoutException e) { logger.warn("Write timeout happened during batch mutate operation", e); diff --git a/core/src/main/java/com/scalar/db/storage/cassandra/MutateStatementHandler.java b/core/src/main/java/com/scalar/db/storage/cassandra/MutateStatementHandler.java index 5a80a4dc69..f5683d47e5 100644 --- a/core/src/main/java/com/scalar/db/storage/cassandra/MutateStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/cassandra/MutateStatementHandler.java @@ -12,6 +12,7 @@ import com.scalar.db.common.CoreError; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.storage.NoMutationException; +import java.util.Collections; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; @@ -39,7 +40,8 @@ public ResultSet handle(Operation operation) throws ExecutionException { Mutation mutation = (Mutation) operation; if (mutation.getCondition().isPresent() && !results.one().getBool(0)) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage()); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), Collections.singletonList(mutation)); } return results; } catch (WriteTimeoutException e) { diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/BatchHandler.java b/core/src/main/java/com/scalar/db/storage/cosmos/BatchHandler.java index 25e2ab58dc..6342d4ef0a 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/BatchHandler.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/BatchHandler.java @@ -49,7 +49,7 @@ public void handle(List mutations) throws ExecutionException try { executeStoredProcedure(mutations, tableMetadata); } catch (CosmosException e) { - throwException(e); + throwException(e, mutations); } catch (RuntimeException e) { throw new ExecutionException( CoreError.COSMOS_ERROR_OCCURRED_IN_MUTATION.buildMessage(e.getMessage()), e); @@ -84,11 +84,13 @@ private void executeStoredProcedure( .execute(args, cosmosMutation.getStoredProcedureOptions()); } - private void throwException(CosmosException exception) throws ExecutionException { + private void throwException(CosmosException exception, List mutations) + throws ExecutionException { int statusCode = exception.getSubStatusCode(); if (statusCode == CosmosErrorCode.PRECONDITION_FAILED.get()) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), exception); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), mutations, exception); } else if (statusCode == CosmosErrorCode.RETRY_WITH.get()) { throw new RetriableExecutionException( CoreError.COSMOS_RETRY_WITH_ERROR_OCCURRED_IN_MUTATION.buildMessage( diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/MutateStatementHandler.java b/core/src/main/java/com/scalar/db/storage/cosmos/MutateStatementHandler.java index 491a2b56c4..e17aff9a06 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/MutateStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/MutateStatementHandler.java @@ -10,6 +10,7 @@ import com.scalar.db.exception.storage.NoMutationException; import com.scalar.db.exception.storage.RetriableExecutionException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.annotation.concurrent.ThreadSafe; @@ -32,7 +33,7 @@ public void handle(Mutation mutation) throws ExecutionException { try { execute(mutation); } catch (CosmosException e) { - throwException(e); + throwException(e, mutation); } catch (RuntimeException e) { throw new ExecutionException( CoreError.COSMOS_ERROR_OCCURRED_IN_MUTATION.buildMessage(e.getMessage()), e); @@ -56,11 +57,15 @@ protected void executeStoredProcedure(Mutation mutation, TableMetadata tableMeta .execute(args, cosmosMutation.getStoredProcedureOptions()); } - private void throwException(CosmosException exception) throws ExecutionException { + private void throwException(CosmosException exception, Mutation mutation) + throws ExecutionException { int statusCode = exception.getSubStatusCode(); if (statusCode == CosmosErrorCode.PRECONDITION_FAILED.get()) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), exception); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), + Collections.singletonList(mutation), + exception); } else if (statusCode == CosmosErrorCode.RETRY_WITH.get()) { throw new RetriableExecutionException( CoreError.COSMOS_RETRY_WITH_ERROR_OCCURRED_IN_MUTATION.buildMessage( diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/BatchHandler.java b/core/src/main/java/com/scalar/db/storage/dynamo/BatchHandler.java index baf98dc488..bf3548b9c5 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/BatchHandler.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/BatchHandler.java @@ -86,7 +86,7 @@ public void handle(List mutations) throws ExecutionException boolean allReasonsAreTransactionConflicts = true; for (CancellationReason reason : e.cancellationReasons()) { if (reason.code().equals("ConditionalCheckFailed")) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), e); + throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), mutations, e); } if (!reason.code().equals("TransactionConflict") && !reason.code().equals("None")) { allReasonsAreTransactionConflicts = false; diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/DeleteStatementHandler.java b/core/src/main/java/com/scalar/db/storage/dynamo/DeleteStatementHandler.java index 6483d5a62b..cf5b276316 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/DeleteStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/DeleteStatementHandler.java @@ -12,6 +12,7 @@ import com.scalar.db.exception.storage.NoMutationException; import com.scalar.db.exception.storage.RetriableExecutionException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collections; import java.util.Map; import java.util.Optional; import javax.annotation.concurrent.ThreadSafe; @@ -50,7 +51,8 @@ public void handle(Delete delete) throws ExecutionException { try { delete(delete, tableMetadata); } catch (ConditionalCheckFailedException e) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), e); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), Collections.singletonList(delete), e); } catch (TransactionConflictException e) { throw new RetriableExecutionException( CoreError.DYNAMO_TRANSACTION_CONFLICT_OCCURRED_IN_MUTATION.buildMessage( diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/PutStatementHandler.java b/core/src/main/java/com/scalar/db/storage/dynamo/PutStatementHandler.java index 1318c40d04..6c622386ad 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/PutStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/PutStatementHandler.java @@ -13,6 +13,7 @@ import com.scalar.db.exception.storage.NoMutationException; import com.scalar.db.exception.storage.RetriableExecutionException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collections; import java.util.Map; import java.util.Optional; import javax.annotation.concurrent.ThreadSafe; @@ -51,7 +52,8 @@ public void handle(Put put) throws ExecutionException { try { execute(put, tableMetadata); } catch (ConditionalCheckFailedException e) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), e); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), Collections.singletonList(put), e); } catch (TransactionConflictException e) { throw new RetriableExecutionException( CoreError.DYNAMO_TRANSACTION_CONFLICT_OCCURRED_IN_MUTATION.buildMessage( diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java index ee2d4dbfe9..5fca851925 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java @@ -21,6 +21,7 @@ import com.scalar.db.exception.storage.RetriableExecutionException; import java.sql.Connection; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import java.util.Optional; import javax.annotation.concurrent.ThreadSafe; @@ -138,7 +139,8 @@ public void put(Put put) throws ExecutionException { try { connection = dataSource.getConnection(); if (!jdbcService.put(put, connection)) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage()); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), Collections.singletonList(put)); } } catch (SQLException e) { throw new ExecutionException( @@ -160,7 +162,8 @@ public void delete(Delete delete) throws ExecutionException { try { connection = dataSource.getConnection(); if (!jdbcService.delete(delete, connection)) { - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage()); + throw new NoMutationException( + CoreError.NO_MUTATION_APPLIED.buildMessage(), Collections.singletonList(delete)); } } catch (SQLException e) { throw new ExecutionException( @@ -210,7 +213,7 @@ public void mutate(List mutations) throws ExecutionException throw new ExecutionException( CoreError.JDBC_ERROR_OCCURRED_IN_MUTATION.buildMessage(e.getMessage()), e); } - throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage()); + throw new NoMutationException(CoreError.NO_MUTATION_APPLIED.buildMessage(), mutations); } else { connection.commit(); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 845f351d56..b03f93d73d 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -93,7 +93,8 @@ private Optional> invokeBeforePreparationSnapshotHook(Snapshot snap abortState(snapshot.getId()); rollbackRecords(snapshot); throw new CommitException( - CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), + CoreError.CONSENSUS_COMMIT_HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage( + e.getMessage()), e, snapshot.getId()); } @@ -113,7 +114,8 @@ private void waitBeforePreparationSnapshotHookFuture( abortState(snapshot.getId()); rollbackRecords(snapshot); throw new CommitException( - CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), + CoreError.CONSENSUS_COMMIT_HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage( + e.getMessage()), e, snapshot.getId()); } @@ -229,7 +231,9 @@ boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException { .collect(Collectors.toList())); } catch (ExecutionException e) { throw new CommitException( - CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(e.getMessage()), + e, + snapshot.getId()); } } @@ -242,7 +246,8 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause) if (state.equals(TransactionState.ABORTED)) { rollbackRecords(snapshot); throw new CommitConflictException( - CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(), + CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage( + cause.getMessage()), cause, snapshot.getId()); } @@ -250,13 +255,15 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause) throw new UnknownTransactionStatusException( CoreError .CONSENSUS_COMMIT_COMMITTING_STATE_FAILED_WITH_NO_MUTATION_EXCEPTION_BUT_COORDINATOR_STATUS_DOES_NOT_EXIST - .buildMessage(), + .buildMessage(cause.getMessage()), cause, snapshot.getId()); } } catch (CoordinatorException ex) { throw new UnknownTransactionStatusException( - CoreError.CONSENSUS_COMMIT_CANNOT_GET_STATE.buildMessage(), ex, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_CANNOT_COORDINATOR_STATUS.buildMessage(ex.getMessage()), + ex, + snapshot.getId()); } } @@ -273,15 +280,21 @@ void onePhaseCommitRecords(Snapshot snapshot) storage.mutate(composer.get()); } catch (NoMutationException e) { throw new CommitConflictException( - CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(e.getMessage()), + e, + snapshot.getId()); } catch (RetriableExecutionException e) { throw new CommitConflictException( - CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS.buildMessage(), + CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS.buildMessage( + e.getMessage()), e, snapshot.getId()); } catch (ExecutionException e) { throw new UnknownTransactionStatusException( - CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_ONE_PHASE_COMMITTING_RECORDS_FAILED.buildMessage( + e.getMessage()), + e, + snapshot.getId()); } } @@ -299,15 +312,20 @@ public void prepareRecords(Snapshot snapshot) throws PreparationException { parallelExecutor.prepareRecords(tasks, snapshot.getId()); } catch (NoMutationException e) { throw new PreparationConflictException( - CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(e.getMessage()), + e, + snapshot.getId()); } catch (RetriableExecutionException e) { throw new PreparationConflictException( - CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_PREPARING_RECORDS.buildMessage(), + CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_PREPARING_RECORDS.buildMessage( + e.getMessage()), e, snapshot.getId()); } catch (ExecutionException e) { throw new PreparationException( - CoreError.CONSENSUS_COMMIT_PREPARING_RECORDS_FAILED.buildMessage(), e, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_PREPARING_RECORDS_FAILED.buildMessage(e.getMessage()), + e, + snapshot.getId()); } } @@ -317,7 +335,9 @@ public void validateRecords(Snapshot snapshot) throws ValidationException { snapshot.toSerializable(storage); } catch (ExecutionException e) { throw new ValidationException( - CoreError.CONSENSUS_COMMIT_VALIDATION_FAILED.buildMessage(), e, snapshot.getId()); + CoreError.CONSENSUS_COMMIT_VALIDATION_FAILED.buildMessage(e.getMessage()), + e, + snapshot.getId()); } } @@ -333,7 +353,9 @@ public void commitState(Snapshot snapshot) handleCommitConflict(snapshot, e); } catch (CoordinatorException e) { throw new UnknownTransactionStatusException( - CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(), e, id); + CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(e.getMessage()), + e, + id); } } @@ -350,7 +372,7 @@ public void commitRecords(Snapshot snapshot) { } parallelExecutor.commitRecords(tasks, snapshot.getId()); } catch (Exception e) { - logger.warn("Committing records failed. Transaction ID: {}", snapshot.getId(), e); + logger.info("Committing records failed. Transaction ID: {}", snapshot.getId(), e); // ignore since records are recovered lazily } } @@ -370,16 +392,20 @@ public TransactionState abortState(String id) throws UnknownTransactionStatusExc throw new UnknownTransactionStatusException( CoreError .CONSENSUS_COMMIT_ABORTING_STATE_FAILED_WITH_NO_MUTATION_EXCEPTION_BUT_COORDINATOR_STATUS_DOES_NOT_EXIST - .buildMessage(), + .buildMessage(e.getMessage()), e, id); } catch (CoordinatorException e1) { throw new UnknownTransactionStatusException( - CoreError.CONSENSUS_COMMIT_CANNOT_GET_STATE.buildMessage(), e1, id); + CoreError.CONSENSUS_COMMIT_CANNOT_COORDINATOR_STATUS.buildMessage(e1.getMessage()), + e1, + id); } } catch (CoordinatorException e) { throw new UnknownTransactionStatusException( - CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(), e, id); + CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(e.getMessage()), + e, + id); } } @@ -397,7 +423,7 @@ public void rollbackRecords(Snapshot snapshot) { } parallelExecutor.rollbackRecords(tasks, snapshot.getId()); } catch (Exception e) { - logger.warn("Rolling back records failed. Transaction ID: {}", snapshot.getId(), e); + logger.info("Rolling back records failed. Transaction ID: {}", snapshot.getId(), e); // ignore since records are recovered lazily } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 6918f7c3ba..c9ee33a85f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -104,7 +104,7 @@ private void commitStateViaGroupCommit(Snapshot snapshot) } catch (Exception e) { // This is an unexpected exception, but clean up resources just in case. cancelGroupCommitIfNeeded(id); - throw new AssertionError("Group commit unexpectedly failed. TransactionID:" + id, e); + throw new AssertionError("Group commit unexpectedly failed. TransactionID: " + id, e); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java index 720d976cb1..697b5c041f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java @@ -78,6 +78,7 @@ private void add(Selection base, @Nullable TransactionResult result) throws Exec // for rollforward in lazy recovery mutations.add(composeDelete(base, result)); } else { + assert result.getState().equals(TransactionState.COMMITTED); logger.debug( "The record was committed by the originated one " + "or rolled forward by another transaction: {}", diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java index a94fb33307..718f672078 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java @@ -150,7 +150,7 @@ public void update(Update update) throws CrudException { throw new UnsatisfiedConditionException( ConsensusCommitUtils.convertUnsatisfiedConditionExceptionMessageForUpdate( e, update.getCondition().get()), - crud.getSnapshot().getId()); + getId()); } // If the condition is not specified, it means that the record does not exist. In this case, @@ -188,12 +188,16 @@ public void commit() throws CommitException, UnknownTransactionStatusException { crud.readIfImplicitPreReadEnabled(); } catch (CrudConflictException e) { throw new CommitConflictException( - CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(), + CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage( + e.getMessage()), e, getId()); } catch (CrudException e) { throw new CommitException( - CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId()); + CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage( + e.getMessage()), + e, + getId()); } try { @@ -212,11 +216,11 @@ public void rollback() { try { crud.closeScanners(); } catch (CrudException e) { - logger.warn("Failed to close the scanner", e); + logger.warn("Failed to close the scanner. Transaction ID: {}", getId(), e); } if (groupCommitter != null && !crud.isReadOnly()) { - groupCommitter.remove(crud.getSnapshot().getId()); + groupCommitter.remove(getId()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index bf8c78249c..e3dba993cd 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -475,7 +475,8 @@ private void rollbackTransaction(DistributedTransaction transaction) { try { transaction.rollback(); } catch (RollbackException e) { - logger.warn("Rolling back the transaction failed", e); + logger.warn( + "Rolling back the transaction failed. Transaction ID: {}", transaction.getId(), e); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java index 935c494048..36c4ea2dda 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java @@ -1,5 +1,7 @@ package com.scalar.db.transaction.consensuscommit; +import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*; + import com.google.common.annotations.VisibleForTesting; import com.scalar.db.api.ConditionalExpression; import com.scalar.db.api.Delete; @@ -7,7 +9,6 @@ import com.scalar.db.api.DeleteIfExists; import com.scalar.db.api.Mutation; import com.scalar.db.api.MutationCondition; -import com.scalar.db.api.Operation; import com.scalar.db.api.Put; import com.scalar.db.api.PutIf; import com.scalar.db.api.PutIfExists; @@ -28,16 +29,6 @@ public ConsensusCommitMutationOperationChecker( this.transactionTableMetadataManager = transactionTableMetadataManager; } - private TransactionTableMetadata getTableMetadata(Operation operation) throws ExecutionException { - TransactionTableMetadata metadata = - transactionTableMetadataManager.getTransactionTableMetadata(operation); - if (metadata == null) { - throw new IllegalArgumentException( - CoreError.TABLE_NOT_FOUND.buildMessage(operation.forFullTableName().get())); - } - return metadata; - } - /** * Checks the mutation validity * @@ -55,7 +46,8 @@ public void check(Mutation mutation) throws ExecutionException { } private void check(Put put) throws ExecutionException { - TransactionTableMetadata metadata = getTableMetadata(put); + TransactionTableMetadata metadata = + getTransactionTableMetadata(transactionTableMetadataManager, put); for (String column : put.getContainedColumnNames()) { if (metadata.getTransactionMetaColumnNames().contains(column)) { throw new IllegalArgumentException( @@ -92,7 +84,8 @@ private void check(Delete delete) throws ExecutionException { CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_ON_DELETE.buildMessage( condition.getClass().getSimpleName())); } - TransactionTableMetadata transactionMetadata = getTableMetadata(delete); + TransactionTableMetadata transactionMetadata = + getTransactionTableMetadata(transactionTableMetadataManager, delete); checkConditionIsNotTargetingMetadataColumns(condition, transactionMetadata); ConditionChecker conditionChecker = createConditionChecker(transactionMetadata.getTableMetadata()); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java index 7de3f326db..b6bb5fdaae 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java @@ -14,7 +14,6 @@ import com.scalar.db.api.Result; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionState; -import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.storage.NoMutationException; import com.scalar.db.io.DataType; import com.scalar.db.io.Key; @@ -134,7 +133,7 @@ Optional getStateForGroupCommit(String fullId) throws Coordin private Optional getStateInternal(String id) throws CoordinatorException { Get get = createGetWith(id); - return get(get); + return get(get, id); } /** @@ -168,7 +167,7 @@ public Optional getStateByFullId(String fullId) throws Coordi public void putState(Coordinator.State state) throws CoordinatorException { Put put = createPutWith(state); - put(put); + put(put, state.getId()); } void putStateForGroupCommit( @@ -189,7 +188,7 @@ void putStateForGroupCommit( State state = new State(parentId, childIds, transactionState, createdAt); Put put = createPutWith(state); - put(put); + put(put, state.getId()); } public void putStateForLazyRecoveryRollback(String id) throws CoordinatorException { @@ -300,12 +299,10 @@ Get createGetWith(String id) { .forTable(TABLE); } - private Optional get(Get get) throws CoordinatorException { + private Optional get(Get get, String id) throws CoordinatorException { int counter = 0; + Exception exception = null; while (true) { - if (counter >= MAX_RETRY_COUNT) { - throw new CoordinatorException("Can't get coordinator state"); - } try { Optional result = storage.get(get); if (result.isPresent()) { @@ -313,10 +310,25 @@ private Optional get(Get get) throws CoordinatorException { } else { return Optional.empty(); } - } catch (ExecutionException e) { - logger.warn("Can't get coordinator state", e); + } catch (Exception e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + + if (counter + 1 >= MAX_RETRY_COUNT) { + throw new CoordinatorException("Can't get coordinator state", exception); + } + + logger.warn( + "Can't get coordinator state. Retrying... Attempt: {}; Transaction ID: {}", + counter, + id, + e); + + exponentialBackoff(counter++); } - exponentialBackoff(counter++); } } @@ -335,21 +347,34 @@ Put createPutWith(Coordinator.State state) { .forTable(TABLE); } - private void put(Put put) throws CoordinatorException { + private void put(Put put, String id) throws CoordinatorException { int counter = 0; + Exception exception = null; while (true) { - if (counter >= MAX_RETRY_COUNT) { - throw new CoordinatorException("Couldn't put coordinator state"); - } try { storage.put(put); break; } catch (NoMutationException e) { throw new CoordinatorConflictException("Mutation seems applied already", e); - } catch (ExecutionException e) { - logger.warn("Putting state in coordinator failed", e); + } catch (Exception e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + + if (counter + 1 >= MAX_RETRY_COUNT) { + throw new CoordinatorException("Couldn't put coordinator state", exception); + } + + logger.warn( + "Putting state in coordinator failed. Retrying... Attempt: {}; Transaction ID: {}", + counter, + id, + e); + + exponentialBackoff(counter++); } - exponentialBackoff(counter++); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 0c34613e8c..4a8fd44e75 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -272,7 +272,8 @@ private LinkedHashMap scanInternal(Scan scan) exception = e; } throw new CrudException( - CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage(), + CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage( + exception.getMessage()), exception, snapshot.getId()); } finally { @@ -280,7 +281,7 @@ private LinkedHashMap scanInternal(Scan scan) try { scanner.close(); } catch (IOException e) { - logger.warn("Failed to close the scanner", e); + logger.warn("Failed to close the scanner. Transaction ID: {}", snapshot.getId(), e); } } } @@ -511,15 +512,14 @@ public void waitForRecoveryCompletionIfNecessary() throws CrudException { recoveryResult.recoveryFuture.get(); } } catch (java.util.concurrent.ExecutionException e) { - if (e.getCause() instanceof CrudConflictException) { - throw new CrudConflictException( - e.getCause().getMessage(), e.getCause(), snapshot.getId()); + Throwable cause = e.getCause(); + if (cause instanceof CrudException) { + throw (CrudException) cause; } throw new CrudException( - CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage( - e.getCause().getMessage()), - e.getCause(), + CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(cause.getMessage()), + cause, snapshot.getId()); } catch (Exception e) { throw new CrudException( @@ -536,15 +536,14 @@ void waitForRecoveryCompletion() throws CrudException { try { recoveryResult.recoveryFuture.get(); } catch (java.util.concurrent.ExecutionException e) { - if (e.getCause() instanceof CrudConflictException) { - throw new CrudConflictException( - e.getCause().getMessage(), e.getCause(), snapshot.getId()); + Throwable cause = e.getCause(); + if (cause instanceof CrudException) { + throw (CrudException) cause; } throw new CrudException( - CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage( - e.getCause().getMessage()), - e.getCause(), + CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(cause.getMessage()), + cause, snapshot.getId()); } catch (Exception e) { throw new CrudException( @@ -572,7 +571,8 @@ Optional getFromStorage(Get get) throws CrudException { } } catch (ExecutionException e) { throw new CrudException( - CoreError.CONSENSUS_COMMIT_READING_RECORD_FROM_STORAGE_FAILED.buildMessage(), + CoreError.CONSENSUS_COMMIT_READING_RECORD_FROM_STORAGE_FAILED.buildMessage( + e.getMessage()), e, snapshot.getId()); } @@ -592,7 +592,8 @@ private Scanner scanFromStorage(Scan scan) throws CrudException { } } catch (ExecutionException e) { throw new CrudException( - CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage(), + CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage( + e.getMessage()), e, snapshot.getId()); } @@ -719,11 +720,15 @@ private Selection prepareStorageSelection(Selection selection) { private TransactionTableMetadata getTransactionTableMetadata(Operation operation) throws CrudException { + assert operation.forFullTableName().isPresent(); + try { return ConsensusCommitUtils.getTransactionTableMetadata(tableMetadataManager, operation); } catch (ExecutionException e) { throw new CrudException( - CoreError.GETTING_TABLE_METADATA_FAILED.buildMessage(), e, snapshot.getId()); + CoreError.GETTING_TABLE_METADATA_FAILED.buildMessage(operation.forFullTableName().get()), + e, + snapshot.getId()); } } @@ -824,7 +829,8 @@ public Optional one() throws CrudException { } catch (ExecutionException e) { closeScanner(); throw new CrudException( - CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage(), + CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage( + e.getMessage()), e, snapshot.getId()); } catch (CrudException e) { @@ -878,7 +884,7 @@ private void closeScanner() { try { scanner.close(); } catch (IOException e) { - logger.warn("Failed to close the scanner", e); + logger.warn("Failed to close the scanner. Transaction ID: {}", snapshot.getId(), e); } } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java index 876e73fe54..ac9199c408 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java @@ -76,7 +76,7 @@ public void prepareRecords(List tasks, String transactionI config.isParallelPreparationEnabled(), false, stopOnError, - "preparation", + "prepareRecords", transactionId); } catch (ValidationConflictException | CrudException e) { throw new AssertionError( @@ -89,7 +89,12 @@ public void validateRecords(List tasks, String transaction throws ExecutionException, ValidationConflictException { try { executeTasks( - tasks, config.isParallelValidationEnabled(), false, true, "validation", transactionId); + tasks, + config.isParallelValidationEnabled(), + false, + true, + "validateRecords", + transactionId); } catch (CrudException e) { throw new AssertionError( "Tasks for validating a transaction should not throw CrudException", e); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java index 4c0079b181..4c1ec9786b 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java @@ -131,7 +131,10 @@ private Optional getCoordinatorState(String transactionId) try { return coordinator.getState(transactionId); } catch (CoordinatorException e) { - throw new CrudException(e.getMessage(), e, transactionId); + throw new CrudException( + CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(e.getMessage()), + e, + transactionId); } } @@ -154,12 +157,20 @@ private void throwUncommittedRecordExceptionIfTransactionNotExpired( Selection selection, TransactionResult result, String transactionId) - throws UncommittedRecordException { + throws CrudException { + assert selection.forFullTableName().isPresent(); + if (!state.isPresent() && !recovery.isTransactionExpired(result)) { + TransactionTableMetadata transactionTableMetadata = + getTransactionTableMetadata(selection, transactionId); throw new UncommittedRecordException( selection, result, - CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(), + CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage( + selection.forFullTableName().get(), + ScalarDbUtils.getPartitionKey(result, transactionTableMetadata.getTableMetadata()), + ScalarDbUtils.getClusteringKey(result, transactionTableMetadata.getTableMetadata()), + result.getId()), transactionId); } } @@ -282,11 +293,15 @@ private void addNullBeforeImageColumns( private TransactionTableMetadata getTransactionTableMetadata( Operation operation, String transactionId) throws CrudException { + assert operation.forFullTableName().isPresent(); + try { return ConsensusCommitUtils.getTransactionTableMetadata(tableMetadataManager, operation); } catch (ExecutionException e) { throw new CrudException( - CoreError.GETTING_TABLE_METADATA_FAILED.buildMessage(), e, transactionId); + CoreError.GETTING_TABLE_METADATA_FAILED.buildMessage(operation.forFullTableName().get()), + e, + transactionId); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryHandler.java index 0709ba1bd5..ae5989dcf4 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryHandler.java @@ -1,6 +1,7 @@ package com.scalar.db.transaction.consensuscommit; import static com.google.common.base.Preconditions.checkNotNull; +import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*; import com.google.common.annotations.VisibleForTesting; import com.scalar.db.api.DistributedStorage; @@ -9,6 +10,8 @@ import com.scalar.db.api.TransactionState; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.storage.NoMutationException; +import com.scalar.db.io.Key; +import com.scalar.db.util.ScalarDbUtils; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Optional; @@ -37,8 +40,6 @@ public RecoveryHandler( public void recover( Selection selection, TransactionResult result, Optional state) throws ExecutionException, CoordinatorException { - logger.debug("Recovering for {}", result.getId()); - if (state.isPresent()) { if (state.get().getState().equals(TransactionState.COMMITTED)) { rollforwardRecord(selection, result); @@ -52,17 +53,34 @@ public void recover( @VisibleForTesting void rollbackRecord(Selection selection, TransactionResult result) throws ExecutionException { - logger.debug( - "Rollback for {}, {} mutated by {}", - selection.getPartitionKey(), - selection.getClusteringKey(), + assert selection.forFullTableName().isPresent(); + + TransactionTableMetadata tableMetadata = + getTransactionTableMetadata(tableMetadataManager, selection); + Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata.getTableMetadata()); + Optional clusteringKey = + ScalarDbUtils.getClusteringKey(result, tableMetadata.getTableMetadata()); + + logger.info( + "Rolling back for a record. Table: {}; Partition Key: {}; Clustering Key: {}; Transaction ID that wrote the record: {}", + selection.forFullTableName().get(), + partitionKey, + clusteringKey, result.getId()); RollbackMutationComposer composer = createRollbackMutationComposer(selection, result); try { mutate(composer.get()); - } catch (NoMutationException ignored) { + } catch (NoMutationException e) { + logger.info( + "Rolling back for a record failed. Table: {}; Partition Key: {}; Clustering Key: {}; Transaction ID that wrote the record: {}", + selection.forFullTableName().get(), + partitionKey, + clusteringKey, + result.getId(), + e); + // This can happen when the record has already been rolled back by another transaction. In // this case, we just ignore it. } @@ -79,17 +97,34 @@ RollbackMutationComposer createRollbackMutationComposer( @VisibleForTesting void rollforwardRecord(Selection selection, TransactionResult result) throws ExecutionException { - logger.debug( - "Rollforward for {}, {} mutated by {}", - selection.getPartitionKey(), - selection.getClusteringKey(), + assert selection.forFullTableName().isPresent(); + + TransactionTableMetadata tableMetadata = + getTransactionTableMetadata(tableMetadataManager, selection); + Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata.getTableMetadata()); + Optional clusteringKey = + ScalarDbUtils.getClusteringKey(result, tableMetadata.getTableMetadata()); + + logger.info( + "Rolling forward for a record. Table: {}; Partition Key: {}; Clustering Key: {}; Transaction ID that wrote the record: {}", + selection.forFullTableName().get(), + partitionKey, + clusteringKey, result.getId()); CommitMutationComposer composer = createCommitMutationComposer(selection, result); try { mutate(composer.get()); - } catch (NoMutationException ignored) { + } catch (NoMutationException e) { + logger.info( + "Rolling forward for a record failed. Table: {}; Partition Key: {}; Clustering Key: {}; Transaction ID that wrote the record: {}", + selection.forFullTableName().get(), + partitionKey, + clusteringKey, + result.getId(), + e); + // This can happen when the record has already been committed by another transaction. In this // case, we just ignore it. } @@ -106,13 +141,29 @@ CommitMutationComposer createCommitMutationComposer(Selection selection, Transac private void abortIfExpired(Selection selection, TransactionResult result) throws CoordinatorException, ExecutionException { + assert selection.forFullTableName().isPresent(); + if (!isTransactionExpired(result)) { return; } try { coordinator.putStateForLazyRecoveryRollback(result.getId()); - } catch (CoordinatorConflictException ignored) { + } catch (CoordinatorConflictException e) { + TransactionTableMetadata tableMetadata = + getTransactionTableMetadata(tableMetadataManager, selection); + Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata.getTableMetadata()); + Optional clusteringKey = + ScalarDbUtils.getClusteringKey(result, tableMetadata.getTableMetadata()); + + logger.info( + "Putting state in coordinator for a record failed. Table: {}; Partition Key: {}; Clustering Key: {}; Transaction ID that wrote the record: {}", + selection.forFullTableName().get(), + partitionKey, + clusteringKey, + result.getId(), + e); + // This can happen when the record has already been rolled back by another transaction. In // this case, we just ignore it. return; diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index a76ad4acbc..e3cf2755f1 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -706,7 +706,7 @@ private void validateScanResults( try { scanner.close(); } catch (IOException e) { - logger.warn("Failed to close the scanner", e); + logger.warn("Failed to close the scanner. Transaction ID: {}", getId(), e); } } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java index f7ba7b3ab2..2a8ac9b2f8 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java @@ -146,7 +146,7 @@ public void update(Update update) throws CrudException { throw new UnsatisfiedConditionException( ConsensusCommitUtils.convertUnsatisfiedConditionExceptionMessageForUpdate( e, update.getCondition().get()), - crud.getSnapshot().getId()); + getId()); } // If the condition is not specified, it means that the record does not exist. In this case, @@ -185,12 +185,16 @@ public void prepare() throws PreparationException { crud.readIfImplicitPreReadEnabled(); } catch (CrudConflictException e) { throw new PreparationConflictException( - CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(), + CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage( + e.getMessage()), e, getId()); } catch (CrudException e) { throw new PreparationException( - CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId()); + CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage( + e.getMessage()), + e, + getId()); } try { @@ -238,7 +242,7 @@ public void rollback() throws RollbackException { try { crud.closeScanners(); } catch (CrudException e) { - logger.warn("Failed to close the scanner", e); + logger.warn("Failed to close the scanner. Transaction ID: {}", getId(), e); } if (!needRollback) { @@ -246,7 +250,7 @@ public void rollback() throws RollbackException { } try { - TransactionState state = commit.abortState(crud.getSnapshot().getId()); + TransactionState state = commit.abortState(getId()); if (state == TransactionState.COMMITTED) { throw new RollbackException( CoreError.CONSENSUS_COMMIT_ROLLBACK_FAILED_BECAUSE_TRANSACTION_ALREADY_COMMITTED diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 4a82b5d9ad..7d848d9b21 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -423,7 +423,8 @@ private void rollbackTransaction(TwoPhaseCommitTransaction transaction) { try { transaction.rollback(); } catch (RollbackException e) { - logger.warn("Rolling back the transaction failed", e); + logger.warn( + "Rolling back the transaction failed. Transaction ID: {}", transaction.getId(), e); } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutorTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutorTest.java index 3b61c0d764..d4b66f6465 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutorTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutorTest.java @@ -348,9 +348,8 @@ private TransactionResult prepareRolledForwardResult() { execute_ReturnLatestResultAndRecoverType_TransactionNotExpiredAndNoCoordinatorState_ShouldThrowUncommittedRecordException() throws CoordinatorException, ExecutionException { // Arrange - TransactionResult transactionResult = mock(TransactionResult.class); - when(transactionResult.getId()).thenReturn(ANY_ID_1); - when(coordinator.getState(ANY_ID_1)).thenReturn(Optional.empty()); + TransactionResult transactionResult = prepareResult(TransactionState.PREPARED); + when(coordinator.getState(ANY_ID_2)).thenReturn(Optional.empty()); when(recovery.isTransactionExpired(transactionResult)).thenReturn(false); // Act Assert diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryHandlerTest.java index 626795292e..50c7134840 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryHandlerTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.scalar.db.api.DistributedStorage; @@ -35,6 +36,8 @@ public class RecoveryHandlerTest { + private static final String ANY_NAMESPACE_NAME = "tbl"; + private static final String ANY_TABLE_NAME = "tbl"; private static final String ANY_NAME_1 = "name1"; private static final String ANY_TEXT_1 = "text1"; private static final String ANY_ID_1 = "id1"; @@ -49,7 +52,8 @@ public class RecoveryHandlerTest { @Mock private DistributedStorage storage; @Mock private Coordinator coordinator; - @Mock private TransactionTableMetadataManager tableMetadataManager; + @Mock private TransactionTableMetadataManager transactionTableMetadataManager; + @Mock private TransactionTableMetadata transactionTableMetadata; @Mock private Selection selection; private RecoveryHandler handler; @@ -59,7 +63,15 @@ public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); // Arrange - handler = spy(new RecoveryHandler(storage, coordinator, tableMetadataManager)); + handler = spy(new RecoveryHandler(storage, coordinator, transactionTableMetadataManager)); + + when(selection.forFullTableName()) + .thenReturn(Optional.of(ANY_NAMESPACE_NAME + "." + ANY_TABLE_NAME)); + when(selection.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); + when(selection.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); + when(transactionTableMetadataManager.getTransactionTableMetadata(selection)) + .thenReturn(transactionTableMetadata); + when(transactionTableMetadata.getTableMetadata()).thenReturn(TABLE_METADATA); } private TransactionResult prepareResult(long preparedAt, TransactionState transactionState) {