Skip to content

Commit 9558ae8

Browse files
committed
Update
1 parent 7b5af66 commit 9558ae8

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

src/ensemble_scheduler/ensemble_scheduler.cc

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -711,17 +711,25 @@ EnsembleContext::ResponseComplete(
711711
*context->step_mutexes_[downstream_step_idx]);
712712

713713
// Block if downstream inflight count >= limit. Timeout to prevent
714-
// potential deadlock. Unblocks when downstream completes a request.
714+
// potential deadlock. Unblocks when downstream completes a request
715+
// or request is cancelled.
715716
auto timeout = std::chrono::seconds(kMutexTimeoutSeconds);
717+
auto cancelled = [&]() {
718+
auto& req = context->request_tracker_->Request();
719+
return (req == nullptr) || req->IsCancelled();
720+
};
721+
716722
bool capacity_available =
717723
context->step_cvs_[downstream_step_idx]->wait_for(
718724
lk, timeout, [&] {
719-
return context->step_inflight_response_counts_
720-
[downstream_step_idx] <
721-
context->info_->max_inflight_responses_;
725+
return cancelled() ||
726+
(context->step_inflight_response_counts_
727+
[downstream_step_idx] <
728+
context->info_->max_inflight_responses_);
722729
});
723730

724-
if (!capacity_available) {
731+
// Log error only if timeout occurred (not cancellation).
732+
if (!capacity_available && !cancelled()) {
725733
LOG_ERROR
726734
<< "[Internal Error] Ensemble '"
727735
<< context->info_->ensemble_name_ << "' step " << this_step_idx
@@ -975,11 +983,12 @@ EnsembleContext::UpdateEnsembleState(
975983
inflight_step_counter_--;
976984

977985
size_t completed_step_idx = completed_step->step_idx_;
978-
step_inflight_response_counts_[completed_step_idx]--;
979986

980-
// Notify any producer threads blocked waiting for this step's capacity
987+
// Decrement step_inflight_response_counts_, then notify any producer
988+
// threads blocked waiting for this step's capacity
981989
if (info_->max_inflight_responses_ > 0 && !step_cvs_.empty()) {
982990
std::lock_guard<std::mutex> lk(*step_mutexes_[completed_step_idx]);
991+
step_inflight_response_counts_[completed_step_idx]--;
983992
step_cvs_[completed_step_idx]->notify_one();
984993
}
985994
}
@@ -1028,7 +1037,10 @@ EnsembleContext::GetNextSteps(
10281037

10291038
// Track as inflight. Checked by producers for backpressure; decremented on
10301039
// completion.
1031-
step_inflight_response_counts_[idx.first]++;
1040+
if (info_->max_inflight_responses_ > 0 && !step_mutexes_.empty()) {
1041+
std::lock_guard<std::mutex> lk(*step_mutexes_[idx.first]);
1042+
step_inflight_response_counts_[idx.first]++;
1043+
}
10321044
}
10331045
inflight_step_counter_ += steps->size();
10341046

0 commit comments

Comments
 (0)