diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java index f5cdb0bcf9..2dc86ebf42 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java @@ -21,10 +21,12 @@ import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.util.ActiveExpiringMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +105,11 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId)); } + /** + * The methods of this class are synchronized to be thread-safe because the rollback() method may + * be called from the expiration handler in a different thread while other methods are being + * executed. + */ @VisibleForTesting class ActiveTransaction extends DecoratedDistributedTransaction { @@ -124,7 +131,37 @@ public synchronized List scan(Scan scan) throws CrudException { @Override public synchronized Scanner getScanner(Scan scan) throws CrudException { - return super.getScanner(scan); + Scanner scanner = super.getScanner(scan); + return new Scanner() { + @Override + public Optional one() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.one(); + } + } + + @Override + public List all() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.all(); + } + } + + @Override + public void close() throws CrudException { + synchronized (ActiveTransaction.this) { + scanner.close(); + } + } + + @Nonnull + @Override + public Iterator iterator() { + synchronized (ActiveTransaction.this) { + return scanner.iterator(); + } + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java index af5d630ef2..820fba1fcc 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java @@ -23,10 +23,12 @@ import com.scalar.db.exception.transaction.ValidationException; import com.scalar.db.util.ActiveExpiringMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,6 +111,11 @@ public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundE CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId)); } + /** + * The methods of this class are synchronized to be thread-safe because the rollback() method may + * be called from the expiration handler in a different thread while other methods are being + * executed. + */ @VisibleForTesting class ActiveTransaction extends DecoratedTwoPhaseCommitTransaction { @@ -130,7 +137,37 @@ public synchronized List scan(Scan scan) throws CrudException { @Override public synchronized Scanner getScanner(Scan scan) throws CrudException { - return super.getScanner(scan); + Scanner scanner = super.getScanner(scan); + return new Scanner() { + @Override + public Optional one() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.one(); + } + } + + @Override + public List all() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.all(); + } + } + + @Override + public void close() throws CrudException { + synchronized (ActiveTransaction.this) { + scanner.close(); + } + } + + @Nonnull + @Override + public Iterator iterator() { + synchronized (ActiveTransaction.this) { + return scanner.iterator(); + } + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */