Skip to content

Commit 7975e90

Browse files
committed
Use BeforeConvertCallback.onBeforeConvert(…) outcome for the actual insert.
We now use correctly the result of the `onBeforeConvert` callback for insert instead of the original entity. Closes #1295
1 parent 06e46a0 commit 7975e90

File tree

4 files changed

+174
-98
lines changed

4 files changed

+174
-98
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,24 @@
2323
import java.util.stream.Collectors;
2424
import java.util.stream.StreamSupport;
2525

26+
import com.datastax.oss.driver.api.core.CqlIdentifier;
27+
import com.datastax.oss.driver.api.core.CqlSession;
28+
import com.datastax.oss.driver.api.core.DriverException;
29+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
30+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
31+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
32+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
33+
import com.datastax.oss.driver.api.core.cql.ResultSet;
34+
import com.datastax.oss.driver.api.core.cql.Row;
35+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
36+
import com.datastax.oss.driver.api.core.cql.Statement;
37+
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
38+
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
39+
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
40+
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
41+
import com.datastax.oss.driver.api.querybuilder.select.Select;
42+
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
43+
import com.datastax.oss.driver.api.querybuilder.update.Update;
2644
import org.apache.commons.logging.Log;
2745
import org.apache.commons.logging.LogFactory;
2846

@@ -66,25 +84,6 @@
6684
import org.springframework.util.Assert;
6785
import org.springframework.util.concurrent.ListenableFuture;
6886

