Skip to content

Commit 2bff78b

Browse files
committed
Move max_inflight_responses to config.proto
1 parent 4befcd9 commit 2bff78b

File tree

2 files changed

+20
-61
lines changed

2 files changed

+20
-61
lines changed

src/ensemble_scheduler/ensemble_scheduler.cc

Lines changed: 16 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ class EnsembleContext {
379379
// consumers are overloaded. Only active if max_inflight_responses_ > 0.
380380
std::vector<size_t> step_inflight_response_counts_;
381381
std::vector<std::unique_ptr<std::mutex>> step_mutexes_;
382-
std::vector<std::unique_ptr<std::condition_variable>> step_cvs_;
382+
std::vector<std::unique_ptr<std::condition_variable>> step_cv_vec_;
383383

384384
// pointer that either points to 'pruned_tensor_to_step_' or to
385385
// 'info_->tensor_to_step_' if all ensemble outputs are requested
@@ -522,11 +522,11 @@ EnsembleContext::EnsembleContext(
522522

523523
if (info_->max_inflight_responses_ > 0) {
524524
step_mutexes_.resize(num_steps);
525-
step_cvs_.resize(num_steps);
525+
step_cv_vec_.resize(num_steps);
526526

527527
for (size_t i = 0; i < num_steps; i++) {
528528
step_mutexes_[i] = std::make_unique<std::mutex>();
529-
step_cvs_[i] = std::make_unique<std::condition_variable>();
529+
step_cv_vec_[i] = std::make_unique<std::condition_variable>();
530530
}
531531
}
532532

@@ -701,7 +701,7 @@ EnsembleContext::ResponseComplete(
701701
// Block this producer if downstream consumers are overloaded.
702702
// Prevents memory exhaustion by limiting concurrent inflight responses.
703703
if (context->info_->max_inflight_responses_ > 0 &&
704-
!context->step_cvs_.empty()) {
704+
!context->step_cv_vec_.empty()) {
705705
for (const auto& output_pair : istep.output_to_tensor_) {
706706
const auto& tensor_name = output_pair.second;
707707
const auto& downstream_steps = (*context->tensor_to_step_)[tensor_name];
@@ -720,7 +720,7 @@ EnsembleContext::ResponseComplete(
720720
};
721721

722722
bool capacity_available =
723-
context->step_cvs_[downstream_step_idx]->wait_for(
723+
context->step_cv_vec_[downstream_step_idx]->wait_for(
724724
lk, timeout, [&] {
725725
return cancelled() ||
726726
(context->step_inflight_response_counts_
@@ -986,10 +986,10 @@ EnsembleContext::UpdateEnsembleState(
986986

987987
// Decrement step_inflight_response_counts_, then notify any producer
988988
// threads blocked waiting for this step's capacity
989-
if (info_->max_inflight_responses_ > 0 && !step_cvs_.empty()) {
989+
if (info_->max_inflight_responses_ > 0 && !step_cv_vec_.empty()) {
990990
std::lock_guard<std::mutex> lk(*step_mutexes_[completed_step_idx]);
991991
step_inflight_response_counts_[completed_step_idx]--;
992-
step_cvs_[completed_step_idx]->notify_one();
992+
step_cv_vec_[completed_step_idx]->notify_one();
993993
}
994994
}
995995
RETURN_IF_ERROR(ConsumeResponse(completed_step));
@@ -1536,52 +1536,12 @@ EnsembleContext::ScheduleSteps(
15361536

15371537
} // namespace
15381538

1539-
Status
1540-
EnsembleScheduler::ValidateConfig(const inference::ModelConfig& config)
1541-
{
1542-
// Validate max_ensemble_inflight_responses parameter if present
1543-
if (config.parameters().contains("max_ensemble_inflight_responses")) {
1544-
const auto& param =
1545-
config.parameters().at("max_ensemble_inflight_responses");
1546-
const std::string& value = param.string_value();
1547-
1548-
try {
1549-
const int parsed = std::stoi(value);
1550-
if (parsed <= 0) {
1551-
return Status(
1552-
Status::Code::INVALID_ARG,
1553-
"Invalid 'max_ensemble_inflight_responses' for ensemble model '" +
1554-
config.name() + "': value must be positive, got " +
1555-
std::to_string(parsed));
1556-
}
1557-
}
1558-
catch (const std::out_of_range& e) {
1559-
return Status(
1560-
Status::Code::INVALID_ARG,
1561-
"Invalid 'max_ensemble_inflight_responses' for ensemble model '" +
1562-
config.name() + "': value exceeds maximum allowed (" +
1563-
std::to_string(INT_MAX) + ")");
1564-
}
1565-
catch (const std::invalid_argument& e) {
1566-
return Status(
1567-
Status::Code::INVALID_ARG,
1568-
"Invalid 'max_ensemble_inflight_responses' for ensemble model '" +
1569-
config.name() + "': cannot parse value '" + value + "'");
1570-
}
1571-
}
1572-
1573-
return Status::Success;
1574-
}
1575-
15761539
Status
15771540
EnsembleScheduler::Create(
15781541
InferenceStatsAggregator* const stats_aggregator,
15791542
InferenceServer* const server, const ModelIdentifier& model_id,
15801543
const inference::ModelConfig& config, std::unique_ptr<Scheduler>* scheduler)
15811544
{
1582-
// Validate configuration before constructing scheduler
1583-
RETURN_IF_ERROR(ValidateConfig(config));
1584-
15851545
scheduler->reset(
15861546
new EnsembleScheduler(stats_aggregator, server, model_id, config));
15871547
return Status::Success;
@@ -1734,17 +1694,16 @@ EnsembleScheduler::EnsembleScheduler(
17341694
}
17351695
callback_pool_ = is_->EnsembleCallbackPool();
17361696

1737-
// Parse backpressure configuration. Limits concurrent responses from
1738-
// decoupled steps to prevent memory growth.
1739-
// Configuration is already validated in ValidateConfig()
1740-
if (config.parameters().contains("max_ensemble_inflight_responses")) {
1741-
const auto& param =
1742-
config.parameters().at("max_ensemble_inflight_responses");
1697+
// Backpressure configuration from protobuf field. Limits concurrent responses
1698+
// from decoupled steps to prevent memory growth. Value of 0 means unlimited.
1699+
if (config.has_ensemble_scheduling()) {
17431700
info_->max_inflight_responses_ =
1744-
static_cast<size_t>(std::stoi(param.string_value()));
1745-
LOG_INFO << "Ensemble model '" << config.name()
1746-
<< "' configured with max_ensemble_inflight_responses: "
1747-
<< info_->max_inflight_responses_;
1701+
config.ensemble_scheduling().max_inflight_responses();
1702+
if (info_->max_inflight_responses_ > 0) {
1703+
LOG_INFO << "Ensemble model '" << config.name()
1704+
<< "' configured with max_inflight_responses: "
1705+
<< info_->max_inflight_responses_;
1706+
}
17481707
}
17491708
}
17501709

src/ensemble_scheduler/ensemble_scheduler.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ struct EnsembleInfo {
8484
// backward path, ensemble tensor to the step that provides its data
8585
std::unordered_map<std::string, size_t> tensor_to_prev_step_;
8686

87-
// Maximum concurrent inflight responses from steps to downstream
88-
// consumers. Prevents memory growth by blocking producers when limit reached.
89-
// Value of 0 means unlimited (default). Configured via parameter
90-
// 'max_ensemble_inflight_responses' in ensemble config.pbtxt.
87+
// Maximum concurrent inflight responses from steps to downstream consumers.
88+
// Prevents memory growth by blocking producers when limit is reached.
89+
// Default value is 0, which indicates unlimited (no backpressure applied).
90+
// Configured via 'max_inflight_responses' parameter in config.pbtxt.
9191
size_t max_inflight_responses_ = 0;
9292
};
9393

0 commit comments

Comments
 (0)