Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,748 changes: 739 additions & 1,009 deletions content-discovery/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ missing_debug_implementations = "warn"
unused-async = "warn"

[workspace.dependencies]
iroh = { version ="0.93", features = ["discovery-pkarr-dht"] }
iroh-base = "0.93"
iroh-blobs = { version = "0.95" }
iroh = { version ="0.94", features = ["discovery-pkarr-dht"] }
iroh-base = "0.94"
iroh-blobs = "0.96"
# explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255
tokio = { version = "1.44.1" }
tokio-stream = { version = "0.1.17" }
postcard = { version = "1", default-features = false }
anyhow = { version = "1", default-features = false }
n0-future = { version = "0.1.3" }
n0-future = { version = "0.3" }
futures-buffered = { version = "0.2.11" }
2 changes: 1 addition & 1 deletion content-discovery/iroh-content-discovery-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ anyhow = { workspace = true, features = ["backtrace"] }
futures = { version = "0.3.25" }
clap = { version = "4", features = ["derive"] }
tempfile = { version = "3.4" }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
derive_more = { version = "2", features = ["debug", "display", "from", "try_into"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["io-util", "rt"] }
6 changes: 3 additions & 3 deletions content-discovery/iroh-content-discovery-cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::{fmt::Display, str::FromStr};

use clap::{Parser, Subcommand};
use iroh::NodeId;
use iroh::EndpointId;
use iroh_blobs::{ticket::BlobTicket, Hash, HashAndFormat};

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -69,7 +69,7 @@ impl FromStr for ContentArg {
pub struct AnnounceArgs {
/// trackers to announce to
#[clap(long, required = true)]
pub tracker: Vec<NodeId>,
pub tracker: Vec<EndpointId>,

/// The content to announce.
///
Expand All @@ -87,7 +87,7 @@ pub struct AnnounceArgs {
pub struct QueryArgs {
/// the tracker to query
#[clap(long, required = true)]
pub tracker: Vec<NodeId>,
pub tracker: Vec<EndpointId>,

/// The content to find hosts for.
pub content: ContentArg,
Expand Down
2 changes: 1 addition & 1 deletion content-discovery/iroh-content-discovery-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
};
let content = args.content.hash_and_format();
if let ContentArg::Ticket(ticket) = &args.content {
if ticket.node_addr().node_id != key.public() {
if ticket.addr().id != key.public() {
bail!("ticket does not match the announce secret");
}
}
Expand Down
2 changes: 1 addition & 1 deletion content-discovery/iroh-content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ license = "MIT OR Apache-2.0"
iroh-base = { workspace = true }
iroh-blobs = { workspace = true }
serde = { version = "1", features = ["derive"] }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
derive_more = { version = "2", features = ["debug", "display", "from", "try_into"] }
serde-big-array = "0.5.1"
hex = "0.4.3"
anyhow = { workspace = true, features = ["backtrace"] }
Expand Down
36 changes: 18 additions & 18 deletions content-discovery/iroh-content-discovery/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{future::Future, result};

use iroh::{
endpoint::{ConnectOptions, Connection},
Endpoint, NodeId,
Endpoint, EndpointId,
};
use n0_future::{BufferedStreamExt, Stream, StreamExt};
use snafu::prelude::*;
Expand Down Expand Up @@ -62,9 +62,9 @@ pub enum Error {
backtrace: snafu::Backtrace,
},

#[snafu(display("Failed to get remote node id: {}", source))]
RemoteNodeId {
source: iroh::endpoint::RemoteNodeIdError,
#[snafu(display("Failed to get remote endpoint id: {}", source))]
RemoteEndpointId {
source: iroh::endpoint::RemoteEndpointIdError,
backtrace: snafu::Backtrace,
},
}
Expand All @@ -74,10 +74,10 @@ pub type Result<T> = result::Result<T, Error>;
/// Announce to multiple trackers in parallel.
pub fn announce_all(
endpoint: Endpoint,
trackers: impl IntoIterator<Item = NodeId>,
trackers: impl IntoIterator<Item = EndpointId>,
signed_announce: SignedAnnounce,
announce_parallelism: usize,
) -> impl Stream<Item = (NodeId, Result<()>)> {
) -> impl Stream<Item = (EndpointId, Result<()>)> {
n0_future::stream::iter(trackers)
.map(move |tracker| {
let endpoint = endpoint.clone();
Expand All @@ -91,31 +91,31 @@ pub fn announce_all(

/// Announce to a tracker.
///
/// You can only announce content you yourself claim to have, to avoid spamming other nodes.
/// You can only announce content you yourself claim to have, to avoid spamming other endpoints.
///
/// `endpoint` is the iroh endpoint to use for announcing.
/// `tracker` is the node id of the tracker to announce to. It must understand the [crate::ALPN] protocol.
/// `tracker` is the endpoint id of the tracker to announce to. It must understand the [crate::ALPN] protocol.
/// `content` is the content to announce.
/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it.
pub async fn announce(
endpoint: &Endpoint,
node_id: NodeId,
endpoint_id: EndpointId,
signed_announce: SignedAnnounce,
) -> Result<()> {
let connecting = endpoint
.connect_with_opts(node_id, ALPN, ConnectOptions::default())
.connect_with_opts(endpoint_id, ALPN, ConnectOptions::default())
.await
.context(ConnectSnafu)?;
match connecting.into_0rtt() {
Ok((connection, zero_rtt_accepted)) => {
trace!("connected to tracker using possibly 0-rtt: {node_id}");
trace!("connected to tracker using possibly 0-rtt: {endpoint_id}");
announce_conn(&connection, signed_announce, zero_rtt_accepted).await?;
wait_for_session_ticket(connection);
Ok(())
}
Err(connecting) => {
let connection = connecting.await.context(Connect1RttSnafu)?;
trace!("connected to tracker using 1-rtt: {node_id}");
trace!("connected to tracker using 1-rtt: {endpoint_id}");
announce_conn(&connection, signed_announce, async { true }).await?;
connection.close(0u32.into(), b"");
Ok(())
Expand Down Expand Up @@ -159,23 +159,23 @@ pub async fn announce_conn(
/// A single query to a tracker, using 0-rtt if possible.
pub async fn query(
endpoint: &Endpoint,
node_id: NodeId,
endpoint_id: EndpointId,
args: Query,
) -> Result<Vec<SignedAnnounce>> {
let connecting = endpoint
.connect_with_opts(node_id, ALPN, ConnectOptions::default())
.connect_with_opts(endpoint_id, ALPN, ConnectOptions::default())
.await
.context(ConnectSnafu)?;
let result = match connecting.into_0rtt() {
Ok((connection, zero_rtt_accepted)) => {
trace!("connected to tracker using possibly 0-rtt: {node_id}");
trace!("connected to tracker using possibly 0-rtt: {endpoint_id}");
let res = query_conn(&connection, args, zero_rtt_accepted).await?;
wait_for_session_ticket(connection);
res
}
Err(connecting) => {
let connection = connecting.await.context(Connect1RttSnafu)?;
trace!("connected to tracker using 1-rtt: {node_id}");
trace!("connected to tracker using 1-rtt: {endpoint_id}");
let res = query_conn(&connection, args, async { true }).await?;
connection.close(0u32.into(), b"");
res
Expand All @@ -190,7 +190,7 @@ pub async fn query(
/// use [`query`] instead.
pub fn query_all(
endpoint: Endpoint,
trackers: impl IntoIterator<Item = NodeId>,
trackers: impl IntoIterator<Item = EndpointId>,
args: Query,
query_parallelism: usize,
) -> impl Stream<Item = Result<SignedAnnounce>> {
Expand Down Expand Up @@ -223,7 +223,7 @@ pub async fn query_conn(
let request = postcard::to_stdvec(&request).context(SerializeRequestSnafu)?;
trace!(
"connected to {:?}",
connection.remote_node_id().context(RemoteNodeIdSnafu)?
connection.remote_id().context(RemoteEndpointIdSnafu)?
);
trace!("opened bi stream");
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
Expand Down
12 changes: 4 additions & 8 deletions content-discovery/iroh-content-discovery/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::{Duration, SystemTime},
};

use iroh::NodeId;
use iroh::EndpointId;
use iroh_blobs::HashAndFormat;
use serde::{Deserialize, Serialize};
use serde_big_array::BigArray;
Expand Down Expand Up @@ -89,7 +89,7 @@ impl From<AbsoluteTime> for SystemTime {
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct Announce {
/// The peer that supposedly has the data.
pub host: NodeId,
pub host: EndpointId,
/// The content that the peer claims to have.
pub content: HashAndFormat,
/// The kind of the announcement.
Expand Down Expand Up @@ -121,12 +121,8 @@ impl Deref for SignedAnnounce {

#[derive(Debug, Snafu)]
pub enum VerifyError {
SignatureError {
source: ed25519_dalek::SignatureError,
},
SerializationError {
source: postcard::Error,
},
SignatureError { source: iroh_base::SignatureError },
SerializationError { source: postcard::Error },
}

impl SignedAnnounce {
Expand Down
8 changes: 4 additions & 4 deletions content-discovery/iroh-content-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ anyhow = { workspace = true, features = ["backtrace"] }
# needs to keep updated with the dep of iroh-blobs
bao-tree = { version = "0.15.1", features = ["tokio_fsm"], default-features = false }
bytes = "1"
derive_more = { version = "1", features = ["debug", "display", "from", "try_into"] }
derive_more = { version = "2", features = ["debug", "display", "from", "try_into"] }
dirs-next = "2"
futures = "0.3.25"
hex = "0.4.3"
Expand All @@ -21,14 +21,14 @@ iroh = { workspace = true }
iroh-blobs = { workspace = true }
postcard = { workspace = true, features = ["alloc", "use-std"] }
rand = "0.9.2"
rcgen = "0.12.0"
redb = "1.5.0"
rcgen = "0.14"
redb = "2.6"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.107"
tempfile = "3.4"
tokio = { version = "1", features = ["io-util", "rt"] }
tokio-util = { version = "0.7", features = ["io-util", "io", "rt"] }
toml = "0.7.3"
toml = "0.9"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ttl_cache = "0.5.1"
Expand Down
1 change: 1 addition & 0 deletions content-discovery/iroh-content-tracker/dial.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1761151767.655932,5c6c7ab5fa7f1f337fd19adce7189fbe3ec219cfffac221fd1cc7f158715fbed,0.213470,ok
1 change: 1 addition & 0 deletions content-discovery/iroh-content-tracker/probe.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1761151767.665611,5c6c7ab5fa7f1f337fd19adce7189fbe3ec219cfffac221fd1cc7f158715fbed,1edabea435e688e8227aa2497e1d1c8ff311d6bab560c9f28d70b8d4845c8a84,Complete,0.009327,ok
8 changes: 4 additions & 4 deletions content-discovery/iroh-content-tracker/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use anyhow::Context;
use iroh::NodeId;
use iroh::EndpointId;
use iroh_blobs::{get::Stats, HashAndFormat};
use iroh_content_discovery::protocol::{AnnounceKind, SignedAnnounce};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand All @@ -28,7 +28,7 @@ pub const TRACKER_HOME_ENV_VAR: &str = "IROH_TRACKER_HOME";
/// This should be easy to edit manually when serialized as json or toml.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AnnounceData(
pub BTreeMap<HashAndFormat, BTreeMap<AnnounceKind, BTreeMap<NodeId, SignedAnnounce>>>,
pub BTreeMap<HashAndFormat, BTreeMap<AnnounceKind, BTreeMap<EndpointId, SignedAnnounce>>>,
);

pub fn save_to_file(data: impl Serialize, path: &Path) -> anyhow::Result<()> {
Expand Down Expand Up @@ -89,7 +89,7 @@ pub fn load_from_file<T: DeserializeOwned + Default>(path: &Path) -> anyhow::Res

pub fn log_connection_attempt(
path: &Option<PathBuf>,
host: &NodeId,
host: &EndpointId,
t0: Instant,
outcome: &Result<iroh::endpoint::Connection, iroh::endpoint::ConnectError>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn log_connection_attempt(

pub fn log_probe_attempt(
path: &Option<PathBuf>,
host: &NodeId,
host: &EndpointId,
content: &HashAndFormat,
kind: ProbeKind,
t0: Instant,
Expand Down
17 changes: 10 additions & 7 deletions content-discovery/iroh-content-tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ async fn create_endpoint(
ipv6_addr: Option<SocketAddrV6>,
) -> Result<Endpoint, BindError> {
let mut builder = iroh::Endpoint::builder()
.secret_key(key)
.discovery_dht()
.discovery_n0()
.secret_key(key.clone())
.discovery(
iroh::discovery::pkarr::dht::DhtDiscovery::builder()
.secret_key(key)
.build()?,
)
.alpns(vec![ALPN.to_vec()]);
if let Some(ipv4_addr) = ipv4_addr {
builder = builder.bind_addr_v4(ipv4_addr);
Expand Down Expand Up @@ -96,8 +99,8 @@ async fn server(args: Args) -> anyhow::Result<()> {
let db = Tracker::new(options, endpoint.clone())?;
db.dump().await?;
endpoint.online().await;
let addr = endpoint.node_addr();
println!("tracker addr: {}\n", addr.node_id);
let addr = endpoint.addr();
println!("tracker addr: {}\n", addr.id);
info!("listening on {:?}", addr);
// let db2 = db.clone();
let db3 = db.clone();
Expand Down Expand Up @@ -154,8 +157,8 @@ pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result<iroh::SecretKe
} else {
let secret_key = SecretKey::generate(&mut rand::rng());
let ckey = ssh_key::private::Ed25519Keypair {
public: secret_key.public().public().into(),
private: secret_key.secret().into(),
public: secret_key.public().as_verifying_key().into(),
private: secret_key.as_signing_key().into(),
};
let ser_key =
ssh_key::private::PrivateKey::from(ckey).to_openssh(ssh_key::LineEnding::default())?;
Expand Down
2 changes: 1 addition & 1 deletion content-discovery/iroh-content-tracker/src/task_map.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{
collections::BTreeMap,
fmt::Debug,
sync::{Arc, Mutex},
};

use derive_more::Debug;
use tokio::task::JoinHandle;
use tokio_util::task::AbortOnDropHandle;

Expand Down
Loading
Loading