Skip to content

Commit 223a8e6

Browse files
authored
Merge pull request #28795 from Lazin/ct/read-pipeline-parallel-fetch
ct: Read scheduler component
2 parents 8db1aea + d11c87c commit 223a8e6

File tree

12 files changed

+519
-0
lines changed

12 files changed

+519
-0
lines changed

src/v/cloud_topics/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,10 @@ redpanda_cc_library(
133133
"//src/v/cloud_topics/level_zero/pipeline:read_pipeline",
134134
"//src/v/cloud_topics/level_zero/pipeline:write_pipeline",
135135
"//src/v/cloud_topics/level_zero/read_fanout",
136+
"//src/v/cloud_topics/level_zero/read_request_scheduler",
136137
"//src/v/cloud_topics/level_zero/reader:fetch_handler",
137138
"//src/v/cloud_topics/level_zero/write_request_scheduler",
139+
"//src/v/config",
138140
"//src/v/model",
139141
"//src/v/ssx:sharded_service_container",
140142
"//src/v/storage",

src/v/cloud_topics/data_plane_impl.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
#include "cloud_topics/level_zero/pipeline/read_pipeline.h"
2121
#include "cloud_topics/level_zero/pipeline/write_pipeline.h"
2222
#include "cloud_topics/level_zero/read_fanout/read_fanout.h"
23+
#include "cloud_topics/level_zero/read_request_scheduler/read_request_scheduler.h"
2324
#include "cloud_topics/level_zero/reader/fetch_request_handler.h"
2425
#include "cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.h"
26+
#include "config/configuration.h"
2527
#include "model/fundamental.h"
2628
#include "ssx/sharded_service_container.h"
2729
#include "storage/api.h"
@@ -73,6 +75,13 @@ class impl
7375
return _read_pipeline.local().register_read_pipeline_stage();
7476
}));
7577

