3535
3636public class OracleJdbcConnection implements Connection {
3737
38+ private static final Completable <?> NULL_HANDLER = (res , err ) -> {};
39+
3840 private static final Logger log = LoggerFactory .getLogger (OracleJdbcConnection .class );
3941
4042 private final ClientMetrics metrics ;
@@ -49,6 +51,7 @@ public class OracleJdbcConnection implements Connection {
4951 // Command pipeline state
5052 @ SuppressWarnings ("rawtypes" )
5153 private final Deque <CommandBase > pending = new ArrayDeque <>();
54+ private final Deque <Completable <?>> completables = new ArrayDeque <>();
5255 private Promise <Void > closePromise ;
5356 private boolean inflight , executing ;
5457
@@ -128,6 +131,7 @@ public void close(Holder holder, Completable<Void> promise) {
128131 closePromise = context .promise ();
129132 future = closePromise .future ().andThen (ar -> holder .handleClosed ());
130133 pending .add (CloseConnectionCommand .INSTANCE );
134+ completables .add (NULL_HANDLER );
131135 checkPending ();
132136 } else {
133137 future = closePromise .future ();
@@ -172,9 +176,9 @@ public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
172176 }
173177
174178 private <R > void doSchedule (CommandBase <R > cmd , Completable <R > handler ) {
175- cmd .handler = handler ;
176179 if (closePromise == null ) {
177180 pending .add (cmd );
181+ completables .add (handler );
178182 checkPending ();
179183 } else {
180184 cmd .fail (VertxException .noStackTrace ("Connection is no longer active" ));
@@ -190,11 +194,13 @@ private void checkPending() {
190194 executing = true ;
191195 CommandBase cmd ;
192196 while (!inflight && (cmd = pending .poll ()) != null ) {
197+ Completable handler = completables .poll ();
193198 inflight = true ;
194199 if (metrics != null && cmd instanceof CloseConnectionCommand ) {
195200 metrics .close ();
196201 }
197202 OracleCommand action = wrap (cmd );
203+ action .handler = handler ;
198204 Future <Void > future = action .processCommand (cmd );
199205 CommandBase capture = cmd ;
200206 future .onComplete (ar -> actionComplete (capture , action , ar ));
0 commit comments