@@ -694,55 +694,6 @@ EnsembleContext::ResponseComplete(
694694 auto pool = step_raw_ptr->ctx_ ->CallbackPool ();
695695 auto fn = [response, flags, step_raw_ptr]() {
696696 auto step_ptr = std::unique_ptr<Step>(step_raw_ptr);
697- auto & context = step_ptr->ctx_ ;
698- size_t this_step_idx = step_ptr->step_idx_ ;
699- const auto & istep = context->info_ ->steps_ [this_step_idx];
700-
701- // Block this producer if downstream consumers are overloaded.
702- // Prevents memory exhaustion by limiting concurrent inflight responses.
703- if (context->info_ ->max_inflight_responses_ > 0 &&
704- !context->step_cv_vec_ .empty ()) {
705- for (const auto & output_pair : istep.output_to_tensor_ ) {
706- const auto & tensor_name = output_pair.second ;
707- const auto & downstream_steps = (*context->tensor_to_step_ )[tensor_name];
708-
709- for (const auto & downstream_step_idx : downstream_steps) {
710- std::unique_lock<std::mutex> lk (
711- *context->step_mutexes_ [downstream_step_idx]);
712-
713- // Block if downstream inflight count >= limit. Timeout to prevent
714- // potential deadlock. Unblocks when downstream completes a request
715- // or request is cancelled.
716- auto timeout = std::chrono::seconds (kMutexTimeoutSeconds );
717- auto cancelled = [&]() {
718- auto & req = context->request_tracker_ ->Request ();
719- return (req == nullptr ) || req->IsCancelled ();
720- };
721-
722- bool capacity_available =
723- context->step_cv_vec_ [downstream_step_idx]->wait_for (
724- lk, timeout, [&] {
725- return cancelled () ||
726- (context->step_inflight_response_counts_
727- [downstream_step_idx] <
728- context->info_ ->max_inflight_responses_ );
729- });
730-
731- // Log error only if timeout occurred (not cancellation).
732- if (!capacity_available && !cancelled ()) {
733- LOG_ERROR
734- << " [Internal Error] Ensemble '"
735- << context->info_ ->ensemble_name_ << " ' step " << this_step_idx
736- << " blocked waiting for downstream step "
737- << downstream_step_idx << " (inflight: "
738- << context->step_inflight_response_counts_ [downstream_step_idx]
739- << " >= limit: " << context->info_ ->max_inflight_responses_
740- << " ) for " << kMutexTimeoutSeconds
741- << " seconds. Proceeding to avoid deadlock." ;
742- }
743- }
744- }
745- }
746697 step_ptr->response_flags_ = flags;
747698 step_ptr->response_ = response;
748699
@@ -1483,6 +1434,39 @@ EnsembleContext::ScheduleSteps(
14831434{
14841435 for (auto & step : steps) {
14851436 step->ctx_ = context;
1437+ size_t this_step_idx = step->step_idx_ ;
1438+
1439+ // Block if this step is overloaded.
1440+ if (context->info_ ->max_inflight_responses_ > 0 &&
1441+ !context->step_cv_vec_ .empty ()) {
1442+ std::unique_lock<std::mutex> lk (*context->step_mutexes_ [this_step_idx]);
1443+
1444+ auto timeout = std::chrono::seconds (kMutexTimeoutSeconds );
1445+ auto cancelled = [&]() {
1446+ auto & req = context->request_tracker_ ->Request ();
1447+ return (req == nullptr ) || req->IsCancelled ();
1448+ };
1449+
1450+ bool capacity_available = context->step_cv_vec_ [this_step_idx]->wait_for (
1451+ lk, timeout, [&] {
1452+ return cancelled () ||
1453+ (context->step_inflight_response_counts_ [this_step_idx] <
1454+ context->info_ ->max_inflight_responses_ );
1455+ });
1456+
1457+ // Log error only if timeout occurred (not cancellation).
1458+ if (!capacity_available && !cancelled ()) {
1459+ LOG_ERROR << " [Internal Error] Ensemble '"
1460+ << context->info_ ->ensemble_name_
1461+ << " ' unable to schedule step " << this_step_idx
1462+ << " (inflight: "
1463+ << context->step_inflight_response_counts_ [this_step_idx]
1464+ << " >= limit: " << context->info_ ->max_inflight_responses_
1465+ << " ) for " << kMutexTimeoutSeconds
1466+ << " seconds. Proceeding to avoid deadlock." ;
1467+ }
1468+ }
1469+
14861470 bool should_schedule = false ;
14871471 // Must release lock before InferAsync to avoid deadlock, as the same thread
14881472 // will be calling request/response callbacks on cache hits, which will
0 commit comments