69-
import com.datastax.oss.driver.api.core.CqlIdentifier;
70-
import com.datastax.oss.driver.api.core.CqlSession;
71-
import com.datastax.oss.driver.api.core.DriverException;
72-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
73-
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
74-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
75-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
76-
import com.datastax.oss.driver.api.core.cql.ResultSet;
77-
import com.datastax.oss.driver.api.core.cql.Row;
78-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
79-
import com.datastax.oss.driver.api.core.cql.Statement;
80-
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
81-
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
82-
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
83-
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
84-
import com.datastax.oss.driver.api.querybuilder.select.Select;
85-
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
86-
import com.datastax.oss.driver.api.querybuilder.update.Update;
87-
8887
/**
8988
* Primary implementation of {@link AsyncCassandraOperations}. It simplifies the use of asynchronous Cassandra usage and
9089
* helps to avoid common errors. It executes core Cassandra workflow. This class executes CQL queries or updates,
@@ -434,7 +433,8 @@ public <T> ListenableFuture<List<T>> select(Query query, Class<T> entityClass) t
434433
Assert.notNull(query, "Query must not be null");
435434
Assert.notNull(entityClass, "Entity type must not be null");
436435

437-
return select(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass)).build(), entityClass);
436+
return select(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass))
437+
.build(), entityClass);
438438
}
439439

440440
/* (non-Javadoc)
@@ -448,7 +448,8 @@ public <T> ListenableFuture<Void> select(Query query, Consumer<T> entityConsumer
448448
Assert.notNull(entityConsumer, "Entity Consumer must not be empty");
449449
Assert.notNull(entityClass, "Entity type must not be null");
450450

451-
return select(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass)).build(), entityConsumer,
451+
return select(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass))
452+
.build(), entityConsumer,
452453
entityClass);
453454
}
454455

@@ -461,7 +462,8 @@ public <T> ListenableFuture<T> selectOne(Query query, Class<T> entityClass) thro
461462
Assert.notNull(query, "Query must not be null");
462463
Assert.notNull(entityClass, "Entity type must not be null");
463464

464-
return selectOne(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass)).build(),
465+
return selectOne(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass))
466+
.build(),
465467
entityClass);
466468
}
467469

@@ -474,7 +476,8 @@ public <T> ListenableFuture<Slice<T>> slice(Query query, Class<T> entityClass) t
474476
Assert.notNull(query, "Query must not be null");
475477
Assert.notNull(entityClass, "Entity type must not be null");
476478

477-
return slice(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass)).build(), entityClass);
479+
return slice(getStatementFactory().select(query, getRequiredPersistentEntity(entityClass))
480+
.build(), entityClass);
478481
}
479482

480483
/* (non-Javadoc)
@@ -488,7 +491,8 @@ public ListenableFuture<Boolean> update(Query query, org.springframework.data.ca
488491
Assert.notNull(update, "Update must not be null");
489492
Assert.notNull(entityClass, "Entity type must not be null");
490493

491-
return doExecute(getStatementFactory().update(query, update, getRequiredPersistentEntity(entityClass)).build(),
494+
return doExecute(getStatementFactory().update(query, update, getRequiredPersistentEntity(entityClass))
495+
.build(),
492496
AsyncResultSet::wasApplied);
493497
}
494498

@@ -514,7 +518,8 @@ private ListenableFuture<Boolean> doDelete(Query query, Class<?> entityClass, Cq
514518

515519
ListenableFuture<Boolean> future = doExecute(delete, AsyncResultSet::wasApplied);
516520

517-
future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {});
521+
future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {
522+
});
518523

519524
return future;
520525
}
@@ -557,7 +562,8 @@ ListenableFuture<Long> doCount(Query query, Class<?> entityClass, CqlIdentifier
557562

558563
SingleColumnRowMapper<Long> mapper = SingleColumnRowMapper.newInstance(Long.class);
559564

560-
Row row = DataAccessUtils.requiredSingleResult(Streamable.of(it.currentPage()).toList());
565+
Row row = DataAccessUtils.requiredSingleResult(Streamable.of(it.currentPage())
566+
.toList());
561567
return mapper.mapRow(row, 0);
562568
});
563569

@@ -640,7 +646,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> doInsert(T entity, WriteOptio
640646
getConverter().getConversionService());
641647
CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
642648

643-
T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entity;
649+
T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : source.getBean();
644650

645651
StatementBuilder<RegularInsert> builder = getStatementFactory().insert(entityToUse, options, persistentEntity,
646652
tableName);
@@ -760,7 +766,8 @@ private ListenableFuture<WriteResult> doDeleteVersioned(Object entity, QueryOpti
760766
StatementBuilder<Delete> delete = getStatementFactory().delete(entity, options, getConverter(), tableName);
761767
;
762768

763-
return executeDelete(entity, tableName, source.appendVersionCondition(delete).build(), result -> {
769+
return executeDelete(entity, tableName, source.appendVersionCondition(delete)
770+
.build(), result -> {
764771

765772
if (!result.wasApplied()) {
766773
throw new OptimisticLockingFailureException(
@@ -774,7 +781,8 @@ private ListenableFuture<WriteResult> doDelete(Object entity, QueryOptions optio
774781

775782
StatementBuilder<Delete> delete = getStatementFactory().delete(entity, options, getConverter(), tableName);
776783

777-
return executeDelete(entity, tableName, delete.build(), result -> {});
784+
return executeDelete(entity, tableName, delete.build(), result -> {
785+
});
778786
}
779787

780788
/* (non-Javadoc)
@@ -795,7 +803,8 @@ public ListenableFuture<Boolean> deleteById(Object id, Class<?> entityClass) {
795803
maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableName));
796804

797805
ListenableFuture<Boolean> future = doExecute(delete, AsyncResultSet::wasApplied);
798-
future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {});
806+
future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {
807+
});
799808

800809
return future;
801810
}
@@ -815,7 +824,8 @@ public ListenableFuture<Void> truncate(Class<?> entityClass) {
815824
maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName));
816825

817826
ListenableFuture<Boolean> future = doExecute(statement, AsyncResultSet::wasApplied);
818-
future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)), e -> {});
827+
future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)), e -> {
828+
});
819829

820830
return new MappingListenableFutureAdapter<>(future, aBoolean -> null);
821831
}
@@ -840,7 +850,8 @@ protected AsyncPreparedStatementHandler createPreparedStatementHandler(Statement
840850
private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIdentifier tableName,
841851
SimpleStatement statement) {
842852

843-
return executeSave(entity, tableName, statement, ignore -> {});
853+
return executeSave(entity, tableName, statement, ignore -> {
854+
});
844855
}
845856

846857
private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIdentifier tableName,
@@ -924,11 +935,13 @@ private <T> ListenableFuture<T> doExecute(Statement<?> statement, Function<Async
924935
}
925936

926937
private static List<Row> getFirstPage(AsyncResultSet resultSet) {
927-
return StreamSupport.stream(resultSet.currentPage().spliterator(), false).collect(Collectors.toList());
938+
return StreamSupport.stream(resultSet.currentPage().spliterator(), false)
939+
.collect(Collectors.toList());
928940
}
929941

930942
private static int getConfiguredPageSize(CqlSession session) {
931-
return session.getContext().getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000);
943+
return session.getContext().getConfig().getDefaultProfile()
944+
.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000);
932945
}
933946

934947
private int getEffectivePageSize(Statement<?> statement) {

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,24 @@
2020
import java.util.function.Function;
2121
import java.util.stream.Stream;
2222

23+
import com.datastax.oss.driver.api.core.CqlIdentifier;
24+
import com.datastax.oss.driver.api.core.CqlSession;
25+
import com.datastax.oss.driver.api.core.DriverException;
26+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
27+
import com.datastax.oss.driver.api.core.cql.BatchType;
28+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
29+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
30+
import com.datastax.oss.driver.api.core.cql.ResultSet;
31+
import com.datastax.oss.driver.api.core.cql.Row;
32+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
33+
import com.datastax.oss.driver.api.core.cql.Statement;
34+
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
35+
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
36+
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
37+
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
38+
import com.datastax.oss.driver.api.querybuilder.select.Select;
39+
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
40+
import com.datastax.oss.driver.api.querybuilder.update.Update;
2341
import org.apache.commons.logging.Log;
2442
import org.apache.commons.logging.LogFactory;
2543

@@ -61,25 +79,6 @@
6179
import org.springframework.lang.Nullable;
6280
import org.springframework.util.Assert;
6381

64-
import com.datastax.oss.driver.api.core.CqlIdentifier;
65-
import com.datastax.oss.driver.api.core.CqlSession;
66-
import com.datastax.oss.driver.api.core.DriverException;
67-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
68-
import com.datastax.oss.driver.api.core.cql.BatchType;
69-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
70-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
71-
import com.datastax.oss.driver.api.core.cql.ResultSet;
72-
import com.datastax.oss.driver.api.core.cql.Row;
73-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
74-
import com.datastax.oss.driver.api.core.cql.Statement;
75-
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
76-
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
77-
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
78-
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
79-
import com.datastax.oss.driver.api.querybuilder.select.Select;
80-
import com.datastax.oss.driver.api.querybuilder.truncate.Truncate;
81-
import com.datastax.oss.driver.api.querybuilder.update.Update;
82-
8382
/**
8483
* Primary implementation of {@link CassandraOperations}. It simplifies the use of Cassandra usage and helps to avoid
8584
* common errors. It executes core Cassandra workflow. This class executes CQL queries or updates, initiating iteration
@@ -670,7 +669,7 @@ <T> EntityWriteResult<T> doInsert(T entity, WriteOptions options, CqlIdentifier
670669
AdaptibleEntity<T> source = getEntityOperations().forEntity(maybeCallBeforeConvert(entity, tableName),
671670
getConverter().getConversionService());
672671

673-
T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entity;
672+
T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : source.getBean();
674673

675674
StatementBuilder<RegularInsert> builder = getStatementFactory().insert(entityToUse, options,
676675
source.getPersistentEntity(), tableName);
@@ -737,7 +736,8 @@ private <T> EntityWriteResult<T> doUpdateVersioned(T entity, UpdateOptions optio
737736
T toSave = source.incrementVersion();
738737

739738
StatementBuilder<Update> builder = getStatementFactory().update(toSave, options, persistentEntity, tableName);
740-
SimpleStatement update = source.appendVersionCondition(builder, previousVersion).build();
739+
SimpleStatement update = source.appendVersionCondition(builder, previousVersion)
740+
.build();
741741

742742
return executeSave(toSave, tableName, update, result -> {
743743

@@ -781,7 +781,8 @@ public WriteResult delete(Object entity, QueryOptions options) {
781781
StatementBuilder<Delete> builder = getStatementFactory().delete(entity, options, getConverter(), tableName);
782782

783783
return source.isVersionedEntity()
784-
? doDeleteVersioned(source.appendVersionCondition(builder).build(), entity, source, tableName)
784+
? doDeleteVersioned(source.appendVersionCondition(builder)
785+
.build(), entity, source, tableName)
785786
: doDelete(builder.build(), entity, tableName);
786787

787788
}
@@ -800,7 +801,8 @@ private WriteResult doDeleteVersioned(SimpleStatement statement, Object entity,
800801
}
801802

802803
private WriteResult doDelete(SimpleStatement delete, Object entity, CqlIdentifier tableName) {
803-
return executeDelete(entity, tableName, delete, result -> {});
804+
return executeDelete(entity, tableName, delete, result -> {
805+
});
804806
}
805807

806808
/* (non-Javadoc)
@@ -900,7 +902,8 @@ protected PreparedStatementHandler createPreparedStatementHandler(Statement<?> s
900902
}
901903

902904
private <T> EntityWriteResult<T> executeSave(T entity, CqlIdentifier tableName, SimpleStatement statement) {
903-
return executeSave(entity, tableName, statement, ignore -> {});
905+
return executeSave(entity, tableName, statement, ignore -> {
906+
});
904907
}
905908

906909
private <T> EntityWriteResult<T> executeSave(T entity, CqlIdentifier tableName, SimpleStatement statement,
@@ -977,7 +980,8 @@ private <T> T doExecute(Statement<?> statement, Function<ResultSet, T> mappingFu
977980
}
978981

979982
private int getConfiguredPageSize(CqlSession session) {
980-
return session.getContext().getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000);
983+
return session.getContext().getConfig().getDefaultProfile()
984+
.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000);
981985
}
982986

983987
@SuppressWarnings("ConstantConditions")

spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/AsyncCassandraTemplateUnitTests.java

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@
1515
*/
1616
package org.springframework.data.cassandra.core;
1717

