2020#include " cluster/partition_balancer_backend.h"
2121#include " cluster/partition_balancer_rpc_service.h"
2222#include " cluster/partition_balancer_types.h"
23+ #include " cluster/partition_leaders_table.h"
2324#include " cluster/partition_manager.h"
2425#include " cluster/shard_table.h"
2526#include " cluster/topic_table.h"
@@ -49,6 +50,7 @@ controller_api::controller_api(
4950 ss::sharded<members_table>& members,
5051 ss::sharded<partition_balancer_backend>& partition_balancer,
5152 ss::sharded<partition_manager>& partition_manager,
53+ ss::sharded<partition_leaders_table>& partition_leaders,
5254 ss::sharded<ss::abort_source>& as)
5355 : _self(self)
5456 , _backend(backend)
@@ -58,6 +60,7 @@ controller_api::controller_api(
5860 , _members(members)
5961 , _partition_balancer(partition_balancer)
6062 , _partition_manager(partition_manager)
63+ , _partition_leaders(partition_leaders)
6164 , _as(as) {}
6265
6366ss::future<chunked_vector<ntp_reconciliation_state>>
@@ -283,6 +286,22 @@ controller_api::get_reconciliation_state(
283286 });
284287}
285288
289+ ss::future<result<ntp_reconciliation_state>>
290+ controller_api::get_partition_leader_reconciliation_state (
291+ model::ntp ntp, model::timeout_clock::time_point timeout) {
292+ auto leader = _partition_leaders.local ().get_leader (ntp);
293+ if (!leader) {
294+ vlog (
295+ clusterlog.debug ,
296+ " can't get partition leader for ntp {} to get its reconciliation "
297+ " state" ,
298+ ntp);
299+ co_return result<ntp_reconciliation_state>(errc::no_leader_controller);
300+ }
301+ co_return co_await get_reconciliation_state (
302+ *leader, std::move (ntp), timeout);
303+ }
304+
286305ss::future<result<ntp_reconciliation_state>>
287306controller_api::get_reconciliation_state (
288307 model::node_id id, model::ntp ntp, model::timeout_clock::time_point timeout) {
@@ -333,51 +352,60 @@ ss::future<std::error_code> controller_api::wait_for_topic(
333352}
334353
335354ss::future<result<chunked_vector<partition_reconfiguration_state>>>
336- controller_api::get_partitions_reconfiguration_state (
355+ controller_api::get_partitions_leader_reconfiguration_state (
337356 const chunked_vector<model::ntp>& partitions,
338- model::timeout_clock::time_point) {
357+ model::timeout_clock::time_point timeout ) {
339358 auto & updates_in_progress = _topics.local ().updates_in_progress ();
340-
341359 absl::node_hash_map<model::ntp, partition_reconfiguration_state> states;
342- for (auto & ntp : partitions) {
343- auto progress_it = updates_in_progress.find (ntp);
344- if (progress_it == updates_in_progress.end ()) {
345- continue ;
346- }
347- auto p_as = _topics.local ().get_partition_assignment (ntp);
348- if (!p_as) {
349- continue ;
350- }
351- partition_reconfiguration_state state;
352- state.ntp = ntp;
353-
354- state.current_assignment = std::move (p_as->replicas );
355- state.previous_assignment = progress_it->second .get_previous_replicas ();
356- state.state = progress_it->second .get_state ();
357- state.policy = progress_it->second .get_reconfiguration_policy ();
358-
359- auto reconciliation_state = co_await get_reconciliation_state (ntp);
360- for (auto & operation : reconciliation_state.pending_operations ()) {
361- if (operation.recovery_state ) {
362- state.current_partition_size
363- = operation.recovery_state ->local_size ;
364- for (auto & [id, recovery_state] :
365- operation.recovery_state ->replicas ) {
366- state.replicas .push_back (
367- replica_bytes{
368- .node = id,
369- .bytes_left = recovery_state.bytes_left ,
370- .bytes_transferred = state.current_partition_size
371- - recovery_state.bytes_left ,
372- .offset = recovery_state.last_offset ,
373- });
360+ co_await ss::max_concurrent_for_each (
361+ partitions,
362+ 16 ,
363+ [this , &updates_in_progress, &states, timeout](
364+ const model::ntp& ntp) -> ss::future<> {
365+ auto progress_it = updates_in_progress.find (ntp);
366+ if (progress_it == updates_in_progress.end ()) {
367+ return ss::now ();
368+ }
369+ auto p_as = _topics.local ().get_partition_assignment (ntp);
370+ if (!p_as) {
371+ return ss::now ();
372+ }
373+ partition_reconfiguration_state state;
374+ state.ntp = ntp;
375+
376+ state.current_assignment = std::move (p_as->replicas );
377+ state.previous_assignment
378+ = progress_it->second .get_previous_replicas ();
379+ state.state = progress_it->second .get_state ();
380+ state.policy = progress_it->second .get_reconfiguration_policy ();
381+
382+ return get_partition_leader_reconciliation_state (ntp, timeout)
383+ .then ([&states, &ntp, state = std::move (state)](
384+ result<ntp_reconciliation_state> r) mutable {
385+ if (r.has_value ()) {
386+ for (auto & operation : r.value ().pending_operations ()) {
387+ if (operation.recovery_state ) {
388+ state.current_partition_size
389+ = operation.recovery_state ->local_size ;
390+ for (auto & [id, recovery_state] :
391+ operation.recovery_state ->replicas ) {
392+ state.replicas .push_back (
393+ replica_bytes{
394+ .node = id,
395+ .bytes_left = recovery_state.bytes_left ,
396+ .bytes_transferred
397+ = state.current_partition_size
398+ - recovery_state.bytes_left ,
399+ .offset = recovery_state.last_offset ,
400+ });
401+ }
402+ }
403+ }
374404 }
375- }
376- }
377-
378- states.emplace (ntp, std::move (state));
379- }
380-
405+ states.emplace (ntp, std::move (state));
406+ return ss::now ();
407+ });
408+ });
381409 chunked_vector<partition_reconfiguration_state> ret;
382410 ret.reserve (states.size ());
383411 for (auto & [_, state] : states) {
@@ -499,7 +527,7 @@ controller_api::get_node_decommission_progress(
499527 // replicas that are moving from decommissioned node are still present on a
500528 // node but their metadata is update, add them explicitly
501529 ret.replicas_left += moving_from_node.size ();
502- auto states = co_await get_partitions_reconfiguration_state (
530+ auto states = co_await get_partitions_leader_reconfiguration_state (
503531 std::move (moving_from_node), timeout);
504532
505533 if (states) {
0 commit comments