@@ -400,13 +400,13 @@ private void PushCurrentBuffer()
400
400
401
401
public Action OnBeforeFlush { get ; set ; }
402
402
403
- public void FlushAndResetSendBuffer ( )
403
+ internal void FlushAndResetSendBuffer ( )
404
404
{
405
405
FlushSendBuffer ( ) ;
406
406
ResetSendBuffer ( ) ;
407
407
}
408
408
409
- public void FlushSendBuffer ( )
409
+ internal void FlushSendBuffer ( )
410
410
{
411
411
if ( currentBufferIndex > 0 )
412
412
PushCurrentBuffer ( ) ;
@@ -458,7 +458,10 @@ private int SafeReadByte()
458
458
return Bstream . ReadByte ( ) ;
459
459
}
460
460
461
- protected T SendReceive < T > ( byte [ ] [ ] cmdWithBinaryArgs , Func < T > fn , Action < Func < T > > completePipelineFn = null )
461
+ protected T SendReceive < T > ( byte [ ] [ ] cmdWithBinaryArgs ,
462
+ Func < T > fn ,
463
+ Action < Func < T > > completePipelineFn = null ,
464
+ bool sendWithoutRead = false )
462
465
{
463
466
var i = 0 ;
464
467
Exception originalEx = null ;
@@ -478,7 +481,7 @@ protected T SendReceive<T>(byte[][] cmdWithBinaryArgs, Func<T> fn, Action<Func<T
478
481
{
479
482
FlushSendBuffer ( ) ;
480
483
}
481
- else
484
+ else if ( ! sendWithoutRead )
482
485
{
483
486
if ( completePipelineFn == null )
484
487
throw new NotSupportedException ( "Pipeline is not supported." ) ;
@@ -487,12 +490,15 @@ protected T SendReceive<T>(byte[][] cmdWithBinaryArgs, Func<T> fn, Action<Func<T
487
490
return default ( T ) ;
488
491
}
489
492
490
- var result = fn ( ) ;
493
+ var result = default ( T ) ;
494
+ if ( fn != null )
495
+ result = fn ( ) ;
491
496
492
497
if ( Pipeline == null )
493
- {
494
498
ResetSendBuffer ( ) ;
495
- }
499
+
500
+ if ( i > 0 )
501
+ Interlocked . Increment ( ref RedisState . TotalRetrySuccess ) ;
496
502
497
503
return result ;
498
504
}
@@ -512,70 +518,15 @@ protected T SendReceive<T>(byte[][] cmdWithBinaryArgs, Func<T> fn, Action<Func<T
512
518
513
519
var retry = DateTime . UtcNow - firstAttempt < retryTimeout ;
514
520
if ( ! retry )
515
- throw CreateRetryTimeoutException ( retryTimeout , originalEx ) ;
516
-
517
- Thread . Sleep ( GetBackOffMultiplier ( ++ i ) ) ;
518
- }
519
- }
520
- }
521
-
522
- protected void SendReceiveVoid ( byte [ ] [ ] cmdWithBinaryArgs , Action fn , Action < Action > completePipelineFn , bool sendWithoutRead = false )
523
- {
524
- var i = 0 ;
525
- Exception originalEx = null ;
526
-
527
- var firstAttempt = DateTime . UtcNow ;
528
-
529
- while ( true )
530
- {
531
- try
532
- {
533
- TryConnectIfNeeded ( ) ;
534
-
535
- if ( i == 0 ) //only write to buffer once
536
- WriteCommandToSendBuffer ( cmdWithBinaryArgs ) ;
537
-
538
- if ( Pipeline == null ) //pipeline will handle flush if in pipeline
539
- {
540
- FlushSendBuffer ( ) ;
541
- }
542
- else if ( ! sendWithoutRead )
543
- {
544
- if ( completePipelineFn == null )
545
- throw new NotSupportedException ( "Pipeline is not supported." ) ;
546
-
547
- completePipelineFn ( fn ) ;
548
- return ;
549
- }
550
-
551
- if ( fn != null )
552
- fn ( ) ;
553
-
554
- if ( Pipeline == null )
555
521
{
556
- ResetSendBuffer ( ) ;
557
- }
558
-
559
- return ;
560
- }
561
- catch ( Exception outerEx )
562
- {
563
- var retryableEx = outerEx as RedisRetryableException ;
522
+ if ( Pipeline == null )
523
+ ResetSendBuffer ( ) ;
564
524
565
- if ( outerEx is RedisException && retryableEx == null )
566
- throw ;
567
-
568
- var ex = retryableEx ?? GetRetryableException ( outerEx ) ;
569
- if ( ex == null )
570
- throw CreateConnectionError ( ) ;
571
-
572
- if ( originalEx == null )
573
- originalEx = ex ;
574
-
575
- var retry = DateTime . UtcNow - firstAttempt < retryTimeout ;
576
- if ( ! retry )
525
+ Interlocked . Increment ( ref RedisState . TotalRetryTimedout ) ;
577
526
throw CreateRetryTimeoutException ( retryTimeout , originalEx ) ;
527
+ }
578
528
529
+ Interlocked . Increment ( ref RedisState . TotalRetryCount ) ;
579
530
Thread . Sleep ( GetBackOffMultiplier ( ++ i ) ) ;
580
531
}
581
532
}
@@ -616,12 +567,17 @@ private static int GetBackOffMultiplier(int i)
616
567
617
568
protected void SendWithoutRead ( params byte [ ] [ ] cmdWithBinaryArgs )
618
569
{
619
- SendReceiveVoid ( cmdWithBinaryArgs , null , null , sendWithoutRead : true ) ;
570
+ SendReceive < long > ( cmdWithBinaryArgs , null , null , sendWithoutRead : true ) ;
620
571
}
621
572
622
573
protected void SendExpectSuccess ( params byte [ ] [ ] cmdWithBinaryArgs )
623
574
{
624
- SendReceiveVoid ( cmdWithBinaryArgs , ExpectSuccess , Pipeline != null ? Pipeline . CompleteVoidQueuedCommand : ( Action < Action > ) null ) ;
575
+ //Turn Action into Func Hack
576
+ var completePipelineFn = Pipeline != null
577
+ ? f => { Pipeline . CompleteVoidQueuedCommand ( ( ) => f ( ) ) ; }
578
+ : ( Action < Func < long > > ) null ;
579
+
580
+ SendReceive ( cmdWithBinaryArgs , ExpectSuccessFn , completePipelineFn ) ;
625
581
}
626
582
627
583
protected long SendExpectLong ( params byte [ ] [ ] cmdWithBinaryArgs )
@@ -699,6 +655,13 @@ protected void CmdLog(byte[][] args)
699
655
log . Debug ( "S: " + this . lastCommand ) ;
700
656
}
701
657
658
+ //Turn Action into Func Hack
659
+ protected long ExpectSuccessFn ( )
660
+ {
661
+ ExpectSuccess ( ) ;
662
+ return 0 ;
663
+ }
664
+
702
665
protected void ExpectSuccess ( )
703
666
{
704
667
int c = SafeReadByte ( ) ;
0 commit comments