Skip to content

Commit c146362

Browse files
authored
Merge pull request #142 from cowlicks/replication
Add events needed for replication
2 parents 7f70249 + 5725b38 commit c146362

File tree

11 files changed

+577
-23
lines changed

11 files changed

+577
-23
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ jobs:
3939
cargo check --all-targets --no-default-features --features async-std,sparse
4040
cargo check --all-targets --no-default-features --features async-std,sparse,cache
4141
cargo test --no-default-features --features js_interop_tests,tokio
42+
cargo test --no-default-features --features js_interop_tests,tokio,shared-core
4243
cargo test --no-default-features --features js_interop_tests,tokio,sparse
4344
cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache
4445
cargo test --no-default-features --features js_interop_tests,async-std
46+
cargo test --no-default-features --features js_interop_tests,async-std,shared-core
4547
cargo test --no-default-features --features js_interop_tests,async-std,sparse
4648
cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache
4749
cargo test --benches --no-default-features --features tokio
@@ -64,9 +66,11 @@ jobs:
6466
cargo check --all-targets --no-default-features --features async-std,sparse
6567
cargo check --all-targets --no-default-features --features async-std,sparse,cache
6668
cargo test --no-default-features --features tokio
69+
cargo test --no-default-features --features tokio,shared-core
6770
cargo test --no-default-features --features tokio,sparse
6871
cargo test --no-default-features --features tokio,sparse,cache
6972
cargo test --no-default-features --features async-std
73+
cargo test --no-default-features --features async-std,shared-core
7074
cargo test --no-default-features --features async-std,sparse
7175
cargo test --no-default-features --features async-std,sparse,cache
7276
cargo test --benches --no-default-features --features tokio
@@ -89,9 +93,11 @@ jobs:
8993
cargo check --all-targets --no-default-features --features async-std,sparse
9094
cargo check --all-targets --no-default-features --features async-std,sparse,cache
9195
cargo test --no-default-features --features js_interop_tests,tokio
96+
cargo test --no-default-features --features js_interop_tests,tokio,shared-core
9297
cargo test --no-default-features --features js_interop_tests,tokio,sparse
9398
cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache
9499
cargo test --no-default-features --features js_interop_tests,async-std
100+
cargo test --no-default-features --features js_interop_tests,async-std,shared-core
95101
cargo test --no-default-features --features js_interop_tests,async-std,sparse
96102
cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache
97103
cargo test --benches --no-default-features --features tokio

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ futures = "0.3"
3939
crc32fast = "1"
4040
intmap = "2"
4141
moka = { version = "0.12", optional = true, features = ["sync"] }
42+
async-broadcast = { version = "0.7.1", optional = true }
43+
async-lock = {version = "3.4.0", optional = true }
4244

4345
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
4446
random-access-disk = { version = "3", default-features = false }
@@ -59,7 +61,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
5961
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
6062

6163
[features]
62-
default = ["tokio", "sparse"]
64+
default = ["tokio", "sparse", "replication"]
65+
replication = ["dep:async-broadcast"]
66+
shared-core = ["replication", "dep:async-lock"]
6367
sparse = ["random-access-disk/sparse"]
6468
tokio = ["random-access-disk/tokio"]
6569
async-std = ["random-access-disk/async-std"]

src/common/node.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,21 @@ pub(crate) struct NodeByteRange {
1414
pub(crate) length: u64,
1515
}
1616

17-
/// Nodes that are persisted to disk.
17+
/// Nodes of the Merkle Tree that are persisted to disk.
1818
// TODO: replace `hash: Vec<u8>` with `hash: Hash`. This requires patching /
1919
// rewriting the Blake2b crate to support `.from_bytes()` to serialize from
2020
// disk.
2121
#[derive(Debug, Clone, PartialEq, Eq)]
2222
pub struct Node {
23+
/// This node's index in the Merkle tree
2324
pub(crate) index: u64,
25+
/// Hash of the data in this node
2426
pub(crate) hash: Vec<u8>,
27+
/// Number of bytes in this [`Node::data`]
2528
pub(crate) length: u64,
29+
/// Index of this nodes parent
2630
pub(crate) parent: u64,
31+
/// Hypercore's data. Can be receieved after the rest of the node, so it's optional.
2732
pub(crate) data: Option<Vec<u8>>,
2833
pub(crate) blank: bool,
2934
}

src/common/peer.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Types needed for passing information with with peers.
22
//! hypercore-protocol-rs uses these types and wraps them
33
//! into wire messages.
4+
45
use crate::Node;
56

67
#[derive(Debug, Clone, PartialEq)]
@@ -20,7 +21,7 @@ pub struct RequestSeek {
2021
}
2122

