1212
1313#include " model/fundamental.h"
1414#include " model/namespace.h"
15+ #include " model/timeout_clock.h"
1516#include " serde/protobuf/rpc.h"
1617
1718#include < seastar/core/coroutine.hh>
1819
20+ #include < fmt/format.h>
21+
22+ namespace {
23+ using namespace std ::chrono_literals;
24+ constexpr auto health_report_query_timeout = 10s;
25+ } // namespace
26+
1927namespace admin {
2028
2129seastar::future<proto::admin::gc::advance_epoch_response>
@@ -64,6 +72,52 @@ gc_service_impl::advance_epoch(
6472 co_return response;
6573}
6674
75+ auto gc_service_impl::populate_epochs (topic_partition_epoch_map tp_epochs)
76+ -> ss::future<topic_partition_epoch_map> {
77+ auto health_report = co_await _health_monitor->local ().get_cluster_health (
78+ cluster::cluster_report_filter{},
79+ cluster::force_refresh::yes,
80+ model::timeout_clock::now () + health_report_query_timeout);
81+
82+ if (!health_report.has_value ()) {
83+ throw serde::pb::rpc::unavailable_exception (
84+ fmt::format (
85+ " Error retrieving cluster health report: {}" ,
86+ health_report.error ()));
87+ }
88+ fmt::print (std::cerr, " POPULATE EPOCHS: {}\n " , tp_epochs.size ());
89+
90+ for (const auto & node_health : health_report.value ().node_reports ) {
91+ for (const auto & [tp_ns, partition_statuses] : node_health->topics ) {
92+ auto tp_it = tp_epochs.find (tp_ns.tp );
93+ if (tp_it == tp_epochs.end ()) {
94+ fmt::print (std::cerr, " {}: NOT FOUND\n " , tp_ns);
95+ continue ;
96+ }
97+ for (const auto & [pid, p_status] : partition_statuses) {
98+ const auto maybe_max_gc_epoch
99+ = p_status.cloud_topic_max_gc_eligible_epoch ;
100+ auto p_it = tp_it->second .find (pid);
101+ if (p_it == tp_it->second .end ()) {
102+ fmt::print (
103+ std::cerr, " {}/{}: PARTITION NOT FOUND\n " , tp_ns, pid);
104+ continue ;
105+ }
106+ if (!maybe_max_gc_epoch.has_value ()) {
107+ fmt::print (std::cerr, " {}/{}: NO EPOCH\n " , tp_ns, pid);
108+ continue ;
109+ }
110+ p_it->second = std::max (p_it->second , maybe_max_gc_epoch);
111+ fmt::print (
112+ std::cerr, " {}/{}: EPOCH: {}\n " , tp_ns, pid, p_it->second );
113+ }
114+ }
115+ co_await ss::maybe_yield ();
116+ }
117+
118+ co_return std::move (tp_epochs);
119+ }
120+
67121seastar::future<proto::admin::gc::get_epoch_response>
68122gc_service_impl::get_epoch (
69123 serde::pb::rpc::context, proto::admin::gc::get_epoch_request req) {
@@ -77,20 +131,19 @@ gc_service_impl::get_epoch(
77131 // 3. Get the current GC epoch for the partition
78132 // 4. Create result entry with success/failure status
79133
80- // Stub: Return empty response for now
81134 chunked_vector<topic_partition_get_epoch_result> results;
135+ topic_partition_epoch_map epochs;
82136 for (const auto & tp : req.get_partitions ()) {
83- topic_partition_get_epoch_result result;
137+ auto cfg = _topic_table->local ().get_topic_cfg (
138+ model::topic_namespace_view{
139+ model::kafka_namespace, model::topic_view{tp.get_topic ()}});
84140
141+ topic_partition_get_epoch_result result;
85142 proto::common::topic_partition result_tp;
86143 result_tp.set_topic (ss::sstring{tp.get_topic ()});
87144 result_tp.set_partition (tp.get_partition ());
88145 result.set_partition (std::move (result_tp));
89146
90- auto cfg = _topic_table->local ().get_topic_cfg (
91- model::topic_namespace_view{
92- model::kafka_namespace, model::topic_view{tp.get_topic ()}});
93-
94147 if (!cfg.has_value ()) {
95148 result.set_error (error::gc_error_topic_not_found);
96149 } else if (auto p = tp.get_partition ();
@@ -99,12 +152,47 @@ gc_service_impl::get_epoch(
99152 } else if (!cfg.value ().is_cloud_topic ()) {
100153 result.set_error (error::gc_error_not_cloud_topic);
101154 } else {
102- result.set_error (error::gc_error_failed);
155+ // if the requested tp is good, stage it for a health report scan
156+ epochs[model::topic_view{result.get_partition ().get_topic ()}]
157+ .try_emplace (
158+ model::partition_id{result.get_partition ().get_partition ()},
159+ std::nullopt );
103160 }
104161
105162 results.push_back (std::move (result));
106163 }
107164
165+ if (!epochs.empty ()) {
166+ epochs = co_await populate_epochs (std::move (epochs));
167+ for (auto & r : results) {
168+ auto tp_it = epochs.find (
169+ model::topic_view{r.get_partition ().get_topic ()});
170+ if (tp_it == epochs.end ()) {
171+ vassert (
172+ r.has_error (),
173+ " Epoch not populated for {}, expected error!" ,
174+ r.get_partition ().get_topic ());
175+ continue ;
176+ }
177+ auto p_it = tp_it->second .find (
178+ model::partition_id{r.get_partition ().get_partition ()});
179+ if (p_it == tp_it->second .end ()) {
180+ vassert (
181+ r.has_error (),
182+ " Epoch not populated for {}/{}, expected error!" ,
183+ r.get_partition ().get_topic (),
184+ r.get_partition ().get_partition ());
185+ continue ;
186+ }
187+ if (!p_it->second .has_value ()) {
188+ // TODO(oren): better error code i guess
189+ r.set_error (error::gc_error_failed);
190+ } else {
191+ r.set_epoch (p_it->second .value ());
192+ }
193+ }
194+ }
195+
108196 response.set_partitions (std::move (results));
109197
110198 co_return response;
0 commit comments