Skip to content
This repository was archived by the owner on Dec 25, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
422 changes: 421 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mime = "0.3.17"
p2panda-blobs = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" }
p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" }
p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" }
p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" }
p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87", features = ["sqlite"] }
p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87" }
p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "79b7682deb5f253224745b7ad9a7faab90e89e87", features = [
"mdns",
Expand All @@ -32,6 +32,7 @@ tokio-util = "0.7.13"
tokio-utils = "0.1.2"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
sqlx = "0.8.3"

[dev-dependencies]
tempfile = "3.17.1"
6 changes: 6 additions & 0 deletions migrations/20250331144110_acked-operations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS acked_v1 (
public_key TEXT NOT NULL,
log_id TEXT NOT NULL,
seq_num TEXT NOT NULL,
PRIMARY_KEY(public_key, log_id)
);
54 changes: 17 additions & 37 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use crate::{
topic::{Topic, TopicMap},
};

pub struct NodeApi<E>
{
pub struct NodeApi<E> {
pub node: Node<Topic, LogId, E>,
pub topic_map: TopicMap,
pub subscriptions: HashMap<[u8; 32], Topic>,
Expand Down Expand Up @@ -171,30 +170,26 @@ impl Serialize for ApiError {
#[cfg(test)]
mod tests {
use p2panda_core::PrivateKey;
use p2panda_store::MemoryStore;

use crate::api::NodeApi;

use crate::extensions::{LogId, NodeExtensions};
use crate::stream::{EventData, StreamEvent};
use crate::topic::TopicMap;
use crate::topic::{Topic, TopicMap};

use super::Node;

#[tokio::test]
async fn subscribe_publish_persisted() {
let private_key = PrivateKey::new();
let store = MemoryStore::<LogId, NodeExtensions>::new();
let blobs_root_dir = tempfile::tempdir().unwrap().into_path();
let topic_map = TopicMap::new();
let (node, mut stream_rx, _system_rx) = Node::new(
"my_network".to_string(),
private_key.clone(),
topic_map.clone(),
None,
None,
None,
store,
blobs_root_dir,
topic_map.clone(),
)
.await
.unwrap();
Expand Down Expand Up @@ -237,17 +232,14 @@ mod tests {
#[tokio::test]
async fn subscribe_publish_ephemeral() {
let node_private_key = PrivateKey::new();
let store = MemoryStore::<LogId, NodeExtensions>::new();
let blobs_root_dir = tempfile::tempdir().unwrap().into_path();
let topic_map = TopicMap::new();
let (node, _stream_rx, _system_rx) = Node::new(
let (node, _stream_rx, _system_rx) = Node::<Topic, LogId, NodeExtensions>::new(
"my_network".to_string(),
node_private_key.clone(),
topic_map.clone(),
None,
None,
None,
store,
blobs_root_dir,
topic_map.clone(),
)
.await
.unwrap();
Expand All @@ -265,34 +257,28 @@ mod tests {
#[tokio::test]
async fn two_peers_subscribe() {
let node_a_private_key = PrivateKey::new();
let store = MemoryStore::<LogId, NodeExtensions>::new();
let blobs_root_dir = tempfile::tempdir().unwrap().into_path();
let topic_map = TopicMap::new();
let (node_a, _node_a_stream_rx, _system_rx) = Node::new(
let (node_a, _stream_rx, _system_rx) = Node::<Topic, LogId, NodeExtensions>::new(
"my_network".to_string(),
node_a_private_key.clone(),
topic_map.clone(),
None,
None,
None,
store,
blobs_root_dir,
topic_map.clone(),
)
.await
.unwrap();
let mut node_a_api = NodeApi::new(node_a, topic_map);

let node_b_private_key = PrivateKey::new();
let store = MemoryStore::<LogId, NodeExtensions>::new();
let blobs_root_dir = tempfile::tempdir().unwrap().into_path();
let topic_map = TopicMap::new();
let (node_b, mut node_b_stream_rx, _system_rx) = Node::new(
let (node_b, mut node_b_stream_rx, _system_rx) = Node::<Topic, LogId, NodeExtensions>::new(
"my_network".to_string(),
node_b_private_key.clone(),
topic_map.clone(),
None,
None,
None,
store,
blobs_root_dir,
topic_map.clone(),
)
.await
.unwrap();
Expand Down Expand Up @@ -339,34 +325,28 @@ mod tests {
#[tokio::test]
async fn two_peers_sync() {
let node_a_private_key = PrivateKey::new();
let store = MemoryStore::<LogId, NodeExtensions>::new();
let blobs_root_dir = tempfile::tempdir().unwrap().into_path();
let topic_map = TopicMap::new();
let (node_a, mut node_a_stream_rx, _system_rx) = Node::new(
"my_network".to_string(),
node_a_private_key.clone(),
topic_map.clone(),
None,
None,
None,
store,
blobs_root_dir,
topic_map.clone(),
)
.await
.unwrap();
let mut node_a_api = NodeApi::new(node_a, topic_map);

let node_b_private_key = PrivateKey::new();
let store = MemoryStore::<LogId, NodeExtensions>::new();
let blobs_root_dir = tempfile::tempdir().unwrap().into_path();
let topic_map = TopicMap::new();
let (node_b, mut node_b_stream_rx, _system_rx) = Node::new(
"my_network".to_string(),
node_b_private_key.clone(),
topic_map.clone(),
None,
None,
None,
store,
blobs_root_dir,
topic_map.clone(),
)
.await
.unwrap();
Expand Down
61 changes: 50 additions & 11 deletions src/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::path::PathBuf;

use anyhow::{Result, anyhow};
use anyhow::{anyhow, Result};
use futures_util::future::{MapErr, Shared};
use futures_util::{FutureExt, TryFutureExt};
use iroh_io::AsyncSliceReader;
Expand All @@ -11,9 +11,10 @@ use p2panda_discovery::mdns::LocalDiscovery;
use p2panda_net::{
Network, NetworkBuilder, NetworkId, RelayUrl, SyncConfiguration, SystemEvent, TopicId,
};
use p2panda_store::{LogId, MemoryStore};
use p2panda_sync::TopicQuery;
use p2panda_store::sqlite::store::{connection_pool, create_database, run_pending_migrations};
use p2panda_store::{LogId, SqliteStore};
use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
use p2panda_sync::TopicQuery;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::pin;
Expand All @@ -23,15 +24,36 @@ use tokio_stream::StreamExt;
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error};

use crate::stream::{StreamController, StreamControllerError, StreamEvent, ToStreamController};

use super::{
actor::{NodeActor, ToNodeActor},
operation::encode_gossip_message,
stream::{StreamController, StreamControllerError, StreamEvent, ToStreamController},
};

// @TODO(glyph): Can we move this into p2panda-store?
async fn initialise_database<L, E>(url: &str) -> Result<SqliteStore<L, E>>
where
L: LogId + Send + Sync + Serialize + for<'a> Deserialize<'a> + 'static,
E: Extensions + Extension<L> + Extension<PruneFlag> + Send + Sync + 'static,
{
create_database(&url).await?;

let pool = connection_pool(&url, 1).await?;

if run_pending_migrations(&pool).await.is_err() {
pool.close().await;
panic!("Database migration failed");
}

let store = SqliteStore::new(pool);

Ok(store)
}

pub struct Node<T, L, E> {
pub private_key: PrivateKey,
pub store: MemoryStore<L, E>,
pub store: SqliteStore<L, E>,
pub network: Network<T>,
blobs: Blobs<T, BlobsStore>,
#[allow(dead_code)]
Expand All @@ -51,11 +73,10 @@ where
pub async fn new<TM: TopicLogMap<T, L> + 'static>(
network_name: String,
private_key: PrivateKey,
bootstrap_node_id: Option<PublicKey>,
relay_url: Option<RelayUrl>,
store: MemoryStore<L, E>,
blobs_root_dir: PathBuf,
topic_map: TM,
relay_url: Option<RelayUrl>,
bootstrap_node_id: Option<PublicKey>,
app_data_dir: Option<PathBuf>,
) -> Result<(
Self,
mpsc::Receiver<StreamEvent<E>>,
Expand All @@ -65,6 +86,18 @@ where

let rt = tokio::runtime::Handle::current();

// Instantiate the SQLite store.
//
// This takes care of creating the database if it doesn't exist, setting up a connection
// pool and running any pending migrations.
let store = if let Some(path) = app_data_dir.as_ref() {
let path = path.display().to_string();
initialise_database(&path).await?
} else {
let url = format!("sqlite://toolkitty?mode=memory&cache=private");
initialise_database(&url).await?
};

let (stream, stream_tx, stream_rx) = StreamController::new(store.clone());
let (ephemeral_tx, mut ephemeral_rx) = mpsc::channel(1024);

Expand Down Expand Up @@ -120,9 +153,15 @@ where
network_builder = network_builder.bootstrap();
}

let blobs_store = BlobsStore::load(blobs_root_dir).await?;
let (network, blobs) = Blobs::from_builder(network_builder, blobs_store).await?;
let blobs_store = match app_data_dir {
Some(app_data_dir) => BlobsStore::load(app_data_dir).await?,
None => {
let temp_dir = tempfile::tempdir()?;
BlobsStore::load(temp_dir.into_path()).await?
}
};

let (network, blobs) = Blobs::from_builder(network_builder, blobs_store).await?;
let system_events_rx = network.events().await?;

let (network_actor_tx, network_actor_rx) = mpsc::channel(64);
Expand Down
Loading