1515 */
1616package org .springframework .data .cassandra .core ;
1717
18+ import reactor .core .publisher .Flux ;
19+ import reactor .core .publisher .Mono ;
20+ import reactor .core .publisher .SynchronousSink ;
21+
1822import java .util .Collections ;
1923import java .util .function .BiConsumer ;
2024import java .util .function .Function ;
2125
22- import com .datastax .oss .driver .api .core .CqlIdentifier ;
23- import com .datastax .oss .driver .api .core .DriverException ;
24- import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
25- import com .datastax .oss .driver .api .core .context .DriverContext ;
26- import com .datastax .oss .driver .api .core .cql .BatchType ;
27- import com .datastax .oss .driver .api .core .cql .BoundStatement ;
28- import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
29- import com .datastax .oss .driver .api .core .cql .Row ;
30- import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
31- import com .datastax .oss .driver .api .core .cql .Statement ;
32- import com .datastax .oss .driver .api .querybuilder .QueryBuilder ;
33- import com .datastax .oss .driver .api .querybuilder .delete .Delete ;
34- import com .datastax .oss .driver .api .querybuilder .insert .Insert ;
35- import com .datastax .oss .driver .api .querybuilder .insert .RegularInsert ;
36- import com .datastax .oss .driver .api .querybuilder .select .Select ;
37- import com .datastax .oss .driver .api .querybuilder .truncate .Truncate ;
38- import com .datastax .oss .driver .api .querybuilder .update .Update ;
3926import org .reactivestreams .Publisher ;
4027import org .slf4j .Logger ;
4128import org .slf4j .LoggerFactory ;
42- import reactor .core .publisher .Flux ;
43- import reactor .core .publisher .Mono ;
44- import reactor .core .publisher .SynchronousSink ;
4529
4630import org .springframework .beans .BeansException ;
4731import org .springframework .context .ApplicationContext ;
8266import org .springframework .lang .Nullable ;
8367import org .springframework .util .Assert ;
8468
69+ import com .datastax .oss .driver .api .core .CqlIdentifier ;
70+ import com .datastax .oss .driver .api .core .DriverException ;
71+ import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
72+ import com .datastax .oss .driver .api .core .context .DriverContext ;
73+ import com .datastax .oss .driver .api .core .cql .BatchType ;
74+ import com .datastax .oss .driver .api .core .cql .BoundStatement ;
75+ import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
76+ import com .datastax .oss .driver .api .core .cql .Row ;
77+ import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
78+ import com .datastax .oss .driver .api .core .cql .Statement ;
79+ import com .datastax .oss .driver .api .querybuilder .QueryBuilder ;
80+ import com .datastax .oss .driver .api .querybuilder .delete .Delete ;
81+ import com .datastax .oss .driver .api .querybuilder .insert .Insert ;
82+ import com .datastax .oss .driver .api .querybuilder .insert .RegularInsert ;
83+ import com .datastax .oss .driver .api .querybuilder .select .Select ;
84+ import com .datastax .oss .driver .api .querybuilder .truncate .Truncate ;
85+ import com .datastax .oss .driver .api .querybuilder .update .Update ;
86+
8587/**
8688 * Primary implementation of {@link ReactiveCassandraOperations}. It simplifies the use of Reactive Cassandra usage and
8789 * helps to avoid common errors. It executes core Cassandra workflow. This class executes CQL queries or updates,
@@ -852,6 +854,19 @@ public ReactiveUpdate update(Class<?> domainType) {
852854 // Implementation hooks and utility methods
853855 // -------------------------------------------------------------------------
854856
857+ /**
858+ * Create a new statement-based {@link ReactivePreparedStatementHandler} using the statement passed in.
859+ * <p>
860+ * This method allows for the creation to be overridden by subclasses.
861+ *
862+ * @param statement the statement to be prepared.
863+ * @return the new {@link PreparedStatementHandler} to use.
864+ * @since 3.2.10
865+ */
866+ protected ReactivePreparedStatementHandler createPreparedStatementHandler (Statement <?> statement ) {
867+ return new PreparedStatementHandler (statement );
868+ }
869+
855870 private <T > Mono <EntityWriteResult <T >> executeSave (T entity , CqlIdentifier tableName , SimpleStatement statement ) {
856871 return executeSave (entity , tableName , statement , (writeResult , sink ) -> sink .next (writeResult ));
857872 }
@@ -888,7 +903,7 @@ private <T> Flux<T> doQuery(Statement<?> statement, RowMapper<T> rowMapper) {
888903
889904 if (PreparedStatementDelegate .canPrepare (isUsePreparedStatements (), statement , logger )) {
890905
891- PreparedStatementHandler statementHandler = new PreparedStatementHandler (statement );
906+ ReactivePreparedStatementHandler statementHandler = createPreparedStatementHandler (statement );
892907 return getReactiveCqlOperations ().query (statementHandler , statementHandler , rowMapper );
893908 }
894909
@@ -899,7 +914,7 @@ private <T> Mono<T> doExecute(Statement<?> statement, Function<ReactiveResultSet
899914
900915 if (PreparedStatementDelegate .canPrepare (isUsePreparedStatements (), statement , logger )) {
901916
902- PreparedStatementHandler statementHandler = new PreparedStatementHandler (statement );
917+ ReactivePreparedStatementHandler statementHandler = createPreparedStatementHandler (statement );
903918 return getReactiveCqlOperations ()
904919 .query (statementHandler , statementHandler , rs -> Mono .just (mappingFunction .apply (rs ))).next ();
905920 }
@@ -912,7 +927,7 @@ private <T> Mono<T> doExecuteAndFlatMap(Statement<?> statement,
912927
913928 if (PreparedStatementDelegate .canPrepare (isUsePreparedStatements (), statement , logger )) {
914929
915- PreparedStatementHandler statementHandler = new PreparedStatementHandler (statement );
930+ ReactivePreparedStatementHandler statementHandler = createPreparedStatementHandler (statement );
916931 return getReactiveCqlOperations ().query (statementHandler , statementHandler , mappingFunction ::apply ).next ();
917932 }
918933
@@ -948,9 +963,7 @@ public String getCql() {
948963 }
949964 }
950965
951- return getReactiveCqlOperations ()
952- .execute (new GetConfiguredPageSize ())
953- .single ();
966+ return getReactiveCqlOperations ().execute (new GetConfiguredPageSize ()).single ();
954967 }
955968
956969 @ SuppressWarnings ("unchecked" )
@@ -1020,14 +1033,26 @@ protected <T> Mono<T> maybeCallBeforeSave(T object, CqlIdentifier tableName, Sta
10201033 return Mono .just (object );
10211034 }
10221035
1036+ /**
1037+ * General callback interface used to create and bind prepared CQL statements.
1038+ * <p>
1039+ * This interface prepares the CQL statement and sets values on a {@link PreparedStatement} as union-type comprised
1040+ * from {@link ReactivePreparedStatementCreator}, {@link PreparedStatementBinder}, and {@link CqlProvider}.
1041+ *
1042+ * @since 3.2.10
1043+ */
1044+ public interface ReactivePreparedStatementHandler
1045+ extends ReactivePreparedStatementCreator , PreparedStatementBinder , CqlProvider {
1046+
1047+ }
1048+
10231049 /**
10241050 * Utility class to prepare a {@link SimpleStatement} and bind values associated with the statement to a
10251051 * {@link BoundStatement}.
10261052 *
10271053 * @since 3.2
10281054 */
1029- private static class PreparedStatementHandler
1030- implements ReactivePreparedStatementCreator , PreparedStatementBinder , CqlProvider {
1055+ public static class PreparedStatementHandler implements ReactivePreparedStatementHandler {
10311056
10321057 private final SimpleStatement statement ;
10331058
0 commit comments