@@ -375,9 +375,9 @@ class EnsembleContext {
375375 size_t inflight_step_counter_;
376376
377377 // Backpressure support: Limits memory growth from decoupled models.
378- // Tracks inflight responses per step; blocks producers when downstream
379- // consumers are overloaded. Only active if max_inflight_responses_ > 0.
380- std::vector<size_t > step_inflight_response_counts_ ;
378+ // Tracks inflight requests per step; blocks producers when downstream
379+ // consumers are overloaded. Only active if max_inflight_requests_ > 0.
380+ std::vector<size_t > step_inflight_request_counts_ ;
381381 std::vector<std::unique_ptr<std::mutex>> step_mutexes_;
382382 std::vector<std::unique_ptr<std::condition_variable>> step_cv_vec_;
383383
@@ -518,9 +518,9 @@ EnsembleContext::EnsembleContext(
518518
519519 // Initialize backpressure tracking if enabled.
520520 size_t num_steps = info_->steps_ .size ();
521- step_inflight_response_counts_ .resize (num_steps, 0 );
521+ step_inflight_request_counts_ .resize (num_steps, 0 );
522522
523- if (info_->max_inflight_responses_ > 0 ) {
523+ if (info_->max_inflight_requests_ > 0 ) {
524524 step_mutexes_.resize (num_steps);
525525 step_cv_vec_.resize (num_steps);
526526
@@ -935,11 +935,11 @@ EnsembleContext::UpdateEnsembleState(
935935
936936 size_t completed_step_idx = completed_step->step_idx_ ;
937937
938- // Decrement step_inflight_response_counts_ , then notify any producer
938+ // Decrement step_inflight_request_counts_ , then notify any producer
939939 // threads blocked waiting for this step's capacity
940- if (info_->max_inflight_responses_ > 0 && !step_cv_vec_.empty ()) {
940+ if (info_->max_inflight_requests_ > 0 && !step_cv_vec_.empty ()) {
941941 std::lock_guard<std::mutex> lk (*step_mutexes_[completed_step_idx]);
942- step_inflight_response_counts_ [completed_step_idx]--;
942+ step_inflight_request_counts_ [completed_step_idx]--;
943943 step_cv_vec_[completed_step_idx]->notify_one ();
944944 }
945945 }
@@ -985,13 +985,6 @@ EnsembleContext::GetNextSteps(
985985 for (const auto & idx : next_step_idx) {
986986 steps->emplace_back ();
987987 RETURN_IF_ERROR (InitStep (idx.first , idx.second , &(steps->back ())));
988-
989- // Track as inflight. Checked by producers for backpressure; decremented on
990- // completion.
991- if (info_->max_inflight_responses_ > 0 && !step_mutexes_.empty ()) {
992- std::lock_guard<std::mutex> lk (*step_mutexes_[idx.first ]);
993- step_inflight_response_counts_[idx.first ]++;
994- }
995988 }
996989 inflight_step_counter_ += steps->size ();
997990
@@ -1436,8 +1429,8 @@ EnsembleContext::ScheduleSteps(
14361429 step->ctx_ = context;
14371430 size_t this_step_idx = step->step_idx_ ;
14381431
1439- // Block if this step is overloaded.
1440- if (context->info_ ->max_inflight_responses_ > 0 &&
1432+ // Apply backpressure to downstream steps only, not the entry step
1433+ if ((this_step_idx != 0 ) && context->info_ ->max_inflight_requests_ > 0 &&
14411434 !context->step_cv_vec_ .empty ()) {
14421435 std::unique_lock<std::mutex> lk (*context->step_mutexes_ [this_step_idx]);
14431436
@@ -1450,8 +1443,8 @@ EnsembleContext::ScheduleSteps(
14501443 bool capacity_available =
14511444 context->step_cv_vec_ [this_step_idx]->wait_for (lk, timeout, [&] {
14521445 return cancelled () ||
1453- (context->step_inflight_response_counts_ [this_step_idx] <
1454- context->info_ ->max_inflight_responses_ );
1446+ (context->step_inflight_request_counts_ [this_step_idx] <
1447+ context->info_ ->max_inflight_requests_ );
14551448 });
14561449
14571450 // Log error only if timeout occurred (not cancellation).
@@ -1460,8 +1453,8 @@ EnsembleContext::ScheduleSteps(
14601453 << context->info_ ->ensemble_name_
14611454 << " ' unable to schedule step " << this_step_idx
14621455 << " (inflight: "
1463- << context->step_inflight_response_counts_ [this_step_idx]
1464- << " >= limit: " << context->info_ ->max_inflight_responses_
1456+ << context->step_inflight_request_counts_ [this_step_idx]
1457+ << " >= limit: " << context->info_ ->max_inflight_requests_
14651458 << " ) for " << kMutexTimeoutSeconds
14661459 << " seconds. Proceeding to avoid deadlock." ;
14671460 }
@@ -1496,6 +1489,15 @@ EnsembleContext::ScheduleSteps(
14961489 std::unique_ptr<InferenceRequest> request = std::move (step->request_ );
14971490 auto step_status = context->is_ ->InferAsync (request);
14981491 if (step_status.IsOk ()) {
1492+ // Increment inflight counter AFTER successful scheduling. Always
1493+ // increment for ALL steps (including step 0) to ensure symmetry with
1494+ // decrement and prevent underflow when steps complete.
1495+ if (context->info_ ->max_inflight_requests_ > 0 &&
1496+ !context->step_mutexes_ .empty ()) {
1497+ std::lock_guard<std::mutex> lk (
1498+ *context->step_mutexes_ [this_step_idx]);
1499+ context->step_inflight_request_counts_ [this_step_idx]++;
1500+ }
14991501 step.release ();
15001502 continue ;
15011503 } else {
@@ -1678,15 +1680,15 @@ EnsembleScheduler::EnsembleScheduler(
16781680 }
16791681 callback_pool_ = is_->EnsembleCallbackPool ();
16801682
1681- // Backpressure configuration from protobuf field. Limits concurrent responses
1683+ // Backpressure configuration from protobuf field. Limits concurrent requests
16821684 // from decoupled steps to prevent memory growth. Value of 0 means unlimited.
16831685 if (config.has_ensemble_scheduling ()) {
1684- info_->max_inflight_responses_ =
1685- config.ensemble_scheduling ().max_inflight_responses ();
1686- if (info_->max_inflight_responses_ > 0 ) {
1686+ info_->max_inflight_requests_ =
1687+ config.ensemble_scheduling ().max_inflight_requests ();
1688+ if (info_->max_inflight_requests_ > 0 ) {
16871689 LOG_INFO << " Ensemble model '" << config.name ()
1688- << " ' configured with max_inflight_responses : "
1689- << info_->max_inflight_responses_ ;
1690+ << " ' configured with max_inflight_requests : "
1691+ << info_->max_inflight_requests_ ;
16901692 }
16911693 }
16921694}
0 commit comments