2323import io .vertx .ext .unit .TestContext ;
2424import io .vertx .ext .unit .junit .VertxUnitRunner ;
2525import io .vertx .sqlclient .*;
26+ import io .vertx .tests .sqlclient .ProxyServer ;
2627import org .junit .After ;
2728import org .junit .Before ;
2829import org .junit .Test ;
3637import java .util .concurrent .atomic .AtomicReference ;
3738import java .util .function .Function ;
3839
40+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
41+ import static java .util .concurrent .TimeUnit .SECONDS ;
42+
3943@ RunWith (VertxUnitRunner .class )
4044public abstract class MetricsTestBase {
4145
@@ -110,15 +114,20 @@ public void close() {
110114 pool .query ("SELECT * FROM immutable WHERE id=1" ).execute ().toCompletionStage ().toCompletableFuture ().get (20 , TimeUnit .SECONDS );
111115 ctx .assertEquals (0 , closeCount .get ());
112116 pool .close ();
113- long now = System .currentTimeMillis ();
114- while (closeCount .get () != 1 ) {
115- ctx .assertTrue (System .currentTimeMillis () - now < 20_000 );
116- Thread .sleep (100 );
117- }
117+ awaitQueueSize (ctx , closeCount , 1 );
118118 }
119119
120120 @ Test
121121 public void testQueuing (TestContext ctx ) throws Exception {
122+ testQueuing (ctx , false );
123+ }
124+
125+ @ Test
126+ public void testQueuingTimeout (TestContext ctx ) throws Exception {
127+ testQueuing (ctx , true );
128+ }
129+
130+ private void testQueuing (TestContext ctx , boolean timeout ) throws Exception {
122131 AtomicInteger queueSize = new AtomicInteger ();
123132 List <Object > enqueueMetrics = Collections .synchronizedList (new ArrayList <>());
124133 List <Object > dequeueMetrics = Collections .synchronizedList (new ArrayList <>());
@@ -136,26 +145,94 @@ public void dequeue(Object taskMetric) {
136145 queueSize .decrementAndGet ();
137146 }
138147 };
139- Pool pool = createPool (vertx , new PoolOptions ().setMaxSize (1 ).setName ("the-pool" ));
140- SqlConnection conn = pool .getConnection ().toCompletionStage ().toCompletableFuture ().get (20 , TimeUnit .SECONDS );
148+ PoolOptions poolOptions = new PoolOptions ().setMaxSize (1 ).setName ("the-pool" );
149+ if (timeout ) {
150+ poolOptions .setConnectionTimeout (2 ).setConnectionTimeoutUnit (SECONDS );
151+ }
152+ Pool pool = createPool (vertx , poolOptions );
153+ SqlConnection conn = pool .getConnection ().await (20 , SECONDS );
141154 int num = 16 ;
142155 List <Future <?>> futures = new ArrayList <>();
143156 for (int i = 0 ;i < num ;i ++) {
144- futures .add (pool .query ("SELECT * FROM immutable WHERE id=1" ).execute ());
145- }
146- long now = System .currentTimeMillis ();
147- while (queueSize .get () != num ) {
148- ctx .assertTrue (System .currentTimeMillis () - now < 20_000 );
149- Thread .sleep (100 );
157+ futures .add (pool .withConnection (sqlConn -> sqlConn .query ("SELECT * FROM immutable WHERE id=1" ).execute ()));
150158 }
159+ awaitQueueSize (ctx , queueSize , timeout ? 0 : num );
151160 conn .close ();
152- Future .join (futures ).toCompletionStage ().toCompletableFuture (). get ( 20 , TimeUnit . SECONDS );
161+ Future .join (futures ).otherwiseEmpty ().await ( 20 , SECONDS );
153162 ctx .assertEquals (0 , queueSize .get ());
154163 ctx .assertEquals (enqueueMetrics , dequeueMetrics );
155164 ctx .assertEquals ("sql" , poolType );
156165 ctx .assertEquals ("the-pool" , poolName );
157166 }
158167
168+ private void awaitQueueSize (TestContext ctx , AtomicInteger queueSize , int num ) throws InterruptedException {
169+ long now = System .currentTimeMillis ();
170+ for (; ; ) {
171+ if (queueSize .get () != num ) {
172+ if (System .currentTimeMillis () - now >= 20_000 ) {
173+ ctx .fail ("Timeout waiting for queue size " + queueSize .get () + " to be equal to " + num );
174+ } else {
175+ MILLISECONDS .sleep (500 );
176+ }
177+ } else {
178+ break ;
179+ }
180+ }
181+ }
182+
183+ @ Test
184+ public void testConnectionLost (TestContext ctx ) throws Exception {
185+ SqlConnectOptions connectOptions = connectOptions ();
186+ ProxyServer proxy = ProxyServer .create (vertx , connectOptions .getPort (), connectOptions .getHost ());
187+ AtomicReference <ProxyServer .Connection > firstConnection = new AtomicReference <>();
188+ proxy .proxyHandler (proxiedConn -> {
189+ if (firstConnection .compareAndSet (null , proxiedConn )) {
190+ proxiedConn .connect ();
191+ }
192+ });
193+ // Start proxy
194+ Async listenLatch = ctx .async ();
195+ proxy .listen (8080 , "localhost" , ctx .asyncAssertSuccess (res -> listenLatch .complete ()));
196+ listenLatch .awaitSuccess (20_000 );
197+
198+
199+ AtomicInteger queueSize = new AtomicInteger ();
200+ poolMetrics = new PoolMetrics () {
201+ @ Override
202+ public Object enqueue () {
203+ queueSize .incrementAndGet ();
204+ return null ;
205+ }
206+
207+ @ Override
208+ public void dequeue (Object taskMetric ) {
209+ queueSize .decrementAndGet ();
210+ }
211+ };
212+ PoolOptions poolOptions = new PoolOptions ()
213+ .setConnectionTimeout (500 )
214+ .setConnectionTimeoutUnit (MILLISECONDS )
215+ .setMaxSize (1 )
216+ .setName ("the-pool" );
217+ Pool pool = poolBuilder ()
218+ .with (poolOptions )
219+ .using (vertx )
220+ .connectingTo (connectOptions .setHost ("localhost" ).setPort (8080 ))
221+ .build ();
222+ SqlConnection conn = pool .getConnection ().await (20 , SECONDS );
223+ int num = 16 ;
224+ Async async = ctx .async (num + 1 );
225+ for (int i = 0 ; i < num ; i ++) {
226+ pool .withConnection (sqlConn -> sqlConn .query ("SELECT * FROM immutable WHERE id=1" ).execute ())
227+ .onComplete (ctx .asyncAssertFailure (t -> async .countDown ()));
228+ }
229+ conn .closeHandler (v -> async .countDown ());
230+ awaitQueueSize (ctx , queueSize , 16 );
231+ firstConnection .get ().clientSocket ().close ();
232+ async .await (20_000 );
233+ ctx .assertEquals (0 , queueSize .get ());
234+ }
235+
159236 @ Test
160237 public void testSimpleQuery (TestContext ctx ) {
161238 Function <SqlConnection , Future <?>> fn = conn -> conn .query ("SELECT * FROM immutable WHERE id=1" ).execute ();
0 commit comments