18-
import static org.assertj.core.api.Assertions.*;
19-
import static org.mockito.ArgumentMatchers.*;
20-
import static org.mockito.Mockito.*;
21-
import static org.springframework.data.cassandra.core.query.Criteria.*;
22-
2318
import java.util.ArrayList;
2419
import java.util.Collections;
2520
import java.util.List;
2621
import java.util.concurrent.CompletableFuture;
2722
import java.util.concurrent.ExecutionException;
2823
import java.util.concurrent.Future;
2924

25+
import com.datastax.oss.driver.api.core.CqlIdentifier;
26+
import com.datastax.oss.driver.api.core.CqlSession;
27+
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
28+
import com.datastax.oss.driver.api.core.context.DriverContext;
29+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
30+
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
31+
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
32+
import com.datastax.oss.driver.api.core.cql.Row;
33+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
34+
import com.datastax.oss.driver.api.core.cql.Statement;
35+
import com.datastax.oss.driver.api.core.type.DataTypes;
36+
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
37+
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
3038
import org.junit.jupiter.api.BeforeEach;
3139
import org.junit.jupiter.api.Test;
3240
import org.junit.jupiter.api.extension.ExtendWith;
@@ -48,19 +56,10 @@
4856
import org.springframework.data.mapping.callback.EntityCallbacks;
4957
import org.springframework.util.concurrent.ListenableFuture;
5058

