30
30
import org .junit .Test ;
31
31
import org .xmldb .api .base .XMLDBException ;
32
32
33
+ import java .util .List ;
34
+ import java .util .ArrayList ;
33
35
import java .util .Optional ;
34
36
import java .util .concurrent .*;
37
+ import java .util .function .Consumer ;
35
38
36
39
import static junit .framework .TestCase .assertTrue ;
37
- import static org .junit .Assert .assertEquals ;
38
- import static org .junit .Assert .fail ;
40
+ import static org .junit .Assert .*;
39
41
40
42
/**
41
43
* @author <a href="mailto:[email protected] ">Adam Retter</a>
@@ -135,20 +137,17 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
135
137
// test requires at least 2 leasedBrokers to prove the issue
136
138
assertTrue (maxBrokers > 1 );
137
139
140
+ final ExecutorService executor = Executors .newFixedThreadPool (maxBrokers + 1 );
141
+ final List <Future <Void >> tasks = new ArrayList <>(maxBrokers );
138
142
final CountDownLatch firstBrokerReleaseLatch = new CountDownLatch (1 );
139
143
final CountDownLatch releaseLatch = new CountDownLatch (1 );
140
144
try {
141
145
142
146
// lease all brokers
143
- final Thread brokerUsers [] = new Thread [maxBrokers ];
144
147
final CountDownLatch acquiredLatch = new CountDownLatch (maxBrokers );
145
-
146
- final Thread firstBrokerUser = new Thread (new BrokerUser (pool , acquiredLatch , firstBrokerReleaseLatch ), "first-brokerUser" );
147
- brokerUsers [0 ] = firstBrokerUser ;
148
- brokerUsers [0 ].start ();
149
- for (int i = 1 ; i < maxBrokers ; i ++) {
150
- brokerUsers [i ] = new Thread (new BrokerUser (pool , acquiredLatch , releaseLatch ));
151
- brokerUsers [i ].start ();
148
+ Future <Void > firstBrokerUser = executor .submit (new BrokerUser (pool , acquiredLatch , firstBrokerReleaseLatch ));
149
+ for (int count = 1 ; count < maxBrokers ; count ++) {
150
+ tasks .add (executor .submit (new BrokerUser (pool , acquiredLatch , releaseLatch )));
152
151
}
153
152
154
153
// wait for all brokers to be acquired
@@ -160,9 +159,8 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
160
159
161
160
// create a new thread and attempt to get an additional broker
162
161
final CountDownLatch additionalBrokerAcquiredLatch = new CountDownLatch (1 );
163
- final Thread additionalBrokerUser = new Thread (new BrokerUser (pool , additionalBrokerAcquiredLatch , releaseLatch ), "additional-brokerUser" );
164
162
assertEquals (1 , additionalBrokerAcquiredLatch .getCount ());
165
- additionalBrokerUser . start ( );
163
+ executor . submit ( new BrokerUser ( pool , additionalBrokerAcquiredLatch , releaseLatch ) );
166
164
167
165
// we should not be able to acquire an additional broker, as we have already leased max
168
166
Thread .sleep (500 ); // just to ensure the other thread has done something
@@ -172,23 +170,85 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
172
170
assertEquals (1 , firstBrokerReleaseLatch .getCount ());
173
171
firstBrokerReleaseLatch .countDown ();
174
172
assertEquals (0 , firstBrokerReleaseLatch .getCount ());
175
- firstBrokerUser .join (); // wait for the first broker lease thread to complete
173
+ firstBrokerUser .get (); // wait for the first broker lease thread to complete
176
174
177
175
// check that the additional broker lease has now been acquired
178
176
Thread .sleep (500 ); // just to ensure the other thread has done something
179
177
assertEquals (0 , additionalBrokerAcquiredLatch .getCount ());
180
178
179
+ executor .shutdown ();
181
180
} finally {
182
181
// release all brokers from brokerUsers
183
182
if (firstBrokerReleaseLatch .getCount () == 1 ) {
184
183
firstBrokerReleaseLatch .countDown ();
185
184
}
186
185
releaseLatch .countDown ();
186
+ assertTrue (executor .awaitTermination (1 , TimeUnit .SECONDS ));
187
+ for (Future <Void > task : tasks ) {
188
+ task .get ();
189
+ }
190
+ for (Runnable task : executor .shutdownNow ()) {
191
+ assertNotNull (task );
192
+ }
193
+ }
194
+ }
195
+
196
+ @ Test
197
+ public void concurrentShutdownAndUse () throws InterruptedException , ExecutionException {
198
+ final BrokerPool pool = existEmbeddedServer .getBrokerPool ();
199
+ final int maxBrokers = pool .getMax ();
200
+
201
+ // test requires at least 5 leasedBrokers to prove the issue
202
+ assertTrue (maxBrokers > 4 );
203
+
204
+ final CountDownLatch readyLatch = new CountDownLatch (1 );
205
+ final CountDownLatch executeLatch = new CountDownLatch (1 );
206
+ final ExecutorService executor = Executors .newFixedThreadPool (maxBrokers );
207
+ final List <Future <Void >> tasks = new ArrayList <>(maxBrokers );
208
+ final Consumer <BrokerPool > brokerAquire = brokerPool -> {
209
+ try (final DBBroker broker = brokerPool .getBroker ()) {
210
+ } catch (EXistException e ) {
211
+ throw new RuntimeException (e );
212
+ }
213
+ };
214
+ for (int count =0 ; count < maxBrokers ; count ++) {
215
+ tasks .add (executor .submit (new PoolAction (pool , readyLatch , executeLatch , count == 0 ? BrokerPool ::shutdown : brokerAquire )));
216
+ }
217
+
218
+ TimeUnit .SECONDS .sleep (1 );
219
+ readyLatch .countDown ();
220
+
221
+ assertTrue (executor .awaitTermination (1 , TimeUnit .MINUTES ));
222
+ for (Future <Void > task : tasks ) {
223
+ task .get ();
224
+ }
225
+ for (Runnable task : executor .shutdownNow ()) {
226
+ assertNotNull (task );
227
+ }
228
+ }
229
+
230
+ static class PoolAction implements Callable <Void > {
231
+ private final BrokerPool brokerPool ;
232
+ private final CountDownLatch readyLatch ;
233
+ private final CountDownLatch excuteLatch ;
234
+ private final Consumer <BrokerPool > action ;
235
+
236
+ PoolAction (final BrokerPool brokerPool , CountDownLatch readyLatch , CountDownLatch excuteLatch , Consumer <BrokerPool > action ) {
237
+ this .brokerPool = brokerPool ;
238
+ this .readyLatch = readyLatch ;
239
+ this .excuteLatch = excuteLatch ;
240
+ this .action = action ;
241
+ }
242
+ @ Override
243
+ public Void call () throws InterruptedException {
244
+ readyLatch .await ();
245
+ action .accept (brokerPool );
246
+ return null ;
187
247
}
188
248
}
189
249
190
- public static class BrokerUser implements Runnable {
191
250
251
+ public static class BrokerUser implements Callable <Void > {
192
252
final BrokerPool brokerPool ;
193
253
private final CountDownLatch acquiredLatch ;
194
254
private final CountDownLatch releaseLatch ;
@@ -200,7 +260,7 @@ public BrokerUser(final BrokerPool brokerPool, final CountDownLatch acquiredLatc
200
260
}
201
261
202
262
@ Override
203
- public void run () {
263
+ public Void call () throws EXistException , InterruptedException {
204
264
try (final DBBroker broker = brokerPool .getBroker ()) {
205
265
206
266
// signal that we have acquired the broker
@@ -210,9 +270,8 @@ public void run() {
210
270
// wait for signal to release the broker
211
271
releaseLatch .await ();
212
272
213
- } catch (final EXistException | InterruptedException e ) {
214
- fail (e .getMessage ());
215
273
}
274
+ return null ;
216
275
}
217
276
}
218
277
0 commit comments