Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions proto/redpanda/core/admin/internal/datalake/v1/datalake.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,26 @@ service DatalakeService {
authz: SUPERUSER
};
}
// Sends a request to the coordinator to reset the pending state of a given
// Iceberg topic.
rpc CoordinatorResetPendingState(CoordinatorResetPendingStateRequest)
returns (CoordinatorResetPendingStateResponse) {
option (redpanda.pbgen.rpc) = {
authz: SUPERUSER
};
}
}

message GetCoordinatorStateRequest {
// Names of topics whose state to return. If empty, returns state for all
// topics.
repeated string topics_filter = 1;
}
message CoordinatorResetPendingStateRequest {
string topic_name = 1;
}

message CoordinatorResetPendingStateResponse {}

message GetCoordinatorStateResponse {
CoordinatorState state = 1;
Expand Down
48 changes: 48 additions & 0 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,54 @@ coordinator::sync_get_usage_stats() {
co_return result;
}

ss::future<checked<std::nullopt_t, coordinator::errc>>
coordinator::sync_reset_pending_state(
model::topic topic, model::revision_id topic_revision) {
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}
auto sync_res = co_await stm_->sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}

auto reset_res = reset_pending_state_update::build(
stm_->state(), topic, topic_revision);
if (reset_res.has_error()) {
vlog(
datalake_log.info,
"Rejecting reset_pending_state request (topic: {}, rev: {}): {}",
topic,
topic_revision,
reset_res.error());
co_return errc::stm_apply_error;
}
storage::record_batch_builder builder(
model::record_batch_type::datalake_coordinator, model::offset{0});
builder.add_raw_kv(
serde::to_iobuf(reset_pending_state_update::key),
serde::to_iobuf(std::move(reset_res.value())));
auto repl_res = co_await stm_->replicate_and_wait(
sync_res.value(), std::move(builder).build(), as_);
if (repl_res.has_error()) {
co_return convert_stm_errc(repl_res.error());
}
const auto t_it = stm_->state().topic_to_state.find(topic);
if (
t_it != stm_->state().topic_to_state.end()
&& t_it->second.has_pending_entries()) {
vlog(
datalake_log.info,
"Failed reset_pending_state request (topic: {}, rev: {}): "
"pending entries still remain",
topic,
topic_revision);
co_return errc::stm_apply_error;
}
co_return std::nullopt;
}

