2020
2121import java .util .Map ;
2222import java .util .function .Function ;
23+ import java .util .function .Supplier ;
2324
2425import org .reactivestreams .Publisher ;
2526
2930import org .springframework .data .cassandra .ReactiveSession ;
3031import org .springframework .data .cassandra .ReactiveSessionFactory ;
3132import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
33+ import org .springframework .data .util .Lazy ;
3234import org .springframework .lang .Nullable ;
3335import org .springframework .util .Assert ;
3436
@@ -302,7 +304,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
302304
303305 Assert .notNull (action , "Callback object must not be null" );
304306
305- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
307+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , () -> toCql (action )));
306308 }
307309
308310 // -------------------------------------------------------------------------
@@ -436,14 +438,16 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
436438 Assert .notNull (statement , "CQL Statement must not be null" );
437439 Assert .notNull (rse , "ReactiveResultSetExtractor must not be null" );
438440
441+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
442+
439443 return createFlux (statement , (session , stmt ) -> {
440444
441445 if (logger .isDebugEnabled ()) {
442- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
446+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
443447 }
444448
445449 return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
446- }).onErrorMap (translateException ("Query" , toCql ( statement ) ));
450+ }).onErrorMap (translateException ("Query" , cql ));
447451 }
448452
449453 /* (non-Javadoc)
@@ -503,20 +507,22 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
503507
504508 Assert .notNull (statement , "CQL Statement must not be null" );
505509
510+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
511+
506512 return createMono (statement , (session , executedStatement ) -> {
507513
508514 if (logger .isDebugEnabled ()) {
509- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
515+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
510516 }
511517
512518 return session .execute (applyStatementSettings (executedStatement ));
513- }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement ) ));
519+ }).onErrorMap (translateException ("QueryForResultSet" , cql ));
514520 }
515521
516522 @ Override
517523 public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
518524 return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
519- .onErrorMap (translateException ("QueryForRows" , toCql (statement )));
525+ .onErrorMap (translateException ("QueryForRows" , () -> toCql (statement )));
520526 }
521527
522528 // -------------------------------------------------------------------------
@@ -533,14 +539,16 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
533539 Assert .notNull (psc , "ReactivePreparedStatementCreator must not be null" );
534540 Assert .notNull (action , "ReactivePreparedStatementCallback object must not be null" );
535541
542+ Lazy <String > cql = Lazy .of (() -> toCql (psc ));
543+
536544 return createFlux (session -> {
537545
538546 if (logger .isDebugEnabled ()) {
539- logger .debug (String .format ("Preparing statement [%s] using %s" , toCql ( psc ), psc ));
547+ logger .debug (String .format ("Preparing statement [%s] using %s" , cql . get ( ), psc ));
540548 }
541549
542550 return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
543- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql ( psc ) ));
551+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , cql ));
544552 }
545553
546554 /* (non-Javadoc)
@@ -579,7 +587,7 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
579587 : preparedStatement .bind ());
580588
581589 return session .execute (applyStatementSettings (boundStatement ));
582- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
590+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , () -> toCql (psc )));
583591 }
584592
585593 /* (non-Javadoc)
@@ -819,7 +827,20 @@ protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
819827 * @see CqlProvider
820828 */
821829 protected Function <Throwable , Throwable > translateException (String task , @ Nullable String cql ) {
822- return throwable -> throwable instanceof DriverException ? translate (task , cql , (DriverException ) throwable )
830+ return translateException (task , () -> cql );
831+ }
832+
833+ /**
834+ * Exception translation {@link Function} intended for {@link Mono#onErrorMap(Function)} usage.
835+ *
836+ * @param task readable text describing the task being attempted
837+ * @param cql supplier of CQL query or update that caused the problem (may be {@literal null})
838+ * @return the exception translation {@link Function}
839+ * @since 3.3.3
840+ * @see CqlProvider
841+ */
842+ protected Function <Throwable , Throwable > translateException (String task , Supplier <String > cql ) {
843+ return throwable -> throwable instanceof DriverException ? translate (task , cql .get (), (DriverException ) throwable )
823844 : throwable ;
824845 }
825846
0 commit comments