2223
#[derive(Debug, Clone, PartialEq)]
23-
/// Request of a DataUpgrade from peer
24+
/// Request for a DataUpgrade from peer
2425
pub struct RequestUpgrade {
2526
/// Hypercore start index
2627
pub start: u64,
@@ -79,7 +80,7 @@ pub struct DataBlock {
7980
pub index: u64,
8081
/// Data block value in bytes
8182
pub value: Vec<u8>,
82-
/// TODO: document
83+
/// Nodes of the merkle tree
8384
pub nodes: Vec<Node>,
8485
}
8586

@@ -104,11 +105,11 @@ pub struct DataSeek {
104105
#[derive(Debug, Clone, PartialEq)]
105106
/// TODO: Document
106107
pub struct DataUpgrade {
107-
/// TODO: Document
108+
/// Starting block of this upgrade response
108109
pub start: u64,
109-
/// TODO: Document
110+
/// Number of blocks in this upgrade response
110111
pub length: u64,
111-
/// TODO: Document
112+
/// The nodes of the merkle tree
112113
pub nodes: Vec<Node>,
113114
/// TODO: Document
114115
pub additional_nodes: Vec<Node>,

src/core.rs

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ pub struct Hypercore {
4848
pub(crate) bitfield: Bitfield,
4949
skip_flush_count: u8, // autoFlush in Javascript
5050
header: Header,
51+
#[cfg(feature = "replication")]
52+
events: crate::replication::events::Events,
5153
}
5254

5355
/// Response from append, matches that of the Javascript result
54-
#[derive(Debug)]
56+
#[derive(Debug, PartialEq)]
5557
pub struct AppendOutcome {
5658
/// Length of the hypercore after append
5759
pub length: u64,
@@ -60,7 +62,7 @@ pub struct AppendOutcome {
6062
}
6163

6264
/// Info about the hypercore
63-
#[derive(Debug)]
65+
#[derive(Debug, PartialEq)]
6466
pub struct Info {
6567
/// Length of the hypercore
6668
pub length: u64,
@@ -247,6 +249,8 @@ impl Hypercore {
247249
bitfield,
248250
header,
249251
skip_flush_count: 0,
252+
#[cfg(feature = "replication")]
253+
events: crate::replication::events::Events::new(),
250254
})
251255
}
252256

@@ -321,6 +325,14 @@ impl Hypercore {
321325
if self.should_flush_bitfield_and_tree_and_oplog() {
322326
self.flush_bitfield_and_tree_and_oplog(false).await?;
323327
}
328+
329+
#[cfg(feature = "replication")]
330+
{
331+
let _ = self.events.send(crate::replication::events::DataUpgrade {});
332+
let _ = self
333+
.events
334+
.send(crate::replication::events::Have::from(&bitfield_update));
335+
}
324336
}
325337

326338
// Return the new value
@@ -330,10 +342,27 @@ impl Hypercore {
330342
})
331343
}
332344

345+
#[cfg(feature = "replication")]
346+
/// Subscribe to core events relevant to replication
347+
pub fn event_subscribe(&self) -> async_broadcast::Receiver<crate::replication::events::Event> {
348+
self.events.channel.new_receiver()
349+
}
350+
351+
/// Check if core has the block at the given `index` locally
352+
#[instrument(ret, skip(self))]
353+
pub fn has(&self, index: u64) -> bool {
354+
self.bitfield.get(index)
355+
}
356+
333357
/// Read value at given index, if any.
334358
#[instrument(err, skip(self))]
335359
pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>, HypercoreError> {
336360
if !self.bitfield.get(index) {
361+
#[cfg(feature = "replication")]
362+
// if not in this core, emit Event::Get(index)
363+
{
364+
self.events.send_on_get(index);
365+
}
337366
return Ok(None);
338367
}
339368

@@ -522,12 +551,12 @@ impl Hypercore {
522551
self.storage.flush_infos(&outcome.infos_to_flush).await?;
523552
self.header = outcome.header;
524553

525-
if let Some(bitfield_update) = bitfield_update {
554+
if let Some(bitfield_update) = &bitfield_update {
526555
// Write to bitfield
527-
self.bitfield.update(&bitfield_update);
556+
self.bitfield.update(bitfield_update);
528557

529558
// Contiguous length is known only now
530-
update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update);
559+
update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update);
531560
}
532561

533562
// Commit changeset to in-memory tree
@@ -537,6 +566,21 @@ impl Hypercore {
537566
if self.should_flush_bitfield_and_tree_and_oplog() {
538567
self.flush_bitfield_and_tree_and_oplog(false).await?;
539568
}
569+
570+
#[cfg(feature = "replication")]
571+
{
572+
if proof.upgrade.is_some() {
573+
// Notify replicator if we receieved an upgrade
574+
let _ = self.events.send(crate::replication::events::DataUpgrade {});
575+
}
576+
577+
// Notify replicator if we receieved a bitfield update
578+
if let Some(ref bitfield) = bitfield_update {
579+
let _ = self
580+
.events
581+
.send(crate::replication::events::Have::from(bitfield));
582+
}
583+
}
540584
Ok(true)
541585
}
542586

@@ -725,7 +769,7 @@ fn update_contiguous_length(
725769
}
726770

727771
#[cfg(test)]
728-
mod tests {
772+
pub(crate) mod tests {
729773
use super::*;
730774

731775
#[async_std::test]
@@ -1091,7 +1135,9 @@ mod tests {
10911135
Ok(())
10921136
}
10931137

1094-
async fn create_hypercore_with_data(length: u64) -> Result<Hypercore, HypercoreError> {
1138+
pub(crate) async fn create_hypercore_with_data(
1139+
length: u64,
1140+
) -> Result<Hypercore, HypercoreError> {
10951141
let signing_key = generate_signing_key();
10961142
create_hypercore_with_data_and_key_pair(
10971143
length,
@@ -1103,7 +1149,7 @@ mod tests {
11031149
.await
11041150
}
11051151

1106-
async fn create_hypercore_with_data_and_key_pair(
1152+
pub(crate) async fn create_hypercore_with_data_and_key_pair(
11071153
length: u64,
11081154
key_pair: PartialKeypair,
11091155
) -> Result<Hypercore, HypercoreError> {

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![forbid(unsafe_code, bad_style, future_incompatible)]
1+
#![forbid(unsafe_code, future_incompatible)]
22
#![forbid(rust_2018_idioms, rust_2018_compatibility)]
33
#![forbid(missing_debug_implementations)]
44
#![forbid(missing_docs)]
@@ -74,6 +74,8 @@
7474
7575
pub mod encoding;
7676
pub mod prelude;
77+
#[cfg(feature = "replication")]
78+
pub mod replication;
7779

7880
mod bitfield;
7981
mod builder;

0 commit comments

Comments
 (0)