diff --git a/Cargo.lock b/Cargo.lock index 74ccc1f..f973aeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,15 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -291,6 +300,9 @@ name = "bitflags" version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +dependencies = [ + "serde", +] [[package]] name = "blake3" @@ -667,6 +679,7 @@ checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" dependencies = [ "const-oid", "der_derive", + "pem-rfc7468", "zeroize", ] @@ -738,6 +751,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -773,6 +787,12 @@ dependencies = [ "litrs", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dtoa" version = "1.0.10" @@ -805,6 +825,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] + [[package]] name = "embedded-io" version = "0.4.0" @@ -880,6 +909,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "event-listener" version = "5.4.0" @@ -918,6 +958,7 @@ dependencies = [ "p2panda-stream", "p2panda-sync", "serde", + "sqlx", "tempfile", "thiserror 2.0.12", "tokio", @@ -1055,6 +1096,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -1280,6 +1332,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "heck" version = "0.5.0" @@ -1371,6 +1432,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1396,6 +1466,15 @@ version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a8575493d277c9092b988c780c94737fb9fd8651a1001e16bee3eccfc1baedb" +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -1876,7 +1955,7 @@ dependencies = [ "futures-buffered", "futures-lite", "genawaiter", - "hashlink", + "hashlink 0.9.1", "hex", "iroh", "iroh-base", @@ -2139,6 +2218,9 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin", +] [[package]] name = "libc" @@ -2146,6 +2228,23 @@ version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +[[package]] +name = "libm" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" + +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.9.3" @@ -2224,6 +2323,16 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "md5" version = "0.7.0" @@ -2536,6 +2645,23 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand 0.8.5", + "smallvec", + "zeroize", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2551,6 +2677,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2558,6 +2695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -2730,7 +2868,10 @@ name = "p2panda-store" version = "0.3.0" source = "git+https://github.com/p2panda/p2panda?rev=79b7682deb5f253224745b7ad9a7faab90e89e87#79b7682deb5f253224745b7ad9a7faab90e89e87" dependencies = [ + "ciborium", + "hex", "p2panda-core", + "sqlx", "thiserror 2.0.12", "trait-variant", ] @@ -2810,6 +2951,15 @@ dependencies = [ "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2917,6 +3067,17 @@ dependencies = [ "z32", ] +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -2927,6 +3088,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "pnet_base" version = "0.34.0" @@ -3503,6 +3670,26 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rsa" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core 0.6.4", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "rtnetlink" version = "0.13.1" @@ -3901,6 +4088,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ + "digest", "rand_core 0.6.4", ] @@ -3960,12 +4148,210 @@ dependencies = [ "der", ] +[[package]] +name = "sqlx" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4410e73b3c0d8442c5f99b425d7a435b5ee0ae4167b3196771dd3f7a01be745f" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a007b6936676aa9ab40207cde35daab0a04b823be8ae004368c0793b96a61e0" +dependencies = [ + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.15.2", + "hashlink 0.10.0", + "indexmap", + "log", + "memchr", + "once_cell", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3112e2ad78643fef903618d78cf0aec1cb3134b019730edb039b69eaf531f310" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.100", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e9f90acc5ab146a99bf5061a7eb4976b573f560bc898ef3bf8435448dd5e7ad" +dependencies = [ + "dotenvy", + "either", + "heck", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 2.0.100", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233" +dependencies = [ + "atoi", + "base64", + "bitflags 2.9.0", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.12", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613" +dependencies = [ + "atoi", + "base64", + "bitflags 2.9.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.12", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f85ca71d3a5b24e64e1d08dd8fe36c6c95c339a896cc33068148906784620540" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing", + "url", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "struct_iterable" version = "0.1.1" @@ -4575,6 +4961,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -4590,6 +4982,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -4672,6 +5070,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -4712,6 +5116,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -4834,6 +5244,16 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "whoami" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +dependencies = [ + "redox_syscall", + "wasite", +] + [[package]] name = "widestring" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 00c1aef..074899e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ mime = "0.3.17" p2panda-blobs = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" } p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" } p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" } -p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" } +p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87", features = ["sqlite"] } p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" } p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87", features = [ "mdns", @@ -32,6 +32,7 @@ tokio-util = "0.7.13" tokio-utils = "0.1.2" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +sqlx = "0.8.3" [dev-dependencies] tempfile = "3.17.1" diff --git a/migrations/20250331144110_acked-operations.sql b/migrations/20250331144110_acked-operations.sql new file mode 100644 index 0000000..f23feef --- /dev/null +++ b/migrations/20250331144110_acked-operations.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS acked_v1 ( + public_key TEXT NOT NULL, + log_id TEXT NOT NULL, + seq_num TEXT NOT NULL, + PRIMARY_KEY(public_key, log_id) +); \ No newline at end of file diff --git a/src/api.rs b/src/api.rs index b784752..6a14d25 100644 --- a/src/api.rs +++ b/src/api.rs @@ -13,8 +13,7 @@ use crate::{ topic::{Topic, TopicMap}, }; -pub struct NodeApi -{ +pub struct NodeApi { pub node: Node, pub topic_map: TopicMap, pub subscriptions: HashMap<[u8; 32], Topic>, @@ -171,30 +170,26 @@ impl Serialize for ApiError { #[cfg(test)] mod tests { use p2panda_core::PrivateKey; - use p2panda_store::MemoryStore; use crate::api::NodeApi; use crate::extensions::{LogId, NodeExtensions}; use crate::stream::{EventData, StreamEvent}; - use crate::topic::TopicMap; + use crate::topic::{Topic, TopicMap}; use super::Node; #[tokio::test] async fn subscribe_publish_persisted() { let private_key = PrivateKey::new(); - let store = MemoryStore::::new(); - let blobs_root_dir = tempfile::tempdir().unwrap().into_path(); let topic_map = TopicMap::new(); let (node, mut stream_rx, _system_rx) = Node::new( "my_network".to_string(), private_key.clone(), + topic_map.clone(), + None, None, None, - store, - blobs_root_dir, - topic_map.clone(), ) .await .unwrap(); @@ -237,17 +232,14 @@ mod tests { #[tokio::test] async fn subscribe_publish_ephemeral() { let node_private_key = PrivateKey::new(); - let store = MemoryStore::::new(); - let blobs_root_dir = tempfile::tempdir().unwrap().into_path(); let topic_map = TopicMap::new(); - let (node, _stream_rx, _system_rx) = Node::new( + let (node, _stream_rx, _system_rx) = Node::::new( "my_network".to_string(), node_private_key.clone(), + topic_map.clone(), + None, None, None, - store, - blobs_root_dir, - topic_map.clone(), ) .await .unwrap(); @@ -265,34 +257,28 @@ mod tests { #[tokio::test] async fn two_peers_subscribe() { let node_a_private_key = PrivateKey::new(); - let store = MemoryStore::::new(); - let blobs_root_dir = tempfile::tempdir().unwrap().into_path(); let topic_map = TopicMap::new(); - let (node_a, _node_a_stream_rx, _system_rx) = Node::new( + let (node_a, _stream_rx, _system_rx) = Node::::new( "my_network".to_string(), node_a_private_key.clone(), + topic_map.clone(), + None, None, None, - store, - blobs_root_dir, - topic_map.clone(), ) .await .unwrap(); let mut node_a_api = NodeApi::new(node_a, topic_map); let node_b_private_key = PrivateKey::new(); - let store = MemoryStore::::new(); - let blobs_root_dir = tempfile::tempdir().unwrap().into_path(); let topic_map = TopicMap::new(); - let (node_b, mut node_b_stream_rx, _system_rx) = Node::new( + let (node_b, mut node_b_stream_rx, _system_rx) = Node::::new( "my_network".to_string(), node_b_private_key.clone(), + topic_map.clone(), + None, None, None, - store, - blobs_root_dir, - topic_map.clone(), ) .await .unwrap(); @@ -339,34 +325,28 @@ mod tests { #[tokio::test] async fn two_peers_sync() { let node_a_private_key = PrivateKey::new(); - let store = MemoryStore::::new(); - let blobs_root_dir = tempfile::tempdir().unwrap().into_path(); let topic_map = TopicMap::new(); let (node_a, mut node_a_stream_rx, _system_rx) = Node::new( "my_network".to_string(), node_a_private_key.clone(), + topic_map.clone(), + None, None, None, - store, - blobs_root_dir, - topic_map.clone(), ) .await .unwrap(); let mut node_a_api = NodeApi::new(node_a, topic_map); let node_b_private_key = PrivateKey::new(); - let store = MemoryStore::::new(); - let blobs_root_dir = tempfile::tempdir().unwrap().into_path(); let topic_map = TopicMap::new(); let (node_b, mut node_b_stream_rx, _system_rx) = Node::new( "my_network".to_string(), node_b_private_key.clone(), + topic_map.clone(), + None, None, None, - store, - blobs_root_dir, - topic_map.clone(), ) .await .unwrap(); diff --git a/src/node.rs b/src/node.rs index 83f4932..240e403 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::path::PathBuf; -use anyhow::{Result, anyhow}; +use anyhow::{anyhow, Result}; use futures_util::future::{MapErr, Shared}; use futures_util::{FutureExt, TryFutureExt}; use iroh_io::AsyncSliceReader; @@ -11,9 +11,10 @@ use p2panda_discovery::mdns::LocalDiscovery; use p2panda_net::{ Network, NetworkBuilder, NetworkId, RelayUrl, SyncConfiguration, SystemEvent, TopicId, }; -use p2panda_store::{LogId, MemoryStore}; -use p2panda_sync::TopicQuery; +use p2panda_store::sqlite::store::{connection_pool, create_database, run_pending_migrations}; +use p2panda_store::{LogId, SqliteStore}; use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap}; +use p2panda_sync::TopicQuery; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::pin; @@ -23,15 +24,36 @@ use tokio_stream::StreamExt; use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error}; +use crate::stream::{StreamController, StreamControllerError, StreamEvent, ToStreamController}; + use super::{ actor::{NodeActor, ToNodeActor}, operation::encode_gossip_message, - stream::{StreamController, StreamControllerError, StreamEvent, ToStreamController}, }; +// @TODO(glyph): Can we move this into p2panda-store? +async fn initialise_database(url: &str) -> Result> +where + L: LogId + Send + Sync + Serialize + for<'a> Deserialize<'a> + 'static, + E: Extensions + Extension + Extension + Send + Sync + 'static, +{ + create_database(&url).await?; + + let pool = connection_pool(&url, 1).await?; + + if run_pending_migrations(&pool).await.is_err() { + pool.close().await; + panic!("Database migration failed"); + } + + let store = SqliteStore::new(pool); + + Ok(store) +} + pub struct Node { pub private_key: PrivateKey, - pub store: MemoryStore, + pub store: SqliteStore, pub network: Network, blobs: Blobs, #[allow(dead_code)] @@ -51,11 +73,10 @@ where pub async fn new + 'static>( network_name: String, private_key: PrivateKey, - bootstrap_node_id: Option, - relay_url: Option, - store: MemoryStore, - blobs_root_dir: PathBuf, topic_map: TM, + relay_url: Option, + bootstrap_node_id: Option, + app_data_dir: Option, ) -> Result<( Self, mpsc::Receiver>, @@ -65,6 +86,18 @@ where let rt = tokio::runtime::Handle::current(); + // Instantiate the SQLite store. + // + // This takes care of creating the database if it doesn't exist, setting up a connection + // pool and running any pending migrations. + let store = if let Some(path) = app_data_dir.as_ref() { + let path = path.display().to_string(); + initialise_database(&path).await? + } else { + let url = format!("sqlite://toolkitty?mode=memory&cache=private"); + initialise_database(&url).await? + }; + let (stream, stream_tx, stream_rx) = StreamController::new(store.clone()); let (ephemeral_tx, mut ephemeral_rx) = mpsc::channel(1024); @@ -120,9 +153,15 @@ where network_builder = network_builder.bootstrap(); } - let blobs_store = BlobsStore::load(blobs_root_dir).await?; - let (network, blobs) = Blobs::from_builder(network_builder, blobs_store).await?; + let blobs_store = match app_data_dir { + Some(app_data_dir) => BlobsStore::load(app_data_dir).await?, + None => { + let temp_dir = tempfile::tempdir()?; + BlobsStore::load(temp_dir.into_path()).await? + } + }; + let (network, blobs) = Blobs::from_builder(network_builder, blobs_store).await?; let system_events_rx = network.events().await?; let (network_actor_tx, network_actor_rx) = mpsc::channel(64); diff --git a/src/stream.rs b/src/stream/controller.rs similarity index 69% rename from src/stream.rs rename to src/stream/controller.rs index 14e8e90..7077e73 100644 --- a/src/stream.rs +++ b/src/stream/controller.rs @@ -1,18 +1,19 @@ use std::collections::HashMap; -use std::future::Future; -use std::sync::Arc; use p2panda_core::{Body, Extension, Extensions, Hash, Header, PruneFlag, PublicKey}; -use p2panda_store::{LogStore, MemoryStore, OperationStore}; +use p2panda_store::MemoryStore; use p2panda_stream::IngestExt; use serde::Serialize; use thiserror::Error; -use tokio::sync::{RwLock, mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error}; +use super::{StreamEvent, StreamMemoryStore}; +use super::store::StreamControllerStore; + // use super::extensions::{Extensions, LogId, LogPath, Stream, StreamOwner, StreamRootHash}; #[allow(clippy::large_enum_variant, dead_code)] @@ -176,67 +177,6 @@ where } } -#[derive(Clone, Debug, PartialEq)] -pub struct StreamEvent { - pub header: Option>, - pub data: EventData, -} - -impl StreamEvent { - pub fn from_operation(header: Header, body: Body) -> Self { - Self { - header: Some(header), - data: EventData::Application(body.to_bytes()), - } - } - - pub fn from_bytes(payload: Vec) -> Self { - Self { - header: None, - data: EventData::Ephemeral(payload), - } - } - - #[allow(dead_code)] - pub fn from_error(error: StreamError, header: Header) -> Self { - Self { - header: Some(header), - data: EventData::Error(error), - } - } -} - -#[allow(dead_code)] -#[derive(Clone, Debug, PartialEq, Serialize)] -#[serde(untagged)] -pub enum EventData { - Application(Vec), - Ephemeral(Vec), - Error(StreamError), -} - -#[allow(dead_code)] -#[derive(Clone, Debug, Error)] -pub enum StreamError { - #[error(transparent)] - IngestError(p2panda_stream::operation::IngestError), -} - -impl PartialEq for StreamError { - fn eq(&self, other: &Self) -> bool { - self.to_string() == other.to_string() - } -} - -impl Serialize for StreamError { - fn serialize(&self, serializer: S) -> Result - where - S: serde::ser::Serializer, - { - serializer.serialize_str(&self.to_string()) - } -} - #[derive(Debug, Error)] pub enum StreamControllerError { #[error("tried do ack unknown operation {0}")] @@ -255,118 +195,6 @@ impl Serialize for StreamControllerError { } } -type Operation = (Header, Option, Vec); - -trait StreamControllerStore -where - E: p2panda_core::Extensions, -{ - type Error; - - /// Mark operation as acknowledged. - fn ack(&self, operation_id: Hash) -> impl Future>; - - /// Return all operations from given logs which have not yet been acknowledged. - fn unacked( - &self, - logs: HashMap>, - ) -> impl Future>, Self::Error>>; -} - -#[derive(Clone, Debug)] -struct StreamMemoryStore { - operation_store: MemoryStore, - - /// Log-height of latest ack per log. - acked: Arc>>, -} - -impl StreamMemoryStore { - pub fn new(operation_store: MemoryStore) -> Self { - Self { - operation_store, - acked: Arc::new(RwLock::new(HashMap::new())), - } - } -} - -impl StreamControllerStore for StreamMemoryStore -where - L: p2panda_store::LogId + Send + Sync, - E: p2panda_core::Extensions + Extension + Send + Sync, -{ - type Error = StreamControllerError; - - async fn ack(&self, operation_id: Hash) -> Result<(), Self::Error> { - let Ok(Some((header, _))) = self.operation_store.get_operation(operation_id).await else { - return Err(StreamControllerError::AckedUnknownOperation(operation_id)); - }; - - let mut acked = self.acked.write().await; - - let log_id: Option = header.extension(); - let Some(log_id) = log_id else { - return Err(StreamControllerError::MissingLogId(operation_id)); - }; - - // Remember the "acknowledged" log-height for this log. - acked.insert((header.public_key, log_id), header.seq_num); - - Ok(()) - } - - async fn unacked( - &self, - logs: HashMap>, - ) -> Result>, Self::Error> { - let acked = self.acked.read().await; - - let mut result = Vec::new(); - for (public_key, log_ids) in logs { - for log_id in log_ids { - match acked.get(&(public_key, log_id.clone())) { - Some(ack_log_height) => { - let Ok(operations) = self - .operation_store - // Get all operations from > ack_log_height - .get_log(&public_key, &log_id, Some(*ack_log_height + 1)) - .await; - - if let Some(operations) = operations { - for (header, body) in operations { - // @TODO(adz): Getting the encoded header bytes through encoding - // like this feels redundant and should be possible to retreive - // just from calling "get_log". - let header_bytes = header.to_bytes(); - result.push((header, body, header_bytes)); - } - } - } - None => { - let Ok(operations) = self - .operation_store - // Get all operations from > ack_log_height - .get_log(&public_key, &log_id, Some(0)) - .await; - - if let Some(operations) = operations { - for (header, body) in operations { - // @TODO(adz): Getting the encoded header bytes through encoding - // like this feels redundant and should be possible to retreive - // just from calling "get_log". - let header_bytes = header.to_bytes(); - result.push((header, body, header_bytes)); - } - } - } - } - } - } - - Ok(result) - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/stream/mod.rs b/src/stream/mod.rs new file mode 100644 index 0000000..1c5ec01 --- /dev/null +++ b/src/stream/mod.rs @@ -0,0 +1,70 @@ +mod controller; +mod store; + +pub use controller::*; +pub use store::memory::StreamMemoryStore; + +use p2panda_core::{Body, Header}; +use serde::Serialize; +use thiserror::Error; + +#[derive(Clone, Debug, PartialEq)] +pub struct StreamEvent { + pub header: Option>, + pub data: EventData, +} + +impl StreamEvent { + pub fn from_operation(header: Header, body: Body) -> Self { + Self { + header: Some(header), + data: EventData::Application(body.to_bytes()), + } + } + + pub fn from_bytes(payload: Vec) -> Self { + Self { + header: None, + data: EventData::Ephemeral(payload), + } + } + + #[allow(dead_code)] + pub fn from_error(error: StreamError, header: Header) -> Self { + Self { + header: Some(header), + data: EventData::Error(error), + } + } +} + +#[allow(dead_code)] +#[derive(Clone, Debug, PartialEq, Serialize)] +#[serde(untagged)] +pub enum EventData { + Application(Vec), + Ephemeral(Vec), + Error(StreamError), +} + +#[allow(dead_code)] +#[derive(Clone, Debug, Error)] +pub enum StreamError { + #[error(transparent)] + IngestError(p2panda_stream::operation::IngestError), +} + +impl PartialEq for StreamError { + fn eq(&self, other: &Self) -> bool { + self.to_string() == other.to_string() + } +} + +impl Serialize for StreamError { + fn serialize(&self, serializer: S) -> Result + where + S: serde::ser::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} diff --git a/src/stream/store/memory.rs b/src/stream/store/memory.rs new file mode 100644 index 0000000..7249275 --- /dev/null +++ b/src/stream/store/memory.rs @@ -0,0 +1,104 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use p2panda_core::{Extension, Hash, PublicKey}; +use p2panda_store::{LogStore, MemoryStore, OperationStore}; +use tokio::sync::RwLock; + +use crate::stream::StreamControllerError; + +use super::{Operation, StreamControllerStore}; + +#[derive(Clone, Debug)] +pub struct StreamMemoryStore { + operation_store: MemoryStore, + + /// Log-height of latest ack per log. + acked: Arc>>, +} + +impl StreamMemoryStore { + pub fn new(operation_store: MemoryStore) -> Self { + Self { + operation_store, + acked: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl StreamControllerStore for StreamMemoryStore +where + L: p2panda_store::LogId + Send + Sync, + E: p2panda_core::Extensions + Extension + Send + Sync, +{ + type Error = StreamControllerError; + + async fn ack(&self, operation_id: Hash) -> Result<(), Self::Error> { + let Ok(Some((header, _))) = self.operation_store.get_operation(operation_id).await else { + return Err(StreamControllerError::AckedUnknownOperation(operation_id)); + }; + + let mut acked = self.acked.write().await; + + let log_id: Option = header.extension(); + let Some(log_id) = log_id else { + return Err(StreamControllerError::MissingLogId(operation_id)); + }; + + // Remember the "acknowledged" log-height for this log. + acked.insert((header.public_key, log_id), header.seq_num); + + Ok(()) + } + + async fn unacked( + &self, + logs: HashMap>, + ) -> Result>, Self::Error> { + let acked = self.acked.read().await; + + let mut result = Vec::new(); + for (public_key, log_ids) in logs { + for log_id in log_ids { + match acked.get(&(public_key, log_id.clone())) { + Some(ack_log_height) => { + let Ok(operations) = self + .operation_store + // Get all operations from > ack_log_height + .get_log(&public_key, &log_id, Some(*ack_log_height + 1)) + .await; + + if let Some(operations) = operations { + for (header, body) in operations { + // @TODO(adz): Getting the encoded header bytes through encoding + // like this feels redundant and should be possible to retreive + // just from calling "get_log". + let header_bytes = header.to_bytes(); + result.push((header, body, header_bytes)); + } + } + } + None => { + let Ok(operations) = self + .operation_store + // Get all operations from > ack_log_height + .get_log(&public_key, &log_id, Some(0)) + .await; + + if let Some(operations) = operations { + for (header, body) in operations { + // @TODO(adz): Getting the encoded header bytes through encoding + // like this feels redundant and should be possible to retreive + // just from calling "get_log". + let header_bytes = header.to_bytes(); + result.push((header, body, header_bytes)); + } + } + } + } + } + } + + Ok(result) + } +} diff --git a/src/stream/store/mod.rs b/src/stream/store/mod.rs new file mode 100644 index 0000000..a3a3329 --- /dev/null +++ b/src/stream/store/mod.rs @@ -0,0 +1,25 @@ +use std::collections::HashMap; +use std::future::Future; + +pub mod memory; +pub mod sqlite; + +use p2panda_core::{Body, Hash, Header, PublicKey}; + +type Operation = (Header, Option, Vec); + +pub trait StreamControllerStore +where + E: p2panda_core::Extensions, +{ + type Error; + + /// Mark operation as acknowledged. + fn ack(&self, operation_id: Hash) -> impl Future>; + + /// Return all operations from given logs which have not yet been acknowledged. + fn unacked( + &self, + logs: HashMap>, + ) -> impl Future>, Self::Error>>; +} diff --git a/src/stream/store/sqlite.rs b/src/stream/store/sqlite.rs new file mode 100644 index 0000000..222ce66 --- /dev/null +++ b/src/stream/store/sqlite.rs @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! SQLite persistent storage. +use std::collections::HashMap; +use std::hash::{DefaultHasher, Hash as StdHash, Hasher}; + +use p2panda_store::{LogStore, OperationStore}; +use sqlx::migrate::{MigrateDatabase, MigrateError}; +use sqlx::prelude::FromRow; +use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; +use sqlx::{Error as SqlxError, Sqlite, query_as}; +use sqlx::{migrate, query}; +use thiserror::Error; + +use p2panda_core::{Extension, Hash, PublicKey}; + +use crate::stream::StreamControllerError; + +use super::{Operation, StreamControllerStore}; + +#[derive(Debug, Error)] +pub enum SqliteStoreError { + #[error(transparent)] + ControllerError(#[from] StreamControllerError), + + #[error("an error occurred with the sqlite database: {0}")] + Database(#[from] SqlxError), +} + +impl From for SqliteStoreError { + fn from(error: MigrateError) -> Self { + Self::Database(SqlxError::Migrate(Box::new(error))) + } +} + +/// Re-export of SQLite connection pool type. +pub type Pool = SqlitePool; + +/// SQLite-based persistent store. +#[derive(Clone, Debug)] +pub struct StreamSqliteStore { + operation_store: p2panda_store::SqliteStore, + + /// Log-height of latest ack per log. + pool: Pool, +} + +impl StreamSqliteStore { + /// Create a new `SqliteStore` using the provided db `Pool`. + pub fn new(pool: Pool, operation_store: p2panda_store::SqliteStore) -> Self { + Self { + pool, + operation_store, + } + } +} + +/// Create the database if it doesn't already exist. +pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> { + if !Sqlite::database_exists(url).await? { + Sqlite::create_database(url).await? + } + + Ok(()) +} + +/// Drop the database if it exists. +pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> { + if Sqlite::database_exists(url).await? { + Sqlite::drop_database(url).await? + } + + Ok(()) +} + +/// Create a connection pool. +pub async fn connection_pool(url: &str, max_connections: u32) -> Result { + let pool: Pool = SqlitePoolOptions::new() + .max_connections(max_connections) + .connect(url) + .await?; + + Ok(pool) +} + +/// Run any pending database migrations from inside the application. +pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> { + migrate!().run(pool).await?; + + Ok(()) +} + +fn calculate_hash(t: &T) -> u64 { + let mut s = DefaultHasher::new(); + t.hash(&mut s); + s.finish() +} + +#[derive(FromRow, Debug, Clone, PartialEq, Eq)] +pub struct LogHeight(String); + +impl From for u64 { + fn from(row: LogHeight) -> Self { + row.0.parse().unwrap() + } +} + +impl StreamControllerStore for StreamSqliteStore +where + L: p2panda_store::LogId + Send + Sync, + E: p2panda_core::Extensions + Extension + Send + Sync, +{ + type Error = SqliteStoreError; + + async fn ack(&self, operation_id: Hash) -> Result<(), Self::Error> { + let Ok(Some((header, _))) = self.operation_store.get_operation(operation_id).await else { + return Err(StreamControllerError::AckedUnknownOperation(operation_id).into()); + }; + + let log_id: Option = header.extension(); + let Some(log_id) = log_id else { + return Err(StreamControllerError::MissingLogId(operation_id).into()); + }; + + // Remember the "acknowledged" log-height for this log. + query( + " + INSERT INTO + operations_v1 ( + public_key, + log_id, + seq_num + ) + VALUES + (?, ?, ?) + ", + ) + .bind(header.public_key.to_hex()) + .bind(calculate_hash(&log_id).to_string()) + .bind(header.seq_num.to_string()) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn unacked( + &self, + logs: HashMap>, + ) -> Result>, Self::Error> { + let mut result = Vec::new(); + for (public_key, log_ids) in logs { + for log_id in log_ids { + let ack_log_height = query_as::<_, LogHeight>( + " + SELECT + seq_num + FROM + acked_v1 + WHERE + public_key = ?, + log_id = ? + ", + ) + .bind(public_key.to_string()) + .bind(calculate_hash(&log_id).to_string()) + .fetch_optional(&self.pool) + .await?; + + match ack_log_height { + Some(ack_log_height) => { + let ack_log_height: u64 = ack_log_height.into(); + let Ok(operations) = self + .operation_store + // Get all operations from > ack_log_height + .get_log(&public_key, &log_id, Some(ack_log_height + 1)) + .await + else { + todo!() + }; + + if let Some(operations) = operations { + for (header, body) in operations { + // @TODO(adz): Getting the encoded header bytes through encoding + // like this feels redundant and should be possible to retreive + // just from calling "get_log". + let header_bytes = header.to_bytes(); + result.push((header, body, header_bytes)); + } + } + } + None => { + let Ok(operations) = self + .operation_store + // Get all operations from > ack_log_height + .get_log(&public_key, &log_id, Some(0)) + .await + else { + todo!() + }; + + if let Some(operations) = operations { + for (header, body) in operations { + // @TODO(adz): Getting the encoded header bytes through encoding + // like this feels redundant and should be possible to retreive + // just from calling "get_log". + let header_bytes = header.to_bytes(); + result.push((header, body, header_bytes)); + } + } + } + } + } + } + + Ok(result) + } +}