ss::future<
checked<chunked_hash_map<model::topic, topic_state>, coordinator::errc>>
coordinator::sync_get_topic_state(chunked_vector<model::topic> topics_filter) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class coordinator {

ss::future<checked<datalake_usage_stats, errc>> sync_get_usage_stats();

ss::future<checked<std::nullopt_t, errc>> sync_reset_pending_state(
model::topic topic, model::revision_id topic_revision);

ss::future<checked<chunked_hash_map<model::topic, topic_state>, errc>>
sync_get_topic_state(chunked_vector<model::topic> topics);

Expand Down
34 changes: 34 additions & 0 deletions src/v/datalake/coordinator/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,40 @@ ss::future<usage_stats_reply> frontend::get_usage_stats(
&client::get_usage_stats>(std::move(request), bool(local_only_exec));
}

ss::future<reset_pending_state_reply> frontend::reset_pending_state_locally(
reset_pending_state_request request,
const model::ntp& coordinator_partition,
ss::shard_id shard) {
auto holder = _gate.hold();
co_return co_await _coordinator_mgr->invoke_on(
shard,
[coordinator_partition, &request](coordinator_manager& mgr) mutable {
auto partition = mgr.get(coordinator_partition);
if (!partition) {
return ssx::now(reset_pending_state_reply{errc::not_leader});
}
return partition
->sync_reset_pending_state(request.topic, request.topic_revision)
.then([](auto result) {
reset_pending_state_reply resp{};
if (result.has_error()) {
resp.errc = to_rpc_errc(result.error());
} else {
resp.errc = errc::ok;
}
return ssx::now(std::move(resp));
});
});
}

ss::future<reset_pending_state_reply> frontend::reset_pending_state(
reset_pending_state_request request, local_only local_only_exec) {
auto holder = _gate.hold();
co_return co_await process<
&frontend::reset_pending_state_locally,
&client::reset_pending_state>(std::move(request), bool(local_only_exec));
}

ss::future<get_topic_state_reply> frontend::get_topic_state_locally(
get_topic_state_request request,
const model::ntp& coordinator_partition,
Expand Down
8 changes: 8 additions & 0 deletions src/v/datalake/coordinator/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::future<usage_stats_reply>
get_usage_stats(usage_stats_request, local_only = local_only::no);

ss::future<reset_pending_state_reply> reset_pending_state(
reset_pending_state_request, local_only = local_only::no);

ss::future<get_topic_state_reply>
get_topic_state(get_topic_state_request, local_only = local_only::no);

Expand Down Expand Up @@ -139,6 +142,11 @@ class frontend : public ss::peering_sharded_service<frontend> {
const model::ntp& coordinator_partition,
ss::shard_id);

ss::future<reset_pending_state_reply> reset_pending_state_locally(
reset_pending_state_request,
const model::ntp& coordinator_partition,
ss::shard_id);

ss::future<get_topic_state_reply> get_topic_state_locally(
get_topic_state_request,
const model::ntp& coordinator_partition,
Expand Down
5 changes: 5 additions & 0 deletions src/v/datalake/coordinator/rpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
"input_type": "usage_stats_request",
"output_type": "usage_stats_reply"
},
{
"name": "reset_pending_state",
"input_type": "reset_pending_state_request",
"output_type": "reset_pending_state_reply"
},
{
"name": "get_topic_state",
"input_type": "get_topic_state_request",
Expand Down
6 changes: 6 additions & 0 deletions src/v/datalake/coordinator/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ ss::future<get_topic_state_reply> service::get_topic_state(
std::move(request), frontend::local_only::yes);
}

ss::future<reset_pending_state_reply> service::reset_pending_state(
reset_pending_state_request request, ::rpc::streaming_context&) {
return _frontend->local().reset_pending_state(
std::move(request), frontend::local_only::yes);
}

}; // namespace datalake::coordinator::rpc
3 changes: 3 additions & 0 deletions src/v/datalake/coordinator/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class service final : public impl::datalake_coordinator_rpc_service {
ss::future<get_topic_state_reply> get_topic_state(
get_topic_state_request, ::rpc::streaming_context&) override;

ss::future<reset_pending_state_reply> reset_pending_state(
reset_pending_state_request, ::rpc::streaming_context&) override;

private:
ss::sharded<frontend>* _frontend;
};
Expand Down
7 changes: 7 additions & 0 deletions src/v/datalake/coordinator/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) {
maybe_log_update_error(_log, key, o, res);
continue;
}
case update_key::reset_pending_state: {
auto update = serde::read<reset_pending_state_update>(val_p);
vlog(_log.debug, "Applying {} from offset {}: {}", key, o, update);
auto res = update.apply(state_);
maybe_log_update_error(_log, key, o, res);
continue;
}
}
vlog(
_log.error,
Expand Down
59 changes: 59 additions & 0 deletions src/v/datalake/coordinator/state_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ std::ostream& operator<<(std::ostream& o, const update_key& u) {
return o << "update_key::mark_files_committed";
case update_key::topic_lifecycle_update:
return o << "update_key::topic_lifecycle_update";
case update_key::reset_pending_state:
return o << "update_key::reset_pending_state";
}
}

Expand Down Expand Up @@ -214,6 +216,58 @@ mark_files_committed_update::apply(topics_state& state) {
return std::nullopt;
}

checked<reset_pending_state_update, stm_update_error>
reset_pending_state_update::build(
const topics_state& state,
const model::topic& topic,
model::revision_id topic_revision) {
reset_pending_state_update update{
.topic = topic,
.topic_revision = topic_revision,
};
auto allowed = update.can_apply(state);
if (allowed.has_error()) {
return allowed.error();
}
return update;
}

checked<std::nullopt_t, stm_update_error>
reset_pending_state_update::can_apply(const topics_state& state) {
auto topic_it = state.topic_to_state.find(topic);
if (topic_it == state.topic_to_state.end()) {
// No topic at all, the reset is a no-op.
return std::nullopt;
}
const auto& cur_topic = topic_it->second;
if (topic_revision != cur_topic.revision) {
return stm_update_error{fmt::format(
"topic {} revision mismatch: got {}, current rev {}",
topic,
topic_revision,
cur_topic.revision)};
}
return std::nullopt;
}

checked<std::nullopt_t, stm_update_error>
reset_pending_state_update::apply(topics_state& state) {
auto allowed = can_apply(state);
if (allowed.has_error()) {
return allowed.error();
}
auto topic_it = state.topic_to_state.find(topic);
if (topic_it == state.topic_to_state.end()) {
// No topic at all, the reset is a no-op.
return std::nullopt;
}
auto& tp_state = topic_it->second;
for (auto& [pid, p_state] : tp_state.pid_to_pending_files) {
p_state.pending_entries.clear();
}
return std::nullopt;
}

