30
30
import org .junit .Test ;
31
31
import org .xmldb .api .base .XMLDBException ;
32
32
33
+ import java .util .List ;
34
+ import java .util .ArrayList ;
33
35
import java .util .Optional ;
34
36
import java .util .concurrent .*;
37
+ import java .util .function .Consumer ;
35
38
36
39
import static junit .framework .TestCase .assertTrue ;
37
- import static org .junit .Assert .assertEquals ;
38
- import static org .junit .Assert .fail ;
40
+ import static org .junit .Assert .*;
39
41
40
42
/**
41
43
* @author <a href="mailto:[email protected] ">Adam Retter</a>
@@ -50,7 +52,7 @@ public void noPrivilegeEscalationThroughBrokerRelease() throws EXistException {
50
52
//take a broker with the guest user
51
53
final BrokerPool pool = existEmbeddedServer .getBrokerPool ();
52
54
final Subject guestUser = pool .getSecurityManager ().getGuestSubject ();
53
- try (final DBBroker broker1 = pool .get (Optional .of (guestUser ))) {
55
+ try (final DBBroker broker1 = pool .get (Optional .of (guestUser ))) {
54
56
55
57
assertEquals ("Expected `guest` user, but was: " + broker1 .getCurrentSubject ().getName (), guestUser .getId (), broker1 .getCurrentSubject ().getId ());
56
58
@@ -70,7 +72,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException {
70
72
//take a broker with the SYSTEM user
71
73
final BrokerPool pool = existEmbeddedServer .getBrokerPool ();
72
74
final Subject sysUser = pool .getSecurityManager ().getSystemSubject ();
73
- try (final DBBroker broker1 = pool .get (Optional .of (sysUser ))) {
75
+ try (final DBBroker broker1 = pool .get (Optional .of (sysUser ))) {
74
76
75
77
assertEquals ("Expected `SYSTEM` user, but was: " + broker1 .getCurrentSubject ().getName (), sysUser .getId (), broker1 .getCurrentSubject ().getId ());
76
78
@@ -88,7 +90,7 @@ public void privilegeStableWhenSubjectNull() throws EXistException {
88
90
public void guestDefaultPriviledge () throws EXistException {
89
91
//take a broker with default perms
90
92
final BrokerPool pool = existEmbeddedServer .getBrokerPool ();
91
- try (final DBBroker broker1 = pool .getBroker ()) {
93
+ try (final DBBroker broker1 = pool .getBroker ()) {
92
94
93
95
final Subject guestUser = pool .getSecurityManager ().getGuestSubject ();
94
96
@@ -109,7 +111,7 @@ public void noPrivilegeEscalationThroughBrokerRelease_xmldb() throws EXistExcept
109
111
//take a broker with the guest user
110
112
final BrokerPool pool = existEmbeddedServer .getBrokerPool ();
111
113
final Subject guestUser = pool .getSecurityManager ().getGuestSubject ();
112
- try (final DBBroker broker1 = pool .get (Optional .of (guestUser ))) {
114
+ try (final DBBroker broker1 = pool .get (Optional .of (guestUser ))) {
113
115
114
116
assertEquals ("Expected `guest` user, but was: " + broker1 .getCurrentSubject ().getName (), guestUser .getId (), broker1 .getCurrentSubject ().getId ());
115
117
@@ -135,20 +137,17 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
135
137
// test requires at least 2 leasedBrokers to prove the issue
136
138
assertTrue (maxBrokers > 1 );
137
139
140
+ final ExecutorService executor = Executors .newFixedThreadPool (maxBrokers + 1 );
141
+ final List <Future <Void >> tasks = new ArrayList <>(maxBrokers );
138
142
final CountDownLatch firstBrokerReleaseLatch = new CountDownLatch (1 );
139
143
final CountDownLatch releaseLatch = new CountDownLatch (1 );
140
144
try {
141
145
142
146
// lease all brokers
143
- final Thread brokerUsers [] = new Thread [maxBrokers ];
144
147
final CountDownLatch acquiredLatch = new CountDownLatch (maxBrokers );
145
-
146
- final Thread firstBrokerUser = new Thread (new BrokerUser (pool , acquiredLatch , firstBrokerReleaseLatch ), "first-brokerUser" );
147
- brokerUsers [0 ] = firstBrokerUser ;
148
- brokerUsers [0 ].start ();
149
- for (int i = 1 ; i < maxBrokers ; i ++) {
150
- brokerUsers [i ] = new Thread (new BrokerUser (pool , acquiredLatch , releaseLatch ));
151
- brokerUsers [i ].start ();
148
+ Future <Void > firstBrokerUser = executor .submit (new BrokerUser (pool , acquiredLatch , firstBrokerReleaseLatch ));
149
+ for (int count = 1 ; count < maxBrokers ; count ++) {
150
+ tasks .add (executor .submit (new BrokerUser (pool , acquiredLatch , releaseLatch )));
152
151
}
153
152
154
153
// wait for all brokers to be acquired
@@ -160,9 +159,8 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
160
159
161
160
// create a new thread and attempt to get an additional broker
162
161
final CountDownLatch additionalBrokerAcquiredLatch = new CountDownLatch (1 );
163
- final Thread additionalBrokerUser = new Thread (new BrokerUser (pool , additionalBrokerAcquiredLatch , releaseLatch ), "additional-brokerUser" );
164
162
assertEquals (1 , additionalBrokerAcquiredLatch .getCount ());
165
- additionalBrokerUser . start ( );
163
+ executor . submit ( new BrokerUser ( pool , additionalBrokerAcquiredLatch , releaseLatch ) );
166
164
167
165
// we should not be able to acquire an additional broker, as we have already leased max
168
166
Thread .sleep (500 ); // just to ensure the other thread has done something
@@ -172,23 +170,92 @@ public void canReleaseWhenSaturated() throws InterruptedException, ExecutionExce
172
170
assertEquals (1 , firstBrokerReleaseLatch .getCount ());
173
171
firstBrokerReleaseLatch .countDown ();
174
172
assertEquals (0 , firstBrokerReleaseLatch .getCount ());
175
- firstBrokerUser .join (); // wait for the first broker lease thread to complete
173
+ firstBrokerUser .get (); // wait for the first broker lease thread to complete
176
174
177
175
// check that the additional broker lease has now been acquired
178
176
Thread .sleep (500 ); // just to ensure the other thread has done something
179
177
assertEquals (0 , additionalBrokerAcquiredLatch .getCount ());
180
178
179
+ executor .shutdown ();
181
180
} finally {
182
181
// release all brokers from brokerUsers
183
- if (firstBrokerReleaseLatch .getCount () == 1 ) {
182
+ if (firstBrokerReleaseLatch .getCount () == 1 ) {
184
183
firstBrokerReleaseLatch .countDown ();
185
184
}
186
185
releaseLatch .countDown ();
186
+ assertTrue (executor .awaitTermination (1 , TimeUnit .SECONDS ));
187
+ for (Future <Void > task : tasks ) {
188
+ task .get ();
189
+ }
190
+ for (Runnable task : executor .shutdownNow ()) {
191
+ assertNotNull (task );
192
+ }
193
+ }
194
+ }
195
+
196
+ @ Test
197
+ public void concurrentShutdownAndUse () throws InterruptedException , ExecutionException {
198
+ final BrokerPool pool = existEmbeddedServer .getBrokerPool ();
199
+ final int maxBrokers = pool .getMax ();
200
+ final int taskAmount = maxBrokers * 50 ;
201
+
202
+ // test requires at least 5 leasedBrokers to prove the issue
203
+ assertTrue (maxBrokers > 4 );
204
+
205
+ final CountDownLatch readyLatch = new CountDownLatch (1 );
206
+ final CountDownLatch executeLatch = new CountDownLatch (1 );
207
+ final ExecutorService executor = Executors .newFixedThreadPool (taskAmount );
208
+ final List <Future <Void >> tasks = new ArrayList <>(taskAmount );
209
+ final Consumer <BrokerPool > brokerAquire = brokerPool -> {
210
+ try (final DBBroker broker = brokerPool .getBroker ()) {
211
+ TimeUnit .SECONDS .sleep (1 );
212
+ } catch (EXistException e ) {
213
+ throw new IllegalStateException (e );
214
+ } catch (InterruptedException e ) {
215
+ Thread .currentThread ().interrupt ();
216
+ throw new IllegalStateException (e );
217
+ }
218
+ };
219
+ for (int count = 0 ; count < taskAmount ; count ++) {
220
+ tasks .add (executor .submit (new PoolAction (pool , readyLatch , executeLatch , (count % 2 == 0 ) ? BrokerPool ::shutdown : brokerAquire )));
221
+ }
222
+ executor .shutdown ();
223
+
224
+ TimeUnit .SECONDS .sleep (2 );
225
+ readyLatch .countDown ();
226
+
227
+ assertTrue (executor .awaitTermination (1 , TimeUnit .MINUTES ));
228
+ for (Future <Void > task : tasks ) {
229
+ task .get ();
230
+ }
231
+ for (Runnable task : executor .shutdownNow ()) {
232
+ assertNotNull (task );
233
+ }
234
+ }
235
+
236
+ static class PoolAction implements Callable <Void > {
237
+ private final BrokerPool brokerPool ;
238
+ private final CountDownLatch readyLatch ;
239
+ private final CountDownLatch excuteLatch ;
240
+ private final Consumer <BrokerPool > action ;
241
+
242
+ PoolAction (final BrokerPool brokerPool , CountDownLatch readyLatch , CountDownLatch excuteLatch , Consumer <BrokerPool > action ) {
243
+ this .brokerPool = brokerPool ;
244
+ this .readyLatch = readyLatch ;
245
+ this .excuteLatch = excuteLatch ;
246
+ this .action = action ;
247
+ }
248
+
249
+ @ Override
250
+ public Void call () throws InterruptedException {
251
+ readyLatch .await ();
252
+ action .accept (brokerPool );
253
+ return null ;
187
254
}
188
255
}
189
256
190
- public static class BrokerUser implements Runnable {
191
257
258
+ public static class BrokerUser implements Callable <Void > {
192
259
final BrokerPool brokerPool ;
193
260
private final CountDownLatch acquiredLatch ;
194
261
private final CountDownLatch releaseLatch ;
@@ -200,8 +267,8 @@ public BrokerUser(final BrokerPool brokerPool, final CountDownLatch acquiredLatc
200
267
}
201
268
202
269
@ Override
203
- public void run () {
204
- try (final DBBroker broker = brokerPool .getBroker ()) {
270
+ public Void call () throws EXistException , InterruptedException {
271
+ try (final DBBroker broker = brokerPool .getBroker ()) {
205
272
206
273
// signal that we have acquired the broker
207
274
acquiredLatch .countDown ();
@@ -210,9 +277,8 @@ public void run() {
210
277
// wait for signal to release the broker
211
278
releaseLatch .await ();
212
279
213
- } catch (final EXistException | InterruptedException e ) {
214
- fail (e .getMessage ());
215
280
}
281
+ return null ;
216
282
}
217
283
}
218
284
0 commit comments