Skip to content

Commit 3a6b798

Browse files
authored
Merge pull request #21 from rklaehn/persistence
feat(iroh): add persistence to a local iroh-blobs store
2 parents 77062ce + 3ca1920 commit 3a6b798

File tree

16 files changed

+1055
-849
lines changed

16 files changed

+1055
-849
lines changed

Cargo.lock

Lines changed: 738 additions & 681 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ resolver = "2"
55
[workspace.dependencies]
66
anyhow = "1"
77
async-trait = "0.1"
8-
axum = "0.8"
8+
axum = { version = "0.8", features = ["macros"] }
99
axum-extra = { version = "0.10", features = ["typed-header"] }
1010
azure_core = "0.19"
1111
azure_identity = "0.19"
1212
azure_storage = "0.19"
1313
azure_storage_blobs = "0.19"
14-
bao-tree = "0.16"
1514
blake3 = "1.5"
1615
bytes = "1.10.1"
1716
chrono = "0.4"
@@ -28,7 +27,6 @@ http-body-util = "0.1.3"
2827
iroh = "0.95"
2928
iroh-base = "0.95"
3029
iroh-blobs = "0.97"
31-
iroh-tickets = "0.2"
3230
itertools = "0.12"
3331
jsonwebtoken = "9"
3432
log = "0.4"
@@ -53,3 +51,4 @@ utoipa = { version = "5", features = ["axum_extras"] }
5351
utoipa-axum = "0.2"
5452
utoipa-swagger-ui = { version = "9", features = ["axum"] }
5553
uuid = { version = "1.18.1", features = ["serde", "v4"] }
54+
irpc = "0.11.0"

core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ anyhow.workspace = true
1111
api-utils = { path = "../crates/api-utils" }
1212
async-trait.workspace = true
1313
axum.workspace = true
14-
bao-tree.workspace = true
1514
bytes.workspace = true
1615
chrono.workspace = true
1716
cid.workspace = true

core/src/crp.rs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::pin::Pin;
1+
use std::{fmt::Debug, pin::Pin, sync::Arc};
22

33
use anyhow::Result;
44
use async_trait::async_trait;
@@ -39,7 +39,7 @@ impl std::fmt::Display for ProviderType {
3939

4040
/// CID Route Provider (CRP) Trait
4141
#[async_trait]
42-
pub trait Crp: Send + Sync {
42+
pub trait Crp: Send + Sync + Debug {
4343
fn provider_id(&self) -> String;
4444
fn provider_type(&self) -> ProviderType;
4545
async fn reindex(&self, cx: &Context) -> Result<()>;
@@ -53,10 +53,31 @@ pub trait Crp: Send + Sync {
5353
}
5454
}
5555

56+
#[async_trait]
57+
impl Crp for Arc<dyn Crp> {
58+
fn provider_id(&self) -> String {
59+
self.as_ref().provider_id()
60+
}
61+
fn provider_type(&self) -> ProviderType {
62+
self.as_ref().provider_type()
63+
}
64+
async fn reindex(&self, cx: &Context) -> Result<()> {
65+
self.as_ref().reindex(cx).await
66+
}
67+
68+
fn capabilities<'a>(&'a self) -> CrpCapabilities<'a> {
69+
self.as_ref().capabilities()
70+
}
71+
72+
fn cid_filter(&self) -> CidFilter {
73+
self.as_ref().cid_filter()
74+
}
75+
}
76+
5677
/// All capabilities a CRP may have represented as self-referential trait objects.
5778
pub struct CrpCapabilities<'a> {
5879
pub route_resolver: Option<&'a dyn RouteResolver>,
59-
pub size_resolver: Option<&'a dyn SizeResolver>,
80+
pub blob_writer: Option<&'a dyn BlobWriter>,
6081
}
6182

6283
/// A RouteResolver can dereference a route, turning it into a stream of bytes, accepting
@@ -78,14 +99,19 @@ pub trait RouteResolver {
7899
>;
79100
}
80101

