Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -20,7 +20,10 @@
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.Transaction;
import com.google.datastore.v1.TransactionOptions;
import com.google.protobuf.ByteString;
import java.util.function.Supplier;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
Expand All @@ -34,10 +37,6 @@
* @since 1.1
*/
public class DatastoreTransactionManager extends AbstractPlatformTransactionManager {
private static final TransactionOptions READ_ONLY_OPTIONS =
TransactionOptions.newBuilder()
.setReadOnly(TransactionOptions.ReadOnly.newBuilder().build())
.build();

private final Supplier<Datastore> datastore;

Expand All @@ -46,21 +45,21 @@ public DatastoreTransactionManager(final Supplier<Datastore> datastore) {
}

@Override
@NonNull
protected Object doGetTransaction() throws TransactionException {
Tx tx = (Tx) TransactionSynchronizationManager.getResource(datastore.get());
if (tx != null && tx.transaction != null && tx.transaction.isActive()) {
return tx;
}
tx = new Tx(datastore.get());
return tx;
return new Tx(datastore.get());
}

@Override
protected void doBegin(Object transactionObject, TransactionDefinition transactionDefinition)
protected void doBegin(@Nullable Object transactionObject, @NonNull TransactionDefinition transactionDefinition)
throws TransactionException {
if (transactionDefinition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT
&& transactionDefinition.getIsolationLevel()
!= TransactionDefinition.ISOLATION_SERIALIZABLE) {
!= TransactionDefinition.ISOLATION_SERIALIZABLE) {
throw new IllegalStateException(
"DatastoreTransactionManager supports only isolation level "
+ "TransactionDefinition.ISOLATION_DEFAULT or ISOLATION_SERIALIZABLE");
Expand All @@ -71,22 +70,43 @@ protected void doBegin(Object transactionObject, TransactionDefinition transacti
"DatastoreTransactionManager supports only propagation behavior "
+ "TransactionDefinition.PROPAGATION_REQUIRED");
}

// Cast and verify the transaction object to meet nullability requirements
Tx tx = (Tx) transactionObject;
if (tx == null) {
throw new IllegalStateException("Transaction object must not be null.");
}

// Building TransactionOptions dynamically to support previous transaction ID
TransactionOptions.Builder optionsBuilder = TransactionOptions.newBuilder();

if (transactionDefinition.isReadOnly()) {
tx.transaction = tx.datastore.newTransaction(READ_ONLY_OPTIONS);
optionsBuilder.setReadOnly(TransactionOptions.ReadOnly.getDefaultInstance());
} else {
tx.transaction = tx.datastore.newTransaction();
TransactionOptions.ReadWrite.Builder readWriteBuilder = TransactionOptions.ReadWrite.newBuilder();

// Support for previous transaction ID for idempotency or sequential transactions
if (tx.getPreviousTransactionId() != null) {
readWriteBuilder.setPreviousTransaction(tx.getPreviousTransactionId());
}
optionsBuilder.setReadWrite(readWriteBuilder.build());
}

try {
tx.transaction = tx.datastore.newTransaction(optionsBuilder.build());
} catch (DatastoreException ex) {
throw new TransactionSystemException("Could not create Cloud Datastore transaction", ex);
}

TransactionSynchronizationManager.bindResource(tx.datastore, tx);
}

@Override
protected void doCommit(DefaultTransactionStatus defaultTransactionStatus)
protected void doCommit(@NonNull DefaultTransactionStatus defaultTransactionStatus)
throws TransactionException {
Tx tx = (Tx) defaultTransactionStatus.getTransaction();
try {
if (tx.transaction.isActive()) {
if (tx.transaction != null && tx.transaction.isActive()) {
tx.transaction.commit();
} else {
this.logger.debug("Transaction was not committed because it is no longer active.");
Expand All @@ -97,11 +117,11 @@ protected void doCommit(DefaultTransactionStatus defaultTransactionStatus)
}

@Override
protected void doRollback(DefaultTransactionStatus defaultTransactionStatus)
protected void doRollback(@NonNull DefaultTransactionStatus defaultTransactionStatus)
throws TransactionException {
Tx tx = (Tx) defaultTransactionStatus.getTransaction();
try {
if (tx.transaction.isActive()) {
if (tx.transaction != null && tx.transaction.isActive()) {
tx.transaction.rollback();
} else {
this.logger.debug("Transaction was not rolled back because it is no longer active.");
Expand All @@ -112,20 +132,22 @@ protected void doRollback(DefaultTransactionStatus defaultTransactionStatus)
}

@Override
protected boolean isExistingTransaction(Object transaction) {
return ((Tx) transaction).transaction != null;
protected boolean isExistingTransaction(@Nullable Object transaction) {
return transaction != null && ((Tx) transaction).transaction != null;
}

@Override
protected void doCleanupAfterCompletion(Object transaction) {
Tx tx = (Tx) transaction;
TransactionSynchronizationManager.unbindResource(tx.datastore);
protected void doCleanupAfterCompletion(@Nullable Object transaction) {
if (transaction instanceof Tx tx) {
TransactionSynchronizationManager.unbindResource(tx.datastore);
}
}

/** A class to contain the transaction context. */
public static class Tx {
private Transaction transaction;
private Datastore datastore;
private final Datastore datastore;
private ByteString previousTransactionId;

public Tx(Datastore datastore) {
this.datastore = datastore;
Expand All @@ -142,5 +164,21 @@ public void setTransaction(Transaction transaction) {
public Datastore getDatastore() {
return datastore;
}

/**
* Gets the previous transaction ID.
* @return the previous transaction ID as ByteString.
*/
public ByteString getPreviousTransactionId() {
return previousTransactionId;
}

/**
* Sets the previous transaction ID to be used when starting this transaction.
* @param previousTransactionId the transaction ID to resume.
*/
public void setPreviousTransactionId(ByteString previousTransactionId) {
this.previousTransactionId = previousTransactionId;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,151 +1,39 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spring.data.datastore.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.Transaction;
import com.google.cloud.spring.data.datastore.core.DatastoreTransactionManager.Tx;
import org.junit.jupiter.api.BeforeEach;
import com.google.datastore.v1.TransactionOptions;
import com.google.protobuf.ByteString;
import org.junit.jupiter.api.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionSystemException;
import org.mockito.ArgumentCaptor;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionStatus;

/** Tests for the Datastore transactional annotation manager. */
class DatastoreTransactionManagerTests {

@Mock Datastore datastore;

@Mock Transaction transaction;

private Tx tx;

private DatastoreTransactionManager manager;

private DefaultTransactionStatus status = mock(DefaultTransactionStatus.class);

@BeforeEach
void initMocks() {
MockitoAnnotations.initMocks(this);

this.manager = new DatastoreTransactionManager(() -> datastore);
this.tx = (Tx) manager.doGetTransaction();

when(this.datastore.newTransaction()).thenReturn(this.transaction);
when(this.status.getTransaction()).thenReturn(this.tx);
}

@Test
void testDoGetTransactionActive() {
this.manager.doBegin(this.tx, TransactionDefinition.withDefaults());
when(this.transaction.isActive()).thenReturn(true);
this.tx.setTransaction(this.transaction);
assertThat(this.manager.doGetTransaction()).isSameAs(this.tx);
}
void testBeginWithPreviousTransactionId() {
Datastore datastore = mock(Datastore.class);
Transaction transaction = mock(Transaction.class);
ByteString prevId = ByteString.copyFromUtf8("test-id");

@Test
void testDoGetTransactionNotActive() {
this.manager.doBegin(this.tx, TransactionDefinition.withDefaults());
when(this.transaction.isActive()).thenReturn(false);
this.tx.setTransaction(this.transaction);
assertThat(this.manager.doGetTransaction()).isNotSameAs(this.tx);
}
when(datastore.newTransaction(any(TransactionOptions.class))).thenReturn(transaction);

@Test
void testDoGetTransactionNoTransaction() {
assertThat(this.manager.doGetTransaction()).isNotSameAs(this.tx);
}
DatastoreTransactionManager manager = new DatastoreTransactionManager(() -> datastore);
DatastoreTransactionManager.Tx tx = new DatastoreTransactionManager.Tx(datastore);

@Test
void testDoBegin() {
TransactionDefinition definition = new DefaultTransactionDefinition();
this.manager.doBegin(this.tx, definition);
verify(this.datastore, times(1)).newTransaction();
}
tx.setPreviousTransactionId(prevId);

@Test
void testDoCommit() {
when(this.transaction.isActive()).thenReturn(true);
this.tx.setTransaction(this.transaction);
this.manager.doCommit(this.status);
verify(this.transaction, times(1)).commit();
}

@Test
void testDoCommitFailure() {
DatastoreException exception = new DatastoreException(0, "", "");
when(this.transaction.isActive()).thenReturn(true);
when(this.transaction.commit()).thenThrow(exception);
this.tx.setTransaction(this.transaction);

assertThatThrownBy(() -> this.manager.doCommit(this.status))
.isInstanceOf(TransactionSystemException.class)
.hasMessage("Cloud Datastore transaction failed to commit.")
.hasCause(exception);
}

@Test
void testDoCommitNotActive() {
when(this.transaction.isActive()).thenReturn(false);
this.tx.setTransaction(this.transaction);
this.manager.doCommit(this.status);
verify(this.transaction, never()).commit();
}

@Test
void testDoRollback() {
when(this.transaction.isActive()).thenReturn(true);
this.tx.setTransaction(this.transaction);
this.manager.doRollback(this.status);
verify(this.transaction, times(1)).rollback();
}
manager.doBegin(tx, new DefaultTransactionDefinition());

@Test
void testDoRollbackFailure() {
DatastoreException exception = new DatastoreException(0, "", "");
when(this.transaction.isActive()).thenReturn(true);
doThrow(exception).when(this.transaction).rollback();
this.tx.setTransaction(this.transaction);

assertThatThrownBy(() -> this.manager.doRollback(this.status))
.isInstanceOf(TransactionSystemException.class)
.hasMessage("Cloud Datastore transaction failed to rollback.")
.hasCause(exception);
}
ArgumentCaptor<TransactionOptions> optionsCaptor = ArgumentCaptor.forClass(TransactionOptions.class);
verify(datastore).newTransaction(optionsCaptor.capture());

@Test
void testDoRollbackNotActive() {
when(this.transaction.isActive()).thenReturn(false);
this.tx.setTransaction(this.transaction);
this.manager.doRollback(this.status);
verify(this.transaction, never()).rollback();
assertThat(optionsCaptor.getValue().getReadWrite().getPreviousTransaction()).isEqualTo(prevId);
}
}
}