@@ -257,28 +257,31 @@ private static Flux<BackendMessage> fetchCursoredWithFlush(ExtendedFlowOperator
257257 }
258258
259259 private static BiConsumer <BackendMessage , SynchronousSink <BackendMessage >> handleReprepare (FluxSink <FrontendMessage > requests , ExtendedFlowOperator operator , MessageFactory messageFactory ) {
260-
261260 AtomicBoolean reprepared = new AtomicBoolean ();
262261
263262 return (message , sink ) -> {
264263
265- if (message instanceof ErrorResponse && requiresReprepare ((ErrorResponse ) message ) && reprepared . compareAndSet ( false , true ) ) {
264+ if (message instanceof ErrorResponse && requiresReprepare ((ErrorResponse ) message )) {
266265
267266 operator .evictCachedStatement ();
268267
269- List <FrontendMessage .DirectEncoder > messages = messageFactory .createMessages ();
270- if (!messages .contains (Sync .INSTANCE )) {
271- messages .add (0 , Sync .INSTANCE );
268+ if (reprepared .compareAndSet (false , true )) {
269+
270+ List <FrontendMessage .DirectEncoder > messages = messageFactory .createMessages ();
271+ if (!messages .contains (Sync .INSTANCE )) {
272+ messages .add (0 , Sync .INSTANCE );
273+ }
274+ requests .next (new CompositeFrontendMessage (messages ));
275+
276+ return ;
272277 }
273- requests .next (new CompositeFrontendMessage (messages ));
274- } else {
275- sink .next (message );
276278 }
279+
280+ sink .next (message );
277281 };
278282 }
279283
280284 private static boolean requiresReprepare (ErrorResponse errorResponse ) {
281-
282285 ErrorDetails details = new ErrorDetails (errorResponse .getFields ());
283286 String code = details .getCode ();
284287
@@ -308,7 +311,7 @@ interface MessageFactory {
308311 /**
309312 * Operator to encapsulate common activity around the extended flow. Subclasses {@link AtomicInteger} to capture the number of ReadyForQuery frames.
310313 */
311- static class ExtendedFlowOperator extends AtomicInteger {
314+ static class ExtendedFlowOperator extends AtomicInteger implements Predicate < BackendMessage > {
312315
313316 private final String sql ;
314317
@@ -332,7 +335,6 @@ public ExtendedFlowOperator(String sql, Binding binding, StatementCache cache, L
332335 this .values = values ;
333336 this .portal = portal ;
334337 this .forceBinary = forceBinary ;
335- set (1 );
336338 }
337339
338340 public void close (FluxSink <FrontendMessage > requests ) {
@@ -341,9 +343,6 @@ public void close(FluxSink<FrontendMessage> requests) {
341343 }
342344
343345 public void evictCachedStatement () {
344-
345- incrementAndGet ();
346-
347346 synchronized (this ) {
348347 this .name = null ;
349348 }
@@ -355,14 +354,16 @@ public void hydrateStatementCache() {
355354 }
356355
357356 public Predicate <BackendMessage > takeUntil () {
358- return m -> {
357+ return this ;
358+ }
359359
360- if (m instanceof ReadyForQuery ) {
361- return decrementAndGet () <= 0 ;
362- }
360+ @ Override
361+ public boolean test (BackendMessage backendMessage ) {
362+ if (backendMessage instanceof ReadyForQuery ) {
363+ return decrementAndGet () <= 0 ;
364+ }
363365
364- return false ;
365- };
366+ return false ;
366367 }
367368
368369 private boolean isPrepareRequired () {
@@ -380,6 +381,7 @@ public String getStatementName() {
380381 }
381382
382383 public List <FrontendMessage .DirectEncoder > getMessages (Collection <FrontendMessage .DirectEncoder > append ) {
384+ incrementAndGet ();
383385 List <FrontendMessage .DirectEncoder > messagesToSend = new ArrayList <>(6 );
384386
385387 if (isPrepareRequired ()) {
0 commit comments