diff --git a/Cargo.lock b/Cargo.lock index 7d7420020..41d2a1bb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1468,6 +1468,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -2475,6 +2486,18 @@ dependencies = [ "windows-link", ] +[[package]] +name = "libredox" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" +dependencies = [ + "bitflags", + "libc", + "plain", + "redox_syscall 0.7.3", +] + [[package]] name = "libz-sys" version = "1.1.24" @@ -3164,7 +3187,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -3317,6 +3340,12 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plain" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" + [[package]] name = "png" version = "0.18.1" @@ -3680,6 +3709,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.12.3" @@ -3759,6 +3797,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-core", + "futures-util", "h2", "hickory-resolver", "http", @@ -3783,12 +3822,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", ] @@ -4769,6 +4810,17 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tar" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.26.0" @@ -5362,9 +5414,11 @@ dependencies = [ "serde_html_form", "serde_json", "sha1", + "tar", "tokio", "tracing", "tuwunel_core", + "tuwunel_database", "tuwunel_service", "url", ] @@ -5437,6 +5491,7 @@ version = "1.5.1" dependencies = [ "async-channel", "const-str", + "crc32fast", "criterion", "ctor", "futures", @@ -5528,6 +5583,7 @@ dependencies = [ "serde_yaml", "sha2", "similar", + "tar", "termimad", "tokio", "tracing", @@ -5850,6 +5906,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" @@ -6394,6 +6463,16 @@ dependencies = [ "time", ] +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "xml5ever" version = "0.18.1" diff --git a/Cargo.toml b/Cargo.toml index 3cec668f4..9cbfbc363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ features = [ "http2", "json", "matched-path", + "query", "tokio", "tracing", ] @@ -109,6 +110,9 @@ version = "1.11" [workspace.dependencies.bytesize] version = "2.3" +[workspace.dependencies.crc32fast] +version = "1" + [workspace.dependencies.cargo_toml] version = "0.22" default-features = false @@ -319,6 +323,7 @@ features = [ "rustls", "rustls-native-certs", "socks", + "stream", ] [workspace.dependencies.ring] @@ -481,6 +486,9 @@ features = [ "printing", ] +[workspace.dependencies.tar] +version = "0.4" + [workspace.dependencies.termimad] version = "0.34" default-features = false diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index cd1266a7a..ced9aa334 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -95,9 +95,11 @@ serde_html_form.workspace = true serde_json.workspace = true serde.workspace = true sha1.workspace = true +tar.workspace = true tokio.workspace = true tracing.workspace = true tuwunel-core.workspace = true +tuwunel-database.workspace = true tuwunel-service.workspace = true url.workspace = true diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 1f08ede48..497431eff 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -23,6 +23,7 @@ pub(super) mod read_marker; pub(super) mod redact; pub(super) mod register; pub(super) mod relations; +pub(super) mod replication; pub(super) mod report; pub(super) mod room; pub(super) mod search; @@ -70,6 +71,7 @@ pub(super) use read_marker::*; pub(super) use redact::*; pub(super) use register::*; pub(super) use relations::*; +pub(super) use replication::*; pub(super) use report::*; pub(super) use room::*; pub(super) use search::*; diff --git a/src/api/client/replication.rs b/src/api/client/replication.rs new file mode 100644 index 000000000..7340f7107 --- /dev/null +++ b/src/api/client/replication.rs @@ -0,0 +1,341 @@ +//! Primary-side HTTP handlers for WAL-based RocksDB replication. +//! +//! Endpoints (all protected by `check_replication_token` middleware): +//! - `GET /_tuwunel/replication/status` — current sequence number + role +//! - `GET /_tuwunel/replication/wal?since=N` — streaming WAL frame feed +//! - `GET /_tuwunel/replication/checkpoint` — full database checkpoint as tar +//! - `POST /_tuwunel/replication/promote` — promote secondary to primary +//! - `POST /_tuwunel/replication/demote` — demote primary back to secondary + +use std::time::Duration; + +use axum::{ + Json, + body::Body, + extract::{Query, State}, + http::{StatusCode, header}, + response::{IntoResponse, Response}, +}; +use bytes::Bytes; +use futures::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use tuwunel_core::Result; +use tuwunel_database::{WalFrame, is_wal_gap_error}; + +/// Query parameters for `GET /_tuwunel/replication/wal`. +#[derive(Debug, Deserialize)] +pub(crate) struct WalParams { + /// Last sequence number the secondary successfully applied. + /// Omit (or pass `0`) to start from the current WAL head. + pub since: Option, +} + +/// `GET /_tuwunel/replication/status` +/// +/// Returns the primary's current WAL sequence number and role. +pub(crate) async fn replication_status( + State(services): State, +) -> impl IntoResponse { + let db = services.db.clone(); + let seq = tokio::task::spawn_blocking(move || db.latest_wal_sequence()) + .await + .unwrap_or(0); + + let role = if services.server.config.rocksdb_primary_url.is_some() + && !services.replication.is_promoted() + { + "secondary" + } else { + "primary" + }; + + axum::Json(serde_json::json!({ + "role": role, + "latest_sequence": seq, + })) +} + +/// `GET /_tuwunel/replication/wal?since=N` +/// +/// Streams WAL frames to the secondary. The response body is a sequence of +/// length-prefixed [`WalFrame`] wire encodings (see `engine/replication.rs`). +/// +/// Returns: +/// - `200 OK` with a streaming `application/octet-stream` body on success. +/// - `410 Gone` when the requested `since` sequence is older than the oldest +/// retained WAL segment. The secondary must full-resync from a checkpoint. +pub(crate) async fn replication_wal( + State(services): State, + Query(params): Query, +) -> Response { + let since = params.since.unwrap_or(0); + let db = services.db.clone(); + let interval_ms = services.server.config.rocksdb_replication_interval_ms; + + // Eagerly check for a WAL gap before opening the streaming response. + let gap_check: Result<()> = tokio::task::spawn_blocking({ + let db = db.clone(); + move || db.wal_frame_iter(since).map(drop) + }) + .await + .expect("spawn_blocking panicked in gap check"); + + if let Err(ref e) = gap_check { + if is_wal_gap_error(e) { + return ( + StatusCode::GONE, + "WAL gap: secondary must re-sync from a fresh checkpoint", + ) + .into_response(); + } + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("WAL iterator error: {e}"), + ) + .into_response(); + } + + // Channel that bridges the blocking WAL reader with the async HTTP body. + let (mut tx, rx) = futures::channel::mpsc::channel::(256); + + tokio::spawn(async move { + let mut seq = since; + + loop { + // Drain all available WAL frames in a blocking thread. + let result = tokio::task::spawn_blocking({ + let db = db.clone(); + move || -> (Vec, u64) { + let mut frames: Vec = Vec::new(); + let mut next_seq = seq; + match db.wal_frame_iter(seq) { + | Ok(iter) => { + for item in iter { + if let Ok(frame) = item { + next_seq = frame.next_resume_seq(); + frames.push(Bytes::from(frame.encode())); + } + } + }, + | Err(_) => {}, + } + (frames, next_seq) + } + }) + .await; + + let (frames, next_seq) = match result { + | Ok(pair) => pair, + | Err(_) => break, // spawn_blocking panicked + }; + + let advanced = next_seq != seq; + seq = next_seq; + + for encoded in frames { + if tx.send(encoded).await.is_err() { + return; // client disconnected + } + } + + // Always emit a heartbeat so the secondary can tell the primary is alive. + // When no data was produced, sleep first to avoid a busy-loop. + if !advanced { + tokio::time::sleep(Duration::from_millis(interval_ms)).await; + } + + let hb_seq = { + let db = db.clone(); + tokio::task::spawn_blocking(move || db.latest_wal_sequence()) + .await + .unwrap_or(seq) + }; + let hb = WalFrame::heartbeat(hb_seq); + if tx.send(Bytes::from(hb.encode())).await.is_err() { + return; // client disconnected + } + } + }); + + let stream = rx.map(Ok::<_, std::convert::Infallible>); + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .header("x-tuwunel-role", "primary") + .body(Body::from_stream(stream)) + .expect("Failed to build WAL streaming response") +} + +/// `GET /_tuwunel/replication/checkpoint` +/// +/// Creates a RocksDB checkpoint of the primary's database and streams it as a +/// tar archive. The `X-Tuwunel-Checkpoint-Sequence` response header carries +/// the WAL sequence number at checkpoint creation time; the secondary uses +/// this as its initial `?since=` value when it begins WAL streaming. +/// +/// The caller is responsible for pausing WAL consumption while restoring the +/// checkpoint and then resuming from `X-Tuwunel-Checkpoint-Sequence`. +pub(crate) async fn replication_checkpoint( + State(services): State, +) -> Response { + let db = services.db.clone(); + + // Build the checkpoint and tar it in a blocking thread. + let result = tokio::task::spawn_blocking(move || -> Result<(Bytes, u64)> { + let tmp = tempfile_checkpoint_dir()?; + let checkpoint_path = tmp.path().join("checkpoint"); + + let seq = db.create_checkpoint(&checkpoint_path)?; + + // Build tar archive in memory. + let mut archive_bytes: Vec = Vec::new(); + { + let mut builder = tar::Builder::new(&mut archive_bytes); + builder + .append_dir_all("checkpoint", &checkpoint_path) + .map_err(|e| tuwunel_core::err!(Database("{e}")))?; + builder + .finish() + .map_err(|e| tuwunel_core::err!(Database("{e}")))?; + } + + Ok((Bytes::from(archive_bytes), seq)) + }) + .await; + + match result { + | Ok(Ok((bytes, seq))) => Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/x-tar") + .header("x-tuwunel-checkpoint-sequence", seq.to_string()) + .body(Body::from(bytes)) + .expect("Failed to build checkpoint response"), + + | Ok(Err(e)) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Checkpoint creation failed: {e}"), + ) + .into_response(), + + | Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Spawn_blocking panicked: {e}"), + ) + .into_response(), + } +} + +/// `POST /_tuwunel/replication/promote` +/// +/// Promotes this secondary to a standalone primary by stopping the replication +/// worker. After this call returns the instance accepts writes and no longer +/// tails the primary's WAL. The caller is responsible for updating the VIP or +/// load balancer to route client traffic to this node. +/// +/// Returns: +/// - `200 OK` with `{"status":"promoted"}` on success. +/// - `409 Conflict` if this instance is already a primary (no `rocksdb_primary_url` +/// was configured, or it was already promoted). +pub(crate) async fn replication_promote( + State(services): State, +) -> impl IntoResponse { + if services.replication.is_promoted() { + return ( + StatusCode::CONFLICT, + axum::Json(serde_json::json!({"error": "already promoted"})), + ) + .into_response(); + } + + if services.server.config.rocksdb_primary_url.is_none() { + return ( + StatusCode::CONFLICT, + axum::Json(serde_json::json!({"error": "not a secondary; no rocksdb_primary_url configured"})), + ) + .into_response(); + } + + services.replication.promote(); + + axum::Json(serde_json::json!({"status": "promoted"})).into_response() +} + +/// Request body for `POST /_tuwunel/replication/demote`. +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct DemoteBody { + /// URL of the new primary to replicate from (e.g. `http://host:8008`). + pub primary_url: String, +} + +/// `POST /_tuwunel/replication/demote` +/// +/// Demotes this promoted primary back to a secondary that replicates from +/// `primary_url`. Resets the resume cursor and triggers a fresh checkpoint +/// bootstrap from the new primary — the worker restarts replication without +/// requiring a process restart. +/// +/// Typical use case: the original primary comes back online after a failover +/// and needs to re-join the cluster as a secondary under the newly promoted +/// node. +/// +/// Returns: +/// - `200 OK` with `{"status":"demoted","primary_url":"..."}` on success. +/// - `400 Bad Request` if `primary_url` is missing or empty. +/// - `409 Conflict` if this instance is not currently promoted (i.e. it is +/// already actively replicating or was never a secondary). +pub(crate) async fn replication_demote( + State(services): State, + Json(body): Json, +) -> impl IntoResponse { + if body.primary_url.is_empty() { + return ( + StatusCode::BAD_REQUEST, + axum::Json(serde_json::json!({"error": "primary_url is required"})), + ) + .into_response(); + } + + match services.replication.demote(body.primary_url.clone()).await { + | Ok(()) => axum::Json(serde_json::json!({ + "status": "demoted", + "primary_url": body.primary_url, + })) + .into_response(), + | Err(e) => ( + StatusCode::CONFLICT, + axum::Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +/// Creates a temporary directory that is automatically removed on drop. +/// +/// We use a simple wrapper around `std::fs::create_dir_all` on a +/// `tempfile::TempDir` equivalent so we don't add a `tempfile` dependency. +/// Instead, we create a uniquely-named subdirectory in the OS temp dir and +/// delete it ourselves. +fn tempfile_checkpoint_dir() -> Result { + use std::time::{SystemTime, UNIX_EPOCH}; + + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let dir = + std::env::temp_dir().join(format!("tuwunel-checkpoint-{ts}-{}", std::process::id())); + std::fs::create_dir_all(&dir).map_err(|e| tuwunel_core::err!(Database("{e}")))?; + Ok(TempDir(dir)) +} + +struct TempDir(std::path::PathBuf); + +impl TempDir { + fn path(&self) -> &std::path::Path { &self.0 } +} + +impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } +} diff --git a/src/api/router.rs b/src/api/router.rs index 7263f4ea0..8835761b7 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -1,6 +1,7 @@ mod args; mod auth; mod handler; +mod replication_auth; mod request; mod response; pub mod state; @@ -9,6 +10,7 @@ use std::str::FromStr; use axum::{ Router, + middleware, response::{IntoResponse, Redirect}, routing::{any, get, post}, }; @@ -17,11 +19,15 @@ use tuwunel_core::{Server, err}; use self::handler::RouterExt; pub(super) use self::{ - args::Args as Ruma, auth::auth_uiaa, response::RumaResponse, state::State, + args::Args as Ruma, + auth::auth_uiaa, + replication_auth::check_replication_token, + response::RumaResponse, + state::State, }; use crate::{client, server}; -pub fn build(router: Router, server: &Server) -> Router { +pub fn build(router: Router, server: &Server, state: State) -> Router { let config = &server.config; let mut router = router .ruma_route(&client::get_timezone_key_route) @@ -247,6 +253,27 @@ pub fn build(router: Router, server: &Server) -> Router { .route("/_tuwunel/local_user_count", any(federation_disabled)); } + // Replication endpoints — protected by shared-secret token auth middleware. + // The middleware extracts `State` from request extensions (set by with_state). + router = router.merge( + Router::::new() + .route("/_tuwunel/replication/status", get(client::replication_status)) + .route("/_tuwunel/replication/wal", get(client::replication_wal)) + .route( + "/_tuwunel/replication/checkpoint", + get(client::replication_checkpoint), + ) + .route( + "/_tuwunel/replication/promote", + post(client::replication_promote), + ) + .route( + "/_tuwunel/replication/demote", + post(client::replication_demote), + ) + .layer(middleware::from_fn_with_state(state, check_replication_token)), + ); + if config.allow_legacy_media { router = router .ruma_route(&client::get_media_config_legacy_route) diff --git a/src/api/router/replication_auth.rs b/src/api/router/replication_auth.rs new file mode 100644 index 000000000..ba8bb3527 --- /dev/null +++ b/src/api/router/replication_auth.rs @@ -0,0 +1,56 @@ +use axum::{ + body::Body, + extract::{Request, State}, + http::StatusCode, + middleware::Next, + response::{IntoResponse, Response}, +}; + +pub(super) const TOKEN_HEADER: &str = "x-tuwunel-replication-token"; + +/// Axum middleware that validates the `X-Tuwunel-Replication-Token` header +/// against `config.rocksdb_replication_token`. +/// +/// Returns: +/// - `501 Not Implemented` if replication is not configured on this instance. +/// - `401 Unauthorized` if the token is missing or incorrect. +/// - Passes through to the handler if the token matches. +pub(crate) async fn check_replication_token( + State(services): State, + request: Request, + next: Next, +) -> Response { + + let Some(ref expected) = services.server.config.rocksdb_replication_token else { + return ( + StatusCode::NOT_IMPLEMENTED, + "Replication is not configured on this instance", + ) + .into_response(); + }; + + let provided = request + .headers() + .get(TOKEN_HEADER) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + // Constant-time comparison to avoid timing side-channels. + if !constant_time_eq(provided.as_bytes(), expected.as_bytes()) { + return (StatusCode::UNAUTHORIZED, "Invalid replication token").into_response(); + } + + next.run(request).await +} + +/// Byte-by-byte constant-time equality check that does not short-circuit. +fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { + if a.len() != b.len() { + // Still consume O(min(a,b)) time to avoid leaking which was shorter. + a.iter() + .zip(b.iter()) + .fold(0u8, |acc, (x, y)| acc | (x ^ y)); + return false; + } + a.iter().zip(b.iter()).fold(0u8, |acc, (x, y)| acc | (x ^ y)) == 0 +} diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 67514f960..46b3863bb 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1344,6 +1344,35 @@ pub struct Config { #[serde(default)] pub rocksdb_secondary: bool, + /// Path for the secondary instance's own RocksDB log files. Required when + /// `rocksdb_secondary` is true and the primary DB is not on a shared + /// filesystem. Must be a writable directory local to this host. + #[serde(default)] + pub rocksdb_secondary_path: Option, + + /// URL of the primary instance for WAL-streaming replication. + /// Example: `https://primary.example.com` + /// Required on secondary instances that use WAL streaming. + #[serde(default)] + pub rocksdb_primary_url: Option, + + /// Shared secret token for replication endpoint authentication. + /// Both primary and secondary must have the same value. + /// Leave unset to disable the replication HTTP endpoints entirely. + #[serde(default)] + pub rocksdb_replication_token: Option, + + /// How long (in seconds) the primary retains WAL segments beyond what + /// local recovery requires. Gives the secondary a window to reconnect + /// after downtime without needing a full re-sync. Default: 86400 (24h). + #[serde(default = "default_rocksdb_wal_ttl_seconds")] + pub rocksdb_wal_ttl_seconds: u64, + + /// Interval in milliseconds at which the secondary polls for new WAL + /// frames when caught up with the primary. Default: 250ms. + #[serde(default = "default_rocksdb_replication_interval_ms")] + pub rocksdb_replication_interval_ms: u64, + /// Enables idle CPU priority for compaction thread. This is not enabled by /// default to prevent compaction from falling too far behind on busy /// systems. @@ -3397,6 +3426,10 @@ fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_wal_ttl_seconds() -> u64 { 86400 } + +fn default_rocksdb_replication_interval_ms() -> u64 { 250 } + // I know, it's a great name #[must_use] #[inline] diff --git a/src/database/Cargo.toml b/src/database/Cargo.toml index 89c63ca97..d338ab696 100644 --- a/src/database/Cargo.toml +++ b/src/database/Cargo.toml @@ -55,6 +55,7 @@ zstd_compression = [ [dependencies] async-channel.workspace = true const-str.workspace = true +crc32fast.workspace = true ctor.workspace = true futures.workspace = true log.workspace = true diff --git a/src/database/engine.rs b/src/database/engine.rs index 787ad294f..395946eb9 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -9,6 +9,11 @@ mod logger; mod memory_usage; mod open; mod repair; +pub mod replication; +pub use replication::{ + WalFrame, batch_count_from_bytes, is_wal_gap_error, FRAME_HEADER_LEN, FRAME_TYPE_DATA, + FRAME_TYPE_HEARTBEAT, +}; use std::{ ffi::CStr, diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 11da8a46e..361ca8890 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -57,6 +57,7 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul // Files opts.set_table_cache_num_shard_bits(7); opts.set_wal_size_limit_mb(1024); + opts.set_wal_ttl_seconds(config.rocksdb_wal_ttl_seconds); opts.set_max_total_wal_size(1024 * 1024 * 512); opts.set_writable_file_max_buffer_size(1024 * 1024 * 2); if !config.rocksdb_allow_fallocate { diff --git a/src/database/engine/open.rs b/src/database/engine/open.rs index 5491cfc28..0ba3f7d07 100644 --- a/src/database/engine/open.rs +++ b/src/database/engine/open.rs @@ -42,7 +42,11 @@ pub(crate) async fn open(ctx: Arc, desc: &[Descriptor]) -> Result, +} + +impl WalFrame { + /// Create a heartbeat frame carrying the primary's current sequence. + pub fn heartbeat(primary_sequence: u64) -> Self { + Self { + frame_type: FRAME_TYPE_HEARTBEAT, + sequence: primary_sequence, + count: 0, + timestamp_ms: now_ms(), + crc32: 0, + batch_data: Vec::new(), + } + } + + /// Create a data frame from a WAL batch. CRC is computed automatically. + pub fn data(sequence: u64, count: u64, batch_data: Vec) -> Self { + let crc32 = crc32fast::hash(&batch_data); + Self { + frame_type: FRAME_TYPE_DATA, + sequence, + count, + timestamp_ms: now_ms(), + crc32, + batch_data, + } + } + + /// Returns the sequence number the secondary should use as its next + /// `?since=` argument after successfully applying this frame. + /// For heartbeats, returns `sequence` unchanged (cursor must not advance + /// based on heartbeats alone). + #[inline] + pub fn next_resume_seq(&self) -> u64 { + if self.frame_type == FRAME_TYPE_DATA { + self.sequence.saturating_add(self.count) + } else { + self.sequence + } + } + + /// Encode the frame to bytes for writing to the HTTP stream. + pub fn encode(&self) -> Vec { + let batch_len = self.batch_data.len() as u32; + let mut buf = Vec::with_capacity(FRAME_HEADER_LEN + self.batch_data.len()); + buf.push(self.frame_type); + buf.extend_from_slice(&self.sequence.to_le_bytes()); + buf.extend_from_slice(&self.count.to_le_bytes()); + buf.extend_from_slice(&self.timestamp_ms.to_le_bytes()); + buf.extend_from_slice(&self.crc32.to_le_bytes()); + buf.extend_from_slice(&batch_len.to_le_bytes()); + buf.extend_from_slice(&self.batch_data); + buf + } + + /// Attempt to decode a frame from the start of `buf`. + /// + /// Returns `(frame, bytes_consumed)` on success. Returns `Err` if the + /// buffer is too short to contain a complete frame, or if the CRC does + /// not match. + pub fn decode(buf: &[u8]) -> Result<(Self, usize)> { + if buf.len() < FRAME_HEADER_LEN { + return Err!( + "WAL frame header truncated: {} bytes < {FRAME_HEADER_LEN} required", + buf.len() + ); + } + + let frame_type = buf[0]; + let sequence = u64::from_le_bytes(buf[1..9].try_into().expect("8 bytes")); + let count = u64::from_le_bytes(buf[9..17].try_into().expect("8 bytes")); + let timestamp_ms = u64::from_le_bytes(buf[17..25].try_into().expect("8 bytes")); + let crc32 = u32::from_le_bytes(buf[25..29].try_into().expect("4 bytes")); + let batch_len = u32::from_le_bytes(buf[29..33].try_into().expect("4 bytes")) as usize; + + let total = FRAME_HEADER_LEN + batch_len; + if buf.len() < total { + return Err!( + "WAL frame body truncated: need {total} bytes, have {}", + buf.len() + ); + } + + let batch_data = buf[FRAME_HEADER_LEN..total].to_vec(); + + if frame_type == FRAME_TYPE_DATA && !batch_data.is_empty() { + let actual = crc32fast::hash(&batch_data); + if actual != crc32 { + return Err!( + "WAL frame CRC mismatch: stored {crc32:#010x}, computed {actual:#010x}" + ); + } + } + + Ok(( + Self { frame_type, sequence, count, timestamp_ms, crc32, batch_data }, + total, + )) + } +} + +/// Extract the operation count from a raw WriteBatch byte slice. +/// +/// RocksDB `WriteBatch` layout: `[8 bytes sequence][4 bytes count][records…]`. +/// The count at bytes 8–11 is the number of operations in the batch, which +/// equals how many sequence numbers the batch consumes. Returns 0 if the +/// slice is too short to contain the count. +#[inline] +pub fn batch_count_from_bytes(data: &[u8]) -> u64 { + if data.len() < 12 { + return 0; + } + u32::from_le_bytes(data[8..12].try_into().expect("4 bytes")) as u64 +} + +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +// ── Engine methods ───────────────────────────────────────────────────────────── + +/// Create a RocksDB checkpoint at `dest`. +/// +/// A checkpoint is a consistent point-in-time snapshot consisting of +/// hard-links to existing SST files. It is created atomically and returns +/// the sequence number at checkpoint creation time. +#[implement(Engine)] +pub fn create_checkpoint(&self, dest: &Path) -> Result { + let checkpoint = Checkpoint::new(&self.db).map_err(map_err)?; + checkpoint.create_checkpoint(dest).map_err(map_err)?; + Ok(self.db.latest_sequence_number()) +} + +/// Prevent RocksDB from deleting obsolete files. +/// +/// Call this before initiating a checkpoint transfer or live-file listing +/// to ensure files are not removed while they are being transferred. Must +/// be paired with a subsequent `enable_file_deletions` call. +#[implement(Engine)] +pub fn disable_file_deletions(&self) -> Result { + self.db.disable_file_deletions().map_err(map_err) +} + +/// Re-enable file deletion after a `disable_file_deletions` call. +#[implement(Engine)] +pub fn enable_file_deletions(&self) -> Result { + self.db.enable_file_deletions().map_err(map_err) +} + +/// Returns the current latest WAL sequence number of this instance. +/// +/// Used by the primary to populate heartbeat frames so the secondary knows +/// the primary is alive and can gauge replication lag. +#[implement(Engine)] +pub fn latest_wal_sequence(&self) -> u64 { self.db.latest_sequence_number() } + +/// Return a WAL iterator starting at `since`. +/// +/// Yields batches whose sequence number is >= `since`. If `since` is older +/// than the oldest retained WAL segment, this returns `Err` — call +/// `is_wal_gap_error` on the result to distinguish this case from other +/// errors. +#[implement(Engine)] +pub fn wal_updates_since(&self, since: u64) -> Result { + self.db.get_updates_since(since).map_err(map_err) +} + +/// Newtype wrapper making `DBWALIterator` safe to send across threads. +/// +/// `DBWALIterator` holds a `*mut rocksdb_wal_iterator_t` raw pointer which +/// is not auto-`Send`. RocksDB WAL iterators are not concurrently shared; +/// this iterator is consumed by exactly one thread at a time, so sending +/// ownership across a thread boundary is safe. +struct SendWalIter(rocksdb::DBWALIterator); + +// SAFETY: DBWALIterator is not auto-Send due to its raw pointer, but the +// underlying RocksDB iterator is safe to use from whichever single thread +// owns it at any given time. We never share it across threads simultaneously. +unsafe impl Send for SendWalIter {} + +impl Iterator for SendWalIter { + type Item = Result; + + fn next(&mut self) -> Option { + self.0.next().map(|result| { + result + .map(|(seq, batch)| { + let data = batch.data().to_vec(); + let count = batch_count_from_bytes(&data); + WalFrame::data(seq, count, data) + }) + .map_err(map_err) + }) + } +} + +/// Return a higher-level iterator of [`WalFrame`]s starting at `since`. +/// +/// Wraps `wal_updates_since` and maps each rocksdb batch into a `WalFrame`, +/// hiding the internal `DBWALIterator` / `WriteBatch` types from callers +/// that only have access to `tuwunel-database` (not `rust-rocksdb` directly). +/// +/// Returns `Err` immediately if the sequence is too old; call +/// `is_wal_gap_error` to distinguish a gap from other errors. +#[implement(Engine)] +pub fn wal_frame_iter( + &self, + since: u64, +) -> Result> + Send>> { + let iter = self.db.get_updates_since(since).map_err(map_err)?; + Ok(Box::new(SendWalIter(iter))) +} + +/// Returns `true` if `err` indicates the requested WAL sequence is older +/// than any retained segment on this instance. +/// +/// The primary uses this to return HTTP 410 rather than 500. +pub fn is_wal_gap_error(err: &tuwunel_core::Error) -> bool { + let msg = err.to_string().to_lowercase(); + msg.contains("too old") + || msg.contains("older than") + || msg.contains("sequence not") + || msg.contains("data loss") + || msg.contains("not available") +} + +// ── Tests ────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn heartbeat_round_trip() { + let frame = WalFrame::heartbeat(12345); + let encoded = frame.encode(); + let (decoded, consumed) = WalFrame::decode(&encoded).unwrap(); + assert_eq!(consumed, encoded.len()); + assert_eq!(decoded.frame_type, FRAME_TYPE_HEARTBEAT); + assert_eq!(decoded.sequence, 12345); + assert_eq!(decoded.count, 0); + assert!(decoded.batch_data.is_empty()); + // Heartbeat does not advance resume cursor + assert_eq!(decoded.next_resume_seq(), 12345); + } + + #[test] + fn data_frame_round_trip() { + let data = b"test writebatch payload bytes".to_vec(); + let frame = WalFrame::data(1000, 50, data.clone()); + let encoded = frame.encode(); + let (decoded, consumed) = WalFrame::decode(&encoded).unwrap(); + assert_eq!(consumed, encoded.len()); + assert_eq!(decoded.frame_type, FRAME_TYPE_DATA); + assert_eq!(decoded.sequence, 1000); + assert_eq!(decoded.count, 50); + assert_eq!(decoded.next_resume_seq(), 1050); + assert_eq!(decoded.batch_data, data); + } + + #[test] + fn data_frame_zero_count() { + // A batch with count=0 should not advance the cursor beyond sequence. + let frame = WalFrame::data(500, 0, b"payload".to_vec()); + assert_eq!(frame.next_resume_seq(), 500); + } + + #[test] + fn crc_mismatch_rejected() { + let frame = WalFrame::data(1000, 1, b"payload data".to_vec()); + let mut encoded = frame.encode(); + // Corrupt one byte in the batch_data region (after the 33-byte header). + let last = encoded.len() - 1; + encoded[last] ^= 0xFF; + assert!(WalFrame::decode(&encoded).is_err()); + } + + #[test] + fn truncated_header_rejected() { + let frame = WalFrame::heartbeat(1); + let encoded = frame.encode(); + assert!(WalFrame::decode(&encoded[..FRAME_HEADER_LEN - 1]).is_err()); + } + + #[test] + fn truncated_body_rejected() { + let frame = WalFrame::data(1, 1, b"hello world test".to_vec()); + let mut encoded = frame.encode(); + encoded.truncate(encoded.len() - 3); + assert!(WalFrame::decode(&encoded).is_err()); + } + + #[test] + fn multiple_frames_in_buffer() { + let f1 = WalFrame::data(100, 5, b"batch one".to_vec()); + let f2 = WalFrame::heartbeat(105); + let f3 = WalFrame::data(105, 3, b"batch two".to_vec()); + let mut buf = f1.encode(); + buf.extend_from_slice(&f2.encode()); + buf.extend_from_slice(&f3.encode()); + + let (d1, c1) = WalFrame::decode(&buf).unwrap(); + let (d2, c2) = WalFrame::decode(&buf[c1..]).unwrap(); + let (d3, c3) = WalFrame::decode(&buf[c1 + c2..]).unwrap(); + + assert_eq!(d1.sequence, 100); + assert_eq!(d2.frame_type, FRAME_TYPE_HEARTBEAT); + assert_eq!(d3.sequence, 105); + assert_eq!(c1 + c2 + c3, buf.len()); + } + + #[test] + fn batch_count_from_bytes_valid() { + let mut fake = vec![0u8; 16]; + fake[8..12].copy_from_slice(&7u32.to_le_bytes()); + assert_eq!(batch_count_from_bytes(&fake), 7); + } + + #[test] + fn batch_count_from_bytes_too_short() { + assert_eq!(batch_count_from_bytes(&[0u8; 5]), 0); + assert_eq!(batch_count_from_bytes(&[]), 0); + } +} diff --git a/src/database/maps.rs b/src/database/maps.rs index 5ec270bbd..34f070b84 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -186,6 +186,10 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "registrationtoken_info", ..descriptor::RANDOM_SMALL }, + Descriptor { + name: "replication_meta", + ..descriptor::RANDOM_SMALL + }, Descriptor { name: "roomid_knockedcount", ..descriptor::RANDOM_SMALL diff --git a/src/database/mod.rs b/src/database/mod.rs index 5d799b053..349bb8242 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -26,6 +26,10 @@ use std::{ops::Index, sync::Arc}; use log as _; use tuwunel_core::{Result, Server, err}; +pub use self::engine::{ + WalFrame, is_wal_gap_error, FRAME_HEADER_LEN, FRAME_TYPE_DATA, FRAME_TYPE_HEARTBEAT, +}; + pub use self::{ de::{Ignore, IgnoreAll}, deserialized::Deserialized, @@ -80,6 +84,72 @@ impl Database { #[inline] #[must_use] pub fn is_secondary(&self) -> bool { self.engine.is_secondary() } + + /// Returns the primary's current latest WAL sequence number. + /// + /// Used by replication status endpoints and heartbeat frames. + #[inline] + #[must_use] + pub fn latest_wal_sequence(&self) -> u64 { self.engine.latest_wal_sequence() } + + /// Return a WAL frame iterator starting at `since`. + /// + /// See `Engine::wal_frame_iter` for semantics. Returns `Err` if `since` + /// is older than the oldest retained WAL segment. + pub fn wal_frame_iter( + &self, + since: u64, + ) -> Result> + Send>> { + self.engine.wal_frame_iter(since) + } + + /// Create a RocksDB checkpoint at `dest`. + /// + /// Returns the WAL sequence number at checkpoint creation time. + pub fn create_checkpoint(&self, dest: &std::path::Path) -> Result { + self.engine.create_checkpoint(dest) + } + + /// Apply a raw WriteBatch (from the primary's WAL stream) to this database. + /// + /// Used by the secondary replication worker to replay incoming batches. + pub fn write_raw_batch(&self, data: &[u8]) -> Result { + use rocksdb::{WriteBatch, WriteOptions}; + let batch = WriteBatch::from_data(data); + let opts = WriteOptions::default(); + self.engine + .db + .write_opt(&batch, &opts) + .map_err(crate::util::map_err) + } + + /// Read the secondary's persisted WAL resume cursor from the + /// `replication_meta` column family. + /// + /// Returns `Ok(0)` when no cursor has been written yet (fresh secondary). + pub fn get_replication_resume_seq(&self) -> Result { + use tuwunel_core::utils::result::NotFound; + + let map = &self["replication_meta"]; + let result = map.get_blocking(b"primary_resume_seq"); + if result.is_not_found() { + return Ok(0); + } + let handle = result?; + if handle.len() >= 8 { + Ok(u64::from_le_bytes(handle[..8].try_into().expect("8 bytes"))) + } else { + Ok(0) + } + } + + /// Persist the secondary's WAL resume cursor to the `replication_meta` + /// column family so it survives restarts. + pub fn set_replication_resume_seq(&self, seq: u64) -> Result { + let map = &self["replication_meta"]; + map.insert(b"primary_resume_seq", seq.to_le_bytes()); + Ok(()) + } } impl Index<&str> for Database { diff --git a/src/router/router.rs b/src/router/router.rs index 0b9f0929a..15781a8f3 100644 --- a/src/router/router.rs +++ b/src/router/router.rs @@ -10,7 +10,7 @@ use tuwunel_service::Services; pub(crate) fn build(services: &Arc) -> (Router, Guard) { let router = Router::::new(); let (state, guard) = state::create(services.clone()); - let router = tuwunel_api::router::build(router, &services.server) + let router = tuwunel_api::router::build(router, &services.server, state) .route("/", get(it_works)) .fallback(not_found) .with_state(state); diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index 1e23bc912..044072ed0 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -108,6 +108,7 @@ ruma.workspace = true rustls.workspace = true rustyline-async.workspace = true rustyline-async.optional = true +tar.workspace = true serde_html_form.workspace = true serde_json.workspace = true serde.workspace = true diff --git a/src/service/mod.rs b/src/service/mod.rs index 0038985af..0efd3018e 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -23,6 +23,7 @@ pub mod oauth; pub mod presence; pub mod pusher; pub mod registration_tokens; +pub mod replication; pub mod resolver; pub mod rooms; pub mod sending; diff --git a/src/service/replication/mod.rs b/src/service/replication/mod.rs new file mode 100644 index 000000000..b2a96a478 --- /dev/null +++ b/src/service/replication/mod.rs @@ -0,0 +1,383 @@ +//! Secondary replication service. +//! +//! When `config.rocksdb_primary_url` is set, this service continuously tails +//! the primary's WAL stream and applies incoming batches to the local +//! (secondary) database. On startup it bootstraps from a checkpoint if no +//! resume cursor is persisted. On failover the secondary can be promoted to +//! primary by calling `POST /_tuwunel/replication/promote`. A promoted node +//! (or any standalone primary) can be demoted back to a secondary by calling +//! `POST /_tuwunel/replication/demote` with a new primary URL. +//! +//! ## Normal operation +//! +//! ```text +//! startup +//! -> load resume_seq from replication_meta CF +//! -> if resume_seq == 0: bootstrap (GET /checkpoint, restore, set resume_seq) +//! -> connect to GET /wal?since= +//! -> stream: for each frame apply batch, advance resume_seq, persist cursor +//! -> on disconnect / error: exponential backoff, reconnect +//! -> on 410 Gone (WAL gap): stop with error (manual restore required) +//! -> on promote(): enter standby loop, instance becomes standalone primary +//! -> on demote(url): exit standby, bootstrap from new primary, resume stream +//! ``` + +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Duration, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use tokio::sync::{Notify, RwLock}; +use tuwunel_core::{Result, err, error, info, warn}; +use tuwunel_database::{Database, WalFrame, is_wal_gap_error}; + +use crate::service::{Args, make_name}; + +/// Minimum retry delay after a transient connection error. +const BACKOFF_MIN_MS: u64 = 500; +/// Maximum retry delay (caps the exponential backoff). +const BACKOFF_MAX_MS: u64 = 30_000; + +pub struct Service { + db: Arc, + server: Arc, + /// HTTP client used for all primary connections. + client: reqwest::Client, + /// Set to true when `promote()` is called; worker enters standby mode. + promoted: AtomicBool, + /// Wakes any blocking select in the streaming loop immediately on promotion. + promote_notify: Notify, + /// Runtime-overridden primary URL set by `demote()`. Takes precedence over + /// `config.rocksdb_primary_url` when set. + dynamic_primary_url: RwLock>, + /// Wakes the standby loop immediately when `demote()` is called. + demote_notify: Notify, +} + +#[async_trait] +impl crate::Service for Service { + fn build(args: &Args<'_>) -> Result> { + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(10)) + .build() + .map_err(|e| err!(Database("Failed to build replication HTTP client: {e}")))?; + + Ok(Arc::new(Self { + db: args.db.clone(), + server: args.server.clone(), + client, + promoted: AtomicBool::new(false), + promote_notify: Notify::new(), + dynamic_primary_url: RwLock::new(None), + demote_notify: Notify::new(), + })) + } + + /// Worker loop: manages transitions between secondary (replicating), + /// standby (promoted primary), and secondary-again (after demote). + /// + /// The worker runs until server shutdown regardless of role transitions so + /// that `demote()` can restart replication without restarting the process. + async fn worker(self: Arc) -> Result { + if self.db.is_secondary() { + // RocksDB opened in native secondary mode (read-only) -- WAL streaming + // replication requires a writable database. Operator should use either + // rocksdb_secondary OR rocksdb_primary_url, not both. + warn!( + "rocksdb_primary_url is set but database is in RocksDB native secondary \ + mode (read-only); WAL streaming replication requires a writable database. \ + Replication worker will not run." + ); + return Ok(()); + } + + loop { + if !self.server.running() { + return Ok(()); + } + + // Resolve effective primary URL: dynamic (set by demote) takes + // precedence over the static config value. + let primary_url = { + let dynamic = self.dynamic_primary_url.read().await; + dynamic + .clone() + .or_else(|| self.server.config.rocksdb_primary_url.clone()) + }; + + // If no primary URL is configured, wait for a demote() call (which + // sets a dynamic URL) or server shutdown. This keeps the worker alive + // on nodes that start as standalone primaries so they can be demoted + // without a process restart. + let Some(primary_url) = primary_url else { + tokio::select! { + () = self.server.until_shutdown() => return Ok(()), + () = self.demote_notify.notified() => continue, + } + }; + + // If currently promoted, enter standby and wait for a demote signal. + if self.promoted.load(Ordering::Acquire) { + info!("In standalone primary mode; waiting for demote or shutdown."); + tokio::select! { + () = self.server.until_shutdown() => return Ok(()), + () = self.demote_notify.notified() => { + info!("Demote received; resuming replication from {primary_url}"); + continue; + }, + } + } + + // Bootstrap if no cursor is saved (first run or after demote reset). + let resume_seq = self.db.get_replication_resume_seq()?; + if resume_seq == 0 { + info!("No resume cursor found; bootstrapping from primary checkpoint"); + self.bootstrap(&primary_url).await?; + } + + info!("Replication worker starting; primary = {primary_url}"); + + let mut backoff_ms = BACKOFF_MIN_MS; + + while self.server.running() && !self.promoted.load(Ordering::Acquire) { + match self.run_stream(&primary_url).await { + | Ok(()) => { + // run_stream returns Ok on clean shutdown or promotion. + if self.promoted.load(Ordering::Acquire) { + break; // fall through to standby at top of outer loop + } + return Ok(()); // server is stopping + }, + | Err(ref e) if is_wal_gap_error(e) => { + error!( + "WAL gap: primary no longer has WAL history for our resume \ + position. Manual intervention required: stop this secondary, \ + restore a fresh checkpoint over the database directory, then \ + restart. Stopping replication worker." + ); + return Err(err!(Database("WAL gap; manual checkpoint restore required"))); + }, + | Err(ref e) => { + if self.promoted.load(Ordering::Acquire) { + break; + } + error!("Replication stream error: {e}; reconnecting in {backoff_ms}ms"); + }, + } + + // Exponential backoff with cap — also wakes on promotion or shutdown. + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}, + () = self.server.until_shutdown() => return Ok(()), + () = self.promote_notify.notified() => break, + } + backoff_ms = (backoff_ms * 2).min(BACKOFF_MAX_MS); + } + } + } + + fn name(&self) -> &str { make_name(std::module_path!()) } +} + +impl Service { + /// Promote this secondary to a standalone primary. + /// + /// Stops the replication worker immediately. The caller is responsible for + /// updating the VIP / load balancer to route client traffic to this node. + pub fn promote(&self) { + self.promoted.store(true, Ordering::Release); + self.promote_notify.notify_waiters(); + info!("Promotion requested; stopping replication worker."); + } + + /// Returns true if this instance has been promoted to primary. + pub fn is_promoted(&self) -> bool { self.promoted.load(Ordering::Acquire) } + + /// Demote this promoted primary back to a secondary replicating from + /// `new_primary_url`. + /// + /// Resets the resume cursor so the worker performs a clean checkpoint + /// bootstrap from the new primary (whose WAL history will differ from + /// ours). The caller is responsible for ensuring the VIP / load balancer + /// has been updated to route writes to the new primary before calling this. + /// + /// Returns `Err` if the instance is not currently promoted. + pub async fn demote(&self, new_primary_url: String) -> Result<()> { + if !self.promoted.load(Ordering::Acquire) { + return Err(err!(Request(Conflict( + "This instance is not currently promoted; cannot demote." + )))); + } + + // Reset cursor so the worker bootstraps a fresh checkpoint from the new + // primary rather than trying to resume from our own WAL position. + self.db.set_replication_resume_seq(0)?; + + // Store the new primary URL and clear the promoted flag before notifying + // the worker so it sees a consistent state on wake-up. + *self.dynamic_primary_url.write().await = Some(new_primary_url.clone()); + self.promoted.store(false, Ordering::Release); + self.demote_notify.notify_waiters(); + + info!("Demotion requested; will replicate from {new_primary_url}"); + Ok(()) + } + + /// Stream WAL frames from the primary until disconnect, promotion, or error. + async fn run_stream(&self, primary_url: &str) -> Result { + let resume_seq = self.db.get_replication_resume_seq()?; + let url = format!("{primary_url}/_tuwunel/replication/wal?since={resume_seq}"); + + let resp = self + .authed_get(&url) + .await + .map_err(|e| err!(Database("GET {url}: {e}")))?; + + if resp.status() == reqwest::StatusCode::GONE { + return Err(err!(Database("WAL gap: 410 Gone from primary"))); + } + + if !resp.status().is_success() { + return Err(err!(Database( + "Primary returned {} for WAL stream", + resp.status() + ))); + } + + info!("WAL stream connected; starting from seq {resume_seq}"); + + let mut byte_stream = resp.bytes_stream(); + let mut buf: Vec = Vec::new(); + + while self.server.running() && !self.promoted.load(Ordering::Acquire) { + tokio::select! { + chunk = byte_stream.next() => { + let Some(chunk) = chunk else { + return Err(err!(Database("Primary closed WAL stream"))); + }; + let chunk = chunk.map_err(|e| err!(Database("WAL stream read: {e}")))?; + buf.extend_from_slice(&chunk); + self.drain_frames(&mut buf)?; + }, + () = self.server.until_shutdown() => return Ok(()), + () = self.promote_notify.notified() => return Ok(()), + } + } + + Ok(()) + } + + /// Parse and apply as many complete frames as possible from `buf`. + fn drain_frames(&self, buf: &mut Vec) -> Result { + let mut offset = 0; + loop { + match WalFrame::decode(&buf[offset..]) { + | Ok((frame, consumed)) => { + self.apply_frame(&frame)?; + offset += consumed; + }, + | Err(_) => break, + } + } + buf.drain(..offset); + Ok(()) + } + + /// Apply a single frame to the local database. + fn apply_frame(&self, frame: &WalFrame) -> Result { + use tuwunel_database::FRAME_TYPE_DATA; + + if frame.frame_type == FRAME_TYPE_DATA && !frame.batch_data.is_empty() { + self.db.write_raw_batch(&frame.batch_data)?; + } + + let next = frame.next_resume_seq(); + if next > 0 { + self.db.set_replication_resume_seq(next)?; + } + Ok(()) + } + + /// Full sync: download a checkpoint tar from the primary and restore it. + async fn bootstrap(&self, primary_url: &str) -> Result { + let url = format!("{primary_url}/_tuwunel/replication/checkpoint"); + info!("Downloading checkpoint from {url}"); + + let resp = self + .authed_get(&url) + .await + .map_err(|e| err!(Database("GET {url}: {e}")))?; + + if !resp.status().is_success() { + return Err(err!(Database( + "Primary returned {} for checkpoint", + resp.status() + ))); + } + + let seq: u64 = resp + .headers() + .get("x-tuwunel-checkpoint-sequence") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + let tar_bytes: Bytes = resp + .bytes() + .await + .map_err(|e| err!(Database("Reading checkpoint body: {e}")))?; + + let db_path = self.server.config.database_path.clone(); + let parent = db_path.parent().unwrap_or(&db_path).to_owned(); + let staging = parent.join("_replication_staging"); + let backup = parent.join("_replication_backup"); + + if staging.exists() { + std::fs::remove_dir_all(&staging) + .map_err(|e| err!(Database("Removing old staging dir: {e}")))?; + } + std::fs::create_dir_all(&staging) + .map_err(|e| err!(Database("Creating staging dir: {e}")))?; + + let cursor = std::io::Cursor::new(&tar_bytes[..]); + let mut archive = tar::Archive::new(cursor); + archive + .unpack(&staging) + .map_err(|e| err!(Database("Unpacking checkpoint tar: {e}")))?; + + let checkpoint_src = staging.join("checkpoint"); + + if backup.exists() { + std::fs::remove_dir_all(&backup) + .map_err(|e| err!(Database("Removing old backup: {e}")))?; + } + if db_path.exists() { + std::fs::rename(&db_path, &backup) + .map_err(|e| err!(Database("Moving db to backup: {e}")))?; + } + std::fs::rename(&checkpoint_src, &db_path) + .map_err(|e| err!(Database("Moving checkpoint to db_path: {e}")))?; + + let _ = std::fs::remove_dir_all(&staging); + + self.db.set_replication_resume_seq(seq)?; + + info!("Checkpoint bootstrap complete; resume_seq = {seq}"); + Ok(()) + } + + /// Send an authenticated GET request to the primary. + async fn authed_get(&self, url: &str) -> reqwest::Result { + let mut req = self.client.get(url); + if let Some(ref token) = self.server.config.rocksdb_replication_token { + req = req.header("x-tuwunel-replication-token", token.as_str()); + } + req.send().await + } +} diff --git a/src/service/services.rs b/src/service/services.rs index 0ab666893..ab29c3843 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -12,7 +12,7 @@ use crate::{ account_data, admin, appservice, client, config, deactivate, emergency, federation, globals, key_backups, manager::Manager, - media, membership, oauth, presence, pusher, registration_tokens, resolver, + media, membership, oauth, presence, pusher, registration_tokens, replication, resolver, rooms::{self, retention}, sending, server_keys, service::{Args, Service}, @@ -63,6 +63,7 @@ pub struct Services { pub oauth: Arc, pub retention: Arc, pub registration_tokens: Arc, + pub replication: Arc, manager: Mutex>>, pub server: Arc, @@ -123,6 +124,7 @@ pub async fn build(server: Arc) -> Result> { oauth: oauth::Service::build(&args)?, retention: retention::Service::build(&args)?, registration_tokens: registration_tokens::Service::build(&args)?, + replication: replication::Service::build(&args)?, manager: Mutex::new(None), server, @@ -184,6 +186,7 @@ pub(crate) fn services(&self) -> impl Iterator> + Send { cast!(self.oauth), cast!(self.retention), cast!(self.registration_tokens), + cast!(self.replication), ] .into_iter() } diff --git a/tuwunel-example.toml b/tuwunel-example.toml index dc97b8d84..c2df5bb23 100644 --- a/tuwunel-example.toml +++ b/tuwunel-example.toml @@ -1126,6 +1126,35 @@ # #rocksdb_secondary = false +# Path for the secondary instance's own RocksDB log files. Required when +# `rocksdb_secondary` is true and the primary DB is not on a shared +# filesystem. Must be a writable directory local to this host. +# +#rocksdb_secondary_path = false + +# URL of the primary instance for WAL-streaming replication. +# Example: `https://primary.example.com` +# Required on secondary instances that use WAL streaming. +# +#rocksdb_primary_url = false + +# Shared secret token for replication endpoint authentication. +# Both primary and secondary must have the same value. +# Leave unset to disable the replication HTTP endpoints entirely. +# +#rocksdb_replication_token = false + +# How long (in seconds) the primary retains WAL segments beyond what +# local recovery requires. Gives the secondary a window to reconnect +# after downtime without needing a full re-sync. Default: 86400 (24h). +# +#rocksdb_wal_ttl_seconds = + +# Interval in milliseconds at which the secondary polls for new WAL +# frames when caught up with the primary. Default: 250ms. +# +#rocksdb_replication_interval_ms = + # Enables idle CPU priority for compaction thread. This is not enabled by # default to prevent compaction from falling too far behind on busy # systems.