Skip to content

Commit ef679d2

Browse files
committed
Add query and stream features; enhance replication routes and logic
1 parent 6b2fb7a commit ef679d2

File tree

9 files changed

+318
-84
lines changed

9 files changed

+318
-84
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ features = [
6666
"http2",
6767
"json",
6868
"matched-path",
69+
"query",
6970
"tokio",
7071
"tracing",
7172
]
@@ -322,6 +323,7 @@ features = [
322323
"rustls",
323324
"rustls-native-certs",
324325
"socks",
326+
"stream",
325327
]
326328

327329
[workspace.dependencies.ring]

src/api/client/replication.rs

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@
44
//! - `GET /_tuwunel/replication/status` — current sequence number + role
55
//! - `GET /_tuwunel/replication/wal?since=N` — streaming WAL frame feed
66
//! - `GET /_tuwunel/replication/checkpoint` — full database checkpoint as tar
7+
//! - `POST /_tuwunel/replication/promote` — promote secondary to primary
8+
//! - `POST /_tuwunel/replication/demote` — demote primary back to secondary
79
810
use std::time::Duration;
911

1012
use axum::{
13+
Json,
1114
body::Body,
1215
extract::{Query, State},
1316
http::{StatusCode, header},
1417
response::{IntoResponse, Response},
1518
};
1619
use bytes::Bytes;
17-
use futures::StreamExt;
18-
use serde::Deserialize;
20+
use futures::{SinkExt, StreamExt};
21+
use serde::{Deserialize, Serialize};
1922
use tuwunel_core::Result;
2023
use tuwunel_database::{WalFrame, is_wal_gap_error};
2124

@@ -38,8 +41,16 @@ pub(crate) async fn replication_status(
3841
.await
3942
.unwrap_or(0);
4043

44+
let role = if services.server.config.rocksdb_primary_url.is_some()
45+
&& !services.replication.is_promoted()
46+
{
47+
"secondary"
48+
} else {
49+
"primary"
50+
};
51+
4152
axum::Json(serde_json::json!({
42-
"role": "primary",
53+
"role": role,
4354
"latest_sequence": seq,
4455
}))
4556
}
@@ -214,6 +225,90 @@ pub(crate) async fn replication_checkpoint(
214225
}
215226
}
216227

228+
/// `POST /_tuwunel/replication/promote`
229+
///
230+
/// Promotes this secondary to a standalone primary by stopping the replication
231+
/// worker. After this call returns the instance accepts writes and no longer
232+
/// tails the primary's WAL. The caller is responsible for updating the VIP or
233+
/// load balancer to route client traffic to this node.
234+
///
235+
/// Returns:
236+
/// - `200 OK` with `{"status":"promoted"}` on success.
237+
/// - `409 Conflict` if this instance is already a primary (no `rocksdb_primary_url`
238+
/// was configured, or it was already promoted).
239+
pub(crate) async fn replication_promote(
240+
State(services): State<crate::State>,
241+
) -> impl IntoResponse {
242+
if services.replication.is_promoted() {
243+
return (
244+
StatusCode::CONFLICT,
245+
axum::Json(serde_json::json!({"error": "already promoted"})),
246+
)
247+
.into_response();
248+
}
249+
250+
if services.server.config.rocksdb_primary_url.is_none() {
251+
return (
252+
StatusCode::CONFLICT,
253+
axum::Json(serde_json::json!({"error": "not a secondary; no rocksdb_primary_url configured"})),
254+
)
255+
.into_response();
256+
}
257+
258+
services.replication.promote();
259+
260+
axum::Json(serde_json::json!({"status": "promoted"})).into_response()
261+
}
262+
263+
/// Request body for `POST /_tuwunel/replication/demote`.
264+
#[derive(Debug, Deserialize, Serialize)]
265+
pub(crate) struct DemoteBody {
266+
/// URL of the new primary to replicate from (e.g. `http://host:8008`).
267+
pub primary_url: String,
268+
}
269+
270+
/// `POST /_tuwunel/replication/demote`
271+
///
272+
/// Demotes this promoted primary back to a secondary that replicates from
273+
/// `primary_url`. Resets the resume cursor and triggers a fresh checkpoint
274+
/// bootstrap from the new primary — the worker restarts replication without
275+
/// requiring a process restart.
276+
///
277+
/// Typical use case: the original primary comes back online after a failover
278+
/// and needs to re-join the cluster as a secondary under the newly promoted
279+
/// node.
280+
///
281+
/// Returns:
282+
/// - `200 OK` with `{"status":"demoted","primary_url":"..."}` on success.
283+
/// - `400 Bad Request` if `primary_url` is missing or empty.
284+
/// - `409 Conflict` if this instance is not currently promoted (i.e. it is
285+
/// already actively replicating or was never a secondary).
286+
pub(crate) async fn replication_demote(
287+
State(services): State<crate::State>,
288+
Json(body): Json<DemoteBody>,
289+
) -> impl IntoResponse {
290+
if body.primary_url.is_empty() {
291+
return (
292+
StatusCode::BAD_REQUEST,
293+
axum::Json(serde_json::json!({"error": "primary_url is required"})),
294+
)
295+
.into_response();
296+
}
297+
298+
match services.replication.demote(body.primary_url.clone()).await {
299+
| Ok(()) => axum::Json(serde_json::json!({
300+
"status": "demoted",
301+
"primary_url": body.primary_url,
302+
}))
303+
.into_response(),
304+
| Err(e) => (
305+
StatusCode::CONFLICT,
306+
axum::Json(serde_json::json!({"error": e.to_string()})),
307+
)
308+
.into_response(),
309+
}
310+
}
311+
217312
/// Creates a temporary directory that is automatically removed on drop.
218313
///
219314
/// We use a simple wrapper around `std::fs::create_dir_all` on a