78+
if (config::shard_local_cfg().cloud_topics_parallel_fetch_enabled()) {
79+
co_await construct_service(
80+
_read_request_scheduler, ss::sharded_parameter([this] {
81+
return _read_pipeline.local().register_read_pipeline_stage();
82+
}));
83+
}
84+
7685
co_await construct_service(
7786
_fetch_handler,
7887
ss::sharded_parameter([this] {
@@ -93,6 +102,10 @@ class impl
93102
[](auto& s) { return s.start(); });
94103
co_await _batcher.invoke_on_all([](auto& s) { return s.start(); });
95104
co_await _read_fanout.invoke_on_all([](auto& s) { return s.start(); });
105+
if (_read_request_scheduler.local_is_initialized()) {
106+
co_await _read_request_scheduler.invoke_on_all(
107+
[](auto& s) { return s.start(); });
108+
}
96109
co_await _fetch_handler.invoke_on_all(
97110
[](auto& s) { return s.start(); });
98111
co_await _batch_cache.invoke_on_all([](auto& s) { return s.start(); });
@@ -160,6 +173,8 @@ class impl
160173
// Read path
161174
ss::sharded<l0::read_pipeline<>> _read_pipeline;
162175
ss::sharded<l0::read_fanout> _read_fanout;
176+
ss::sharded<l0::read_request_scheduler> _read_request_scheduler;
177+
163178
ss::sharded<l0::fetch_handler> _fetch_handler;
164179
// Batch cache
165180
ss::sharded<batch_cache> _batch_cache;

src/v/cloud_topics/level_zero/common/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ package(
1111
"//src/v/cloud_topics/level_zero/frontend_reader:__pkg__",
1212
"//src/v/cloud_topics/level_zero/pipeline:__pkg__",
1313
"//src/v/cloud_topics/level_zero/pipeline/tests:__pkg__",
14+
"//src/v/cloud_topics/level_zero/read_request_scheduler:__pkg__",
15+
"//src/v/cloud_topics/level_zero/read_request_scheduler/tests:__pkg__",
1416
"//src/v/cloud_topics/level_zero/reader:__pkg__",
1517
"//src/v/cloud_topics/level_zero/reader/tests:__pkg__",
1618
"//src/v/cloud_topics/level_zero/stm:__pkg__",

src/v/cloud_topics/level_zero/pipeline/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package(default_visibility = [
77
"//src/v/cloud_topics/level_zero/batcher/tests:__pkg__",
88
"//src/v/cloud_topics/level_zero/read_fanout:__pkg__",
99
"//src/v/cloud_topics/level_zero/read_fanout/tests:__pkg__",
10+
"//src/v/cloud_topics/level_zero/read_request_scheduler:__pkg__",
11+
"//src/v/cloud_topics/level_zero/read_request_scheduler/tests:__pkg__",
1012
"//src/v/cloud_topics/level_zero/reader:__pkg__",
1113
"//src/v/cloud_topics/level_zero/reader/tests:__pkg__",
1214
"//src/v/cloud_topics/level_zero/tests:__pkg__",

src/v/cloud_topics/level_zero/pipeline/read_pipeline.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class read_pipeline
116116
_parent->_probe.register_micro_probe(p);
117117
}
118118

119+
pipeline_stage id() const noexcept { return _ps; }
120+
119121
private:
120122
pipeline_stage _ps;
121123
read_pipeline<Clock>* _parent;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("//bazel:build.bzl", "redpanda_cc_library")
2+
3+
package(default_visibility = [
4+
":__subpackages__",
5+
"//src/v/cloud_topics:__pkg__",
6+
])
7+
8+
redpanda_cc_library(
9+
name = "read_request_scheduler",
10+
srcs = [
11+
"read_request_scheduler.cc",
12+
],
13+
hdrs = [
14+
"read_request_scheduler.h",
15+
],
16+
deps = [
17+
"//src/v/base",
18+
"//src/v/bytes",
19+
"//src/v/bytes:iobuf",
20+
"//src/v/cloud_topics:logger",
21+
"//src/v/cloud_topics:types",
22+
"//src/v/cloud_topics/level_zero/common:extent_meta",
23+
"//src/v/cloud_topics/level_zero/pipeline:base_pipeline",
24+
"//src/v/cloud_topics/level_zero/pipeline:event_filter",
25+
"//src/v/cloud_topics/level_zero/pipeline:read_pipeline",
26+
"//src/v/cloud_topics/level_zero/pipeline:read_request",
27+
"//src/v/config",
28+
"//src/v/ssx:future_util",
29+
"@seastar",
30+
],
31+
)
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
#include "cloud_topics/level_zero/read_request_scheduler/read_request_scheduler.h"
11+
12+
#include "cloud_topics/logger.h"
13+
#include "ssx/future-util.h"
14+
15+
#include <chrono>
16+
17+
using namespace std::chrono_literals;
18+
19+
namespace cloud_topics::l0 {
20+
21+
read_request_scheduler::read_request_scheduler(
22+
read_pipeline<ss::lowres_clock>::stage stage)
23+
: _stage(std::move(stage)) {}
24+
25+
ss::future<> read_request_scheduler::start() {
26+
vlog(cd_log.debug, "Read Request Scheduler start");
27+
ssx::spawn_with_gate(_gate, [this] { return bg_loop(); });
28+
co_return;
29+
}
30+
31+
ss::future<> read_request_scheduler::stop() { co_await _gate.close(); }
32+
33+
namespace {
34+
ss::shard_id shard_for(const read_request<ss::lowres_clock>& req) {
35+
std::hash<ss::sstring> hasher;
36+
// The request is generated from the placeholder batch.
37+
// The placeholder batch can't span multiple objects so it's safe
38+
// to check only the first extent.
39+
auto h = hasher(req.query.meta.front().id.name);
40+
auto shard = h % ss::smp::count;
41+
return static_cast<ss::shard_id>(shard);
42+
}
43+
44+
std::unique_ptr<read_request<ss::lowres_clock>> make_proxy(
45+
ss::shard_id target_shard,
46+
const read_request<ss::lowres_clock>& req,
47+
ss::lowres_clock::time_point timeout,
48+
retry_chain_node* target_rtc,
49+
pipeline_stage id) {
50+
vassert(
51+
ss::this_shard_id() == target_shard,
52+
"make_proxy called on the wrong shard");
53+
dataplane_query query;
54+
query.output_size_estimate = req.query.output_size_estimate;
55+
query.meta = req.query.meta.copy();
56+
auto proxy = std::make_unique<read_request<ss::lowres_clock>>(
57+
req.ntp, std::move(query), timeout, target_rtc, id);
58+
return proxy;
59+
}
60+
61+
} // namespace
62+
63+
void read_request_scheduler::schedule_on(
64+
read_request<ss::lowres_clock>& source_req, ss::shard_id target) {
65+
if (target == ss::this_shard_id()) {
66+
// Fast path, just push source_req down the pipeline
67+
_stage.push_next_stage(source_req);
68+
return;
69+
}
70+
71+
// Check shutdown before launching cross-shard RPC
72+
if (_stage.stopped()) {
73+
source_req.set_value(errc::shutting_down);
74+
return;
75+
}
76+
77+
auto proxy = container().invoke_on(
78+
target, [target, &source_req](read_request_scheduler& s) {
79+
return s.proxy_read_request(source_req, target);
80+
});
81+
82+
auto ack = proxy.then(
83+
[&source_req](read_request<ss::lowres_clock>::response_t resp) {
84+
if (resp.has_value()) {
85+
source_req.set_value(std::move(resp.value()));
86+
} else {
87+
source_req.set_value(resp.error());
88+
}
89+
});
90+
91+
// Note: We intentionally do NOT hold the gate here. The gate is only used
92+
// for the bg_loop fiber. These fire-and-forget continuations must be able
93+
// to complete even during shutdown to avoid deadlock.
94+
ssx::background = std::move(ack);
95+
}
96+
97+
ss::future<read_request<ss::lowres_clock>::response_t>
98+
read_request_scheduler::proxy_read_request(
99+
const read_request<ss::lowres_clock>& source_req, ss::shard_id target) {
100+
auto now = ss::lowres_clock::now();
101+
auto timeout = source_req.expiration_time;
102+
if (timeout < now) {
103+
co_return std::unexpected(errc::timeout);
104+
}
105+
// Use pipeline stage id from the _stage object and not from the request/
106+
// The request belongs to another pipeline and its stage id doesn't make
107+
// sense on the current shard.
108+
auto proxy = make_proxy(
109+
target, source_req, timeout, &_stage.get_root_rtc(), _stage.id());
110+
111+
// Check if pipeline is shutting down before awaiting response
112+
if (_stage.stopped()) {
113+
co_return std::unexpected(errc::shutting_down);
114+
}
115+
116+
auto f = proxy->response.get_future();
117+
_stage.push_next_stage(*proxy);
118+
auto res = co_await ss::coroutine::as_future(std::move(f));
119+
if (res.failed()) {
120+
auto ex = res.get_exception();
121+
// Check for shutdown exceptions explicitly
122+
if (ssx::is_shutdown_exception(ex)) {
123+
co_return std::unexpected(errc::shutting_down);
124+
}
125+
co_return std::unexpected(errc::unexpected_failure);
126+
}
127+
co_return std::move(res.get());
128+
}
129+
130+
ss::future<> read_request_scheduler::bg_loop() {
131+
while (!_stage.stopped()) {
132+
// NOTE(1): requests are vectorized but it's not guaranteed
133+
// that all extents in the request target the same object.
134+
// If this is the case the scheduler will use first extent
135+
// to decide the target shard. This could lead to suboptimal
136+
// distribution of requests across shards and some edge cases.
137+
// To avoid this the caller of the 'materialize' must ensure
138+
// that the requests are split properly so that all extents
139+
// in the request target the same object. This is not a
140+
// correctness problem. The only side effect is that we may
141+
// download same objects on multiple shards in parallel in
142+
// cases.
143+
//
144+
// NOTE(2): cache locality is not a concern here because
145+
// unlike in cases of write path the read path is only used
146+
// when there is a cache miss. Normally, we will not hit this
147+
// code path if the cache is working well and there is no
148+
// leadership transfers. The goal here is to brute-force the
149+
// reconciliation of cache misses as fast as possible.
150+
auto res = co_await _stage.pull_fetch_requests(10_MiB);
151+
if (!res.has_value()) {
152+
if (res.error() == errc::shutting_down) {
153+
break;
154+
}
155+
vlog(
156+
_stage.logger().error,
157+
"Failed to pull fetch requests: {}",
158+
res.error());
159+
_stage.register_pipeline_error(res.error());
160+
continue;
161+
}
162+
auto list = std::move(res.value());
163+
while (!list.requests.empty()) {
164+
auto front = &list.requests.front();
165+
list.requests.pop_front();
166+
auto target_shard = shard_for(*front);
167+
schedule_on(*front, target_shard);
168+
}
169+
}
170+
}
171+
172+
} // namespace cloud_topics::l0
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#pragma once
12+
13+
#include "base/seastarx.h"
14+
#include "cloud_topics/level_zero/pipeline/read_pipeline.h"
15+
#include "cloud_topics/level_zero/pipeline/read_request.h"
16+
17+
#include <seastar/core/abort_source.hh>
18+
#include <seastar/core/lowres_clock.hh>
19+
#include <seastar/core/sharded.hh>
20+
21+
namespace cloud_topics::l0 {
22+
/// Read Request Scheduler
23+
///
24+
/// This is a simple fan-out scheduler for read requests.
25+
/// It directs read requests to different shards based on the
26+
/// object id. The requests that target the same object id will
27+
/// always go to the same shard.
28+
class read_request_scheduler
29+
: public ss::peering_sharded_service<read_request_scheduler> {
30+
public:
31+
explicit read_request_scheduler(
32+
read_pipeline<ss::lowres_clock>::stage stage);
33+
34+
ss::future<> start();
35+
36+
ss::future<> stop();
37+
38+
private:
39+
ss::future<> bg_loop();
40+
41+
/// Schedules request processing on the target shard.
42+
///
43+
/// The method sends the request to the target shard.
44+
/// The 'target shard' is a shard that performs the processing of the
45+
/// request. The 'source shard' is a shard that owns the request. In some
46+
/// cases 'source shard' and 'target shard' can be the same. The source_req
47+
/// request is just propagated down the pipeline in this case.
48+
///
49+
/// \param target Target shard (the shard that should process the request)
50+
/// \param source_req Request to process. The request is owned by the source
51+
/// shard.
52+
void schedule_on(
53+
read_request<ss::lowres_clock>& source_req, ss::shard_id target);
54+
55+
ss::future<read_request<ss::lowres_clock>::response_t> proxy_read_request(
56+
const read_request<ss::lowres_clock>& source_req, ss::shard_id target);
57+
58+
read_pipeline<ss::lowres_clock>::stage _stage;
59+
ss::gate _gate;
60+
};
61+
} // namespace cloud_topics::l0
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
load("//bazel:test.bzl", "redpanda_cc_gtest")
2+
3+
redpanda_cc_gtest(
4+
name = "read_request_scheduler_test",
5+
timeout = "short",
6+
srcs = [
7+
"read_request_scheduler_test.cc",
8+
],
9+
cpu = 4,
10+
deps = [
11+
"//src/v/base",
12+
"//src/v/cloud_topics:types",
13+
"//src/v/cloud_topics/level_zero/pipeline:event_filter",
14+
"//src/v/cloud_topics/level_zero/pipeline:pipeline_stage",
15+
"//src/v/cloud_topics/level_zero/pipeline:read_pipeline",
16+
"//src/v/cloud_topics/level_zero/pipeline:read_request",
17+
"//src/v/cloud_topics/level_zero/read_request_scheduler",
18+
"//src/v/config",
19+
"//src/v/model",
20+
"//src/v/model/tests:random",
21+
"//src/v/test_utils:gtest",
22+
"//src/v/utils:uuid",
23+
"@googletest//:gtest",
24+
"@seastar",
25+
],
26+
)

0 commit comments

Comments
 (0)