51-
import com.datastax.oss.driver.api.core.CqlIdentifier;
52-
import com.datastax.oss.driver.api.core.CqlSession;
53-
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
54-
import com.datastax.oss.driver.api.core.context.DriverContext;
55-
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
56-
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
57-
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
58-
import com.datastax.oss.driver.api.core.cql.Row;
59-
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
60-
import com.datastax.oss.driver.api.core.cql.Statement;
61-
import com.datastax.oss.driver.api.core.type.DataTypes;
62-
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
63-
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
59+
import static org.assertj.core.api.Assertions.*;
60+
import static org.mockito.ArgumentMatchers.*;
61+
import static org.mockito.Mockito.*;
62+
import static org.springframework.data.cassandra.core.query.Criteria.*;
6463

6564
/**
6665
* Unit tests for {@link AsyncCassandraTemplate}.
@@ -326,7 +325,28 @@ void insertShouldInsertEntity() {
326325
assertThat(beforeSave).isSameAs(user);
327326
}
328327

329-
@Test // DATACASS-618
328+
@Test
329+
// GH-1295
330+
void insertShouldConsiderEntityAfterCallback() {
331+
332+
when(resultSet.wasApplied()).thenReturn(true);
333+
334+
User user = new User("heisenberg", "Walter", "White");
335+
336+
EntityCallbacks callbacks = EntityCallbacks.create();
337+
callbacks.addEntityCallback((BeforeConvertCallback<Object>) (entity, tableName) -> new User("ww", "Walter", "White"));
338+
template.setEntityCallbacks(callbacks);
339+
340+
ListenableFuture<User> future = template.insert(user);
341+
342+
assertThat(getUninterruptibly(future)).isNotSameAs(user);
343+
verify(session).executeAsync(statementCaptor.capture());
344+
assertThat(render(statementCaptor.getValue()))
345+
.isEqualTo("INSERT INTO users (firstname,id,lastname) VALUES ('Walter','ww','White')");
346+
}
347+
348+
@Test
349+
// DATACASS-618
330350
void insertShouldInsertVersionedEntity() {
331351

332352
when(resultSet.wasApplied()).thenReturn(true);

0 commit comments

Comments
 (0)