diff --git a/Cargo.lock b/Cargo.lock index 6a1fd2b62ec7d..fab1c961864ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -571,6 +571,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "async-signal" version = "0.2.10" @@ -1427,6 +1438,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +dependencies = [ + "bitflags 2.9.4", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.106", +] + [[package]] name = "bindgen" version = "0.72.1" @@ -1440,7 +1471,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.106", ] @@ -3427,6 +3458,69 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "foundationdb" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514aeffe12bbcf2f64a746793cc1c2602006c705d3fc6285df024303d008cccf" +dependencies = [ + "async-recursion", + "async-trait", + "foundationdb-gen", + "foundationdb-macros", + "foundationdb-sys", + "foundationdb-tuple", + "futures", + "memchr", + "rand 0.8.5", + "serde", + "serde_bytes", + "serde_json", + "static_assertions", + "uuid", +] + +[[package]] +name = "foundationdb-gen" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef9d854866df33e1f4099769e2b9fa8bf8cf3bca707029ae6298d0e61bcae358" +dependencies = [ + "xml-rs", +] + +[[package]] +name = "foundationdb-macros" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be610412e5a92d89855fb15b099a57792b7dbdcf8ac74c5a0e24d9b7b1b6f7f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", + "try_map", +] + +[[package]] +name = "foundationdb-sys" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bae14dba30b8dcc4905a9189ebb18bc9db9744ef0ad8f2b94ef00d21e176964" +dependencies = [ + "bindgen 0.70.1", + "libc", +] + +[[package]] +name = "foundationdb-tuple" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1832c1fbe592de718893f7c3b48179a47757f8974d1498fece997454c2b0fa" +dependencies = [ + "memchr", + "uuid", +] + [[package]] name = "funty" version = "2.0.0" @@ -4946,7 +5040,7 @@ version = "0.17.3+10.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cef2a00ee60fe526157c9023edab23943fae1ce2ab6f4abb2a807c1746835de9" dependencies = [ - "bindgen", + "bindgen 0.72.1", "bzip2-sys", "cc", "libc", @@ -7078,6 +7172,7 @@ dependencies = [ "deadpool-postgres", "differential-dataflow", "fail", + "foundationdb", "futures-util", "itertools 0.14.0", "md-5", @@ -10677,6 +10772,12 @@ version = "0.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -12758,6 +12859,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" +[[package]] +name = "try_map" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1626d07cb5c1bb2cf17d94c0be4852e8a7c02b041acec9a8c5bdda99f9d580" + [[package]] name = "tungstenite" version = "0.24.0" @@ -13591,7 +13698,7 @@ dependencies = [ "axum", "axum-core", "base16ct", - "bindgen", + "bindgen 0.72.1", "bit-set", "bit-vec", "bitflags 2.9.4", @@ -13673,6 +13780,7 @@ dependencies = [ "postgres", "postgres-types", "predicates 3.1.3", + "prettyplease", "proc-macro2", "proptest", "proptest-derive", @@ -13755,6 +13863,12 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "xml-rs" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd8403733700263c6eb89f192880191f1b83e332f7a20371ddcf421c4a337c7" + [[package]] name = "xmlparser" version = "0.13.5" @@ -13839,7 +13953,7 @@ version = "2.0.16+zstd.1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" dependencies = [ - "bindgen", + "bindgen 0.72.1", "cc", "pkg-config", ] diff --git a/ci/builder/Dockerfile b/ci/builder/Dockerfile index 0a40b9c6b907b..5605814d06145 100644 --- a/ci/builder/Dockerfile +++ b/ci/builder/Dockerfile @@ -352,6 +352,12 @@ RUN curl -fsSL https://amazon-inspector-sbomgen.s3.amazonaws.com/1.8.1/linux/$AR && mv inspector-sbomgen-1.8.1/linux/$ARCH_GO/inspector-sbomgen /usr/local/bin \ && chmod +x /usr/local/bin/inspector-sbomgen +RUN arch_fdb=$(echo "$ARCH_GCC" | sed -e "s/x86_64/amd64/") \ + && curl -fsSL https://github.com/apple/foundationdb/releases/download/7.3.71/foundationdb-clients_7.3.71-1_$arch_fdb.deb > foundationdb-clients.deb \ + && if [ $ARCH_GCC = X64_64 ]; then echo '695193b8c6f8af9ec083221611b5f2925ef7a5e3c5e3c1d0af65d0dfbe99d13d foundationdb-clients.deb' | sha256sum --check; fi \ + && if [ $ARCH_GCC = aarch64 ]; then echo 'db1bbb72d57685a1c212c6456e6f0dfd1266c5c10c4adbc98d23a5d91bdbaff7 foundationdb-clients.deb' | sha256sum --check; fi \ + && dpkg -i foundationdb-clients.deb + # Hardcode some known SSH hosts, or else SSH will ask whether the host is # trustworthy on the first connection. diff --git a/ci/test/lint-main/checks/check-copyright.sh b/ci/test/lint-main/checks/check-copyright.sh index 3f4d960f1688d..18e7634280054 100755 --- a/ci/test/lint-main/checks/check-copyright.sh +++ b/ci/test/lint-main/checks/check-copyright.sh @@ -49,6 +49,7 @@ copyright_files=$(grep -vE \ -e '^ci/test/lint-deps/' \ -e '^misc/bazel/c_deps/patches/snappy-config.patch' \ -e '^misc/completions/.*' \ + -e '^misc/foundationdb/.*' \ -e '^misc/mcp-materialize/uv.lock' \ -e '^misc/mcp-materialize-agents/uv.lock' \ -e '^misc/mcp-materialize-agents/mcp_materialize_agents/system_prompt.md' \ diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index 26382112bec8e..76c63ba91023d 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -297,6 +297,19 @@ steps: agents: queue: hetzner-aarch64-8cpu-16gb + - id: testdrive-fdb + label: "Testdrive (FoundationDB)" + depends_on: build-aarch64 + timeout_in_minutes: 40 + inputs: [test/testdrive] + parallelism: 20 + plugins: + - ./ci/plugins/mzcompose: + composition: testdrive + args: [--foundationdb] + agents: + queue: hetzner-aarch64-8cpu-16gb + - id: cluster-tests label: "Cluster tests" depends_on: build-aarch64 diff --git a/deny.toml b/deny.toml index c8895ff5a81c6..6fabe1e92494e 100644 --- a/deny.toml +++ b/deny.toml @@ -148,7 +148,9 @@ skip = [ # Used by tower-lsp { name = "dashmap", version = "5.5.3" }, # Used by bindgen + { name = "bindgen", version = "0.70.1" }, { name = "itertools", version = "0.13.0" }, + { name = "rustc-hash", version = "1.1.0" }, # Used by pprof { name = "nix", version = "0.26.4" }, # Used by dynfmt @@ -205,6 +207,7 @@ name = "log" wrappers = [ "azure_svc_blobstorage", "apache-avro", + "bindgen", "buildid", "cookie_store", "deadpool-postgres", diff --git a/misc/foundationdb/fdb.cluster b/misc/foundationdb/fdb.cluster new file mode 100644 index 0000000000000..9e55dcb40a6e7 --- /dev/null +++ b/misc/foundationdb/fdb.cluster @@ -0,0 +1,2 @@ +# FoundationDB in Docker +docker:docker@foundationdb:4500 diff --git a/misc/foundationdb/foundationdb.conf b/misc/foundationdb/foundationdb.conf new file mode 100644 index 0000000000000..212ee71321249 --- /dev/null +++ b/misc/foundationdb/foundationdb.conf @@ -0,0 +1,47 @@ +## foundationdb.conf +## +## Configuration file for FoundationDB server processes +## Full documentation is available at +## https://apple.github.io/foundationdb/configuration.html#the-configuration-file + +[fdbmonitor] +user = foundationdb +group = foundationdb + +[general] +restart-delay = 60 +## by default, restart-backoff = restart-delay-reset-interval = restart-delay +# initial-restart-delay = 0 +# restart-backoff = 60 +# restart-delay-reset-interval = 60 +cluster-file = /etc/foundationdb/fdb.cluster +# delete-envvars = +# kill-on-configuration-change = true + +## Default parameters for individual fdbserver processes +[fdbserver] +command = /usr/sbin/fdbserver +public-address = auto:$ID +listen-address = public +datadir = /var/lib/foundationdb/data/$ID +logdir = /var/log/foundationdb +# logsize = 10MiB +# maxlogssize = 100MiB +# machine-id = +# datacenter-id = +# class = +# memory = 8GiB +# storage-memory = 1GiB +# cache-memory = 2GiB +# metrics-cluster = +# metrics-prefix = + +## An individual fdbserver process with id 4500 +## Parameters set here override defaults from the [fdbserver] section +[fdbserver.4500] + +[backup_agent] +command = /usr/lib/foundationdb/backup_agent/backup_agent +logdir = /var/log/foundationdb + +[backup_agent.1] diff --git a/misc/images/ubuntu-base/Dockerfile b/misc/images/ubuntu-base/Dockerfile index e0424a199b394..cfaf73f5407ba 100644 --- a/misc/images/ubuntu-base/Dockerfile +++ b/misc/images/ubuntu-base/Dockerfile @@ -24,3 +24,6 @@ RUN sed -i -e 's#http://archive\.ubuntu\.com#http://us-east-1.ec2.archive.ubuntu -e 's#http://ports\.ubuntu\.com#http://us-east-1.ec2.ports.ubuntu.com#' /etc/apt/sources.list.d/ubuntu.sources RUN apt-get update --fix-missing && TZ=UTC DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends eatmydata + +COPY --from=foundationdb/foundationdb:7.3.71 /usr/lib/libfdb_c.so /usr/lib/ +COPY --from=foundationdb/foundationdb:7.3.71 /usr/bin/fdbcli /usr/bin/ diff --git a/misc/python/materialize/cli/run.py b/misc/python/materialize/cli/run.py index 5575cfaf0077a..292ce045de319 100644 --- a/misc/python/materialize/cli/run.py +++ b/misc/python/materialize/cli/run.py @@ -117,6 +117,11 @@ def main() -> int: help="Postgres/CockroachDB connection string", default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES), ) + parser.add_argument( + "--consensus", + help="Postgres/CockroachDB consensus connection string", + default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES), + ) parser.add_argument( "--blob", help="Blob storage connection string", @@ -303,6 +308,10 @@ def main() -> int: print(f"persist-blob-url: {args.blob}") print(f"listeners config path: {args.listeners_config_path}") + if args.consensus is not None: + consensus = args.consensus + else: + consensus = args.postgres command += [ f"--listeners-config-path={args.listeners_config_path}", "--orchestrator=process", @@ -311,7 +320,7 @@ def main() -> int: f"--orchestrator-process-prometheus-service-discovery-directory={MZDATA}/prometheus", f"--orchestrator-process-scratch-directory={scratch}", "--secrets-controller=local-file", - f"--persist-consensus-url={args.postgres}?options=--search_path=consensus", + f"--persist-consensus-url={consensus}?options=--search_path=consensus", f"--persist-blob-url={args.blob}", f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle", f"--environment-id={environment_id}", diff --git a/misc/python/materialize/mzcompose/services/foundationdb.py b/misc/python/materialize/mzcompose/services/foundationdb.py new file mode 100644 index 0000000000000..e0d28cc6c66b5 --- /dev/null +++ b/misc/python/materialize/mzcompose/services/foundationdb.py @@ -0,0 +1,115 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from materialize import MZ_ROOT +from materialize.mzcompose.service import ( + Service, + ServiceConfig, +) + + +class FoundationDB(Service): + def __init__( + self, + name: str = "foundationdb", + image: str | None = None, + ports: list[str] = ["4500"], + environment: list[str] = [ + "FDB_NETWORKING_MODE=container", + ], + volumes: list[str] = [], + restart: str = "no", + version: str = "7.3.71", + ) -> None: + # command: list[str] = [ + # "postgres", + # "-c", + # "wal_level=logical", + # "-c", + # f"max_wal_senders={max_wal_senders}", + # "-c", + # f"max_replication_slots={max_replication_slots}", + # "-c", + # "max_connections=5000", + # ] + extra_command + + # if setup_materialize: + # path = os.path.relpath( + # MZ_ROOT / "misc" / "postgres" / "setup_materialize.sql", + # loader.composition_path, + # ) + # volumes = volumes + [ + # f"{path}:/docker-entrypoint-initdb.d/z_setup_materialize.sql" + # ] + # + # environment = environment + ["PGPORT=26257"] + + env_extra = [ + f"FDB_COORDINATOR_PORT={ports[0]}", + f"FDB_PORT={ports[0]}", + ] + + # command = dedent( + # """ + # /usr/bin/tini -g -- /var/fdb/scripts/fdb.bash & + # sleep 5 + # fdbcli -C /etc/foundationdb/fdb.cluster --exec "configure new single memory" + # fdbcli -C /etc/foundationdb/fdb.cluster --exec "status" + # wait + # """ + # ) + + if image is None: + image = f"foundationdb/foundationdb:{version}" + + config: ServiceConfig = {"image": image} + + volumes += [f"{MZ_ROOT}/misc/foundationdb/:/etc/foundationdb/"] + + config.update( + { + "image": image, + # "allow_host_ports": True, + # "command": ["bash", "-c", command], + "ports": ports, + "environment": env_extra + environment, + # "healthcheck": { + # "test": [ + # "CMD", + # "fdbcli", + # "--exec", + # "configure single memory ; status", + # ], + # "interval": "1s", + # "start_period": "30s", + # }, + "restart": restart, + "volumes": volumes, + } + ) + super().__init__(name=name, config=config) + + +# class PostgresMetadata(Postgres): +# def __init__(self, restart: str = "no") -> None: +# super().__init__( +# name="postgres-metadata", +# setup_materialize=True, +# ports=["26257"], +# restart=restart, +# ) + + +# CockroachOrPostgresMetadata = ( +# Cockroach if os.getenv("BUILDKITE_TAG", "") != "" else PostgresMetadata +# ) +# +# METADATA_STORE: str = ( +# "cockroach" if CockroachOrPostgresMetadata == Cockroach else "postgres-metadata" +# ) diff --git a/misc/python/materialize/mzcompose/services/materialized.py b/misc/python/materialize/mzcompose/services/materialized.py index fecad42f19b12..200e316cdda4d 100644 --- a/misc/python/materialize/mzcompose/services/materialized.py +++ b/misc/python/materialize/mzcompose/services/materialized.py @@ -100,6 +100,7 @@ def __init__( networks: ( dict[str, dict[str, list[str]]] | dict[str, dict[str, str]] | None ) = None, + consensus_foundationdb: bool = False, ) -> None: if name is None: name = "materialized" @@ -259,6 +260,10 @@ def __init__( # v0.92.0). f"MZ_ADAPTER_STASH_URL=postgres://root@{address}:26257?options=--search_path=adapter", ] + if consensus_foundationdb: + command += [ + "--persist-consensus-url=foundationdb:?options=--search_path=consensus", + ] command += [ "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0", @@ -336,6 +341,14 @@ def __init__( volumes += [f"{os.getcwd()}/license_key:/license_key/license_key"] + if ( + image_version is None or image_version >= "v0.160.0-dev" + ) and consensus_foundationdb: + print("Using foundationdb for consensus") + volumes += [ + f"{MZ_ROOT}/misc/foundationdb/fdb.cluster:/etc/foundationdb/fdb.cluster" + ] + if use_default_volumes: volumes += DEFAULT_MZ_VOLUMES volumes += volumes_extra diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 6ecfe64660651..28d28a9434fd9 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -3012,6 +3012,10 @@ impl Consensus for MetricsConsensus { .inc_by(u64::cast_from(deleted)); Ok(deleted) } + + fn truncate_counts(&self) -> bool { + self.consensus.truncate_counts() + } } /// A standard set of metrics for an async task. Call [TaskMetrics::instrument_task] to instrument diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index 79d447d10f1ce..3f35e0e203ebc 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -39,6 +39,7 @@ bytes = "1.10.1" deadpool-postgres = "0.10.3" differential-dataflow = "0.17.0" fail = { version = "0.5.1", features = ["failpoints"] } +foundationdb = { version = "0.9.2", features = ["embedded-fdb-include", "fdb-7_3"] } futures-util = "0.3.31" itertools = "0.14.0" md-5 = "0.10.6" diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index 7323be2c37b0a..324c58783617e 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -23,6 +23,7 @@ use mz_postgres_client::metrics::PostgresClientMetrics; use crate::azure::{AzureBlob, AzureBlobConfig}; use crate::file::{FileBlob, FileBlobConfig}; +use crate::foundationdb::{FdbConsensus, FdbConsensusConfig}; use crate::location::{Blob, Consensus, Determinate, ExternalError}; use crate::mem::{MemBlob, MemBlobConfig, MemConsensus}; use crate::metrics::S3BlobMetrics; @@ -212,6 +213,8 @@ impl BlobConfig { /// Config for an implementation of [Consensus]. #[derive(Debug, Clone)] pub enum ConsensusConfig { + /// Config for FoundationDB. + FoundationDB(FdbConsensusConfig), /// Config for [PostgresConsensus]. Postgres(PostgresConsensusConfig), /// Config for [MemConsensus], only available in testing. @@ -222,6 +225,9 @@ impl ConsensusConfig { /// Opens the associated implementation of [Consensus]. pub async fn open(self) -> Result, ExternalError> { match self { + ConsensusConfig::FoundationDB(config) => { + Ok(Arc::new(FdbConsensus::open(config).await?)) + } ConsensusConfig::Postgres(config) => { Ok(Arc::new(PostgresConsensus::open(config).await?)) } @@ -237,6 +243,9 @@ impl ConsensusConfig { dyncfg: Arc, ) -> Result { let config = match url.scheme() { + "fdb" | "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new( + url.clone(), + )?)), "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres( PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?, )), diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs new file mode 100644 index 0000000000000..415abaef93e79 --- /dev/null +++ b/src/persist/src/foundationdb.rs @@ -0,0 +1,567 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Implementation of [Consensus] backed by FoundationDB. +//! +//! We're storing the consensus data in a subspace at `/mz/consensus`. Each key maps to a subspace +//! with the following structure: +//! * `./seqno/ -> ` +//! * `./data// -> ` + +use std::io::Write; +use std::sync::OnceLock; + +use anyhow::anyhow; +use async_stream::try_stream; +use async_trait::async_trait; +use bytes::Bytes; +use foundationdb::api::NetworkAutoStop; +use foundationdb::directory::{ + Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace, +}; +use foundationdb::tuple::{ + PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, + unpack, +}; +use foundationdb::{ + Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption, + Transaction, +}; +use futures_util::future::FutureExt; +use mz_ore::url::SensitiveUrl; +use url::Url; + +use crate::error::Error; +use crate::location::{ + CaSResult, Consensus, Determinate, ExternalError, Indeterminate, ResultStream, SeqNo, + VersionedData, +}; + +impl From for ExternalError { + fn from(x: FdbError) -> Self { + if x.is_retryable() { + ExternalError::Indeterminate(Indeterminate::new(x.into())) + } else { + ExternalError::Determinate(Determinate::new(x.into())) + } + } +} + +impl From for ExternalError { + fn from(x: FdbBindingError) -> Self { + ExternalError::Determinate(Determinate::new(x.into())) + } +} + +/// FoundationDB network singleton. +/// +/// Normally, we'd need to drop this to clean up the network, but since we +/// never expect to exit normally, it's fine to leak it. +static FDB_NETWORK: OnceLock = OnceLock::new(); + +fn init_network() -> &'static NetworkAutoStop { + FDB_NETWORK.get_or_init(|| unsafe { foundationdb::boot() }) +} + +/// Configuration to connect to a FoundationDB backed implementation of [Consensus]. +#[derive(Clone, Debug)] +pub struct FdbConsensusConfig { + url: SensitiveUrl, +} + +impl FdbConsensusConfig { + /// Returns a new [FdbConsensusConfig] for use in production. + pub fn new(url: SensitiveUrl) -> Result { + Ok(FdbConsensusConfig { url }) + } + + pub fn new_for_test() -> Result { + Self::new(SensitiveUrl(Url::parse("foundationdb:").unwrap())) + } +} + +/// Implementation of [Consensus] over a Foundation database. +pub struct FdbConsensus { + seqno: DirectorySubspace, + data: DirectorySubspace, + db: Database, +} + +/// An error that can occur during a FoundationDB transaction. +/// This is either a FoundationDB error or an external error. +enum FdbTransactError { + FdbError(FdbError), + ExternalError(ExternalError), +} + +impl From for FdbTransactError { + fn from(value: FdbError) -> Self { + Self::FdbError(value) + } +} + +impl From for FdbTransactError { + fn from(value: ExternalError) -> Self { + Self::ExternalError(value) + } +} + +impl From for FdbTransactError { + fn from(value: PackError) -> Self { + ExternalError::Determinate(anyhow::Error::new(value).into()).into() + } +} + +impl From for ExternalError { + fn from(value: FdbTransactError) -> Self { + match value { + FdbTransactError::FdbError(e) => e.into(), + FdbTransactError::ExternalError(e) => e, + } + } +} + +impl From for ExternalError { + fn from(e: DirectoryError) -> Self { + ExternalError::Determinate(anyhow!("directory error: {e:?}").into()) + } +} + +impl TransactError for FdbTransactError { + fn try_into_fdb_error(self) -> Result { + match self { + Self::FdbError(e) => Ok(e), + other => Err(other), + } + } +} + +impl TuplePack for SeqNo { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + self.0.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for SeqNo { + fn unpack(input: &'de [u8], tuple_depth: TupleDepth) -> PackResult<(&'de [u8], Self)> { + u64::unpack(input, tuple_depth).map(|(rem, v)| (rem, SeqNo(v))) + } +} + +impl std::fmt::Debug for FdbConsensus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FdbConsensus") + .field("seqno", &self.seqno) + .field("data", &self.data) + .finish_non_exhaustive() + } +} + +impl FdbConsensus { + /// Open a FoundationDB [Consensus] instance with `config`. + pub async fn open(config: FdbConsensusConfig) -> Result { + let mut prefix = Vec::new(); + for (key, value) in config.url.query_pairs() { + match &*key { + "options" => { + if let Some(path) = value.strip_prefix("--search_path=") { + prefix = path.split('/').map(|s| s.to_owned()).collect(); + } else { + return Err(ExternalError::from(anyhow!( + "unrecognized FoundationDB URL options parameter: {value}", + ))); + } + } + key => { + return Err(ExternalError::from(anyhow!( + "unrecognized FoundationDB URL query parameter: {key}: {value}", + ))); + } + } + } + let path = if config.url.0.cannot_be_a_base() { + None + } else { + Some(config.url.0.path()) + }; + + let _ = init_network(); + + let db = Database::new(path)?; + let directory = DirectoryLayer::default(); + let path: Vec<_> = prefix + .iter() + .cloned() + .chain(std::iter::once("seqno".to_owned())) + .collect(); + let seqno = db + .run(async |trx, _maybe_commited| { + Ok(directory.create_or_open(&trx, &path, None, None).await) + }) + .await??; + let seqno = match seqno { + DirectoryOutput::DirectorySubspace(subspace) => subspace, + DirectoryOutput::DirectoryPartition(_partition) => { + return Err(ExternalError::from(anyhow!( + "consensus seqno cannot be a partition" + ))); + } + }; + let path: Vec<_> = prefix + .into_iter() + .chain(std::iter::once("data".to_owned())) + .collect(); + let data = db + .run(async |trx, _maybe_commited| { + Ok(directory.create_or_open(&trx, &path, None, None).await) + }) + .await??; + let data = match data { + DirectoryOutput::DirectorySubspace(subspace) => subspace, + DirectoryOutput::DirectoryPartition(_partition) => { + return Err(ExternalError::from(anyhow!( + "consensus data cannot be a partition" + ))); + } + }; + Ok(FdbConsensus { seqno, data, db }) + } + + /// Drops and recreates the `consensus` data in FoundationDB. + /// + /// ONLY FOR TESTING + #[cfg(test)] + pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> { + self.db + .run(async |trx, _maybe_commited| { + self.seqno.remove(&trx, &[]).await?; + self.data.remove(&trx, &[]).await?; + Ok(()) + }) + .await?; + Ok(()) + } + + async fn head_trx( + &self, + trx: &Transaction, + seqno_key: &Subspace, + data_key: &Subspace, + ) -> Result, FdbTransactError> { + let seqno = trx.get(seqno_key.bytes(), false).await?; + if let Some(seqno) = &seqno { + let seqno: SeqNo = unpack(seqno)?; + + let seqno_space = data_key.pack(&seqno); + let data = trx.get(&seqno_space, false).await?; + if let Some(data) = data { + let data = unpack::>(&data)?; + Ok(Some(VersionedData { + seqno, + data: Bytes::from(data), + })) + } else { + Err(ExternalError::Determinate( + anyhow!("inconsistent state: seqno present without data").into(), + ) + .into()) + } + } else { + Ok(None) + } + } + async fn compare_and_set_trx( + &self, + trx: &Transaction, + seqno_key: &Subspace, + data_key: &Subspace, + expected: &Option, + new: &VersionedData, + ) -> Result { + let seqno = trx + .get(seqno_key.bytes(), false) + .await? + .map(|data| unpack(&data)) + .transpose()?; + + if expected != &seqno { + return Ok(CaSResult::ExpectationMismatch); + } + + trx.set(seqno_key.bytes(), &pack(&new.seqno)); + + let data_seqno_key = data_key.pack(&new.seqno); + trx.set(&data_seqno_key, &pack(&new.data.as_ref())); + Ok(CaSResult::Committed) + } + + async fn scan_trx( + &self, + trx: &Transaction, + data_key: &Subspace, + from: &SeqNo, + limit: &usize, + entries: &mut Vec, + ) -> Result<(), FdbTransactError> { + let mut limit = *limit; + let seqno_start = data_key.pack(&from); + let seqno_end = data_key.pack(&SeqNo::maximum()); + + let mut range = RangeOption::from(seqno_start..=seqno_end); + range.limit = Some(limit); + + entries.clear(); + + loop { + let output = trx.get_range(&range, 1, false).await?; + entries.reserve(output.len()); + for key_value in &output { + let seqno = data_key.unpack(key_value.key())?; + let value: Vec = unpack(key_value.value())?; + entries.push(VersionedData { + seqno, + data: Bytes::from(value), + }); + } + + limit = limit.saturating_sub(output.len()); + + if let Some(last) = output.last() + && limit > 0 + { + range.begin = KeySelector::first_greater_than(last.key().to_vec()); + range.limit = Some(limit); + } else { + break; + } + } + Ok(()) + } + async fn truncate_trx( + &self, + trx: &Transaction, + seqno_key: &Subspace, + data_key: &Subspace, + until: &SeqNo, + ) -> Result<(), FdbTransactError> { + let seqno = trx.get(seqno_key.bytes(), false).await?; + if let Some(seqno) = &seqno { + let current_seqno: SeqNo = unpack(seqno)?; + if current_seqno < *until { + return Err(ExternalError::Determinate( + anyhow!("upper bound too high for truncate: {until}").into(), + ) + .into()); + } + } else { + return Err(ExternalError::Determinate(anyhow!("no entries for key").into()).into()); + } + let key_space_start = data_key.pack(&SeqNo::minimum()); + let key_space_end = data_key.pack(&until); + + trx.clear_range(&key_space_start, &key_space_end); + Ok(()) + } +} + +#[async_trait] +impl Consensus for FdbConsensus { + fn list_keys(&self) -> ResultStream<'_, String> { + Box::pin(try_stream! { + let keys: Vec = self + .db + .run(async |trx, _maybe_commited| { + let mut range = RangeOption::from(self.seqno.range()); + let mut keys = Vec::new(); + loop { + let values = trx.get_range(&range, 1, false).await?; + for value in &values { + let key: String = self.seqno.unpack(value.key()).map_err(FdbBindingError::PackError)?; + keys.push(key); + } + if let Some(last) = values.last() { + range.begin = KeySelector::first_greater_than(last.key().to_vec()); + } else { + break; + } + } + Ok(keys) + }).await?; + + for shard in keys { + yield shard; + } + }) + } + + async fn head(&self, key: &str) -> Result, ExternalError> { + let seqno_key = self.seqno.subspace(&key); + let data_key = self.data.subspace(&key); + + let ok = self + .db + .transact_boxed( + (&seqno_key, &data_key), + |trx, (seqno_key, data_key)| self.head_trx(trx, seqno_key, data_key).boxed(), + TransactOption::default(), + ) + .await?; + Ok(ok) + } + + async fn compare_and_set( + &self, + key: &str, + expected: Option, + new: VersionedData, + ) -> Result { + if let Some(expected) = expected { + if new.seqno <= expected { + return Err(Error::from( + format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}", + new.seqno, expected)).into()); + } + } + if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") { + return Err(ExternalError::from(anyhow!( + "sequence numbers must fit within [0, i64::MAX], received: {:?}", + new.seqno + ))); + } + + let seqno_key = self.seqno.subspace(&key); + let data_key = self.data.subspace(&key); + + let ok = self + .db + .transact_boxed( + (expected, &new), + |trx, (expected, new)| { + self.compare_and_set_trx(trx, &seqno_key, &data_key, expected, new) + .boxed() + }, + TransactOption::default(), + ) + .await?; + Ok(ok) + } + + async fn scan( + &self, + key: &str, + from: SeqNo, + limit: usize, + ) -> Result, ExternalError> { + let data_key = self.data.subspace(&key); + let mut entries = Vec::new(); + self.db + .transact_boxed( + (&data_key, from, limit, &mut entries), + |trx, (data_key, from, limit, entries)| { + self.scan_trx(trx, data_key, from, limit, entries).boxed() + }, + TransactOption::default(), + ) + .await?; + + entries.sort_by_key(|e| e.seqno); + Ok(entries) + } + + async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { + let seqno_key = self.seqno.subspace(&key); + let data_key = self.data.subspace(&key); + + self.db + .transact_boxed( + (&seqno_key, &data_key, seqno), + |trx, (seqno_key, data_key, seqno)| { + self.truncate_trx(trx, seqno_key, data_key, seqno).boxed() + }, + TransactOption::idempotent(), + ) + .await?; + Ok(0) + } + + fn truncate_counts(&self) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use crate::location::tests::consensus_impl_test; + + use super::*; + + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn fdb_consensus() -> Result<(), ExternalError> { + let config = FdbConsensusConfig::new_for_test()?; + + { + let fdb = FdbConsensus::open(config.clone()).await?; + fdb.drop_and_recreate().await?; + } + + consensus_impl_test(|| FdbConsensus::open(config.clone())).await?; + + // and now verify the implementation-specific `drop_and_recreate` works as intended + let consensus = FdbConsensus::open(config.clone()).await?; + let key = Uuid::new_v4().to_string(); + let mut state = VersionedData { + seqno: SeqNo(5), + data: Bytes::from("abc"), + }; + + assert_eq!( + consensus.compare_and_set(&key, None, state.clone()).await, + Ok(CaSResult::Committed), + ); + state.seqno = SeqNo(6); + assert_eq!( + consensus + .compare_and_set(&key, Some(SeqNo(5)), state.clone()) + .await, + Ok(CaSResult::Committed), + ); + state.seqno = SeqNo(129 + 5); + assert_eq!( + consensus + .compare_and_set(&key, Some(SeqNo(6)), state.clone()) + .await, + Ok(CaSResult::Committed), + ); + + assert_eq!(consensus.head(&key).await, Ok(Some(state.clone()))); + + println!("--- SCANNING ---"); + + for data in consensus.scan(&key, SeqNo(129), 10).await? { + println!( + "scan data: seqno: {:?}, {} bytes", + data.seqno, + data.data.len() + ); + } + + consensus.drop_and_recreate().await?; + + assert_eq!(consensus.head(&key).await, Ok(None)); + + Ok(()) + } +} diff --git a/src/persist/src/lib.rs b/src/persist/src/lib.rs index 339790d9734f9..3af83904c04ca 100644 --- a/src/persist/src/lib.rs +++ b/src/persist/src/lib.rs @@ -21,6 +21,7 @@ pub mod azure; pub mod cfg; pub mod error; pub mod file; +mod foundationdb; pub mod generated; pub mod indexed; pub mod intercept; diff --git a/src/persist/src/location.rs b/src/persist/src/location.rs index 05d66cc53e00b..4243f1c51d859 100644 --- a/src/persist/src/location.rs +++ b/src/persist/src/location.rs @@ -83,6 +83,11 @@ impl SeqNo { pub fn minimum() -> Self { SeqNo(0) } + + /// A maximum value. + pub fn maximum() -> Self { + SeqNo(u64::MAX) + } } impl RustType for SeqNo { @@ -115,6 +120,12 @@ impl std::error::Error for Determinate { } } +impl From for Determinate { + fn from(inner: anyhow::Error) -> Self { + Self::new(inner) + } +} + impl Determinate { /// Return a new Determinate wrapping the given error. /// @@ -429,6 +440,11 @@ pub trait Consensus: std::fmt::Debug + Send + Sync { /// `seqno` is greater than the current sequence number, or if there is no /// data at this key. async fn truncate(&self, key: &str, seqno: SeqNo) -> Result; + + /// Returns true if [`Self::truncate`] returns the number of versions deleted. + fn truncate_counts(&self) -> bool { + true + } } #[async_trait] @@ -492,6 +508,10 @@ impl Consensus for Tasked { ) .await? } + + fn truncate_counts(&self) -> bool { + self.0.truncate_counts() + } } /// Metadata about a particular blob stored by persist @@ -625,7 +645,7 @@ pub mod tests { use anyhow::anyhow; use futures_util::TryStreamExt; - use mz_ore::assert_err; + use mz_ore::{assert_err, assert_ok}; use uuid::Uuid; use crate::location::Blob; @@ -1012,7 +1032,11 @@ pub mod tests { ); // Can remove the previous write with the appropriate truncation. - assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1)); + if consensus.truncate_counts() { + assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1)); + } else { + assert_ok!(consensus.truncate(&key, SeqNo(6)).await); + } // Verify that the old write is indeed deleted. assert_eq!( @@ -1022,7 +1046,11 @@ pub mod tests { // Truncate is idempotent and can be repeated. The return value // indicates we didn't do any work though. - assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0)); + if consensus.truncate_counts() { + assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0)); + } else { + assert_ok!(consensus.truncate(&key, SeqNo(6)).await); + } // Make sure entries under different keys don't clash. let other_key = Uuid::new_v4().to_string(); @@ -1079,7 +1107,11 @@ pub mod tests { consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await, Ok(CaSResult::Committed), ); - assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2)); + if consensus.truncate_counts() { + assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2)); + } else { + assert_ok!(consensus.truncate(&key, SeqNo(12)).await); + } // Sequence numbers used within Consensus have to be within [0, i64::MAX]. diff --git a/src/persist/src/unreliable.rs b/src/persist/src/unreliable.rs index efa32fa0cceee..552c651e0f3d4 100644 --- a/src/persist/src/unreliable.rs +++ b/src/persist/src/unreliable.rs @@ -228,6 +228,10 @@ impl Consensus for UnreliableConsensus { .run_op("truncate", || self.consensus.truncate(key, seqno)) .await } + + fn truncate_counts(&self) -> bool { + self.consensus.truncate_counts() + } } #[cfg(test)] diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 634be3dfb58b1..47f59f981f7f6 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -105,6 +105,7 @@ portable-atomic = { version = "1.11.1", features = ["require-cas"] } postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4"] } postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.3" } +prettyplease = { version = "0.2.37", default-features = false, features = ["verbatim"] } proc-macro2 = { version = "1.0.101", features = ["span-locations"] } proptest = { version = "1.7.0" } prost = { version = "0.13.5", features = ["no-recursion-limit", "prost-derive"] } @@ -251,6 +252,7 @@ portable-atomic = { version = "1.11.1", features = ["require-cas"] } postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4"] } postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.3" } +prettyplease = { version = "0.2.37", default-features = false, features = ["verbatim"] } proc-macro2 = { version = "1.0.101", features = ["span-locations"] } proptest = { version = "1.7.0" } proptest-derive = { version = "0.5.1", default-features = false, features = ["boxed_union"] } diff --git a/test/persist/mzcompose.py b/test/persist/mzcompose.py index af3c4227a960d..78a602cdf9e5b 100644 --- a/test/persist/mzcompose.py +++ b/test/persist/mzcompose.py @@ -13,20 +13,29 @@ import argparse +from materialize import MZ_ROOT from materialize.mzcompose.composition import ( Composition, WorkflowArgumentParser, ) from materialize.mzcompose.service import Service from materialize.mzcompose.services.cockroach import Cockroach +from materialize.mzcompose.services.foundationdb import FoundationDB from materialize.mzcompose.services.postgres import PostgresMetadata SERVICES = [ Cockroach(setup_materialize=True, in_memory=True), PostgresMetadata(), + FoundationDB(), Service( "maelstrom-persist", - {"mzbuild": "maelstrom-persist", "volumes": ["./maelstrom:/store"]}, + { + "mzbuild": "maelstrom-persist", + "volumes": [ + "./maelstrom:/store", + f"{MZ_ROOT}/misc/foundationdb/fdb.cluster:/etc/foundationdb/fdb.cluster", + ], + }, ), ] @@ -50,7 +59,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "--consensus", type=str, - choices=["mem", "cockroach", "maelstrom", "postgres"], + choices=["mem", "cockroach", "maelstrom", "postgres", "foundationdb"], default="maelstrom", ) parser.add_argument( @@ -76,6 +85,17 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "postgres://root@postgres-metadata:26257?options=--search_path=consensus" ) c.up("postgres-metadata") + elif args.consensus == "foundationdb": + consensus_uri = "foundationdb:" + c.up("foundationdb") + c.run( + "foundationdb", + "-C", + "/etc/foundationdb/fdb.cluster", + "--exec", + "configure new single memory", + entrypoint="fdbcli", + ) else: # empty consensus uri defaults to Maelstrom consensus implementation consensus_uri = "" diff --git a/test/testdrive/mzcompose.py b/test/testdrive/mzcompose.py index ea7a85f8c0e0e..6009c048123a5 100644 --- a/test/testdrive/mzcompose.py +++ b/test/testdrive/mzcompose.py @@ -24,6 +24,7 @@ ) from materialize.mzcompose.services.azurite import Azurite from materialize.mzcompose.services.fivetran_destination import FivetranDestination +from materialize.mzcompose.services.foundationdb import FoundationDB from materialize.mzcompose.services.kafka import Kafka from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.minio import Minio @@ -48,6 +49,7 @@ Materialized(external_blob_store=True, sanity_restart=False), FivetranDestination(volumes_extra=["tmp:/share/tmp"]), Testdrive(external_blob_store=True), + FoundationDB(), ] @@ -105,6 +107,12 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" ) + parser.add_argument( + "--foundationdb", + action="store_true", + help="Use FoundationDB for internal metadata storage instead of Postgres", + ) + parser.add_argument( "files", nargs="*", @@ -138,8 +146,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: additional_system_parameter_defaults=additional_system_parameter_defaults, default_replication_factor=1, sanity_restart=False, + consensus_foundationdb=args.foundationdb, ) - testdrive = Testdrive( kafka_default_partitions=args.kafka_default_partitions, aws_region=args.aws_region, @@ -150,10 +158,25 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: blob_store_is_azure=args.azurite, fivetran_destination=True, fivetran_destination_files_path="/share/tmp", - entrypoint_extra=[ - f"--var=uses-redpanda={args.redpanda}", - ], + entrypoint_extra=( + [ + f"--var=uses-redpanda={args.redpanda}", + ] + + ["--consistency-checks=disable"] + if args.foundationdb + else [] + ), ) + if args.foundationdb: + c.up("foundationdb") + c.run( + "foundationdb", + "-C", + "/etc/foundationdb/fdb.cluster", + "--exec", + "configure new single memory", + entrypoint="fdbcli", + ) with c.override(testdrive, materialized): c.up(*dependencies, Service("testdrive", idle=True))