From b51fe12a54cc95e0ee3c0ffef88816e963761eda Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Oct 2025 17:06:28 +0200 Subject: [PATCH 1/6] Support FoundationDB for consensus Implements a consensus backend to talk to FoundationDB. Very much untested and still panics sometimes. Signed-off-by: Moritz Hoffmann --- Cargo.lock | 121 +++++- misc/python/materialize/cli/run.py | 21 +- src/persist-client/src/lib.rs | 4 +- src/persist/Cargo.toml | 1 + src/persist/src/cfg.rs | 20 + src/persist/src/foundationdb.rs | 653 +++++++++++++++++++++++++++++ src/persist/src/lib.rs | 1 + src/persist/src/location.rs | 36 +- 8 files changed, 842 insertions(+), 15 deletions(-) create mode 100644 src/persist/src/foundationdb.rs diff --git a/Cargo.lock b/Cargo.lock index 6a1fd2b62ec7d..d6e9d2b936112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -571,6 +571,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "async-signal" version = "0.2.10" @@ -1427,6 +1438,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +dependencies = [ + "bitflags 2.9.4", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.106", +] + [[package]] name = "bindgen" version = "0.72.1" @@ -1440,7 +1471,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.106", ] @@ -3427,6 +3458,69 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "foundationdb" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514aeffe12bbcf2f64a746793cc1c2602006c705d3fc6285df024303d008cccf" +dependencies = [ + "async-recursion", + "async-trait", + "foundationdb-gen", + "foundationdb-macros", + "foundationdb-sys", + "foundationdb-tuple", + "futures", + "memchr", + "rand 0.8.5", + "serde", + "serde_bytes", + "serde_json", + "static_assertions", + "uuid", +] + +[[package]] +name = "foundationdb-gen" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef9d854866df33e1f4099769e2b9fa8bf8cf3bca707029ae6298d0e61bcae358" +dependencies = [ + "xml-rs", +] + +[[package]] +name = "foundationdb-macros" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be610412e5a92d89855fb15b099a57792b7dbdcf8ac74c5a0e24d9b7b1b6f7f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", + "try_map", +] + +[[package]] +name = "foundationdb-sys" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bae14dba30b8dcc4905a9189ebb18bc9db9744ef0ad8f2b94ef00d21e176964" +dependencies = [ + "bindgen 0.70.1", + "libc", +] + +[[package]] +name = "foundationdb-tuple" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1832c1fbe592de718893f7c3b48179a47757f8974d1498fece997454c2b0fa" +dependencies = [ + "memchr", + "uuid", +] + [[package]] name = "funty" version = "2.0.0" @@ -4946,7 +5040,7 @@ version = "0.17.3+10.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cef2a00ee60fe526157c9023edab23943fae1ce2ab6f4abb2a807c1746835de9" dependencies = [ - "bindgen", + "bindgen 0.72.1", "bzip2-sys", "cc", "libc", @@ -7078,6 +7172,7 @@ dependencies = [ "deadpool-postgres", "differential-dataflow", "fail", + "foundationdb", "futures-util", "itertools 0.14.0", "md-5", @@ -10677,6 +10772,12 @@ version = "0.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -12758,6 +12859,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" +[[package]] +name = "try_map" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1626d07cb5c1bb2cf17d94c0be4852e8a7c02b041acec9a8c5bdda99f9d580" + [[package]] name = "tungstenite" version = "0.24.0" @@ -13591,7 +13698,7 @@ dependencies = [ "axum", "axum-core", "base16ct", - "bindgen", + "bindgen 0.72.1", "bit-set", "bit-vec", "bitflags 2.9.4", @@ -13755,6 +13862,12 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "xml-rs" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd8403733700263c6eb89f192880191f1b83e332f7a20371ddcf421c4a337c7" + [[package]] name = "xmlparser" version = "0.13.5" @@ -13839,7 +13952,7 @@ version = "2.0.16+zstd.1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" dependencies = [ - "bindgen", + "bindgen 0.72.1", "cc", "pkg-config", ] diff --git a/misc/python/materialize/cli/run.py b/misc/python/materialize/cli/run.py index 5575cfaf0077a..c045e5b20aa88 100644 --- a/misc/python/materialize/cli/run.py +++ b/misc/python/materialize/cli/run.py @@ -117,6 +117,11 @@ def main() -> int: help="Postgres/CockroachDB connection string", default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES), ) + parser.add_argument( + "--consensus", + help="Postgres/CockroachDB consensus connection string", + default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES), + ) parser.add_argument( "--blob", help="Blob storage connection string", @@ -266,11 +271,11 @@ def main() -> int: if args.program == "environmentd": _handle_lingering_services(kill=args.reset) scratch = MZ_ROOT / "scratch" - dbconn = _connect_sql(args.postgres) - for schema in ["consensus", "tsoracle", "storage"]: - if args.reset: - _run_sql(dbconn, f"DROP SCHEMA IF EXISTS {schema} CASCADE") - _run_sql(dbconn, f"CREATE SCHEMA IF NOT EXISTS {schema}") + # dbconn = _connect_sql(args.postgres) + # for schema in ["consensus", "tsoracle", "storage"]: + # if args.reset: + # _run_sql(dbconn, f"DROP SCHEMA IF EXISTS {schema} CASCADE") + # _run_sql(dbconn, f"CREATE SCHEMA IF NOT EXISTS {schema}") # Keep this after clearing out Postgres. Otherwise there is a race # where a ctrl-c could leave persist with references in Postgres to # files that have been deleted. There's no race if we reset in the @@ -303,6 +308,10 @@ def main() -> int: print(f"persist-blob-url: {args.blob}") print(f"listeners config path: {args.listeners_config_path}") + if args.consensus is not None: + consensus = args.consensus + else: + consensus = args.postgres command += [ f"--listeners-config-path={args.listeners_config_path}", "--orchestrator=process", @@ -311,7 +320,7 @@ def main() -> int: f"--orchestrator-process-prometheus-service-discovery-directory={MZDATA}/prometheus", f"--orchestrator-process-scratch-directory={scratch}", "--secrets-controller=local-file", - f"--persist-consensus-url={args.postgres}?options=--search_path=consensus", + f"--persist-consensus-url={args.consensus}?options=--search_path=consensus", f"--persist-blob-url={args.blob}", f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle", f"--environment-id={environment_id}", diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 288983df7eda5..2704434c07a18 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -916,9 +916,11 @@ mod tests { } pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient { + let mut location = PersistLocation::new_in_mem(); + location.consensus_uri = "foundationdb://".parse().unwrap(); let cache = new_test_client_cache(dyncfgs); cache - .open(PersistLocation::new_in_mem()) + .open(location) .await .expect("client construction failed") } diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index 79d447d10f1ce..337293acc9636 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -39,6 +39,7 @@ bytes = "1.10.1" deadpool-postgres = "0.10.3" differential-dataflow = "0.17.0" fail = { version = "0.5.1", features = ["failpoints"] } +foundationdb = { version = "0.9.2", features = ["fdb-7_3"] } futures-util = "0.3.31" itertools = "0.14.0" md-5 = "0.10.6" diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index 7323be2c37b0a..e6474d28beaa5 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -23,6 +23,7 @@ use mz_postgres_client::metrics::PostgresClientMetrics; use crate::azure::{AzureBlob, AzureBlobConfig}; use crate::file::{FileBlob, FileBlobConfig}; +use crate::foundationdb::{FdbConsensus, FdbConsensusConfig, ValidatingConsensus}; use crate::location::{Blob, Consensus, Determinate, ExternalError}; use crate::mem::{MemBlob, MemBlobConfig, MemConsensus}; use crate::metrics::S3BlobMetrics; @@ -212,6 +213,8 @@ impl BlobConfig { /// Config for an implementation of [Consensus]. #[derive(Debug, Clone)] pub enum ConsensusConfig { + /// Config for [FdbConsensus]. + FoundationDB(FdbConsensusConfig), /// Config for [PostgresConsensus]. Postgres(PostgresConsensusConfig), /// Config for [MemConsensus], only available in testing. @@ -222,6 +225,17 @@ impl ConsensusConfig { /// Opens the associated implementation of [Consensus]. pub async fn open(self) -> Result, ExternalError> { match self { + ConsensusConfig::FoundationDB(config) => { + Ok(Arc::new(FdbConsensus::open(config).await?)) + } + // ConsensusConfig::FoundationDB(config) => { + // let inner = FdbConsensus::open(config).await?; + // inner.drop_and_recreate().await?; + // Ok(Arc::new(ValidatingConsensus { + // inner, + // validator: MemConsensus::default(), + // })) + // } ConsensusConfig::Postgres(config) => { Ok(Arc::new(PostgresConsensus::open(config).await?)) } @@ -237,6 +251,12 @@ impl ConsensusConfig { dyncfg: Arc, ) -> Result { let config = match url.scheme() { + "fdb" | "foundationdb" => { + let network = FdbConsensusConfig::get_network(); + Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new( + network.into(), + )?)) + } "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres( PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?, )), diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs new file mode 100644 index 0000000000000..5065002c6d92d --- /dev/null +++ b/src/persist/src/foundationdb.rs @@ -0,0 +1,653 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Implementation of [Consensus] backed by FoundationDB. +//! +//! We're storing the consensus data in a subspace at `/mz/consensus`. Each key maps to a subspace +//! with the following structure: +//! ./seqno/ -> +//! ./data// -> + +use crate::error::Error; +use crate::location::{ + CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo, + VersionedData, +}; +use anyhow::anyhow; +use async_stream::try_stream; +use async_trait::async_trait; +use bytes::Bytes; +use foundationdb::api::NetworkAutoStop; +use foundationdb::directory::{Directory, DirectoryLayer, DirectoryOutput}; +use foundationdb::options::StreamingMode; +use foundationdb::tuple::{ + PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack, +}; +use foundationdb::{ + Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption, + Transaction, +}; +use futures_util::future::FutureExt; +use std::io::Write; +use std::sync::{Arc, OnceLock}; +use tracing::info; + +impl From for ExternalError { + fn from(x: FdbError) -> Self { + if x.is_retryable() { + ExternalError::Indeterminate(Indeterminate::new(x.into())) + } else { + ExternalError::Determinate(Determinate::new(x.into())) + } + } +} + +impl From for ExternalError { + fn from(x: FdbBindingError) -> Self { + ExternalError::Determinate(Determinate::new(x.into())) + } +} + +// impl From for ExternalError { +// fn from(x: TransactionCommitError) -> Self { +// match x { +// TransactionCommitError::Retryable(e) => { +// ExternalError::Indeterminate(Indeterminate::new(e.into())) +// } +// TransactionCommitError::NonRetryable(e) => { +// ExternalError::Determinate(Determinate::new(e.into())) +// } +// } + +// impl From for ExternalError { +// fn from(x: DirectoryError) -> Self { +// ExternalError::Determinate(Determinate { +// inner: anyhow::Error::new(x), +// }) +// } +// } + +static CELL: OnceLock> = OnceLock::new(); + +pub fn get_network() -> Arc { + CELL.get_or_init(|| unsafe { foundationdb::boot() }.into()) + .clone() +} + +/// Configuration to connect to a Postgres backed implementation of [Consensus]. +#[derive(Clone)] +pub struct FdbConsensusConfig { + network: Arc, +} + +impl std::fmt::Debug for FdbConsensusConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FdbConsensusConfig").finish_non_exhaustive() + } +} + +// impl From for PostgresClientConfig { +// fn from(config: FdbConsensusConfig) -> Self { +// let network = unsafe { foundationdb::boot() }; +// PostgresClientConfig::new(config.url, config.knobs, config.metrics) +// } +// } + +impl FdbConsensusConfig { + /// Returns a new [FdbConsensusConfig] for use in production. + pub fn new(network: Arc) -> Result { + Ok(FdbConsensusConfig { network }) + } + + pub fn new_for_test() -> Result { + let network = unsafe { foundationdb::boot() }; + Self::new(network.into()) + } + + pub fn get_network() -> Arc { + get_network() + } +} + +/// Implementation of [Consensus] over a Postgres database. +pub struct FdbConsensus { + // content_subspace: DirectoryOutput, + seqno: DirectoryOutput, + data: DirectoryOutput, + db: Database, + _network: Arc, +} + +enum FdbTransactError { + FdbError(FdbError), + ExternalError(ExternalError), +} + +impl From for FdbTransactError { + fn from(value: FdbError) -> Self { + Self::FdbError(value) + } +} + +impl From for FdbTransactError { + fn from(value: ExternalError) -> Self { + Self::ExternalError(value) + } +} + +impl From for ExternalError { + fn from(value: FdbTransactError) -> Self { + match value { + FdbTransactError::FdbError(e) => e.into(), + FdbTransactError::ExternalError(e) => e, + } + } +} + +impl TransactError for FdbTransactError { + fn try_into_fdb_error(self) -> Result { + match self { + Self::FdbError(e) => Ok(e), + other => Err(other), + } + } +} + +impl TuplePack for SeqNo { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + self.0.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for SeqNo { + fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> { + u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, SeqNo(v))) + } +} + +impl std::fmt::Debug for FdbConsensus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FdbConsensus").finish_non_exhaustive() + } +} + +impl FdbConsensus { + /// Open a Postgres [Consensus] instance with `config`, for the collection + /// named `shard`. + pub async fn open(config: FdbConsensusConfig) -> Result { + let db = Database::new(None)?; + let directory = DirectoryLayer::default(); + let path = vec!["seqno".to_owned()]; + let seqno = db + .run(async |trx, _maybe_commited| { + Ok(directory.create_or_open(&trx, &path, None, None).await) + }) + .await? + .expect("valid directory"); + let path = vec!["data".to_owned()]; + let data = db + .run(async |trx, _maybe_commited| { + Ok(directory.create_or_open(&trx, &path, None, None).await) + }) + .await? + .expect("valid directory"); + Ok(FdbConsensus { + seqno, + data, + db, + _network: config.network, + }) + } + + /// Drops and recreates the `consensus` table in Postgres + /// + /// ONLY FOR TESTING + pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> { + self.db + .run(async |trx, _maybe_commited| { + self.seqno.remove(&trx, &[]).await?; + self.data.remove(&trx, &[]).await?; + Ok(()) + }) + .await?; + Ok(()) + } + + async fn head_trx( + &self, + trx: &Transaction, + key: &str, + ) -> Result, FdbTransactError> { + let seqno_key = self.seqno.subspace(&key).expect("valid directory"); + let data_key = self.data.subspace(&key).expect("valid directory"); + + let seq_no = trx.get(seqno_key.bytes(), false).await?; + if let Some(seq_no) = &seq_no { + // println!("key: {key}, seqno bytes: {:?}", seq_no); + let seqno: SeqNo = unpack(seq_no).expect("valid data"); + + let seq_no_space = data_key.subspace(&seqno); + let data = trx + .get(&seq_no_space.bytes(), false) + .await? + .expect("valid data"); + let data = unpack::>(&data).expect("valid data"); + // println!( + // "key: {key}, key_value bytes: {} seqno: {:?}", + // data.len(), + // seqno + // ); + Ok(Some(VersionedData { + seqno, + data: Bytes::from(data), + })) + } else { + Ok(None) + } + } + async fn compare_and_set_trx( + &self, + trx: &Transaction, + key: &str, + expected: &Option, + new: &VersionedData, + ) -> Result { + let seqno_key = self.seqno.subspace(&key).expect("valid directory"); + let data_key = self.data.subspace(&key).expect("valid directory"); + + let seq_no = trx + .get(seqno_key.bytes(), false) + .await? + .map(|data| unpack(&data).expect("valid data")); + + if expected.is_some() && (expected != &seq_no) { + return Ok(CaSResult::ExpectationMismatch); + } + + trx.set(seqno_key.bytes(), &pack(&new.seqno)); + // println!( + // "cas seqno_key: {:?}", + // foundationdb::tuple::Bytes::from(seqno_key.bytes()) + // ); + + let data_seqno_key = data_key.subspace(&new.seqno); + trx.set(&data_seqno_key.bytes(), &pack(&new.data.as_ref())); + // println!( + // "cas data_seqno_key: {:?}", + // foundationdb::tuple::Bytes::from(data_seqno_key.bytes()) + // ); + let written = trx.get(&data_seqno_key.bytes(), false).await?; + let unpacked: Vec = unpack(&written.unwrap()).expect("valid data"); + assert_eq!(&*unpacked, &*new.data); + Ok(CaSResult::Committed) + } + async fn scan_trx( + &self, + trx: &Transaction, + key: &str, + from: &SeqNo, + limit: &usize, + ) -> Result, FdbTransactError> { + let mut limit = *limit; + let data_key = self.data.subspace(&key).expect("valid directory"); + let seqno_start = data_key.pack(&from); + let seqno_end = data_key.pack(&0xff); + + // let output = trx.get_range(&data_key.range().into(), 1, false).await?; + // for key_value in &output { + // println!("entry: all {:?}", key_value); + // } + + let mut range = RangeOption::from((seqno_start, seqno_end)); + range.limit = Some(limit); + + let mut entries = Vec::new(); + + // println!("Scanning range begin: {:?}", range.begin); + // println!("Scanning range end: {:?}", range.end); + + loop { + let output = trx.get_range(&range, 1, false).await?; + for key_value in &output { + // println!("entry: {:?}", key_value); + let seqno = data_key.unpack(key_value.key()).expect("valid data"); + let value: Vec = unpack(key_value.value()).expect("valid data"); + // println!( + // "key: {key}, key_value bytes: {} seqno: {:?}", + // value.len(), + // seqno + // ); + entries.push(VersionedData { + seqno: seqno, + data: Bytes::from(value), + }); + } + + limit = limit.saturating_sub(output.len()); + + if let Some(last) = output.last() + && limit > 0 + { + range.begin = KeySelector::first_greater_than(last.key().to_vec()); + range.limit = Some(limit); + } else { + break; + } + } + + entries.sort_by_key(|e| e.seqno); + Ok(entries) + } + async fn truncate_trx( + &self, + trx: &Transaction, + key: &str, + seqno: &SeqNo, + ) -> Result<(), FdbTransactError> { + let seqno_key = self.seqno.subspace(&key).expect("valid directory"); + + let seq_no = trx.get(seqno_key.bytes(), false).await?; + if let Some(seq_no) = &seq_no { + let current_seqno: SeqNo = unpack(seq_no).expect("valid data"); + if current_seqno < *seqno { + return Err(ExternalError::Determinate( + anyhow!("upper bound too high for truncate: {:?}", seqno).into(), + ) + .into()); + } + } else { + return Err( + ExternalError::Determinate(anyhow!("no entries for key: {}", key).into()).into(), + ); + } + + let key_space_start = self + .data + .subspace(&(key, &SeqNo::minimum())) + .expect("valid directory"); + let key_space_end = self.data.subspace(&(key, seqno)).expect("valid directory"); + + trx.clear_range(&key_space_start.bytes(), &key_space_end.bytes()); + Ok(()) + } +} + +#[async_trait] +impl Consensus for FdbConsensus { + fn list_keys(&self) -> ResultStream<'_, String> { + Box::pin(try_stream! { + let keys: Vec = self + .db + .run(async |trx, _maybe_commited| { + let mut range = RangeOption::from(self.seqno.range().expect("valid directory")); + let mut keys = Vec::new(); + loop { + let values = trx.get_range(&range, 1, false).await?; + for value in &values { + // println!("entry: {:?}", value); + let key: String = self.seqno.unpack(value.key()).expect("valid directory").expect("valid data"); + keys.push(key); + } + if let Some(last) = values.last() { + range.begin = KeySelector::first_greater_than(last.key().to_vec()); + } else { + break; + } + } + Ok(keys) + }).await?; + + for shard in keys { + yield shard; + } + }) + } + + async fn head(&self, key: &str) -> Result, ExternalError> { + // info!("FdbConsensus::head({})", key); + let ok = self + .db + .transact_boxed( + key, + |trx, key| self.head_trx(trx, key).boxed(), + TransactOption::default(), + ) + .await?; + info!( + "FdbConsensus::head({}) -> {:?}", + key, + ok.as_ref().map(|ok| ok.seqno) + ); + Ok(ok) + } + + async fn compare_and_set( + &self, + key: &str, + expected: Option, + new: VersionedData, + ) -> Result { + info!( + "FdbConsensus::compare_and_set({}, {:?}, <{} bytes at seqno {}>)", + key, + expected, + new.data.len(), + new.seqno + ); + if let Some(expected) = expected { + if new.seqno <= expected { + return Err(Error::from( + format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}", + new.seqno, expected)).into()); + } + } + if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") { + return Err(ExternalError::from(anyhow!( + "sequence numbers must fit within [0, i64::MAX], received: {:?}", + new.seqno + ))); + } + + let ok = self + .db + .transact_boxed( + (key, expected, &new), + |trx, (key, expected, new)| { + self.compare_and_set_trx(trx, key, expected, new).boxed() + }, + TransactOption::default(), + ) + .await?; + Ok(ok) + } + + async fn scan( + &self, + key: &str, + from: SeqNo, + limit: usize, + ) -> Result, ExternalError> { + info!("FdbConsensus::scan({}, {:?}, {})", key, from, limit); + let ok = self + .db + .transact_boxed( + (key, from, limit), + |trx, (key, from, limit)| self.scan_trx(trx, key, from, limit).boxed(), + TransactOption::default(), + ) + .await?; + Ok(ok) + } + + async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { + info!("FdbConsensus::truncate({}, {:?})", key, seqno); + self.db + .transact_boxed( + (key, seqno), + |trx, (key, seqno)| self.truncate_trx(trx, key, seqno).boxed(), + TransactOption::default(), + ) + .await?; + Ok(0) + } + + fn truncate_counts(&self) -> bool { + false + } +} + +#[derive(Debug)] +pub struct ValidatingConsensus { + pub inner: A, + pub validator: B, +} + +#[async_trait] +impl Consensus for ValidatingConsensus { + fn list_keys(&self) -> ResultStream<'_, String> { + self.inner.list_keys() + } + + async fn head(&self, key: &str) -> Result, ExternalError> { + let inner = self.inner.head(key).await?; + let valid = self.validator.head(key).await?; + assert_eq!(inner, valid, "mismatched head for key {}", key); + Ok(inner) + } + + async fn compare_and_set( + &self, + key: &str, + expected: Option, + new: VersionedData, + ) -> Result { + let inner = self + .inner + .compare_and_set(key, expected.clone(), new.clone()) + .await?; + let valid = self.validator.compare_and_set(key, expected, new).await?; + assert_eq!(inner, valid, "mismatched cas for key {}", key); + Ok(inner) + } + + async fn scan( + &self, + key: &str, + from: SeqNo, + limit: usize, + ) -> Result, ExternalError> { + let inner = self.inner.scan(key, from, limit).await?; + let valid = self.validator.scan(key, from, limit).await?; + for inner in &inner { + println!( + "inner scan: seqno: {:?}, {} bytes", + inner.seqno, + inner.data.len() + ); + } + for valid in &valid { + println!( + "valid scan: seqno: {:?}, {} bytes", + valid.seqno, + valid.data.len() + ); + } + for (a, b) in inner.iter().zip(valid.iter()) { + assert_eq!(a.seqno, b.seqno); + assert_eq!(&*a.data, &*b.data); + } + assert_eq!(inner.len(), valid.len()); + Ok(inner) + } + + async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { + let inner = self.inner.truncate(key, seqno).await?; + let valid = self.validator.truncate(key, seqno).await?; + self.scan(key, SeqNo::minimum(), 100000000).await?; + if self.truncate_counts() { + assert_eq!(inner, valid, "mismatched truncate counts for key {}", key); + } + Ok(inner) + } + + fn truncate_counts(&self) -> bool { + self.inner.truncate_counts() && self.validator.truncate_counts() + } +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use crate::location::tests::consensus_impl_test; + + use super::*; + + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn fdb_consensus() -> Result<(), ExternalError> { + let config = FdbConsensusConfig::new_for_test()?; + + { + let fdb = FdbConsensus::open(config.clone()).await?; + fdb.drop_and_recreate().await?; + } + + consensus_impl_test(|| FdbConsensus::open(config.clone())).await?; + + // and now verify the implementation-specific `drop_and_recreate` works as intended + let consensus = FdbConsensus::open(config.clone()).await?; + let key = Uuid::new_v4().to_string(); + let mut state = VersionedData { + seqno: SeqNo(5), + data: Bytes::from("abc"), + }; + + assert_eq!( + consensus.compare_and_set(&key, None, state.clone()).await, + Ok(CaSResult::Committed), + ); + state.seqno = SeqNo(6); + assert_eq!( + consensus + .compare_and_set(&key, Some(SeqNo(5)), state.clone()) + .await, + Ok(CaSResult::Committed), + ); + state.seqno = SeqNo(129 + 5); + assert_eq!( + consensus + .compare_and_set(&key, Some(SeqNo(6)), state.clone()) + .await, + Ok(CaSResult::Committed), + ); + + assert_eq!(consensus.head(&key).await, Ok(Some(state.clone()))); + + println!("--- SCANNING ---"); + + for data in consensus.scan(&key, SeqNo(129), 10).await? { + println!( + "scan data: seqno: {:?}, {} bytes", + data.seqno, + data.data.len() + ); + } + + consensus.drop_and_recreate().await?; + + assert_eq!(consensus.head(&key).await, Ok(None)); + + Ok(()) + } +} diff --git a/src/persist/src/lib.rs b/src/persist/src/lib.rs index 339790d9734f9..3af83904c04ca 100644 --- a/src/persist/src/lib.rs +++ b/src/persist/src/lib.rs @@ -21,6 +21,7 @@ pub mod azure; pub mod cfg; pub mod error; pub mod file; +mod foundationdb; pub mod generated; pub mod indexed; pub mod intercept; diff --git a/src/persist/src/location.rs b/src/persist/src/location.rs index 05d66cc53e00b..03f9c14b59602 100644 --- a/src/persist/src/location.rs +++ b/src/persist/src/location.rs @@ -83,6 +83,11 @@ impl SeqNo { pub fn minimum() -> Self { SeqNo(0) } + + /// A maximum value. + pub fn maximum() -> Self { + SeqNo(u64::MAX) + } } impl RustType for SeqNo { @@ -115,6 +120,12 @@ impl std::error::Error for Determinate { } } +impl From for Determinate { + fn from(inner: anyhow::Error) -> Self { + Self::new(inner) + } +} + impl Determinate { /// Return a new Determinate wrapping the given error. /// @@ -429,6 +440,11 @@ pub trait Consensus: std::fmt::Debug + Send + Sync { /// `seqno` is greater than the current sequence number, or if there is no /// data at this key. async fn truncate(&self, key: &str, seqno: SeqNo) -> Result; + + /// Returns true if [`truncate`] returns the number of versions deleted. + fn truncate_counts(&self) -> bool { + true + } } #[async_trait] @@ -625,7 +641,7 @@ pub mod tests { use anyhow::anyhow; use futures_util::TryStreamExt; - use mz_ore::assert_err; + use mz_ore::{assert_err, assert_ok}; use uuid::Uuid; use crate::location::Blob; @@ -1012,7 +1028,11 @@ pub mod tests { ); // Can remove the previous write with the appropriate truncation. - assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1)); + if consensus.truncate_counts() { + assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1)); + } else { + assert_ok!(consensus.truncate(&key, SeqNo(6)).await); + } // Verify that the old write is indeed deleted. assert_eq!( @@ -1022,7 +1042,11 @@ pub mod tests { // Truncate is idempotent and can be repeated. The return value // indicates we didn't do any work though. - assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0)); + if consensus.truncate_counts() { + assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0)); + } else { + assert_ok!(consensus.truncate(&key, SeqNo(6)).await); + } // Make sure entries under different keys don't clash. let other_key = Uuid::new_v4().to_string(); @@ -1079,7 +1103,11 @@ pub mod tests { consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await, Ok(CaSResult::Committed), ); - assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2)); + if consensus.truncate_counts() { + assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2)); + } else { + assert_ok!(consensus.truncate(&key, SeqNo(12)).await); + } // Sequence numbers used within Consensus have to be within [0, i64::MAX]. From 6157d1456b43edb8857affd5ca18bfd6b0a30134 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Oct 2025 17:18:26 +0200 Subject: [PATCH 2/6] wip Signed-off-by: Moritz Hoffmann --- src/persist/src/foundationdb.rs | 53 ++++++++++++++------------------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs index 5065002c6d92d..a0c589ba03297 100644 --- a/src/persist/src/foundationdb.rs +++ b/src/persist/src/foundationdb.rs @@ -36,7 +36,6 @@ use foundationdb::{ use futures_util::future::FutureExt; use std::io::Write; use std::sync::{Arc, OnceLock}; -use tracing::info; impl From for ExternalError { fn from(x: FdbError) -> Self { @@ -236,11 +235,8 @@ impl FdbConsensus { // println!("key: {key}, seqno bytes: {:?}", seq_no); let seqno: SeqNo = unpack(seq_no).expect("valid data"); - let seq_no_space = data_key.subspace(&seqno); - let data = trx - .get(&seq_no_space.bytes(), false) - .await? - .expect("valid data"); + let seq_no_space = data_key.pack(&seqno); + let data = trx.get(&seq_no_space, false).await?.expect("valid data"); let data = unpack::>(&data).expect("valid data"); // println!( // "key: {key}, key_value bytes: {} seqno: {:?}", @@ -280,13 +276,13 @@ impl FdbConsensus { // foundationdb::tuple::Bytes::from(seqno_key.bytes()) // ); - let data_seqno_key = data_key.subspace(&new.seqno); - trx.set(&data_seqno_key.bytes(), &pack(&new.data.as_ref())); + let data_seqno_key = data_key.pack(&new.seqno); + trx.set(&data_seqno_key, &pack(&new.data.as_ref())); // println!( // "cas data_seqno_key: {:?}", // foundationdb::tuple::Bytes::from(data_seqno_key.bytes()) // ); - let written = trx.get(&data_seqno_key.bytes(), false).await?; + let written = trx.get(&data_seqno_key, false).await?; let unpacked: Vec = unpack(&written.unwrap()).expect("valid data"); assert_eq!(&*unpacked, &*new.data); Ok(CaSResult::Committed) @@ -370,14 +366,11 @@ impl FdbConsensus { ExternalError::Determinate(anyhow!("no entries for key: {}", key).into()).into(), ); } + let data_key = self.data.subspace(&key).expect("valid directory"); + let key_space_start = data_key.pack(&SeqNo::minimum()); + let key_space_end = data_key.pack(&seqno); - let key_space_start = self - .data - .subspace(&(key, &SeqNo::minimum())) - .expect("valid directory"); - let key_space_end = self.data.subspace(&(key, seqno)).expect("valid directory"); - - trx.clear_range(&key_space_start.bytes(), &key_space_end.bytes()); + trx.clear_range(&key_space_start, &key_space_end); Ok(()) } } @@ -423,11 +416,11 @@ impl Consensus for FdbConsensus { TransactOption::default(), ) .await?; - info!( - "FdbConsensus::head({}) -> {:?}", - key, - ok.as_ref().map(|ok| ok.seqno) - ); + // info!( + // "FdbConsensus::head({}) -> {:?}", + // key, + // ok.as_ref().map(|ok| ok.seqno) + // ); Ok(ok) } @@ -437,13 +430,13 @@ impl Consensus for FdbConsensus { expected: Option, new: VersionedData, ) -> Result { - info!( - "FdbConsensus::compare_and_set({}, {:?}, <{} bytes at seqno {}>)", - key, - expected, - new.data.len(), - new.seqno - ); + // info!( + // "FdbConsensus::compare_and_set({}, {:?}, <{} bytes at seqno {}>)", + // key, + // expected, + // new.data.len(), + // new.seqno + // ); if let Some(expected) = expected { if new.seqno <= expected { return Err(Error::from( @@ -477,7 +470,7 @@ impl Consensus for FdbConsensus { from: SeqNo, limit: usize, ) -> Result, ExternalError> { - info!("FdbConsensus::scan({}, {:?}, {})", key, from, limit); + // info!("FdbConsensus::scan({}, {:?}, {})", key, from, limit); let ok = self .db .transact_boxed( @@ -490,7 +483,7 @@ impl Consensus for FdbConsensus { } async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { - info!("FdbConsensus::truncate({}, {:?})", key, seqno); + // info!("FdbConsensus::truncate({}, {:?})", key, seqno); self.db .transact_boxed( (key, seqno), From e7a17218f31a45a9ff0b733785c272ca65164046 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Oct 2025 17:29:45 +0200 Subject: [PATCH 3/6] Fix panic! Signed-off-by: Moritz Hoffmann --- src/persist/src/foundationdb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs index a0c589ba03297..c3e29bf8a9a10 100644 --- a/src/persist/src/foundationdb.rs +++ b/src/persist/src/foundationdb.rs @@ -297,7 +297,7 @@ impl FdbConsensus { let mut limit = *limit; let data_key = self.data.subspace(&key).expect("valid directory"); let seqno_start = data_key.pack(&from); - let seqno_end = data_key.pack(&0xff); + let seqno_end = data_key.pack(&SeqNo::maximum()); // let output = trx.get_range(&data_key.range().into(), 1, false).await?; // for key_value in &output { From c354c8f6d5ab21618c66ae29cca16fa23f2ed483 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 14 Oct 2025 10:37:08 +0200 Subject: [PATCH 4/6] testdrive against fdb Signed-off-by: Moritz Hoffmann --- Cargo.lock | 1 + ci/builder/Dockerfile | 6 + ci/test/lint-main/checks/check-copyright.sh | 1 + ci/test/pipeline.template.yml | 13 + deny.toml | 3 + misc/foundationdb/fdb.cluster | 2 + misc/foundationdb/foundationdb.conf | 47 ++ misc/images/ubuntu-base/Dockerfile | 3 + misc/python/materialize/cli/run.py | 12 +- .../mzcompose/services/foundationdb.py | 115 +++++ .../mzcompose/services/materialized.py | 13 + src/persist-client/src/lib.rs | 4 +- src/persist/Cargo.toml | 2 +- src/persist/src/cfg.rs | 19 +- src/persist/src/foundationdb.rs | 440 +++++++----------- src/workspace-hack/Cargo.toml | 2 + test/testdrive/mzcompose.py | 31 +- 17 files changed, 423 insertions(+), 291 deletions(-) create mode 100644 misc/foundationdb/fdb.cluster create mode 100644 misc/foundationdb/foundationdb.conf create mode 100644 misc/python/materialize/mzcompose/services/foundationdb.py diff --git a/Cargo.lock b/Cargo.lock index d6e9d2b936112..fab1c961864ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13780,6 +13780,7 @@ dependencies = [ "postgres", "postgres-types", "predicates 3.1.3", + "prettyplease", "proc-macro2", "proptest", "proptest-derive", diff --git a/ci/builder/Dockerfile b/ci/builder/Dockerfile index 0a40b9c6b907b..5605814d06145 100644 --- a/ci/builder/Dockerfile +++ b/ci/builder/Dockerfile @@ -352,6 +352,12 @@ RUN curl -fsSL https://amazon-inspector-sbomgen.s3.amazonaws.com/1.8.1/linux/$AR && mv inspector-sbomgen-1.8.1/linux/$ARCH_GO/inspector-sbomgen /usr/local/bin \ && chmod +x /usr/local/bin/inspector-sbomgen +RUN arch_fdb=$(echo "$ARCH_GCC" | sed -e "s/x86_64/amd64/") \ + && curl -fsSL https://github.com/apple/foundationdb/releases/download/7.3.71/foundationdb-clients_7.3.71-1_$arch_fdb.deb > foundationdb-clients.deb \ + && if [ $ARCH_GCC = X64_64 ]; then echo '695193b8c6f8af9ec083221611b5f2925ef7a5e3c5e3c1d0af65d0dfbe99d13d foundationdb-clients.deb' | sha256sum --check; fi \ + && if [ $ARCH_GCC = aarch64 ]; then echo 'db1bbb72d57685a1c212c6456e6f0dfd1266c5c10c4adbc98d23a5d91bdbaff7 foundationdb-clients.deb' | sha256sum --check; fi \ + && dpkg -i foundationdb-clients.deb + # Hardcode some known SSH hosts, or else SSH will ask whether the host is # trustworthy on the first connection. diff --git a/ci/test/lint-main/checks/check-copyright.sh b/ci/test/lint-main/checks/check-copyright.sh index 3f4d960f1688d..18e7634280054 100755 --- a/ci/test/lint-main/checks/check-copyright.sh +++ b/ci/test/lint-main/checks/check-copyright.sh @@ -49,6 +49,7 @@ copyright_files=$(grep -vE \ -e '^ci/test/lint-deps/' \ -e '^misc/bazel/c_deps/patches/snappy-config.patch' \ -e '^misc/completions/.*' \ + -e '^misc/foundationdb/.*' \ -e '^misc/mcp-materialize/uv.lock' \ -e '^misc/mcp-materialize-agents/uv.lock' \ -e '^misc/mcp-materialize-agents/mcp_materialize_agents/system_prompt.md' \ diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index 26382112bec8e..76c63ba91023d 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -297,6 +297,19 @@ steps: agents: queue: hetzner-aarch64-8cpu-16gb + - id: testdrive-fdb + label: "Testdrive (FoundationDB)" + depends_on: build-aarch64 + timeout_in_minutes: 40 + inputs: [test/testdrive] + parallelism: 20 + plugins: + - ./ci/plugins/mzcompose: + composition: testdrive + args: [--foundationdb] + agents: + queue: hetzner-aarch64-8cpu-16gb + - id: cluster-tests label: "Cluster tests" depends_on: build-aarch64 diff --git a/deny.toml b/deny.toml index c8895ff5a81c6..6fabe1e92494e 100644 --- a/deny.toml +++ b/deny.toml @@ -148,7 +148,9 @@ skip = [ # Used by tower-lsp { name = "dashmap", version = "5.5.3" }, # Used by bindgen + { name = "bindgen", version = "0.70.1" }, { name = "itertools", version = "0.13.0" }, + { name = "rustc-hash", version = "1.1.0" }, # Used by pprof { name = "nix", version = "0.26.4" }, # Used by dynfmt @@ -205,6 +207,7 @@ name = "log" wrappers = [ "azure_svc_blobstorage", "apache-avro", + "bindgen", "buildid", "cookie_store", "deadpool-postgres", diff --git a/misc/foundationdb/fdb.cluster b/misc/foundationdb/fdb.cluster new file mode 100644 index 0000000000000..9e55dcb40a6e7 --- /dev/null +++ b/misc/foundationdb/fdb.cluster @@ -0,0 +1,2 @@ +# FoundationDB in Docker +docker:docker@foundationdb:4500 diff --git a/misc/foundationdb/foundationdb.conf b/misc/foundationdb/foundationdb.conf new file mode 100644 index 0000000000000..212ee71321249 --- /dev/null +++ b/misc/foundationdb/foundationdb.conf @@ -0,0 +1,47 @@ +## foundationdb.conf +## +## Configuration file for FoundationDB server processes +## Full documentation is available at +## https://apple.github.io/foundationdb/configuration.html#the-configuration-file + +[fdbmonitor] +user = foundationdb +group = foundationdb + +[general] +restart-delay = 60 +## by default, restart-backoff = restart-delay-reset-interval = restart-delay +# initial-restart-delay = 0 +# restart-backoff = 60 +# restart-delay-reset-interval = 60 +cluster-file = /etc/foundationdb/fdb.cluster +# delete-envvars = +# kill-on-configuration-change = true + +## Default parameters for individual fdbserver processes +[fdbserver] +command = /usr/sbin/fdbserver +public-address = auto:$ID +listen-address = public +datadir = /var/lib/foundationdb/data/$ID +logdir = /var/log/foundationdb +# logsize = 10MiB +# maxlogssize = 100MiB +# machine-id = +# datacenter-id = +# class = +# memory = 8GiB +# storage-memory = 1GiB +# cache-memory = 2GiB +# metrics-cluster = +# metrics-prefix = + +## An individual fdbserver process with id 4500 +## Parameters set here override defaults from the [fdbserver] section +[fdbserver.4500] + +[backup_agent] +command = /usr/lib/foundationdb/backup_agent/backup_agent +logdir = /var/log/foundationdb + +[backup_agent.1] diff --git a/misc/images/ubuntu-base/Dockerfile b/misc/images/ubuntu-base/Dockerfile index e0424a199b394..cfaf73f5407ba 100644 --- a/misc/images/ubuntu-base/Dockerfile +++ b/misc/images/ubuntu-base/Dockerfile @@ -24,3 +24,6 @@ RUN sed -i -e 's#http://archive\.ubuntu\.com#http://us-east-1.ec2.archive.ubuntu -e 's#http://ports\.ubuntu\.com#http://us-east-1.ec2.ports.ubuntu.com#' /etc/apt/sources.list.d/ubuntu.sources RUN apt-get update --fix-missing && TZ=UTC DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends eatmydata + +COPY --from=foundationdb/foundationdb:7.3.71 /usr/lib/libfdb_c.so /usr/lib/ +COPY --from=foundationdb/foundationdb:7.3.71 /usr/bin/fdbcli /usr/bin/ diff --git a/misc/python/materialize/cli/run.py b/misc/python/materialize/cli/run.py index c045e5b20aa88..292ce045de319 100644 --- a/misc/python/materialize/cli/run.py +++ b/misc/python/materialize/cli/run.py @@ -271,11 +271,11 @@ def main() -> int: if args.program == "environmentd": _handle_lingering_services(kill=args.reset) scratch = MZ_ROOT / "scratch" - # dbconn = _connect_sql(args.postgres) - # for schema in ["consensus", "tsoracle", "storage"]: - # if args.reset: - # _run_sql(dbconn, f"DROP SCHEMA IF EXISTS {schema} CASCADE") - # _run_sql(dbconn, f"CREATE SCHEMA IF NOT EXISTS {schema}") + dbconn = _connect_sql(args.postgres) + for schema in ["consensus", "tsoracle", "storage"]: + if args.reset: + _run_sql(dbconn, f"DROP SCHEMA IF EXISTS {schema} CASCADE") + _run_sql(dbconn, f"CREATE SCHEMA IF NOT EXISTS {schema}") # Keep this after clearing out Postgres. Otherwise there is a race # where a ctrl-c could leave persist with references in Postgres to # files that have been deleted. There's no race if we reset in the @@ -320,7 +320,7 @@ def main() -> int: f"--orchestrator-process-prometheus-service-discovery-directory={MZDATA}/prometheus", f"--orchestrator-process-scratch-directory={scratch}", "--secrets-controller=local-file", - f"--persist-consensus-url={args.consensus}?options=--search_path=consensus", + f"--persist-consensus-url={consensus}?options=--search_path=consensus", f"--persist-blob-url={args.blob}", f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle", f"--environment-id={environment_id}", diff --git a/misc/python/materialize/mzcompose/services/foundationdb.py b/misc/python/materialize/mzcompose/services/foundationdb.py new file mode 100644 index 0000000000000..e0d28cc6c66b5 --- /dev/null +++ b/misc/python/materialize/mzcompose/services/foundationdb.py @@ -0,0 +1,115 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from materialize import MZ_ROOT +from materialize.mzcompose.service import ( + Service, + ServiceConfig, +) + + +class FoundationDB(Service): + def __init__( + self, + name: str = "foundationdb", + image: str | None = None, + ports: list[str] = ["4500"], + environment: list[str] = [ + "FDB_NETWORKING_MODE=container", + ], + volumes: list[str] = [], + restart: str = "no", + version: str = "7.3.71", + ) -> None: + # command: list[str] = [ + # "postgres", + # "-c", + # "wal_level=logical", + # "-c", + # f"max_wal_senders={max_wal_senders}", + # "-c", + # f"max_replication_slots={max_replication_slots}", + # "-c", + # "max_connections=5000", + # ] + extra_command + + # if setup_materialize: + # path = os.path.relpath( + # MZ_ROOT / "misc" / "postgres" / "setup_materialize.sql", + # loader.composition_path, + # ) + # volumes = volumes + [ + # f"{path}:/docker-entrypoint-initdb.d/z_setup_materialize.sql" + # ] + # + # environment = environment + ["PGPORT=26257"] + + env_extra = [ + f"FDB_COORDINATOR_PORT={ports[0]}", + f"FDB_PORT={ports[0]}", + ] + + # command = dedent( + # """ + # /usr/bin/tini -g -- /var/fdb/scripts/fdb.bash & + # sleep 5 + # fdbcli -C /etc/foundationdb/fdb.cluster --exec "configure new single memory" + # fdbcli -C /etc/foundationdb/fdb.cluster --exec "status" + # wait + # """ + # ) + + if image is None: + image = f"foundationdb/foundationdb:{version}" + + config: ServiceConfig = {"image": image} + + volumes += [f"{MZ_ROOT}/misc/foundationdb/:/etc/foundationdb/"] + + config.update( + { + "image": image, + # "allow_host_ports": True, + # "command": ["bash", "-c", command], + "ports": ports, + "environment": env_extra + environment, + # "healthcheck": { + # "test": [ + # "CMD", + # "fdbcli", + # "--exec", + # "configure single memory ; status", + # ], + # "interval": "1s", + # "start_period": "30s", + # }, + "restart": restart, + "volumes": volumes, + } + ) + super().__init__(name=name, config=config) + + +# class PostgresMetadata(Postgres): +# def __init__(self, restart: str = "no") -> None: +# super().__init__( +# name="postgres-metadata", +# setup_materialize=True, +# ports=["26257"], +# restart=restart, +# ) + + +# CockroachOrPostgresMetadata = ( +# Cockroach if os.getenv("BUILDKITE_TAG", "") != "" else PostgresMetadata +# ) +# +# METADATA_STORE: str = ( +# "cockroach" if CockroachOrPostgresMetadata == Cockroach else "postgres-metadata" +# ) diff --git a/misc/python/materialize/mzcompose/services/materialized.py b/misc/python/materialize/mzcompose/services/materialized.py index fecad42f19b12..200e316cdda4d 100644 --- a/misc/python/materialize/mzcompose/services/materialized.py +++ b/misc/python/materialize/mzcompose/services/materialized.py @@ -100,6 +100,7 @@ def __init__( networks: ( dict[str, dict[str, list[str]]] | dict[str, dict[str, str]] | None ) = None, + consensus_foundationdb: bool = False, ) -> None: if name is None: name = "materialized" @@ -259,6 +260,10 @@ def __init__( # v0.92.0). f"MZ_ADAPTER_STASH_URL=postgres://root@{address}:26257?options=--search_path=adapter", ] + if consensus_foundationdb: + command += [ + "--persist-consensus-url=foundationdb:?options=--search_path=consensus", + ] command += [ "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0", @@ -336,6 +341,14 @@ def __init__( volumes += [f"{os.getcwd()}/license_key:/license_key/license_key"] + if ( + image_version is None or image_version >= "v0.160.0-dev" + ) and consensus_foundationdb: + print("Using foundationdb for consensus") + volumes += [ + f"{MZ_ROOT}/misc/foundationdb/fdb.cluster:/etc/foundationdb/fdb.cluster" + ] + if use_default_volumes: volumes += DEFAULT_MZ_VOLUMES volumes += volumes_extra diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 2704434c07a18..288983df7eda5 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -916,11 +916,9 @@ mod tests { } pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient { - let mut location = PersistLocation::new_in_mem(); - location.consensus_uri = "foundationdb://".parse().unwrap(); let cache = new_test_client_cache(dyncfgs); cache - .open(location) + .open(PersistLocation::new_in_mem()) .await .expect("client construction failed") } diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index 337293acc9636..3f35e0e203ebc 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -39,7 +39,7 @@ bytes = "1.10.1" deadpool-postgres = "0.10.3" differential-dataflow = "0.17.0" fail = { version = "0.5.1", features = ["failpoints"] } -foundationdb = { version = "0.9.2", features = ["fdb-7_3"] } +foundationdb = { version = "0.9.2", features = ["embedded-fdb-include", "fdb-7_3"] } futures-util = "0.3.31" itertools = "0.14.0" md-5 = "0.10.6" diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index e6474d28beaa5..0dfe5d1dd3bd8 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -23,7 +23,7 @@ use mz_postgres_client::metrics::PostgresClientMetrics; use crate::azure::{AzureBlob, AzureBlobConfig}; use crate::file::{FileBlob, FileBlobConfig}; -use crate::foundationdb::{FdbConsensus, FdbConsensusConfig, ValidatingConsensus}; +use crate::foundationdb::{FdbConsensus, FdbConsensusConfig}; use crate::location::{Blob, Consensus, Determinate, ExternalError}; use crate::mem::{MemBlob, MemBlobConfig, MemConsensus}; use crate::metrics::S3BlobMetrics; @@ -228,14 +228,6 @@ impl ConsensusConfig { ConsensusConfig::FoundationDB(config) => { Ok(Arc::new(FdbConsensus::open(config).await?)) } - // ConsensusConfig::FoundationDB(config) => { - // let inner = FdbConsensus::open(config).await?; - // inner.drop_and_recreate().await?; - // Ok(Arc::new(ValidatingConsensus { - // inner, - // validator: MemConsensus::default(), - // })) - // } ConsensusConfig::Postgres(config) => { Ok(Arc::new(PostgresConsensus::open(config).await?)) } @@ -251,12 +243,9 @@ impl ConsensusConfig { dyncfg: Arc, ) -> Result { let config = match url.scheme() { - "fdb" | "foundationdb" => { - let network = FdbConsensusConfig::get_network(); - Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new( - network.into(), - )?)) - } + "fdb" | "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new( + url.clone(), + )?)), "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres( PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?, )), diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs index c3e29bf8a9a10..87801f3959614 100644 --- a/src/persist/src/foundationdb.rs +++ b/src/persist/src/foundationdb.rs @@ -11,31 +11,37 @@ //! //! We're storing the consensus data in a subspace at `/mz/consensus`. Each key maps to a subspace //! with the following structure: -//! ./seqno/ -> -//! ./data// -> +//! ./seqno/ -> +//! ./data// -> + +use std::io::Write; +use std::sync::OnceLock; -use crate::error::Error; -use crate::location::{ - CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo, - VersionedData, -}; use anyhow::anyhow; use async_stream::try_stream; use async_trait::async_trait; use bytes::Bytes; use foundationdb::api::NetworkAutoStop; -use foundationdb::directory::{Directory, DirectoryLayer, DirectoryOutput}; -use foundationdb::options::StreamingMode; +use foundationdb::directory::{ + Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace, +}; use foundationdb::tuple::{ - PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, unpack, + PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, + unpack, }; use foundationdb::{ Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption, Transaction, }; use futures_util::future::FutureExt; -use std::io::Write; -use std::sync::{Arc, OnceLock}; +use mz_ore::url::SensitiveUrl; +use url::Url; + +use crate::error::Error; +use crate::location::{ + CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo, + VersionedData, +}; impl From for ExternalError { fn from(x: FdbError) -> Self { @@ -53,76 +59,42 @@ impl From for ExternalError { } } -// impl From for ExternalError { -// fn from(x: TransactionCommitError) -> Self { -// match x { -// TransactionCommitError::Retryable(e) => { -// ExternalError::Indeterminate(Indeterminate::new(e.into())) -// } -// TransactionCommitError::NonRetryable(e) => { -// ExternalError::Determinate(Determinate::new(e.into())) -// } -// } - -// impl From for ExternalError { -// fn from(x: DirectoryError) -> Self { -// ExternalError::Determinate(Determinate { -// inner: anyhow::Error::new(x), -// }) -// } -// } - -static CELL: OnceLock> = OnceLock::new(); - -pub fn get_network() -> Arc { - CELL.get_or_init(|| unsafe { foundationdb::boot() }.into()) - .clone() -} +/// FoundationDB network singleton. +/// +/// Normally, we'd need to drop this to clean up the network, but since we +/// never expect to exit normally, it's fine to leak it. +static FDB_NETWORK: OnceLock = OnceLock::new(); -/// Configuration to connect to a Postgres backed implementation of [Consensus]. -#[derive(Clone)] -pub struct FdbConsensusConfig { - network: Arc, +fn init_network() -> &'static NetworkAutoStop { + FDB_NETWORK.get_or_init(|| unsafe { foundationdb::boot() }) } -impl std::fmt::Debug for FdbConsensusConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FdbConsensusConfig").finish_non_exhaustive() - } +/// Configuration to connect to a FoundationDB backed implementation of [Consensus]. +#[derive(Clone, Debug)] +pub struct FdbConsensusConfig { + url: SensitiveUrl, } -// impl From for PostgresClientConfig { -// fn from(config: FdbConsensusConfig) -> Self { -// let network = unsafe { foundationdb::boot() }; -// PostgresClientConfig::new(config.url, config.knobs, config.metrics) -// } -// } - impl FdbConsensusConfig { /// Returns a new [FdbConsensusConfig] for use in production. - pub fn new(network: Arc) -> Result { - Ok(FdbConsensusConfig { network }) + pub fn new(url: SensitiveUrl) -> Result { + Ok(FdbConsensusConfig { url }) } pub fn new_for_test() -> Result { - let network = unsafe { foundationdb::boot() }; - Self::new(network.into()) - } - - pub fn get_network() -> Arc { - get_network() + Self::new(SensitiveUrl(Url::parse("foundationdb:").unwrap())) } } -/// Implementation of [Consensus] over a Postgres database. +/// Implementation of [Consensus] over a Foundation database. pub struct FdbConsensus { - // content_subspace: DirectoryOutput, - seqno: DirectoryOutput, - data: DirectoryOutput, + seqno: DirectorySubspace, + data: DirectorySubspace, db: Database, - _network: Arc, } +/// An error that can occur during a FoundationDB transaction. +/// This is either a FoundationDB error or an external error. enum FdbTransactError { FdbError(FdbError), ExternalError(ExternalError), @@ -140,6 +112,12 @@ impl From for FdbTransactError { } } +impl From for FdbTransactError { + fn from(value: PackError) -> Self { + ExternalError::Determinate(anyhow::Error::new(value).into()).into() + } +} + impl From for ExternalError { fn from(value: FdbTransactError) -> Self { match value { @@ -149,6 +127,12 @@ impl From for ExternalError { } } +impl From for ExternalError { + fn from(e: DirectoryError) -> Self { + ExternalError::Determinate(anyhow!("directory error: {e:?}").into()) + } +} + impl TransactError for FdbTransactError { fn try_into_fdb_error(self) -> Result { match self { @@ -176,41 +160,87 @@ impl<'de> TupleUnpack<'de> for SeqNo { impl std::fmt::Debug for FdbConsensus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FdbConsensus").finish_non_exhaustive() + f.debug_struct("FdbConsensus") + .field("seqno", &self.seqno) + .field("data", &self.data) + .finish_non_exhaustive() } } impl FdbConsensus { - /// Open a Postgres [Consensus] instance with `config`, for the collection - /// named `shard`. + /// Open a FoundationDB [Consensus] instance with `config`. pub async fn open(config: FdbConsensusConfig) -> Result { - let db = Database::new(None)?; + let mut prefix = Vec::new(); + for (key, value) in config.url.query_pairs() { + match &*key { + "options" => { + if let Some(path) = value.strip_prefix("--search_path=") { + prefix = path.split('/').map(|s| s.to_owned()).collect(); + } else { + return Err(ExternalError::from(anyhow!( + "unrecognized FoundationDB URL options parameter: {value}", + ))); + } + } + key => { + return Err(ExternalError::from(anyhow!( + "unrecognized FoundationDB URL query parameter: {key}: {value}", + ))); + } + } + } + let path = if config.url.0.cannot_be_a_base() { + None + } else { + Some(config.url.0.path()) + }; + + let _ = init_network(); + + let db = Database::new(path)?; let directory = DirectoryLayer::default(); - let path = vec!["seqno".to_owned()]; + let path: Vec<_> = prefix + .iter() + .cloned() + .chain(std::iter::once("seqno".to_owned())) + .collect(); let seqno = db .run(async |trx, _maybe_commited| { Ok(directory.create_or_open(&trx, &path, None, None).await) }) - .await? - .expect("valid directory"); - let path = vec!["data".to_owned()]; + .await??; + let seqno = match seqno { + DirectoryOutput::DirectorySubspace(subspace) => subspace, + DirectoryOutput::DirectoryPartition(_partition) => { + return Err(ExternalError::from(anyhow!( + "consensus seqno cannot be a partition" + ))); + } + }; + let path: Vec<_> = prefix + .into_iter() + .chain(std::iter::once("data".to_owned())) + .collect(); let data = db .run(async |trx, _maybe_commited| { Ok(directory.create_or_open(&trx, &path, None, None).await) }) - .await? - .expect("valid directory"); - Ok(FdbConsensus { - seqno, - data, - db, - _network: config.network, - }) + .await??; + let data = match data { + DirectoryOutput::DirectorySubspace(subspace) => subspace, + DirectoryOutput::DirectoryPartition(_partition) => { + return Err(ExternalError::from(anyhow!( + "consensus data cannot be a partition" + ))); + } + }; + Ok(FdbConsensus { seqno, data, db }) } - /// Drops and recreates the `consensus` table in Postgres + /// Drops and recreates the `consensus` data in FoundationDB. /// /// ONLY FOR TESTING + #[cfg(test)] pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> { self.db .run(async |trx, _maybe_commited| { @@ -225,28 +255,27 @@ impl FdbConsensus { async fn head_trx( &self, trx: &Transaction, - key: &str, + seqno_key: &Subspace, + data_key: &Subspace, ) -> Result, FdbTransactError> { - let seqno_key = self.seqno.subspace(&key).expect("valid directory"); - let data_key = self.data.subspace(&key).expect("valid directory"); - - let seq_no = trx.get(seqno_key.bytes(), false).await?; - if let Some(seq_no) = &seq_no { - // println!("key: {key}, seqno bytes: {:?}", seq_no); - let seqno: SeqNo = unpack(seq_no).expect("valid data"); - - let seq_no_space = data_key.pack(&seqno); - let data = trx.get(&seq_no_space, false).await?.expect("valid data"); - let data = unpack::>(&data).expect("valid data"); - // println!( - // "key: {key}, key_value bytes: {} seqno: {:?}", - // data.len(), - // seqno - // ); - Ok(Some(VersionedData { - seqno, - data: Bytes::from(data), - })) + let seqno = trx.get(seqno_key.bytes(), false).await?; + if let Some(seqno) = &seqno { + let seqno: SeqNo = unpack(seqno)?; + + let seqno_space = data_key.pack(&seqno); + let data = trx.get(&seqno_space, false).await?; + if let Some(data) = data { + let data = unpack::>(&data)?; + Ok(Some(VersionedData { + seqno, + data: Bytes::from(data), + })) + } else { + Err(ExternalError::Determinate( + anyhow!("inconsistent state: seqno present without data").into(), + ) + .into()) + } } else { Ok(None) } @@ -254,77 +283,50 @@ impl FdbConsensus { async fn compare_and_set_trx( &self, trx: &Transaction, - key: &str, + seqno_key: &Subspace, + data_key: &Subspace, expected: &Option, new: &VersionedData, ) -> Result { - let seqno_key = self.seqno.subspace(&key).expect("valid directory"); - let data_key = self.data.subspace(&key).expect("valid directory"); - - let seq_no = trx + let seqno = trx .get(seqno_key.bytes(), false) .await? - .map(|data| unpack(&data).expect("valid data")); + .map(|data| unpack(&data)) + .transpose()?; - if expected.is_some() && (expected != &seq_no) { + if expected.is_some() && (expected != &seqno) { return Ok(CaSResult::ExpectationMismatch); } trx.set(seqno_key.bytes(), &pack(&new.seqno)); - // println!( - // "cas seqno_key: {:?}", - // foundationdb::tuple::Bytes::from(seqno_key.bytes()) - // ); let data_seqno_key = data_key.pack(&new.seqno); trx.set(&data_seqno_key, &pack(&new.data.as_ref())); - // println!( - // "cas data_seqno_key: {:?}", - // foundationdb::tuple::Bytes::from(data_seqno_key.bytes()) - // ); - let written = trx.get(&data_seqno_key, false).await?; - let unpacked: Vec = unpack(&written.unwrap()).expect("valid data"); - assert_eq!(&*unpacked, &*new.data); Ok(CaSResult::Committed) } async fn scan_trx( &self, trx: &Transaction, - key: &str, + data_key: &Subspace, from: &SeqNo, limit: &usize, ) -> Result, FdbTransactError> { let mut limit = *limit; - let data_key = self.data.subspace(&key).expect("valid directory"); let seqno_start = data_key.pack(&from); let seqno_end = data_key.pack(&SeqNo::maximum()); - // let output = trx.get_range(&data_key.range().into(), 1, false).await?; - // for key_value in &output { - // println!("entry: all {:?}", key_value); - // } - - let mut range = RangeOption::from((seqno_start, seqno_end)); + let mut range = RangeOption::from(seqno_start..=seqno_end); range.limit = Some(limit); let mut entries = Vec::new(); - // println!("Scanning range begin: {:?}", range.begin); - // println!("Scanning range end: {:?}", range.end); - loop { let output = trx.get_range(&range, 1, false).await?; for key_value in &output { - // println!("entry: {:?}", key_value); - let seqno = data_key.unpack(key_value.key()).expect("valid data"); - let value: Vec = unpack(key_value.value()).expect("valid data"); - // println!( - // "key: {key}, key_value bytes: {} seqno: {:?}", - // value.len(), - // seqno - // ); + let seqno = data_key.unpack(key_value.key())?; + let value: Vec = unpack(key_value.value())?; entries.push(VersionedData { - seqno: seqno, + seqno, data: Bytes::from(value), }); } @@ -347,28 +349,24 @@ impl FdbConsensus { async fn truncate_trx( &self, trx: &Transaction, - key: &str, - seqno: &SeqNo, + seqno_key: &Subspace, + data_key: &Subspace, + until: &SeqNo, ) -> Result<(), FdbTransactError> { - let seqno_key = self.seqno.subspace(&key).expect("valid directory"); - - let seq_no = trx.get(seqno_key.bytes(), false).await?; - if let Some(seq_no) = &seq_no { - let current_seqno: SeqNo = unpack(seq_no).expect("valid data"); - if current_seqno < *seqno { + let seqno = trx.get(seqno_key.bytes(), false).await?; + if let Some(seqno) = &seqno { + let current_seqno: SeqNo = unpack(seqno)?; + if current_seqno < *until { return Err(ExternalError::Determinate( - anyhow!("upper bound too high for truncate: {:?}", seqno).into(), + anyhow!("upper bound too high for truncate: {until}").into(), ) .into()); } } else { - return Err( - ExternalError::Determinate(anyhow!("no entries for key: {}", key).into()).into(), - ); + return Err(ExternalError::Determinate(anyhow!("no entries for key").into()).into()); } - let data_key = self.data.subspace(&key).expect("valid directory"); let key_space_start = data_key.pack(&SeqNo::minimum()); - let key_space_end = data_key.pack(&seqno); + let key_space_end = data_key.pack(&until); trx.clear_range(&key_space_start, &key_space_end); Ok(()) @@ -382,13 +380,12 @@ impl Consensus for FdbConsensus { let keys: Vec = self .db .run(async |trx, _maybe_commited| { - let mut range = RangeOption::from(self.seqno.range().expect("valid directory")); + let mut range = RangeOption::from(self.seqno.range()); let mut keys = Vec::new(); loop { let values = trx.get_range(&range, 1, false).await?; for value in &values { - // println!("entry: {:?}", value); - let key: String = self.seqno.unpack(value.key()).expect("valid directory").expect("valid data"); + let key: String = self.seqno.unpack(value.key()).map_err(FdbBindingError::PackError)?; keys.push(key); } if let Some(last) = values.last() { @@ -407,20 +404,17 @@ impl Consensus for FdbConsensus { } async fn head(&self, key: &str) -> Result, ExternalError> { - // info!("FdbConsensus::head({})", key); + let seqno_key = self.seqno.subspace(&key); + let data_key = self.data.subspace(&key); + let ok = self .db .transact_boxed( - key, - |trx, key| self.head_trx(trx, key).boxed(), + (&seqno_key, &data_key), + |trx, (seqno_key, data_key)| self.head_trx(trx, seqno_key, data_key).boxed(), TransactOption::default(), ) .await?; - // info!( - // "FdbConsensus::head({}) -> {:?}", - // key, - // ok.as_ref().map(|ok| ok.seqno) - // ); Ok(ok) } @@ -430,13 +424,6 @@ impl Consensus for FdbConsensus { expected: Option, new: VersionedData, ) -> Result { - // info!( - // "FdbConsensus::compare_and_set({}, {:?}, <{} bytes at seqno {}>)", - // key, - // expected, - // new.data.len(), - // new.seqno - // ); if let Some(expected) = expected { if new.seqno <= expected { return Err(Error::from( @@ -451,12 +438,16 @@ impl Consensus for FdbConsensus { ))); } + let seqno_key = self.seqno.subspace(&key); + let data_key = self.data.subspace(&key); + let ok = self .db .transact_boxed( - (key, expected, &new), - |trx, (key, expected, new)| { - self.compare_and_set_trx(trx, key, expected, new).boxed() + (expected, &new), + |trx, (expected, new)| { + self.compare_and_set_trx(trx, &seqno_key, &data_key, expected, new) + .boxed() }, TransactOption::default(), ) @@ -470,12 +461,12 @@ impl Consensus for FdbConsensus { from: SeqNo, limit: usize, ) -> Result, ExternalError> { - // info!("FdbConsensus::scan({}, {:?}, {})", key, from, limit); + let data_key = self.data.subspace(&key); let ok = self .db .transact_boxed( - (key, from, limit), - |trx, (key, from, limit)| self.scan_trx(trx, key, from, limit).boxed(), + (&data_key, from, limit), + |trx, (data_key, from, limit)| self.scan_trx(trx, data_key, from, limit).boxed(), TransactOption::default(), ) .await?; @@ -483,12 +474,16 @@ impl Consensus for FdbConsensus { } async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { - // info!("FdbConsensus::truncate({}, {:?})", key, seqno); + let seqno_key = self.seqno.subspace(&key); + let data_key = self.data.subspace(&key); + self.db .transact_boxed( - (key, seqno), - |trx, (key, seqno)| self.truncate_trx(trx, key, seqno).boxed(), - TransactOption::default(), + (&seqno_key, &data_key, seqno), + |trx, (seqno_key, data_key, seqno)| { + self.truncate_trx(trx, seqno_key, data_key, seqno).boxed() + }, + TransactOption::idempotent(), ) .await?; Ok(0) @@ -499,85 +494,6 @@ impl Consensus for FdbConsensus { } } -#[derive(Debug)] -pub struct ValidatingConsensus { - pub inner: A, - pub validator: B, -} - -#[async_trait] -impl Consensus for ValidatingConsensus { - fn list_keys(&self) -> ResultStream<'_, String> { - self.inner.list_keys() - } - - async fn head(&self, key: &str) -> Result, ExternalError> { - let inner = self.inner.head(key).await?; - let valid = self.validator.head(key).await?; - assert_eq!(inner, valid, "mismatched head for key {}", key); - Ok(inner) - } - - async fn compare_and_set( - &self, - key: &str, - expected: Option, - new: VersionedData, - ) -> Result { - let inner = self - .inner - .compare_and_set(key, expected.clone(), new.clone()) - .await?; - let valid = self.validator.compare_and_set(key, expected, new).await?; - assert_eq!(inner, valid, "mismatched cas for key {}", key); - Ok(inner) - } - - async fn scan( - &self, - key: &str, - from: SeqNo, - limit: usize, - ) -> Result, ExternalError> { - let inner = self.inner.scan(key, from, limit).await?; - let valid = self.validator.scan(key, from, limit).await?; - for inner in &inner { - println!( - "inner scan: seqno: {:?}, {} bytes", - inner.seqno, - inner.data.len() - ); - } - for valid in &valid { - println!( - "valid scan: seqno: {:?}, {} bytes", - valid.seqno, - valid.data.len() - ); - } - for (a, b) in inner.iter().zip(valid.iter()) { - assert_eq!(a.seqno, b.seqno); - assert_eq!(&*a.data, &*b.data); - } - assert_eq!(inner.len(), valid.len()); - Ok(inner) - } - - async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { - let inner = self.inner.truncate(key, seqno).await?; - let valid = self.validator.truncate(key, seqno).await?; - self.scan(key, SeqNo::minimum(), 100000000).await?; - if self.truncate_counts() { - assert_eq!(inner, valid, "mismatched truncate counts for key {}", key); - } - Ok(inner) - } - - fn truncate_counts(&self) -> bool { - self.inner.truncate_counts() && self.validator.truncate_counts() - } -} - #[cfg(test)] mod tests { use uuid::Uuid; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 634be3dfb58b1..47f59f981f7f6 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -105,6 +105,7 @@ portable-atomic = { version = "1.11.1", features = ["require-cas"] } postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4"] } postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.3" } +prettyplease = { version = "0.2.37", default-features = false, features = ["verbatim"] } proc-macro2 = { version = "1.0.101", features = ["span-locations"] } proptest = { version = "1.7.0" } prost = { version = "0.13.5", features = ["no-recursion-limit", "prost-derive"] } @@ -251,6 +252,7 @@ portable-atomic = { version = "1.11.1", features = ["require-cas"] } postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4"] } postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.3" } +prettyplease = { version = "0.2.37", default-features = false, features = ["verbatim"] } proc-macro2 = { version = "1.0.101", features = ["span-locations"] } proptest = { version = "1.7.0" } proptest-derive = { version = "0.5.1", default-features = false, features = ["boxed_union"] } diff --git a/test/testdrive/mzcompose.py b/test/testdrive/mzcompose.py index ea7a85f8c0e0e..6009c048123a5 100644 --- a/test/testdrive/mzcompose.py +++ b/test/testdrive/mzcompose.py @@ -24,6 +24,7 @@ ) from materialize.mzcompose.services.azurite import Azurite from materialize.mzcompose.services.fivetran_destination import FivetranDestination +from materialize.mzcompose.services.foundationdb import FoundationDB from materialize.mzcompose.services.kafka import Kafka from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.minio import Minio @@ -48,6 +49,7 @@ Materialized(external_blob_store=True, sanity_restart=False), FivetranDestination(volumes_extra=["tmp:/share/tmp"]), Testdrive(external_blob_store=True), + FoundationDB(), ] @@ -105,6 +107,12 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" ) + parser.add_argument( + "--foundationdb", + action="store_true", + help="Use FoundationDB for internal metadata storage instead of Postgres", + ) + parser.add_argument( "files", nargs="*", @@ -138,8 +146,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: additional_system_parameter_defaults=additional_system_parameter_defaults, default_replication_factor=1, sanity_restart=False, + consensus_foundationdb=args.foundationdb, ) - testdrive = Testdrive( kafka_default_partitions=args.kafka_default_partitions, aws_region=args.aws_region, @@ -150,10 +158,25 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: blob_store_is_azure=args.azurite, fivetran_destination=True, fivetran_destination_files_path="/share/tmp", - entrypoint_extra=[ - f"--var=uses-redpanda={args.redpanda}", - ], + entrypoint_extra=( + [ + f"--var=uses-redpanda={args.redpanda}", + ] + + ["--consistency-checks=disable"] + if args.foundationdb + else [] + ), ) + if args.foundationdb: + c.up("foundationdb") + c.run( + "foundationdb", + "-C", + "/etc/foundationdb/fdb.cluster", + "--exec", + "configure new single memory", + entrypoint="fdbcli", + ) with c.override(testdrive, materialized): c.up(*dependencies, Service("testdrive", idle=True)) From 08651ec83c62f3e341e648607859560817963aae Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 14 Oct 2025 16:58:50 +0200 Subject: [PATCH 5/6] Fix compare_and_append implementation Signed-off-by: Moritz Hoffmann --- src/persist/src/foundationdb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs index 87801f3959614..818908ae5074a 100644 --- a/src/persist/src/foundationdb.rs +++ b/src/persist/src/foundationdb.rs @@ -294,7 +294,7 @@ impl FdbConsensus { .map(|data| unpack(&data)) .transpose()?; - if expected.is_some() && (expected != &seqno) { + if expected != &seqno { return Ok(CaSResult::ExpectationMismatch); } From 72eeacb22b9ebcded6a25fb372f019b09888d205 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 15 Oct 2025 11:45:53 +0200 Subject: [PATCH 6/6] fdb maelstrom Signed-off-by: Moritz Hoffmann --- src/persist-client/src/internal/metrics.rs | 4 +++ src/persist/src/cfg.rs | 2 +- src/persist/src/foundationdb.rs | 29 +++++++++++++--------- src/persist/src/location.rs | 6 ++++- src/persist/src/unreliable.rs | 4 +++ test/persist/mzcompose.py | 24 ++++++++++++++++-- 6 files changed, 53 insertions(+), 16 deletions(-) diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 6ecfe64660651..28d28a9434fd9 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -3012,6 +3012,10 @@ impl Consensus for MetricsConsensus { .inc_by(u64::cast_from(deleted)); Ok(deleted) } + + fn truncate_counts(&self) -> bool { + self.consensus.truncate_counts() + } } /// A standard set of metrics for an async task. Call [TaskMetrics::instrument_task] to instrument diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index 0dfe5d1dd3bd8..324c58783617e 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -213,7 +213,7 @@ impl BlobConfig { /// Config for an implementation of [Consensus]. #[derive(Debug, Clone)] pub enum ConsensusConfig { - /// Config for [FdbConsensus]. + /// Config for FoundationDB. FoundationDB(FdbConsensusConfig), /// Config for [PostgresConsensus]. Postgres(PostgresConsensusConfig), diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs index 818908ae5074a..415abaef93e79 100644 --- a/src/persist/src/foundationdb.rs +++ b/src/persist/src/foundationdb.rs @@ -11,8 +11,8 @@ //! //! We're storing the consensus data in a subspace at `/mz/consensus`. Each key maps to a subspace //! with the following structure: -//! ./seqno/ -> -//! ./data// -> +//! * `./seqno/ -> ` +//! * `./data// -> ` use std::io::Write; use std::sync::OnceLock; @@ -304,13 +304,15 @@ impl FdbConsensus { trx.set(&data_seqno_key, &pack(&new.data.as_ref())); Ok(CaSResult::Committed) } + async fn scan_trx( &self, trx: &Transaction, data_key: &Subspace, from: &SeqNo, limit: &usize, - ) -> Result, FdbTransactError> { + entries: &mut Vec, + ) -> Result<(), FdbTransactError> { let mut limit = *limit; let seqno_start = data_key.pack(&from); let seqno_end = data_key.pack(&SeqNo::maximum()); @@ -318,10 +320,11 @@ impl FdbConsensus { let mut range = RangeOption::from(seqno_start..=seqno_end); range.limit = Some(limit); - let mut entries = Vec::new(); + entries.clear(); loop { let output = trx.get_range(&range, 1, false).await?; + entries.reserve(output.len()); for key_value in &output { let seqno = data_key.unpack(key_value.key())?; let value: Vec = unpack(key_value.value())?; @@ -342,9 +345,7 @@ impl FdbConsensus { break; } } - - entries.sort_by_key(|e| e.seqno); - Ok(entries) + Ok(()) } async fn truncate_trx( &self, @@ -462,15 +463,19 @@ impl Consensus for FdbConsensus { limit: usize, ) -> Result, ExternalError> { let data_key = self.data.subspace(&key); - let ok = self - .db + let mut entries = Vec::new(); + self.db .transact_boxed( - (&data_key, from, limit), - |trx, (data_key, from, limit)| self.scan_trx(trx, data_key, from, limit).boxed(), + (&data_key, from, limit, &mut entries), + |trx, (data_key, from, limit, entries)| { + self.scan_trx(trx, data_key, from, limit, entries).boxed() + }, TransactOption::default(), ) .await?; - Ok(ok) + + entries.sort_by_key(|e| e.seqno); + Ok(entries) } async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { diff --git a/src/persist/src/location.rs b/src/persist/src/location.rs index 03f9c14b59602..4243f1c51d859 100644 --- a/src/persist/src/location.rs +++ b/src/persist/src/location.rs @@ -441,7 +441,7 @@ pub trait Consensus: std::fmt::Debug + Send + Sync { /// data at this key. async fn truncate(&self, key: &str, seqno: SeqNo) -> Result; - /// Returns true if [`truncate`] returns the number of versions deleted. + /// Returns true if [`Self::truncate`] returns the number of versions deleted. fn truncate_counts(&self) -> bool { true } @@ -508,6 +508,10 @@ impl Consensus for Tasked { ) .await? } + + fn truncate_counts(&self) -> bool { + self.0.truncate_counts() + } } /// Metadata about a particular blob stored by persist diff --git a/src/persist/src/unreliable.rs b/src/persist/src/unreliable.rs index efa32fa0cceee..552c651e0f3d4 100644 --- a/src/persist/src/unreliable.rs +++ b/src/persist/src/unreliable.rs @@ -228,6 +228,10 @@ impl Consensus for UnreliableConsensus { .run_op("truncate", || self.consensus.truncate(key, seqno)) .await } + + fn truncate_counts(&self) -> bool { + self.consensus.truncate_counts() + } } #[cfg(test)] diff --git a/test/persist/mzcompose.py b/test/persist/mzcompose.py index af3c4227a960d..78a602cdf9e5b 100644 --- a/test/persist/mzcompose.py +++ b/test/persist/mzcompose.py @@ -13,20 +13,29 @@ import argparse +from materialize import MZ_ROOT from materialize.mzcompose.composition import ( Composition, WorkflowArgumentParser, ) from materialize.mzcompose.service import Service from materialize.mzcompose.services.cockroach import Cockroach +from materialize.mzcompose.services.foundationdb import FoundationDB from materialize.mzcompose.services.postgres import PostgresMetadata SERVICES = [ Cockroach(setup_materialize=True, in_memory=True), PostgresMetadata(), + FoundationDB(), Service( "maelstrom-persist", - {"mzbuild": "maelstrom-persist", "volumes": ["./maelstrom:/store"]}, + { + "mzbuild": "maelstrom-persist", + "volumes": [ + "./maelstrom:/store", + f"{MZ_ROOT}/misc/foundationdb/fdb.cluster:/etc/foundationdb/fdb.cluster", + ], + }, ), ] @@ -50,7 +59,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "--consensus", type=str, - choices=["mem", "cockroach", "maelstrom", "postgres"], + choices=["mem", "cockroach", "maelstrom", "postgres", "foundationdb"], default="maelstrom", ) parser.add_argument( @@ -76,6 +85,17 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "postgres://root@postgres-metadata:26257?options=--search_path=consensus" ) c.up("postgres-metadata") + elif args.consensus == "foundationdb": + consensus_uri = "foundationdb:" + c.up("foundationdb") + c.run( + "foundationdb", + "-C", + "/etc/foundationdb/fdb.cluster", + "--exec", + "configure new single memory", + entrypoint="fdbcli", + ) else: # empty consensus uri defaults to Maelstrom consensus implementation consensus_uri = ""