@@ -385,6 +385,8 @@ private void PushCurrentBuffer()
385
385
currentBufferIndex = 0 ;
386
386
}
387
387
388
+ public Action OnBeforeFlush { get ; set ; }
389
+
388
390
public bool FlushSendBuffer ( )
389
391
{
390
392
try
@@ -394,6 +396,9 @@ public bool FlushSendBuffer()
394
396
395
397
if ( cmdBuffer . Count > 0 )
396
398
{
399
+ if ( OnBeforeFlush != null )
400
+ OnBeforeFlush ( ) ;
401
+
397
402
if ( ! Env . IsMono && sslStream == null )
398
403
{
399
404
socket . Send ( cmdBuffer ) ; //Optimized for Windows
@@ -462,132 +467,83 @@ private int SafeReadByte()
462
467
}
463
468
}
464
469
465
- protected void SendExpectSuccess ( params byte [ ] [ ] cmdWithBinaryArgs )
470
+ protected T SendReceive < T > ( byte [ ] [ ] cmdWithBinaryArgs , Func < T > fn , Action < Func < T > > completePipelineFn = null )
466
471
{
467
472
if ( ! SendCommand ( cmdWithBinaryArgs ) )
468
473
throw CreateConnectionError ( ) ;
469
474
470
475
if ( Pipeline != null )
471
476
{
472
- Pipeline . CompleteVoidQueuedCommand ( ExpectSuccess ) ;
473
- return ;
474
- }
475
- ExpectSuccess ( ) ;
476
- }
477
-
478
- protected long SendExpectLong ( params byte [ ] [ ] cmdWithBinaryArgs )
479
- {
480
- if ( ! SendCommand ( cmdWithBinaryArgs ) )
481
- throw CreateConnectionError ( ) ;
477
+ if ( completePipelineFn == null )
478
+ throw new NotSupportedException ( "Pipeline is not supported." ) ;
482
479
483
- if ( Pipeline != null )
484
- {
485
- Pipeline . CompleteLongQueuedCommand ( ReadLong ) ;
486
- return default ( long ) ;
480
+ completePipelineFn ( fn ) ;
481
+ return default ( T ) ;
487
482
}
488
- return ReadLong ( ) ;
483
+ return fn ( ) ;
489
484
}
490
485
491
- protected byte [ ] SendExpectData ( params byte [ ] [ ] cmdWithBinaryArgs )
486
+ protected void SendReceiveVoid ( byte [ ] [ ] cmdWithBinaryArgs , Action fn , Action < Action > completePipelineFn )
492
487
{
493
488
if ( ! SendCommand ( cmdWithBinaryArgs ) )
494
489
throw CreateConnectionError ( ) ;
495
490
496
491
if ( Pipeline != null )
497
492
{
498
- Pipeline . CompleteBytesQueuedCommand ( ReadData ) ;
499
- return null ;
493
+ if ( completePipelineFn == null )
494
+ throw new NotSupportedException ( "Pipeline is not supported." ) ;
495
+
496
+ completePipelineFn ( fn ) ;
497
+ return ;
500
498
}
501
- return ReadData ( ) ;
499
+ fn ( ) ;
502
500
}
503
501
504
- protected string SendExpectString ( params byte [ ] [ ] cmdWithBinaryArgs )
502
+ protected void SendExpectSuccess ( params byte [ ] [ ] cmdWithBinaryArgs )
505
503
{
506
- var bytes = SendExpectData ( cmdWithBinaryArgs ) ;
507
- return bytes . FromUtf8Bytes ( ) ;
504
+ SendReceiveVoid ( cmdWithBinaryArgs , ExpectSuccess , Pipeline != null ? Pipeline . CompleteVoidQueuedCommand : ( Action < Action > ) null ) ;
508
505
}
509
506
510
- protected double SendExpectDouble ( params byte [ ] [ ] cmdWithBinaryArgs )
507
+ protected long SendExpectLong ( params byte [ ] [ ] cmdWithBinaryArgs )
511
508
{
512
- if ( ! SendCommand ( cmdWithBinaryArgs ) )
513
- throw CreateConnectionError ( ) ;
514
-
515
- if ( Pipeline != null )
516
- {
517
- Pipeline . CompleteDoubleQueuedCommand ( ReadDouble ) ;
518
- return Double . NaN ;
519
- }
520
-
521
- return ReadDouble ( ) ;
509
+ return SendReceive ( cmdWithBinaryArgs , ReadLong , Pipeline != null ? Pipeline . CompleteLongQueuedCommand : ( Action < Func < long > > ) null ) ;
522
510
}
523
511
524
- public double ReadDouble ( )
512
+ protected byte [ ] SendExpectData ( params byte [ ] [ ] cmdWithBinaryArgs )
525
513
{
526
- var bytes = ReadData ( ) ;
527
- return ( bytes == null ) ? double . NaN : ParseDouble ( bytes ) ;
514
+ return SendReceive ( cmdWithBinaryArgs , ReadData , Pipeline != null ? Pipeline . CompleteBytesQueuedCommand : ( Action < Func < byte [ ] > > ) null ) ;
528
515
}
529
516
530
- public static double ParseDouble ( byte [ ] doubleBytes )
517
+ protected double SendExpectDouble ( params byte [ ] [ ] cmdWithBinaryArgs )
531
518
{
532
- var doubleString = Encoding . UTF8 . GetString ( doubleBytes ) ;
533
-
534
- double d ;
535
- double . TryParse ( doubleString , NumberStyles . Any , CultureInfo . InvariantCulture . NumberFormat , out d ) ;
536
-
537
- return d ;
519
+ return SendReceive ( cmdWithBinaryArgs , ReadDouble , Pipeline != null ? Pipeline . CompleteDoubleQueuedCommand : ( Action < Func < double > > ) null ) ;
538
520
}
539
521
540
522
protected string SendExpectCode ( params byte [ ] [ ] cmdWithBinaryArgs )
541
523
{
542
- if ( ! SendCommand ( cmdWithBinaryArgs ) )
543
- throw CreateConnectionError ( ) ;
544
-
545
- if ( Pipeline != null )
546
- {
547
- Pipeline . CompleteStringQueuedCommand ( ExpectCode ) ;
548
- return null ;
549
- }
550
-
551
- return ExpectCode ( ) ;
524
+ return SendReceive ( cmdWithBinaryArgs , ExpectCode , Pipeline != null ? Pipeline . CompleteStringQueuedCommand : ( Action < Func < string > > ) null ) ;
552
525
}
553
526
554
527
protected byte [ ] [ ] SendExpectMultiData ( params byte [ ] [ ] cmdWithBinaryArgs )
555
528
{
556
- if ( ! SendCommand ( cmdWithBinaryArgs ) )
557
- throw CreateConnectionError ( ) ;
558
-
559
- if ( Pipeline != null )
560
- {
561
- Pipeline . CompleteMultiBytesQueuedCommand ( ReadMultiData ) ;
562
- return new byte [ 0 ] [ ] ;
563
- }
564
- return ReadMultiData ( ) ;
529
+ return SendReceive ( cmdWithBinaryArgs , ReadMultiData , Pipeline != null ? Pipeline . CompleteMultiBytesQueuedCommand : ( Action < Func < byte [ ] [ ] > > ) null )
530
+ ?? new byte [ 0 ] [ ] ;
565
531
}
566
532
567
533
protected object [ ] SendExpectDeeplyNestedMultiData ( params byte [ ] [ ] cmdWithBinaryArgs )
568
534
{
569
- if ( ! SendCommand ( cmdWithBinaryArgs ) )
570
- throw CreateConnectionError ( ) ;
571
-
572
- if ( Pipeline != null )
573
- {
574
- throw new NotSupportedException ( "Pipeline is not supported." ) ;
575
- }
576
-
577
- return ReadDeeplyNestedMultiData ( ) ;
535
+ return SendReceive ( cmdWithBinaryArgs , ReadDeeplyNestedMultiData ) ;
578
536
}
579
537
580
538
protected RedisData SendExpectComplexResponse ( params byte [ ] [ ] cmdWithBinaryArgs )
581
539
{
582
- if ( ! SendCommand ( cmdWithBinaryArgs ) )
583
- throw CreateConnectionError ( ) ;
584
-
585
- if ( Pipeline != null )
586
- {
587
- throw new NotSupportedException ( "Pipeline is not supported." ) ;
588
- }
540
+ return SendReceive ( cmdWithBinaryArgs , ReadComplexResponse ) ;
541
+ }
589
542
590
- return ReadComplexResponse ( ) ;
543
+ protected string SendExpectString ( params byte [ ] [ ] cmdWithBinaryArgs )
544
+ {
545
+ var bytes = SendExpectData ( cmdWithBinaryArgs ) ;
546
+ return bytes . FromUtf8Bytes ( ) ;
591
547
}
592
548
593
549
protected void Log ( string fmt , params object [ ] args )
@@ -706,6 +662,22 @@ public long ReadLong()
706
662
throw CreateResponseError ( "Unknown reply on integer response: " + c + s ) ;
707
663
}
708
664
665
+ public double ReadDouble ( )
666
+ {
667
+ var bytes = ReadData ( ) ;
668
+ return ( bytes == null ) ? double . NaN : ParseDouble ( bytes ) ;
669
+ }
670
+
671
+ public static double ParseDouble ( byte [ ] doubleBytes )
672
+ {
673
+ var doubleString = Encoding . UTF8 . GetString ( doubleBytes ) ;
674
+
675
+ double d ;
676
+ double . TryParse ( doubleString , NumberStyles . Any , CultureInfo . InvariantCulture . NumberFormat , out d ) ;
677
+
678
+ return d ;
679
+ }
680
+
709
681
private byte [ ] ReadData ( )
710
682
{
711
683
var r = ReadLine ( ) ;
0 commit comments