2020
2121import java .util .Map ;
2222import java .util .function .Function ;
23+ import java .util .function .Supplier ;
2324
2425import org .reactivestreams .Publisher ;
2526
2930import org .springframework .data .cassandra .ReactiveSession ;
3031import org .springframework .data .cassandra .ReactiveSessionFactory ;
3132import org .springframework .data .cassandra .core .cql .session .DefaultReactiveSessionFactory ;
33+ import org .springframework .data .util .Lazy ;
3234import org .springframework .lang .Nullable ;
3335import org .springframework .util .Assert ;
3436
@@ -299,7 +301,7 @@ public <T> Flux<T> execute(ReactiveSessionCallback<T> action) throws DataAccessE
299301
300302 Assert .notNull (action , "Callback object must not be null" );
301303
302- return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , toCql (action )));
304+ return createFlux (action ).onErrorMap (translateException ("ReactiveSessionCallback" , () -> toCql (action )));
303305 }
304306
305307 // -------------------------------------------------------------------------
@@ -394,14 +396,16 @@ public <T> Flux<T> query(Statement<?> statement, ReactiveResultSetExtractor<T> r
394396 Assert .notNull (statement , "CQL Statement must not be null" );
395397 Assert .notNull (rse , "ReactiveResultSetExtractor must not be null" );
396398
399+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
400+
397401 return createFlux (statement , (session , stmt ) -> {
398402
399403 if (logger .isDebugEnabled ()) {
400- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
404+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
401405 }
402406
403407 return session .execute (applyStatementSettings (statement )).flatMapMany (rse ::extractData );
404- }).onErrorMap (translateException ("Query" , toCql ( statement ) ));
408+ }).onErrorMap (translateException ("Query" , cql ));
405409 }
406410
407411 @ Override
@@ -440,20 +444,22 @@ public Mono<ReactiveResultSet> queryForResultSet(Statement<?> statement) throws
440444
441445 Assert .notNull (statement , "CQL Statement must not be null" );
442446
447+ Lazy <String > cql = Lazy .of (() -> toCql (statement ));
448+
443449 return createMono (statement , (session , executedStatement ) -> {
444450
445451 if (logger .isDebugEnabled ()) {
446- logger .debug (String .format ("Executing statement [%s]" , toCql ( statement )));
452+ logger .debug (String .format ("Executing statement [%s]" , cql . get ( )));
447453 }
448454
449455 return session .execute (applyStatementSettings (executedStatement ));
450- }).onErrorMap (translateException ("QueryForResultSet" , toCql ( statement ) ));
456+ }).onErrorMap (translateException ("QueryForResultSet" , cql ));
451457 }
452458
453459 @ Override
454460 public Flux <Row > queryForRows (Statement <?> statement ) throws DataAccessException {
455461 return queryForResultSet (statement ).flatMapMany (ReactiveResultSet ::rows )
456- .onErrorMap (translateException ("QueryForRows" , toCql (statement )));
462+ .onErrorMap (translateException ("QueryForRows" , () -> toCql (statement )));
457463 }
458464
459465 // -------------------------------------------------------------------------
@@ -467,14 +473,16 @@ public <T> Flux<T> execute(ReactivePreparedStatementCreator psc, ReactivePrepare
467473 Assert .notNull (psc , "ReactivePreparedStatementCreator must not be null" );
468474 Assert .notNull (action , "ReactivePreparedStatementCallback object must not be null" );
469475
476+ Lazy <String > cql = Lazy .of (() -> toCql (psc ));
477+
470478 return createFlux (session -> {
471479
472480 if (logger .isDebugEnabled ()) {
473- logger .debug (String .format ("Preparing statement [%s] using %s" , toCql ( psc ), psc ));
481+ logger .debug (String .format ("Preparing statement [%s] using %s" , cql . get ( ), psc ));
474482 }
475483
476484 return psc .createPreparedStatement (session ).flatMapMany (ps -> action .doInPreparedStatement (session , ps ));
477- }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , toCql ( psc ) ));
485+ }).onErrorMap (translateException ("ReactivePreparedStatementCallback" , cql ));
478486 }
479487
480488 @ Override
@@ -510,7 +518,7 @@ public <T> Flux<T> query(ReactivePreparedStatementCreator psc,
510518 : preparedStatement .bind ());
511519
512520 return session .execute (applyStatementSettings (boundStatement ));
513- }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , toCql (psc )));
521+ }).flatMap (rse ::extractData )).onErrorMap (translateException ("Query" , () -> toCql (psc )));
514522 }
515523
516524 @ Override
@@ -696,7 +704,20 @@ protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
696704 * @see CqlProvider
697705 */
698706 protected Function <Throwable , Throwable > translateException (String task , @ Nullable String cql ) {
699- return throwable -> throwable instanceof DriverException ? translate (task , cql , (DriverException ) throwable )
707+ return translateException (task , () -> cql );
708+ }
709+
710+ /**
711+ * Exception translation {@link Function} intended for {@link Mono#onErrorMap(Function)} usage.
712+ *
713+ * @param task readable text describing the task being attempted
714+ * @param cql supplier of CQL query or update that caused the problem (may be {@literal null})
715+ * @return the exception translation {@link Function}
716+ * @since 3.3.3
717+ * @see CqlProvider
718+ */
719+ protected Function <Throwable , Throwable > translateException (String task , Supplier <String > cql ) {
720+ return throwable -> throwable instanceof DriverException ? translate (task , cql .get (), (DriverException ) throwable )
700721 : throwable ;
701722 }
702723
0 commit comments