@@ -116,9 +116,33 @@ datalake_service_impl::datalake_service_impl(
116116 : _proxy_client(std::move(proxy_client))
117117 , _coordinator_fe(coordinator_fe) {}
118118
119+ <<<<<<< HEAD
119120ss::future<proto::admin::get_coordinator_state_response>
120121datalake_service_impl::get_coordinator_state (
121122 serde::pb::rpc::context, proto::admin::get_coordinator_state_request req) {
123+ =======
124+ ss::future<proto::admin::coordinator_reset_pending_state_response>
125+ datalake_service_impl::coordinator_reset_pending_state (
126+ serde::pb::rpc::context,
127+ proto::admin::coordinator_reset_pending_state_request req) {
128+ datalake::coordinator::reset_pending_state_request fe_req;
129+ fe_req.topic = model::topic{req.get_topic_name ()};
130+ if (!_coordinator_fe->local_is_initialized ()) {
131+ throw serde::pb::rpc::unavailable_exception (
132+ " Datalake coordinator frontend not initialized" );
133+ }
134+ auto fe_res = co_await _coordinator_fe->local ().reset_pending_state (fe_req);
135+ if (fe_res.errc != datalake::coordinator::errc::ok) {
136+ throw serde::pb::rpc::internal_exception (
137+ fmt::format (" Datalake coordinator error: {}" , fe_res.errc ));
138+ }
139+ co_return proto::admin::coordinator_reset_pending_state_response{};
140+ }
141+
142+ ss::future<proto::admin::coordinator_get_state_response>
143+ datalake_service_impl::coordinator_get_state (
144+ serde::pb::rpc::context, proto::admin::coordinator_get_state_request req) {
145+ >>>>>>> 120d3b7539 (reset admin)
122146 if (!_coordinator_fe->local_is_initialized ()) {
123147 throw serde::pb::rpc::unavailable_exception (
124148 " Datalake coordinator frontend not initialized" );
0 commit comments