1919import reactor .core .publisher .Mono ;
2020
2121import java .util .Map ;
22- import java .util .Optional ;
2322import java .util .function .Function ;
23+ import java .util .function .Supplier ;
2424
2525import org .reactivestreams .Publisher ;
2626
3030import org .springframework .data .cassandra .ReactiveSession ;
3131import org .springframework .data .cassandra .ReactiveSessionFactory ;
3232import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
33+ import org .springframework .data .util .Lazy ;
3334import org .springframework .lang .Nullable ;
3435import org .springframework .util .Assert ;
3536
@@ -303,7 +304,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
303304
304305 Assert .notNull (action , "Callback object must not be null" );
305306
306- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
307+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , () -> toCql (action )));
307308 }
308309
309310 // -------------------------------------------------------------------------
@@ -437,14 +438,16 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
437438 Assert .notNull (statement , "CQL Statement must not be null" );
438439 Assert .notNull (rse , "ReactiveResultSetExtractor must not be null" );
439440
441+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
442+
440443 return createFlux (statement , (session , stmt ) -> {
441444
442445 if (logger .isDebugEnabled ()) {
443- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
446+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
444447 }
445448
446449 return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
447- }).onErrorMap (translateException ("Query" , toCql ( statement ) ));
450+ }).onErrorMap (translateException ("Query" , cql ));
448451 }
449452
450453 /* (non-Javadoc)
@@ -504,20 +507,22 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
504507
505508 Assert .notNull (statement , "CQL Statement must not be null" );
506509
510+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
511+
507512 return createMono (statement , (session , executedStatement ) -> {
508513
509514 if (logger .isDebugEnabled ()) {
510- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
515+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
511516 }
512517
513518 return session .execute (applyStatementSettings (executedStatement ));
514- }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement ) ));
519+ }).onErrorMap (translateException ("QueryForResultSet" , cql ));
515520 }
516521
517522 @ Override
518523 public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
519524 return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
520- .onErrorMap (translateException ("QueryForRows" , toCql (statement )));
525+ .onErrorMap (translateException ("QueryForRows" , () -> toCql (statement )));
521526 }
522527
523528 // -------------------------------------------------------------------------
@@ -534,14 +539,16 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
534539 Assert .notNull (psc , "ReactivePreparedStatementCreator must not be null" );
535540 Assert .notNull (action , "ReactivePreparedStatementCallback object must not be null" );
536541
542+ Lazy <String > cql = Lazy .of (() -> toCql (psc ));
543+
537544 return createFlux (session -> {
538545
539546 if (logger .isDebugEnabled ()) {
540- logger .debug (String .format ("Preparing statement [%s] using %s" , toCql ( psc ), psc ));
547+ logger .debug (String .format ("Preparing statement [%s] using %s" , cql . get ( ), psc ));
541548 }
542549
543550 return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
544- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql ( psc ) ));
551+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , cql ));
545552 }
546553
547554 /* (non-Javadoc)
@@ -580,7 +587,7 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
580587 : preparedStatement .bind ());
581588
582589 return session .execute (applyStatementSettings (boundStatement ));
583- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
590+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , () -> toCql (psc )));
584591 }
585592
586593 /* (non-Javadoc)
@@ -820,7 +827,20 @@ protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
820827 * @see CqlProvider
821828 */
822829 protected Function <Throwable , Throwable > translateException (String task , @ Nullable String cql ) {
823- return throwable -> throwable instanceof DriverException ? translate (task , cql , (DriverException ) throwable )
830+ return translateException (task , () -> cql );
831+ }
832+
833+ /**
834+ * Exception translation {@link Function} intended for {@link Mono#onErrorMap(Function)} usage.
835+ *
836+ * @param task readable text describing the task being attempted
837+ * @param cql supplier of CQL query or update that caused the problem (may be {@literal null})
838+ * @return the exception translation {@link Function}
839+ * @since 3.2.10
840+ * @see CqlProvider
841+ */
842+ protected Function <Throwable , Throwable > translateException (String task , Supplier <String > cql ) {
843+ return throwable -> throwable instanceof DriverException ? translate (task , cql .get (), (DriverException ) throwable )
824844 : throwable ;
825845 }
826846
0 commit comments