src/api/router.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub(super) use self::{
2727
};
2828
use crate::{client, server};
2929

30-
pub fn build(router: Router<State>, server: &Server) -> Router<State> {
30+
pub fn build(router: Router<State>, server: &Server, state: State) -> Router<State> {
3131
let config = &server.config;
3232
let mut router = router
3333
.ruma_route(&client::get_timezone_key_route)
@@ -263,7 +263,15 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
263263
"/_tuwunel/replication/checkpoint",
264264
get(client::replication_checkpoint),
265265
)
266-
.layer(middleware::from_fn(check_replication_token)),
266+
.route(
267+
"/_tuwunel/replication/promote",
268+
post(client::replication_promote),
269+
)
270+
.route(
271+
"/_tuwunel/replication/demote",
272+
post(client::replication_demote),
273+
)
274+
.layer(middleware::from_fn_with_state(state, check_replication_token)),
267275
);
268276

269277
if config.allow_legacy_media {

src/api/router/replication_auth.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use axum::{
22
body::Body,
3-
extract::Request,
3+
extract::{Request, State},
44
http::StatusCode,
55
middleware::Next,
66
response::{IntoResponse, Response},
@@ -16,10 +16,11 @@ pub(super) const TOKEN_HEADER: &str = "x-tuwunel-replication-token";
1616
/// - `401 Unauthorized` if the token is missing or incorrect.
1717
/// - Passes through to the handler if the token matches.
1818
pub(crate) async fn check_replication_token(
19-
axum::extract::State(services): axum::extract::State<super::State>,
19+
State(services): State<super::State>,
2020
request: Request<Body>,
2121
next: Next,
2222
) -> Response {
23+
2324
let Some(ref expected) = services.server.config.rocksdb_replication_token else {
2425
return (
2526
StatusCode::NOT_IMPLEMENTED,

src/database/engine/open.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
4444
} else if config.rocksdb_secondary {
4545
let secondary_path = config
4646
.rocksdb_secondary_path
47-
.as_deref()
47+
.as_ref()
4848
.unwrap_or(path);
4949
Db::open_cf_descriptors_as_secondary(&db_opts, path, secondary_path, cfds)
5050
} else {

src/database/engine/replication.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
time::{SystemTime, UNIX_EPOCH},
99
};
1010

11-
use rocksdb::Checkpoint;
11+
use rocksdb::checkpoint::Checkpoint;
1212
use tuwunel_core::{Err, Result, implement};
1313

1414
use super::Engine;
@@ -219,6 +219,35 @@ pub fn wal_updates_since(&self, since: u64) -> Result<rocksdb::DBWALIterator> {
219219
self.db.get_updates_since(since).map_err(map_err)
220220
}
221221

222+
/// Newtype wrapper making `DBWALIterator` safe to send across threads.
223+
///
224+
/// `DBWALIterator` holds a `*mut rocksdb_wal_iterator_t` raw pointer which
225+
/// is not auto-`Send`. RocksDB WAL iterators are not concurrently shared;
226+
/// this iterator is consumed by exactly one thread at a time, so sending
227+
/// ownership across a thread boundary is safe.
228+
struct SendWalIter(rocksdb::DBWALIterator);
229+
230+
// SAFETY: DBWALIterator is not auto-Send due to its raw pointer, but the
231+
// underlying RocksDB iterator is safe to use from whichever single thread
232+
// owns it at any given time. We never share it across threads simultaneously.
233+
unsafe impl Send for SendWalIter {}
234+
235+
impl Iterator for SendWalIter {
236+
type Item = Result<WalFrame>;
237+
238+
fn next(&mut self) -> Option<Self::Item> {
239+
self.0.next().map(|result| {
240+
result
241+
.map(|(seq, batch)| {
242+
let data = batch.data().to_vec();
243+
let count = batch_count_from_bytes(&data);
244+
WalFrame::data(seq, count, data)
245+
})
246+
.map_err(map_err)
247+
})
248+
}
249+
}
250+
222251
/// Return a higher-level iterator of [`WalFrame`]s starting at `since`.
223252
///
224253
/// Wraps `wal_updates_since` and maps each rocksdb batch into a `WalFrame`,
@@ -233,15 +262,7 @@ pub fn wal_frame_iter(
233262
since: u64,
234263
) -> Result<Box<dyn Iterator<Item = Result<WalFrame>> + Send>> {
235264
let iter = self.db.get_updates_since(since).map_err(map_err)?;
236-
Ok(Box::new(iter.map(|result| {
237-
result
238-
.map(|(seq, batch)| {
239-
let data = batch.data().to_vec();
240-
let count = batch_count_from_bytes(&data);
241-
WalFrame::data(seq, count, data)
242-
})
243-
.map_err(map_err)
244-
})))
265+
Ok(Box::new(SendWalIter(iter)))
245266
}
246267

247268
/// Returns `true` if `err` indicates the requested WAL sequence is older

src/router/router.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tuwunel_service::Services;
1010
pub(crate) fn build(services: &Arc<Services>) -> (Router, Guard) {
1111
let router = Router::<state::State>::new();
1212
let (state, guard) = state::create(services.clone());
13-
let router = tuwunel_api::router::build(router, &services.server)
13+
let router = tuwunel_api::router::build(router, &services.server, state)
1414
.route("/", get(it_works))
1515
.fallback(not_found)
1616
.with_state(state);

0 commit comments

Comments
 (0)