22
22
import java .util .concurrent .ExecutorService ;
23
23
import java .util .concurrent .Executors ;
24
24
import java .util .concurrent .Semaphore ;
25
+ import java .util .concurrent .atomic .AtomicInteger ;
25
26
26
27
import org .junit .Assert ;
27
28
import org .junit .Before ;
28
29
import org .junit .Ignore ;
29
30
import org .junit .Test ;
31
+ import org .slf4j .Logger ;
32
+ import org .slf4j .LoggerFactory ;
30
33
31
34
import com .fasterxml .jackson .databind .node .ObjectNode ;
32
35
import com .redhat .lightblue .EntityVersion ;
42
45
43
46
public class BulkTest extends AbstractMediatorTest {
44
47
48
+ private static final Logger LOGGER = LoggerFactory .getLogger (BulkTest .class );
49
+
45
50
public interface FindCb {
46
51
Response call (FindRequest req );
47
52
}
@@ -139,41 +144,45 @@ public void bulkTest_fromJson() throws Exception {
139
144
140
145
BulkRequest breqParsed = BulkRequest .fromJson (breqJson );
141
146
142
- System . out . println ("asdf" );
147
+ LOGGER . debug ("asdf" );
143
148
}
144
149
145
150
private class PFindCb implements FindCb {
146
151
Semaphore sem = new Semaphore (0 );
147
- int nested = 0 ;
152
+ AtomicInteger nested = new AtomicInteger ( 0 ) ;
148
153
149
154
@ Override
150
- public Response call (FindRequest req ) {
151
- nested ++ ;
155
+ public Response call (FindRequest req ) {
156
+ nested . incrementAndGet () ;
152
157
try {
158
+ LOGGER .debug ("find: nested:" +nested +" waiting" );
153
159
sem .acquire ();
160
+ LOGGER .debug ("find: nested:" +nested +" acquired" );
154
161
return new Response ();
155
162
} catch (Exception e ) {
156
163
throw new RuntimeException (e );
157
164
} finally {
158
- nested -- ;
165
+ nested . decrementAndGet () ;
159
166
}
160
167
}
161
168
}
162
169
163
170
private class PInsertCb implements InsertCb {
164
171
Semaphore sem = new Semaphore (0 );
165
- int nested = 0 ;
172
+ AtomicInteger nested = new AtomicInteger ( 0 ) ;
166
173
167
174
@ Override
168
175
public Response call (InsertionRequest req ) {
169
- nested ++ ;
176
+ nested . incrementAndGet () ;
170
177
try {
178
+ LOGGER .debug ("insert: nested:" +nested +" waiting" );
171
179
sem .acquire ();
180
+ LOGGER .debug ("insert: nested:" +nested +" acquired" );
172
181
return new Response ();
173
182
} catch (Exception e ) {
174
183
throw new RuntimeException (e );
175
184
} finally {
176
- nested -- ;
185
+ nested . decrementAndGet () ;
177
186
}
178
187
}
179
188
}
@@ -221,26 +230,26 @@ public void parallelOrderedBulkTest() throws Exception {
221
230
@ Override
222
231
public void run () {
223
232
try {
224
- System . out . println ("Check if all 3 finds are waiting" );
225
- while (find .nested < 3 ) {
233
+ LOGGER . debug ("Check if all 3 finds are waiting" );
234
+ while (find .nested . get () < 3 ) {
226
235
Thread .sleep (1 );
227
236
}
228
- System . out . println ("Let the 3 find requests complete" );
237
+ LOGGER . debug ("Let the 3 find requests complete" );
229
238
find .sem .release (3 );
230
- System . out . println ("Busy wait" );
239
+ LOGGER . debug ("Busy wait" );
231
240
while (find .sem .availablePermits () > 0 ) {
232
241
Thread .sleep (1 );
233
242
}
234
- System . out . println ("Let insert complete" );
243
+ LOGGER . debug ("Let insert complete" );
235
244
insert .sem .release (1 );
236
245
while (insert .sem .availablePermits () > 0 ) {
237
246
Thread .sleep (1 );
238
247
}
239
- System . out . println ("Check if all 2 finds are waiting" );
240
- while (find .nested < 2 ) {
248
+ LOGGER . debug ("Check if all 2 finds are waiting" );
249
+ while (find .nested . get () < 2 ) {
241
250
Thread .sleep (1 );
242
251
}
243
- System . out . println ("Let the remaining 2 find requests complete" );
252
+ LOGGER . debug ("Let the remaining 2 find requests complete" );
244
253
find .sem .release (2 );
245
254
while (find .sem .availablePermits () > 0 ) {
246
255
Thread .sleep (1 );
@@ -249,7 +258,7 @@ public void run() {
249
258
while (insert .sem .availablePermits () > 0 ) {
250
259
Thread .sleep (1 );
251
260
}
252
- System . out . println ("Complete" );
261
+ LOGGER . debug ("Complete" );
253
262
valid = true ;
254
263
} catch (Exception e ) {
255
264
throw new RuntimeException (e );
@@ -258,10 +267,10 @@ public void run() {
258
267
};
259
268
validator .start ();
260
269
261
- System . out . println ("Ordered exec" );
270
+ LOGGER . debug ("Ordered exec" );
262
271
BulkResponse bresp = mediator .bulkRequest (breq );
263
272
validator .join ();
264
- System . out . println ("Ordered exec done" );
273
+ LOGGER . debug ("Ordered exec done" );
265
274
266
275
Assert .assertTrue (validator .valid );
267
276
}
@@ -299,7 +308,7 @@ public void parallelUnorderedBulkTest() throws Exception {
299
308
public void run () {
300
309
try {
301
310
// Wait until all 7 calls started
302
- while (find .nested + insert .nested < 7 ) {
311
+ while (find .nested . get () + insert .nested . get () < 7 ) {
303
312
Thread .sleep (1 );
304
313
}
305
314
@@ -308,7 +317,7 @@ public void run() {
308
317
insert .sem .release (2 );
309
318
310
319
// Wait until they're all completed
311
- while (find .nested > 0 && insert .nested > 0 ) {
320
+ while (find .nested . get () > 0 && insert .nested . get () > 0 ) {
312
321
Thread .sleep (1 );
313
322
}
314
323
@@ -319,10 +328,10 @@ public void run() {
319
328
}
320
329
};
321
330
validator .start ();
322
- System . out . println ("Unordered exec" );
331
+ LOGGER . debug ("Unordered exec" );
323
332
mediator .bulkRequest (breq );
324
333
validator .join ();
325
- System . out . println ("Unordered exec done" );
334
+ LOGGER . debug ("Unordered exec done" );
326
335
327
336
Assert .assertTrue (validator .valid );
328
337
}
0 commit comments