diff --git a/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto b/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto index a2db311035a16..111418b27bcec 100644 --- a/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto +++ b/proto/redpanda/core/admin/internal/datalake/v1/datalake.proto @@ -32,6 +32,14 @@ service DatalakeService { authz: SUPERUSER }; } + // Sends a request to the coordinator to reset the pending state of a given + // Iceberg topic. + rpc CoordinatorResetPendingState(CoordinatorResetPendingStateRequest) + returns (CoordinatorResetPendingStateResponse) { + option (redpanda.pbgen.rpc) = { + authz: SUPERUSER + }; + } } message GetCoordinatorStateRequest { @@ -39,6 +47,11 @@ message GetCoordinatorStateRequest { // topics. repeated string topics_filter = 1; } +message CoordinatorResetPendingStateRequest { + string topic_name = 1; +} + +message CoordinatorResetPendingStateResponse {} message GetCoordinatorStateResponse { CoordinatorState state = 1; diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 0c307eecb40af..7e3c2de162aaa 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -857,6 +857,54 @@ coordinator::sync_get_usage_stats() { co_return result; } +ss::future> +coordinator::sync_reset_pending_state( + model::topic topic, model::revision_id topic_revision) { + auto gate = maybe_gate(); + if (gate.has_error()) { + co_return gate.error(); + } + auto sync_res = co_await stm_->sync(10s); + if (sync_res.has_error()) { + co_return convert_stm_errc(sync_res.error()); + } + + auto reset_res = reset_pending_state_update::build( + stm_->state(), topic, topic_revision); + if (reset_res.has_error()) { + vlog( + datalake_log.info, + "Rejecting reset_pending_state request (topic: {}, rev: {}): {}", + topic, + topic_revision, + reset_res.error()); + co_return errc::stm_apply_error; + } + storage::record_batch_builder builder( + model::record_batch_type::datalake_coordinator, model::offset{0}); + builder.add_raw_kv( + serde::to_iobuf(reset_pending_state_update::key), + serde::to_iobuf(std::move(reset_res.value()))); + auto repl_res = co_await stm_->replicate_and_wait( + sync_res.value(), std::move(builder).build(), as_); + if (repl_res.has_error()) { + co_return convert_stm_errc(repl_res.error()); + } + const auto t_it = stm_->state().topic_to_state.find(topic); + if ( + t_it != stm_->state().topic_to_state.end() + && t_it->second.has_pending_entries()) { + vlog( + datalake_log.info, + "Failed reset_pending_state request (topic: {}, rev: {}): " + "pending entries still remain", + topic, + topic_revision); + co_return errc::stm_apply_error; + } + co_return std::nullopt; +} + ss::future< checked, coordinator::errc>> coordinator::sync_get_topic_state(chunked_vector topics_filter) { diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index 5e5a8a20a37ac..95e7961807dba 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -86,6 +86,9 @@ class coordinator { ss::future> sync_get_usage_stats(); + ss::future> sync_reset_pending_state( + model::topic topic, model::revision_id topic_revision); + ss::future, errc>> sync_get_topic_state(chunked_vector topics); diff --git a/src/v/datalake/coordinator/frontend.cc b/src/v/datalake/coordinator/frontend.cc index 33b4282a86d16..c30332bb89c44 100644 --- a/src/v/datalake/coordinator/frontend.cc +++ b/src/v/datalake/coordinator/frontend.cc @@ -459,6 +459,40 @@ ss::future frontend::get_usage_stats( &client::get_usage_stats>(std::move(request), bool(local_only_exec)); } +ss::future frontend::reset_pending_state_locally( + reset_pending_state_request request, + const model::ntp& coordinator_partition, + ss::shard_id shard) { + auto holder = _gate.hold(); + co_return co_await _coordinator_mgr->invoke_on( + shard, + [coordinator_partition, &request](coordinator_manager& mgr) mutable { + auto partition = mgr.get(coordinator_partition); + if (!partition) { + return ssx::now(reset_pending_state_reply{errc::not_leader}); + } + return partition + ->sync_reset_pending_state(request.topic, request.topic_revision) + .then([](auto result) { + reset_pending_state_reply resp{}; + if (result.has_error()) { + resp.errc = to_rpc_errc(result.error()); + } else { + resp.errc = errc::ok; + } + return ssx::now(std::move(resp)); + }); + }); +} + +ss::future frontend::reset_pending_state( + reset_pending_state_request request, local_only local_only_exec) { + auto holder = _gate.hold(); + co_return co_await process< + &frontend::reset_pending_state_locally, + &client::reset_pending_state>(std::move(request), bool(local_only_exec)); +} + ss::future frontend::get_topic_state_locally( get_topic_state_request request, const model::ntp& coordinator_partition, diff --git a/src/v/datalake/coordinator/frontend.h b/src/v/datalake/coordinator/frontend.h index 9cd4c00bda5cd..fae240c55f891 100644 --- a/src/v/datalake/coordinator/frontend.h +++ b/src/v/datalake/coordinator/frontend.h @@ -74,6 +74,9 @@ class frontend : public ss::peering_sharded_service { ss::future get_usage_stats(usage_stats_request, local_only = local_only::no); + ss::future reset_pending_state( + reset_pending_state_request, local_only = local_only::no); + ss::future get_topic_state(get_topic_state_request, local_only = local_only::no); @@ -139,6 +142,11 @@ class frontend : public ss::peering_sharded_service { const model::ntp& coordinator_partition, ss::shard_id); + ss::future reset_pending_state_locally( + reset_pending_state_request, + const model::ntp& coordinator_partition, + ss::shard_id); + ss::future get_topic_state_locally( get_topic_state_request, const model::ntp& coordinator_partition, diff --git a/src/v/datalake/coordinator/rpc.json b/src/v/datalake/coordinator/rpc.json index 026f2a0bd3ea3..1ba46808af00e 100644 --- a/src/v/datalake/coordinator/rpc.json +++ b/src/v/datalake/coordinator/rpc.json @@ -30,6 +30,11 @@ "input_type": "usage_stats_request", "output_type": "usage_stats_reply" }, + { + "name": "reset_pending_state", + "input_type": "reset_pending_state_request", + "output_type": "reset_pending_state_reply" + }, { "name": "get_topic_state", "input_type": "get_topic_state_request", diff --git a/src/v/datalake/coordinator/service.cc b/src/v/datalake/coordinator/service.cc index 5fe082f08fc32..bf8a3131ecbf8 100644 --- a/src/v/datalake/coordinator/service.cc +++ b/src/v/datalake/coordinator/service.cc @@ -58,4 +58,10 @@ ss::future service::get_topic_state( std::move(request), frontend::local_only::yes); } +ss::future service::reset_pending_state( + reset_pending_state_request request, ::rpc::streaming_context&) { + return _frontend->local().reset_pending_state( + std::move(request), frontend::local_only::yes); +} + }; // namespace datalake::coordinator::rpc diff --git a/src/v/datalake/coordinator/service.h b/src/v/datalake/coordinator/service.h index b24f17340d8cd..5bd8486c0ea57 100644 --- a/src/v/datalake/coordinator/service.h +++ b/src/v/datalake/coordinator/service.h @@ -39,6 +39,9 @@ class service final : public impl::datalake_coordinator_rpc_service { ss::future get_topic_state( get_topic_state_request, ::rpc::streaming_context&) override; + ss::future reset_pending_state( + reset_pending_state_request, ::rpc::streaming_context&) override; + private: ss::sharded* _frontend; }; diff --git a/src/v/datalake/coordinator/state_machine.cc b/src/v/datalake/coordinator/state_machine.cc index be53a915c9be5..9334c420b07aa 100644 --- a/src/v/datalake/coordinator/state_machine.cc +++ b/src/v/datalake/coordinator/state_machine.cc @@ -133,6 +133,13 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) { maybe_log_update_error(_log, key, o, res); continue; } + case update_key::reset_pending_state: { + auto update = serde::read(val_p); + vlog(_log.debug, "Applying {} from offset {}: {}", key, o, update); + auto res = update.apply(state_); + maybe_log_update_error(_log, key, o, res); + continue; + } } vlog( _log.error, diff --git a/src/v/datalake/coordinator/state_update.cc b/src/v/datalake/coordinator/state_update.cc index a500f0817b597..1753560aa1946 100644 --- a/src/v/datalake/coordinator/state_update.cc +++ b/src/v/datalake/coordinator/state_update.cc @@ -24,6 +24,8 @@ std::ostream& operator<<(std::ostream& o, const update_key& u) { return o << "update_key::mark_files_committed"; case update_key::topic_lifecycle_update: return o << "update_key::topic_lifecycle_update"; + case update_key::reset_pending_state: + return o << "update_key::reset_pending_state"; } } @@ -214,6 +216,58 @@ mark_files_committed_update::apply(topics_state& state) { return std::nullopt; } +checked +reset_pending_state_update::build( + const topics_state& state, + const model::topic& topic, + model::revision_id topic_revision) { + reset_pending_state_update update{ + .topic = topic, + .topic_revision = topic_revision, + }; + auto allowed = update.can_apply(state); + if (allowed.has_error()) { + return allowed.error(); + } + return update; +} + +checked +reset_pending_state_update::can_apply(const topics_state& state) { + auto topic_it = state.topic_to_state.find(topic); + if (topic_it == state.topic_to_state.end()) { + // No topic at all, the reset is a no-op. + return std::nullopt; + } + const auto& cur_topic = topic_it->second; + if (topic_revision != cur_topic.revision) { + return stm_update_error{fmt::format( + "topic {} revision mismatch: got {}, current rev {}", + topic, + topic_revision, + cur_topic.revision)}; + } + return std::nullopt; +} + +checked +reset_pending_state_update::apply(topics_state& state) { + auto allowed = can_apply(state); + if (allowed.has_error()) { + return allowed.error(); + } + auto topic_it = state.topic_to_state.find(topic); + if (topic_it == state.topic_to_state.end()) { + // No topic at all, the reset is a no-op. + return std::nullopt; + } + auto& tp_state = topic_it->second; + for (auto& [pid, p_state] : tp_state.pid_to_pending_files) { + p_state.pending_entries.clear(); + } + return std::nullopt; +} + checked topic_lifecycle_update::can_apply(const topics_state& state) { auto topic_it = state.topic_to_state.find(topic); @@ -328,4 +382,9 @@ std::ostream& operator<<(std::ostream& o, const topic_lifecycle_update& u) { return o; } +std::ostream& operator<<(std::ostream& o, const reset_pending_state_update& u) { + fmt::print(o, "{{topic: {}, revision: {}}}", u.topic, u.topic_revision); + return o; +} + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/state_update.h b/src/v/datalake/coordinator/state_update.h index ce77daa3808c2..b8eabdb95e9fc 100644 --- a/src/v/datalake/coordinator/state_update.h +++ b/src/v/datalake/coordinator/state_update.h @@ -25,6 +25,7 @@ enum class update_key : uint8_t { add_files = 0, mark_files_committed = 1, topic_lifecycle_update = 2, + reset_pending_state = 3, }; std::ostream& operator<<(std::ostream&, const update_key&); @@ -88,6 +89,28 @@ struct mark_files_committed_update uint64_t kafka_bytes_processed{0}; }; +struct reset_pending_state_update + : public serde::envelope< + reset_pending_state_update, + serde::version<0>, + serde::compat_version<0>> { + static constexpr auto key{update_key::reset_pending_state}; + static checked build( + const topics_state&, + const model::topic&, + model::revision_id topic_revision); + + auto serde_fields() { return std::tie(topic, topic_revision); } + + checked can_apply(const topics_state&); + checked apply(topics_state&); + friend std::ostream& + operator<<(std::ostream&, const reset_pending_state_update&); + + model::topic topic; + model::revision_id topic_revision; +}; + // An update to change topic lifecycle state after it has been deleted. struct topic_lifecycle_update : public serde::envelope< diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h index f923821c1bad7..1bcfaae7dce2f 100644 --- a/src/v/datalake/coordinator/types.h +++ b/src/v/datalake/coordinator/types.h @@ -332,6 +332,46 @@ struct usage_stats_request auto serde_fields() { return std::tie(coordinator_partition); } }; +struct reset_pending_state_reply + : serde::envelope< + reset_pending_state_reply, + serde::version<0>, + serde::compat_version<0>> { + reset_pending_state_reply() = default; + explicit reset_pending_state_reply(errc err) + : errc(err) {} + + friend std::ostream& + operator<<(std::ostream&, const reset_pending_state_reply&); + + errc errc; + + auto serde_fields() { return std::tie(errc); } +}; + +struct reset_pending_state_request + : serde::envelope< + reset_pending_state_request, + serde::version<0>, + serde::compat_version<0>> { + using resp_t = reset_pending_state_reply; + + model::topic topic; + model::revision_id topic_revision; + + reset_pending_state_request() = default; + explicit reset_pending_state_request( + const model::topic& topic, model::revision_id rev) + : topic(topic) + , topic_revision(rev) {} + friend std::ostream& + operator<<(std::ostream&, const reset_pending_state_request&); + + const model::topic& get_topic() const { return topic; } + + auto serde_fields() { return std::tie(topic, topic_revision); } +}; + struct get_topic_state_reply : serde::envelope< get_topic_state_reply, diff --git a/src/v/redpanda/admin/services/datalake/datalake.cc b/src/v/redpanda/admin/services/datalake/datalake.cc index 713908048d065..4e03ed180dd27 100644 --- a/src/v/redpanda/admin/services/datalake/datalake.cc +++ b/src/v/redpanda/admin/services/datalake/datalake.cc @@ -193,4 +193,22 @@ datalake_service_impl::get_coordinator_state( co_return response; } +ss::future +datalake_service_impl::coordinator_reset_pending_state( + serde::pb::rpc::context, + proto::admin::coordinator_reset_pending_state_request req) { + datalake::coordinator::reset_pending_state_request fe_req; + fe_req.topic = model::topic{req.get_topic_name()}; + if (!_coordinator_fe->local_is_initialized()) { + throw serde::pb::rpc::unavailable_exception( + "Datalake coordinator frontend not initialized"); + } + auto fe_res = co_await _coordinator_fe->local().reset_pending_state(fe_req); + if (fe_res.errc != datalake::coordinator::errc::ok) { + throw serde::pb::rpc::internal_exception( + fmt::format("Datalake coordinator error: {}", fe_res.errc)); + } + co_return proto::admin::coordinator_reset_pending_state_response{}; +} + } // namespace admin diff --git a/src/v/redpanda/admin/services/datalake/datalake.h b/src/v/redpanda/admin/services/datalake/datalake.h index 7a210645d98e3..39cd08faaafc9 100644 --- a/src/v/redpanda/admin/services/datalake/datalake.h +++ b/src/v/redpanda/admin/services/datalake/datalake.h @@ -28,6 +28,11 @@ class datalake_service_impl : public proto::admin::datalake_service { serde::pb::rpc::context, proto::admin::get_coordinator_state_request) override; + ss::future + coordinator_reset_pending_state( + serde::pb::rpc::context, + proto::admin::coordinator_reset_pending_state_request) override; + private: admin::proxy::client _proxy_client;