81-
/// A SizeResolver can return the length in bytes of the blob a CID points at.
82-
/// This is useful both as a preflight check before downloading a CID,
83-
/// and as a fast means of checking if a CRP has the CID in the first place.
102+
/// A RouteResolver can dereference a route, turning it into a stream of bytes, accepting
103+
/// authentication data.
84104
#[async_trait]
85-
pub trait SizeResolver {
86-
async fn get_size(
105+
pub trait BlobWriter: Send + Sync {
106+
/// Puts a blob into the CRP, given optional authentication data, a CID, and the data bytes.
107+
///
108+
/// Note that this assumes that the data fits in memory, which is probably the case for most
109+
/// data that eqty wants to write. If this becomes a problem, we will add a second method that
110+
/// takes a stream of bytes instead.
111+
async fn put_blob(
87112
&self,
113+
auth: Option<bytes::Bytes>,
88114
cid: &Cid,
89-
auth: Vec<u8>,
90-
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>>;
115+
data: &[u8],
116+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
91117
}

core/src/db.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ mod tests {
362362
Context,
363363
};
364364

365+
#[derive(Debug)]
365366
struct StubAzureProvider {}
366367

367368
#[async_trait]
@@ -381,7 +382,7 @@ mod tests {
381382
fn capabilities(&self) -> CrpCapabilities<'_> {
382383
CrpCapabilities {
383384
route_resolver: None,
384-
size_resolver: None,
385+
blob_writer: None,
385386
}
386387
}
387388

crps/azure/src/container.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl Crp for Container {
4848
fn capabilities<'a>(&'a self) -> CrpCapabilities<'a> {
4949
CrpCapabilities {
5050
route_resolver: Some(self),
51-
size_resolver: None, // TODO
51+
blob_writer: None, // TODO
5252
}
5353
}
5454

crps/iroh/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ edition = "2021"
99

1010
anyhow.workspace = true
1111
async-trait.workspace = true
12-
bao-tree.workspace = true
1312
bytes.workspace = true
1413
cid.workspace = true
1514
cid-router-core = { path = "../../core" }
1615
futures.workspace = true
1716
iroh.workspace = true
1817
iroh-base.workspace = true
1918
iroh-blobs.workspace = true
20-
iroh-tickets.workspace = true
2119
n0-future.workspace = true
2220
serde.workspace = true
2321
serde_json.workspace = true
2422
tokio.workspace = true
23+
irpc.workspace = true
24+
log.workspace = true

crps/iroh/src/iroh.rs

Lines changed: 43 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,42 @@
1-
use std::{pin::Pin, str::FromStr};
1+
use std::{io, path::PathBuf, pin::Pin};
22

33
use anyhow::Result;
44
use async_trait::async_trait;
5-
use bao_tree::io::BaoContentItem;
65
use bytes::Bytes;
6+
use cid::Cid;
77
use cid_router_core::{
88
cid_filter::{CidFilter, CodeFilter},
9-
crp::{Crp, CrpCapabilities, ProviderType, RouteResolver},
9+
crp::{BlobWriter, Crp, CrpCapabilities, ProviderType, RouteResolver},
1010
routes::Route,
1111
Context,
1212
};
13-
use futures::{Stream, StreamExt};
14-
use iroh::{Endpoint, EndpointAddr, EndpointId};
15-
use iroh_blobs::{get::request::GetBlobItem, ticket::BlobTicket, Hash};
13+
use futures::Stream;
14+
use iroh_blobs::Hash;
15+
use log::info;
1616
use serde::{Deserialize, Serialize};
17-
use serde_json::Value;
1817

19-
#[derive(Debug)]
18+
#[derive(Debug, Clone)]
2019
pub struct IrohCrp {
21-
addr: EndpointAddr,
22-
endpoint: Endpoint,
20+
store: iroh_blobs::store::fs::FsStore,
2321
}
2422

2523
#[derive(Debug, Clone, Serialize, Deserialize)]
2624
pub struct IrohCrpConfig {
27-
pub node_addr_ref: IrohNodeAddrRef,
28-
}
29-
30-
#[derive(Debug, Clone, Serialize, Deserialize)]
31-
#[serde(rename_all = "snake_case")]
32-
pub enum IrohNodeAddrRef {
33-
EndpointId(String),
34-
EndpointTicket(String),
35-
Ticket(String),
25+
/// Path to the directory where blobs are stored
26+
pub path: PathBuf,
3627
}
3728

3829
impl IrohCrp {
39-
pub async fn new_from_config(config: Value) -> Result<Self> {
40-
let IrohCrpConfig { node_addr_ref } = serde_json::from_value(config)?;
41-
42-
let endpoint_addr = match node_addr_ref {
43-
IrohNodeAddrRef::EndpointId(node_id) => {
44-
let endpoint_id = EndpointId::from_str(&node_id)?;
45-
EndpointAddr::from(endpoint_id)
46-
}
47-
IrohNodeAddrRef::EndpointTicket(ticket) => {
48-
let ticket = iroh_tickets::endpoint::EndpointTicket::from_str(&ticket)?;
49-
ticket.endpoint_addr().to_owned()
50-
}
51-
IrohNodeAddrRef::Ticket(ticket) => {
52-
let ticket = BlobTicket::from_str(&ticket)?;
53-
ticket.addr().clone()
54-
}
30+
pub async fn new_from_config(config: IrohCrpConfig) -> io::Result<Self> {
31+
let path = if config.path.is_absolute() {
32+
config.path
33+
} else {
34+
std::env::current_dir()?.join(config.path)
5535
};
56-
57-
let endpoint = Endpoint::bind().await?;
58-
59-
Ok(Self {
60-
addr: endpoint_addr,
61-
endpoint,
62-
})
36+
let store = iroh_blobs::store::fs::FsStore::load(path)
37+
.await
38+
.map_err(|e| io::Error::other(e))?;
39+
Ok(Self { store })
6340
}
6441
}
6542

@@ -75,13 +52,13 @@ impl Crp for IrohCrp {
7552

7653
async fn reindex(&self, _cx: &Context) -> anyhow::Result<()> {
7754
// TODO: Implement reindexing logic
78-
todo!();
55+
Ok(())
7956
}
8057

8158
fn capabilities<'a>(&'a self) -> CrpCapabilities<'a> {
8259
CrpCapabilities {
8360
route_resolver: Some(self),
84-
size_resolver: None, // TODO
61+
blob_writer: Some(self),
8562
}
8663
}
8764

@@ -90,6 +67,24 @@ impl Crp for IrohCrp {
9067
}
9168
}
9269

70+
#[async_trait]
71+
impl BlobWriter for IrohCrp {
72+
async fn put_blob(
73+
&self,
74+
_auth: Option<Bytes>,
75+
cid: &Cid,
76+
data: &[u8],
77+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
78+
let blobs = self.store.blobs().clone();
79+
let data = Bytes::copy_from_slice(data);
80+
if cid.hash().code() != 0x1e {
81+
return Err("Unsupported CID hash code; only blake3 is supported".into());
82+
}
83+
blobs.add_bytes(data).with_tag().await.map_err(Box::new)?;
84+
Ok(())
85+
}
86+
}
87+
9388
#[async_trait]
9489
impl RouteResolver for IrohCrp {
9590
async fn get_bytes(
@@ -105,39 +100,13 @@ impl RouteResolver for IrohCrp {
105100
>,
106101
Box<dyn std::error::Error + Send + Sync>,
107102
> {
108-
let Self { addr, .. } = self;
103+
info!("get_bytes for route: {:?}", route);
109104
let cid = route.cid;
110-
111105
let hash = cid.hash().digest();
112106
let hash: [u8; 32] = hash.try_into()?;
113107
let hash = Hash::from_bytes(hash);
114-
115-
let conn = self
116-
.endpoint
117-
.connect(addr.clone(), iroh_blobs::ALPN)
118-
.await?;
119-
120-
println!("get {:?} from {}", hash, addr.id.fmt_short());
121-
122-
let res = iroh_blobs::get::request::get_blob(conn, hash);
123-
let res = res
124-
.take_while(|item| n0_future::future::ready(!matches!(item, GetBlobItem::Done(_))))
125-
.filter_map(|item| {
126-
n0_future::future::ready(match item {
127-
GetBlobItem::Item(item) => match item {
128-
BaoContentItem::Leaf(leaf) => Some(Ok(leaf.data)),
129-
// TODO - I don't think this is right. returning None here
130-
// will likely end the stream prematurely
131-
BaoContentItem::Parent(_parent) => None,
132-
},
133-
// This is filtered out, only for compiler happiness
134-
GetBlobItem::Done(_stats) => None,
135-
GetBlobItem::Error(err) => Some(Err(
136-
Box::new(err) as Box<dyn std::error::Error + Send + Sync>
137-
)),
138-
})
139-
});
140-
141-
Ok(Box::pin(res))
108+
let data = self.store.blobs().get_bytes(hash).await.map_err(Box::new)?;
109+
let stream = futures::stream::once(async move { Ok(data) });
110+
Ok(Box::pin(stream))
142111
}
143112
}

crps/iroh/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
mod iroh;
22

3-
pub use iroh::{IrohCrp, IrohCrpConfig, IrohNodeAddrRef};
3+
pub use iroh::{IrohCrp, IrohCrpConfig};

server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ api-utils = { path = "../crates/api-utils" }
1212
async-trait.workspace = true
1313
axum.workspace = true
1414
axum-extra.workspace = true
15-
bao-tree.workspace = true
1615
bytes.workspace = true
1716
chrono.workspace = true
1817
cid.workspace = true
18+
blake3.workspace = true
1919
cid-router-core = { path = "../core" }
2020
clap.workspace = true
2121
crp-azure = { path = "../crps/azure" }

0 commit comments

Comments
 (0)