Skip to content

Commit da299be

Browse files
committed
controller/api: parallelize reconcliation state fetches
1 parent bfd9761 commit da299be

File tree

1 file changed

+47
-42
lines changed

1 file changed

+47
-42
lines changed

src/v/cluster/controller_api.cc

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -356,51 +356,56 @@ controller_api::get_partitions_leader_reconfiguration_state(
356356
const chunked_vector<model::ntp>& partitions,
357357
model::timeout_clock::time_point timeout) {
358358
auto& updates_in_progress = _topics.local().updates_in_progress();
359-
360359
absl::node_hash_map<model::ntp, partition_reconfiguration_state> states;
361-
for (auto& ntp : partitions) {
362-
auto progress_it = updates_in_progress.find(ntp);
363-
if (progress_it == updates_in_progress.end()) {
364-
continue;
365-
}
366-
auto p_as = _topics.local().get_partition_assignment(ntp);
367-
if (!p_as) {
368-
continue;
369-
}
370-
partition_reconfiguration_state state;
371-
state.ntp = ntp;
372-
373-
state.current_assignment = std::move(p_as->replicas);
374-
state.previous_assignment = progress_it->second.get_previous_replicas();
375-
state.state = progress_it->second.get_state();
376-
state.policy = progress_it->second.get_reconfiguration_policy();
377-
378-
auto reconciliation_state
379-
= co_await get_partition_leader_reconciliation_state(ntp, timeout);
380-
if (reconciliation_state.has_value()) {
381-
for (auto& operation :
382-
reconciliation_state.value().pending_operations()) {
383-
if (operation.recovery_state) {
384-
state.current_partition_size
385-
= operation.recovery_state->local_size;
386-
for (auto& [id, recovery_state] :
387-
operation.recovery_state->replicas) {
388-
state.replicas.push_back(
389-
replica_bytes{
390-
.node = id,
391-
.bytes_left = recovery_state.bytes_left,
392-
.bytes_transferred = state.current_partition_size
393-
- recovery_state.bytes_left,
394-
.offset = recovery_state.last_offset,
395-
});
360+
co_await ss::max_concurrent_for_each(
361+
partitions,
362+
16,
363+
[this, &updates_in_progress, &states, timeout](
364+
const model::ntp& ntp) -> ss::future<> {
365+
auto progress_it = updates_in_progress.find(ntp);
366+
if (progress_it == updates_in_progress.end()) {
367+
return ss::now();
368+
}
369+
auto p_as = _topics.local().get_partition_assignment(ntp);
370+
if (!p_as) {
371+
return ss::now();
372+
}
373+
partition_reconfiguration_state state;
374+
state.ntp = ntp;
375+
376+
state.current_assignment = std::move(p_as->replicas);
377+
state.previous_assignment
378+
= progress_it->second.get_previous_replicas();
379+
state.state = progress_it->second.get_state();
380+
state.policy = progress_it->second.get_reconfiguration_policy();
381+
382+
return get_partition_leader_reconciliation_state(ntp, timeout)
383+
.then([&states, &ntp, state = std::move(state)](
384+
result<ntp_reconciliation_state> r) mutable {
385+
if (r.has_value()) {
386+
for (auto& operation : r.value().pending_operations()) {
387+
if (operation.recovery_state) {
388+
state.current_partition_size
389+
= operation.recovery_state->local_size;
390+
for (auto& [id, recovery_state] :
391+
operation.recovery_state->replicas) {
392+
state.replicas.push_back(
393+
replica_bytes{
394+
.node = id,
395+
.bytes_left = recovery_state.bytes_left,
396+
.bytes_transferred
397+
= state.current_partition_size
398+
- recovery_state.bytes_left,
399+
.offset = recovery_state.last_offset,
400+
});
401+
}
402+
}
396403
}
397404
}
398-
}
399-
}
400-
401-
states.emplace(ntp, std::move(state));
402-
}
403-
405+
states.emplace(ntp, std::move(state));
406+
return ss::now();
407+
});
408+
});
404409
chunked_vector<partition_reconfiguration_state> ret;
405410
ret.reserve(states.size());
406411
for (auto& [_, state] : states) {

0 commit comments

Comments
 (0)