checked<bool, stm_update_error>
topic_lifecycle_update::can_apply(const topics_state& state) {
auto topic_it = state.topic_to_state.find(topic);
Expand Down Expand Up @@ -328,4 +382,9 @@ std::ostream& operator<<(std::ostream& o, const topic_lifecycle_update& u) {
return o;
}

std::ostream& operator<<(std::ostream& o, const reset_pending_state_update& u) {
fmt::print(o, "{{topic: {}, revision: {}}}", u.topic, u.topic_revision);
return o;
}

} // namespace datalake::coordinator
23 changes: 23 additions & 0 deletions src/v/datalake/coordinator/state_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ enum class update_key : uint8_t {
add_files = 0,
mark_files_committed = 1,
topic_lifecycle_update = 2,
reset_pending_state = 3,
};
std::ostream& operator<<(std::ostream&, const update_key&);

Expand Down Expand Up @@ -88,6 +89,28 @@ struct mark_files_committed_update
uint64_t kafka_bytes_processed{0};
};

struct reset_pending_state_update
: public serde::envelope<
reset_pending_state_update,
serde::version<0>,
serde::compat_version<0>> {
static constexpr auto key{update_key::reset_pending_state};
static checked<reset_pending_state_update, stm_update_error> build(
const topics_state&,
const model::topic&,
model::revision_id topic_revision);

auto serde_fields() { return std::tie(topic, topic_revision); }

checked<std::nullopt_t, stm_update_error> can_apply(const topics_state&);
checked<std::nullopt_t, stm_update_error> apply(topics_state&);
friend std::ostream&
operator<<(std::ostream&, const reset_pending_state_update&);

model::topic topic;
model::revision_id topic_revision;
};

// An update to change topic lifecycle state after it has been deleted.
struct topic_lifecycle_update
: public serde::envelope<
Expand Down
40 changes: 40 additions & 0 deletions src/v/datalake/coordinator/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,46 @@ struct usage_stats_request
auto serde_fields() { return std::tie(coordinator_partition); }
};

struct reset_pending_state_reply
: serde::envelope<
reset_pending_state_reply,
serde::version<0>,
serde::compat_version<0>> {
reset_pending_state_reply() = default;
explicit reset_pending_state_reply(errc err)
: errc(err) {}

friend std::ostream&
operator<<(std::ostream&, const reset_pending_state_reply&);

errc errc;

auto serde_fields() { return std::tie(errc); }
};

struct reset_pending_state_request
: serde::envelope<
reset_pending_state_request,
serde::version<0>,
serde::compat_version<0>> {
using resp_t = reset_pending_state_reply;

model::topic topic;
model::revision_id topic_revision;

reset_pending_state_request() = default;
explicit reset_pending_state_request(
const model::topic& topic, model::revision_id rev)
: topic(topic)
, topic_revision(rev) {}
friend std::ostream&
operator<<(std::ostream&, const reset_pending_state_request&);

const model::topic& get_topic() const { return topic; }

auto serde_fields() { return std::tie(topic, topic_revision); }
};

struct get_topic_state_reply
: serde::envelope<
get_topic_state_reply,
Expand Down
18 changes: 18 additions & 0 deletions src/v/redpanda/admin/services/datalake/datalake.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,22 @@ datalake_service_impl::get_coordinator_state(
co_return response;
}

ss::future<proto::admin::coordinator_reset_pending_state_response>
datalake_service_impl::coordinator_reset_pending_state(
serde::pb::rpc::context,
proto::admin::coordinator_reset_pending_state_request req) {
datalake::coordinator::reset_pending_state_request fe_req;
fe_req.topic = model::topic{req.get_topic_name()};
if (!_coordinator_fe->local_is_initialized()) {
throw serde::pb::rpc::unavailable_exception(
"Datalake coordinator frontend not initialized");
}
auto fe_res = co_await _coordinator_fe->local().reset_pending_state(fe_req);
if (fe_res.errc != datalake::coordinator::errc::ok) {
throw serde::pb::rpc::internal_exception(
fmt::format("Datalake coordinator error: {}", fe_res.errc));
}
co_return proto::admin::coordinator_reset_pending_state_response{};
}

} // namespace admin
5 changes: 5 additions & 0 deletions src/v/redpanda/admin/services/datalake/datalake.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class datalake_service_impl : public proto::admin::datalake_service {
serde::pb::rpc::context,
proto::admin::get_coordinator_state_request) override;

ss::future<proto::admin::coordinator_reset_pending_state_response>
coordinator_reset_pending_state(
serde::pb::rpc::context,
proto::admin::coordinator_reset_pending_state_request) override;

private:
admin::proxy::client _proxy_client;

Expand Down
Loading