@@ -422,6 +422,11 @@ struct BiDiChannelState {
422
422
bool sinkAlive () const {
423
423
return state_ != State::OnlyStreamOpen && state_ != State::Closed;
424
424
}
425
+ bool anyAlive () const { return state_ != State::Closed; }
426
+ bool bothAlive () const {
427
+ return state_ == State::StreamAndSinkOpen ||
428
+ state_ == State::AwaitingFirstResponse;
429
+ }
425
430
426
431
BiDiChannelState& markFirstResponseSent () {
427
432
if (state_ == State::AwaitingFirstResponse) {
@@ -467,13 +472,12 @@ class BiDiServerCallback {
467
472
public:
468
473
virtual ~BiDiServerCallback () = default ;
469
474
470
- FOLLY_NODISCARD virtual BiDiChannelState onStreamRequestN (uint64_t ) = 0;
471
- FOLLY_NODISCARD virtual BiDiChannelState onStreamCancel () = 0;
475
+ FOLLY_NODISCARD virtual bool onStreamRequestN (uint64_t ) = 0;
476
+ FOLLY_NODISCARD virtual bool onStreamCancel () = 0;
472
477
473
- FOLLY_NODISCARD virtual BiDiChannelState onSinkNext (StreamPayload&&) = 0;
474
- FOLLY_NODISCARD virtual BiDiChannelState onSinkError (
475
- folly::exception_wrapper) = 0;
476
- FOLLY_NODISCARD virtual BiDiChannelState onSinkComplete () = 0;
478
+ FOLLY_NODISCARD virtual bool onSinkNext (StreamPayload&&) = 0;
479
+ FOLLY_NODISCARD virtual bool onSinkError (folly::exception_wrapper) = 0;
480
+ FOLLY_NODISCARD virtual bool onSinkComplete () = 0;
477
481
478
482
virtual void resetClientCallback (BiDiClientCallback&) = 0;
479
483
};
@@ -482,26 +486,30 @@ class BiDiClientCallback {
482
486
public:
483
487
virtual ~BiDiClientCallback () = default ;
484
488
485
- FOLLY_NODISCARD virtual BiDiChannelState onFirstResponse (
489
+ FOLLY_NODISCARD virtual bool onFirstResponse (
486
490
FirstResponsePayload&&, folly::EventBase*, BiDiServerCallback*) = 0;
487
491
virtual void onFirstResponseError (folly::exception_wrapper) = 0;
488
492
489
- FOLLY_NODISCARD virtual BiDiChannelState onStreamNext (StreamPayload&&) = 0;
490
- FOLLY_NODISCARD virtual BiDiChannelState onStreamError (
491
- folly::exception_wrapper) = 0;
492
- FOLLY_NODISCARD virtual BiDiChannelState onStreamComplete () = 0;
493
+ FOLLY_NODISCARD virtual bool onStreamNext (StreamPayload&&) = 0;
494
+ FOLLY_NODISCARD virtual bool onStreamError (folly::exception_wrapper) = 0;
495
+ FOLLY_NODISCARD virtual bool onStreamComplete () = 0;
493
496
494
- FOLLY_NODISCARD virtual BiDiChannelState onSinkRequestN (uint64_t ) = 0;
495
- FOLLY_NODISCARD virtual BiDiChannelState onSinkCancel () = 0;
497
+ FOLLY_NODISCARD virtual bool onSinkRequestN (uint64_t ) = 0;
498
+ FOLLY_NODISCARD virtual bool onSinkCancel () = 0;
496
499
497
500
virtual void resetServerCallback (BiDiServerCallback&) = 0;
498
501
};
499
502
500
503
struct BiDiServerCallbackCancel {
501
504
void operator ()(BiDiServerCallback* biDiServerCallback) noexcept {
502
- auto state = biDiServerCallback->onStreamCancel ();
503
- // This should be handled as an early close.
504
- DCHECK (!state.sinkAlive () && !state.streamAlive ());
505
+ auto alive = biDiServerCallback->onStreamCancel ();
506
+ if (alive) {
507
+ TApplicationException ex (
508
+ TApplicationException::TApplicationExceptionType::INTERRUPTION,
509
+ " BiDi server callback canceled" );
510
+ alive = biDiServerCallback->onSinkError (std::move (ex));
511
+ }
512
+ DCHECK (!alive);
505
513
}
506
514
};
507
515
0 commit comments