@@ -215,55 +215,6 @@ public void GetSetSync(bool preserveOrder)
215
215
}
216
216
}
217
217
218
- [ Theory ]
219
- [ InlineData ( true , true ) ]
220
- [ InlineData ( true , false ) ]
221
- [ InlineData ( false , true ) ]
222
- [ InlineData ( false , false ) ]
223
- public async Task MassiveBulkOpsAsync ( bool preserveOrder , bool withContinuation )
224
- {
225
- #if DEBUG
226
- var oldAsyncCompletionCount = ConnectionMultiplexer . GetAsyncCompletionWorkerCount ( ) ;
227
- #endif
228
- using ( var muxer = Create ( ) )
229
- {
230
- muxer . PreserveAsyncOrder = preserveOrder ;
231
- RedisKey key = "MBOA" ;
232
- var conn = muxer . GetDatabase ( ) ;
233
- await conn . PingAsync ( ) . ForAwait ( ) ;
234
- #if NETCOREAPP1_0
235
- int number = 0 ;
236
- #endif
237
- Action < Task > nonTrivial = delegate
238
- {
239
- #if ! NETCOREAPP1_0
240
- Thread . SpinWait ( 5 ) ;
241
- #else
242
- for ( int i = 0 ; i < 50 ; i ++ )
243
- {
244
- number ++ ;
245
- }
246
- #endif
247
- } ;
248
- var watch = Stopwatch . StartNew ( ) ;
249
- for ( int i = 0 ; i <= AsyncOpsQty ; i ++ )
250
- {
251
- var t = conn . StringSetAsync ( key , i ) ;
252
- #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
253
- if ( withContinuation ) t . ContinueWith ( nonTrivial ) ;
254
- #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
255
- }
256
- Assert . Equal ( AsyncOpsQty , await conn . StringGetAsync ( key ) . ForAwait ( ) ) ;
257
- watch . Stop ( ) ;
258
- Output . WriteLine ( "{2}: Time for {0} ops: {1}ms ({3}, {4}); ops/s: {5}" , AsyncOpsQty , watch . ElapsedMilliseconds , Me ( ) ,
259
- withContinuation ? "with continuation" : "no continuation" , preserveOrder ? "preserve order" : "any order" ,
260
- AsyncOpsQty / watch . Elapsed . TotalSeconds ) ;
261
- #if DEBUG
262
- Output . WriteLine ( "Async completion workers: " + ( ConnectionMultiplexer . GetAsyncCompletionWorkerCount ( ) - oldAsyncCompletionCount ) ) ;
263
- #endif
264
- }
265
- }
266
-
267
218
[ Theory ]
268
219
[ InlineData ( false , false ) ]
269
220
[ InlineData ( true , true ) ]
@@ -347,187 +298,6 @@ public void GetWithExpiryWrongTypeSync()
347
298
Assert . Equal ( "WRONGTYPE Operation against a key holding the wrong kind of value" , ex . Message ) ;
348
299
}
349
300
350
- #if FEATURE_BOOKSLEEVE
351
- [ Theory ]
352
- [ InlineData ( true , true , ResultCompletionMode . ConcurrentIfContinuation ) ]
353
- [ InlineData ( true , false , ResultCompletionMode . ConcurrentIfContinuation ) ]
354
- [ InlineData ( false , true , ResultCompletionMode . ConcurrentIfContinuation ) ]
355
- [ InlineData ( false , false , ResultCompletionMode . ConcurrentIfContinuation ) ]
356
- [ InlineData ( true , true , ResultCompletionMode . Concurrent ) ]
357
- [ InlineData ( true , false , ResultCompletionMode . Concurrent ) ]
358
- [ InlineData ( false , true , ResultCompletionMode . Concurrent ) ]
359
- [ InlineData ( false , false , ResultCompletionMode . Concurrent ) ]
360
- [ InlineData ( true , true , ResultCompletionMode . PreserveOrder ) ]
361
- [ InlineData ( true , false , ResultCompletionMode . PreserveOrder ) ]
362
- [ InlineData ( false , true , ResultCompletionMode . PreserveOrder ) ]
363
- [ InlineData ( false , false , ResultCompletionMode . PreserveOrder ) ]
364
- public void MassiveBulkOpsAsyncOldStyle ( bool withContinuation , bool suspendFlush , ResultCompletionMode completionMode )
365
- {
366
- using ( var conn = GetOldStyleConnection ( ) )
367
- {
368
- const int db = 0 ;
369
- string key = "MBOQ" ;
370
- conn . CompletionMode = completionMode ;
371
- conn . Wait ( conn . Server . Ping ( ) ) ;
372
- Action < Task > nonTrivial = delegate
373
- {
374
- Thread . SpinWait ( 5 ) ;
375
- } ;
376
- var watch = Stopwatch . StartNew ( ) ;
377
-
378
- if ( suspendFlush ) conn . SuspendFlush ( ) ;
379
- try
380
- {
381
-
382
- for ( int i = 0 ; i <= AsyncOpsQty ; i ++ )
383
- {
384
- var t = conn . Strings . Set ( db , key , i ) ;
385
- if ( withContinuation ) t . ContinueWith ( nonTrivial ) ;
386
- }
387
- } finally
388
- {
389
- if ( suspendFlush ) conn . ResumeFlush ( ) ;
390
- }
391
- int val = ( int ) conn . Wait ( conn . Strings . GetInt64 ( db , key ) ) ;
392
- Assert . Equal ( AsyncOpsQty , val ) ;
393
- watch . Stop ( ) ;
394
- Output . WriteLine ( "{2}: Time for {0} ops: {1}ms ({3}, {4}, {5}); ops/s: {6}" , AsyncOpsQty , watch . ElapsedMilliseconds , Me ( ) ,
395
- withContinuation ? "with continuation" : "no continuation" ,
396
- suspendFlush ? "suspend flush" : "flush at whim" ,
397
- completionMode , AsyncOpsQty / watch . Elapsed . TotalSeconds ) ;
398
- }
399
- }
400
- #endif
401
-
402
- [ Theory ]
403
- [ InlineData ( true , 1 ) ]
404
- [ InlineData ( false , 1 ) ]
405
- [ InlineData ( true , 5 ) ]
406
- [ InlineData ( false , 5 ) ]
407
- [ InlineData ( true , 10 ) ]
408
- [ InlineData ( false , 10 ) ]
409
- [ InlineData ( true , 50 ) ]
410
- [ InlineData ( false , 50 ) ]
411
- public void MassiveBulkOpsSync ( bool preserveOrder , int threads )
412
- {
413
- int workPerThread = SyncOpsQty / threads ;
414
- using ( var muxer = Create ( syncTimeout : 30000 ) )
415
- {
416
- muxer . PreserveAsyncOrder = preserveOrder ;
417
- RedisKey key = "MBOS" ;
418
- var conn = muxer . GetDatabase ( ) ;
419
- conn . KeyDelete ( key ) ;
420
- #if DEBUG
421
- long oldAlloc = ConnectionMultiplexer . GetResultBoxAllocationCount ( ) ;
422
- long oldWorkerCount = ConnectionMultiplexer . GetAsyncCompletionWorkerCount ( ) ;
423
- #endif
424
- var timeTaken = RunConcurrent ( delegate
425
- {
426
- for ( int i = 0 ; i < workPerThread ; i ++ )
427
- {
428
- conn . StringIncrement ( key ) ;
429
- }
430
- } , threads ) ;
431
-
432
- int val = ( int ) conn . StringGet ( key ) ;
433
- Assert . Equal ( workPerThread * threads , val ) ;
434
- Output . WriteLine ( "{2}: Time for {0} ops on {4} threads: {1}ms ({3}); ops/s: {5}" ,
435
- threads * workPerThread , timeTaken . TotalMilliseconds , Me ( )
436
- , preserveOrder ? "preserve order" : "any order" , threads , ( workPerThread * threads ) / timeTaken . TotalSeconds ) ;
437
- #if DEBUG
438
- long newAlloc = ConnectionMultiplexer . GetResultBoxAllocationCount ( ) ;
439
- long newWorkerCount = ConnectionMultiplexer . GetAsyncCompletionWorkerCount ( ) ;
440
- Output . WriteLine ( "ResultBox allocations: {0}; workers {1}" , newAlloc - oldAlloc , newWorkerCount - oldWorkerCount ) ;
441
- Assert . True ( newAlloc - oldAlloc <= 2 * threads , "number of box allocations" ) ;
442
- #endif
443
- }
444
- }
445
-
446
- #if FEATURE_BOOKSLEEVE
447
- [ Theory ]
448
- [ InlineData ( ResultCompletionMode . Concurrent , 1 ) ]
449
- [ InlineData ( ResultCompletionMode . ConcurrentIfContinuation , 1 ) ]
450
- [ InlineData ( ResultCompletionMode . PreserveOrder , 1 ) ]
451
- [ InlineData ( ResultCompletionMode . Concurrent , 5 ) ]
452
- [ InlineData ( ResultCompletionMode . ConcurrentIfContinuation , 5 ) ]
453
- [ InlineData ( ResultCompletionMode . PreserveOrder , 5 ) ]
454
- [ InlineData ( ResultCompletionMode . Concurrent , 10 ) ]
455
- [ InlineData ( ResultCompletionMode . ConcurrentIfContinuation , 10 ) ]
456
- [ InlineData ( ResultCompletionMode . PreserveOrder , 10 ) ]
457
- [ InlineData ( ResultCompletionMode . Concurrent , 50 ) ]
458
- [ InlineData ( ResultCompletionMode . ConcurrentIfContinuation , 50 ) ]
459
- [ InlineData ( ResultCompletionMode . PreserveOrder , 50 ) ]
460
- public void MassiveBulkOpsSyncOldStyle ( ResultCompletionMode completionMode , int threads )
461
- {
462
- int workPerThread = SyncOpsQty / threads ;
463
-
464
- using ( var conn = GetOldStyleConnection ( ) )
465
- {
466
- const int db = 0 ;
467
- string key = "MBOQ" ;
468
- conn . CompletionMode = completionMode ;
469
- conn . Wait ( conn . Keys . Remove ( db , key ) ) ;
470
-
471
- var timeTaken = RunConcurrent ( delegate
472
- {
473
- for ( int i = 0 ; i < workPerThread ; i ++ )
474
- {
475
- conn . Wait ( conn . Strings . Increment ( db , key ) ) ;
476
- }
477
- } , threads ) ;
478
-
479
- int val = ( int ) conn . Wait ( conn . Strings . GetInt64 ( db , key ) ) ;
480
- Assert . Equal ( workPerThread * threads , val ) ;
481
-
482
- Output . WriteLine ( "{2}: Time for {0} ops on {4} threads: {1}ms ({3}); ops/s: {5}" , workPerThread * threads , timeTaken . TotalMilliseconds , Me ( ) ,
483
- completionMode , threads , ( workPerThread * threads ) / timeTaken . TotalSeconds ) ;
484
- }
485
- }
486
- #endif
487
-
488
- [ Theory ]
489
- [ InlineData ( true , 1 ) ]
490
- [ InlineData ( false , 1 ) ]
491
- [ InlineData ( true , 5 ) ]
492
- [ InlineData ( false , 5 ) ]
493
- public void MassiveBulkOpsFireAndForget ( bool preserveOrder , int threads )
494
- {
495
- using ( var muxer = Create ( syncTimeout : 30000 ) )
496
- {
497
- muxer . PreserveAsyncOrder = preserveOrder ;
498
- #if DEBUG
499
- long oldAlloc = ConnectionMultiplexer . GetResultBoxAllocationCount ( ) ;
500
- #endif
501
- RedisKey key = "MBOF" ;
502
- var conn = muxer . GetDatabase ( ) ;
503
- conn . Ping ( ) ;
504
-
505
- conn . KeyDelete ( key , CommandFlags . FireAndForget ) ;
506
- int perThread = AsyncOpsQty / threads ;
507
- var elapsed = RunConcurrent ( delegate
508
- {
509
- for ( int i = 0 ; i < perThread ; i ++ )
510
- {
511
- conn . StringIncrement ( key , flags : CommandFlags . FireAndForget ) ;
512
- }
513
- conn . Ping ( ) ;
514
- } , threads ) ;
515
- var val = ( long ) conn . StringGet ( key ) ;
516
- Assert . Equal ( perThread * threads , val ) ;
517
-
518
- Output . WriteLine ( "{2}: Time for {0} ops over {5} threads: {1:###,###}ms ({3}); ops/s: {4:###,###,##0}" ,
519
- val , elapsed . TotalMilliseconds , Me ( ) ,
520
- preserveOrder ? "preserve order" : "any order" ,
521
- val / elapsed . TotalSeconds , threads ) ;
522
- #if DEBUG
523
- long newAlloc = ConnectionMultiplexer . GetResultBoxAllocationCount ( ) ;
524
- Output . WriteLine ( "ResultBox allocations: {0}" ,
525
- newAlloc - oldAlloc ) ;
526
- Assert . True ( newAlloc - oldAlloc <= 4 ) ;
527
- #endif
528
- }
529
- }
530
-
531
301
#if DEBUG
532
302
[ Theory ]
533
303
[ InlineData ( true ) ]
0 commit comments