Skip to content

Commit d6dd673

Browse files
committed
Move shared_core to own module and add as feature
1 parent b52b25a commit d6dd673

File tree

3 files changed

+242
-227
lines changed

3 files changed

+242
-227
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
6161
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
6262

6363
[features]
64-
default = ["tokio", "sparse"]
65-
replication = ["dep:async-broadcast", "dep:async-lock"]
64+
default = ["tokio", "sparse", "replication"]
65+
replication = ["dep:async-broadcast"]
66+
shared-core = ["replication", "dep:async-lock"]
6667
sparse = ["random-access-disk/sparse"]
6768
tokio = ["random-access-disk/tokio"]
6869
async-std = ["random-access-disk/async-std"]

src/replication/mod.rs

Lines changed: 7 additions & 225 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,20 @@
11
//! External interface for replication
22
pub mod events;
3+
#[cfg(feature = "shared-core")]
4+
pub mod shared_core;
5+
6+
#[cfg(feature = "shared-core")]
7+
pub use shared_core::SharedCore;
38

49
use crate::{
5-
AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair, Proof, RequestBlock,
6-
RequestSeek, RequestUpgrade,
10+
AppendOutcome, HypercoreError, Info, PartialKeypair, Proof, RequestBlock, RequestSeek,
11+
RequestUpgrade,
712
};
813

914
pub use events::Event;
1015

1116
use async_broadcast::Receiver;
12-
use async_lock::Mutex;
13-
1417
use std::future::Future;
15-
use std::sync::Arc;
16-
/// Hypercore that can have multiple owners
17-
#[derive(Debug, Clone)]
18-
pub struct SharedCore(pub Arc<Mutex<Hypercore>>);
19-
20-
impl From<Hypercore> for SharedCore {
21-
fn from(core: Hypercore) -> Self {
22-
SharedCore(Arc::new(Mutex::new(core)))
23-
}
24-
}
25-
impl SharedCore {
26-
/// Create a shared core from a [`Hypercore`]
27-
pub fn from_hypercore(core: Hypercore) -> Self {
28-
SharedCore(Arc::new(Mutex::new(core)))
29-
}
30-
}
3118

3219
/// Methods related to just this core's information
3320
pub trait CoreInfo {
@@ -37,22 +24,6 @@ pub trait CoreInfo {
3724
fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send;
3825
}
3926

40-
impl CoreInfo for SharedCore {
41-
fn info(&self) -> impl Future<Output = Info> + Send {
42-
async move {
43-
let core = &self.0.lock().await;
44-
core.info()
45-
}
46-
}
47-
48-
fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send {
49-
async move {
50-
let core = &self.0.lock().await;
51-
core.key_pair().clone()
52-
}
53-
}
54-
}
55-
5627
/// Error for ReplicationMethods trait
5728
#[derive(thiserror::Error, Debug)]
5829
pub enum ReplicationMethodsError {
@@ -85,45 +56,6 @@ pub trait ReplicationMethods: CoreInfo + Send {
8556
fn event_subscribe(&self) -> impl Future<Output = Receiver<Event>>;
8657
}
8758

88-
impl ReplicationMethods for SharedCore {
89-
fn verify_and_apply_proof(
90-
&self,
91-
proof: &Proof,
92-
) -> impl Future<Output = Result<bool, ReplicationMethodsError>> {
93-
async move {
94-
let mut core = self.0.lock().await;
95-
Ok(core.verify_and_apply_proof(proof).await?)
96-
}
97-
}
98-
99-
fn missing_nodes(
100-
&self,
101-
index: u64,
102-
) -> impl Future<Output = Result<u64, ReplicationMethodsError>> {
103-
async move {
104-
let mut core = self.0.lock().await;
105-
Ok(core.missing_nodes(index).await?)
106-
}
107-
}
108-
109-
fn create_proof(
110-
&self,
111-
block: Option<RequestBlock>,
112-
hash: Option<RequestBlock>,
113-
seek: Option<RequestSeek>,
114-
upgrade: Option<RequestUpgrade>,
115-
) -> impl Future<Output = Result<Option<Proof>, ReplicationMethodsError>> {
116-
async move {
117-
let mut core = self.0.lock().await;
118-
Ok(core.create_proof(block, hash, seek, upgrade).await?)
119-
}
120-
}
121-
122-
fn event_subscribe(&self) -> impl Future<Output = Receiver<Event>> {
123-
async move { self.0.lock().await.event_subscribe() }
124-
}
125-
}
126-
12759
/// Error for ReplicationMethods trait
12860
#[derive(thiserror::Error, Debug)]
12961
pub enum CoreMethodsError {
@@ -156,153 +88,3 @@ pub trait CoreMethods: CoreInfo {
15688
batch: B,
15789
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send;
15890
}
159-
160-
impl CoreMethods for SharedCore {
161-
fn has(&self, index: u64) -> impl Future<Output = bool> + Send {
162-
async move {
163-
let core = self.0.lock().await;
164-
core.has(index)
165-
}
166-
}
167-
fn get(
168-
&self,
169-
index: u64,
170-
) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send {
171-
async move {
172-
let mut core = self.0.lock().await;
173-
Ok(core.get(index).await?)
174-
}
175-
}
176-
177-
fn append(
178-
&self,
179-
data: &[u8],
180-
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send {
181-
async move {
182-
let mut core = self.0.lock().await;
183-
Ok(core.append(data).await?)
184-
}
185-
}
186-
187-
fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>(
188-
&self,
189-
batch: B,
190-
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send {
191-
async move {
192-
let mut core = self.0.lock().await;
193-
Ok(core.append_batch(batch).await?)
194-
}
195-
}
196-
}
197-
198-
#[cfg(test)]
199-
mod tests {
200-
use events::{Get, Have};
201-
202-
use super::*;
203-
204-
#[async_std::test]
205-
async fn shared_core_methods() -> Result<(), CoreMethodsError> {
206-
let core = crate::core::tests::create_hypercore_with_data(0).await?;
207-
let core = SharedCore::from(core);
208-
209-
let info = core.info().await;
210-
assert_eq!(
211-
info,
212-
crate::core::Info {
213-
length: 0,
214-
byte_length: 0,
215-
contiguous_length: 0,
216-
fork: 0,
217-
writeable: true,
218-
}
219-
);
220-
221-
// key_pair is random, nothing to test here
222-
let _kp = core.key_pair().await;
223-
224-
assert_eq!(core.has(0).await, false);
225-
assert_eq!(core.get(0).await?, None);
226-
let res = core.append(b"foo").await?;
227-
assert_eq!(
228-
res,
229-
AppendOutcome {
230-
length: 1,
231-
byte_length: 3
232-
}
233-
);
234-
assert_eq!(core.has(0).await, true);
235-
assert_eq!(core.get(0).await?, Some(b"foo".into()));
236-
let res = core.append_batch([b"hello", b"world"]).await?;
237-
assert_eq!(
238-
res,
239-
AppendOutcome {
240-
length: 3,
241-
byte_length: 13
242-
}
243-
);
244-
assert_eq!(core.has(2).await, true);
245-
assert_eq!(core.get(2).await?, Some(b"world".into()));
246-
Ok(())
247-
}
248-
249-
#[async_std::test]
250-
async fn test_events() -> Result<(), CoreMethodsError> {
251-
let core = crate::core::tests::create_hypercore_with_data(0).await?;
252-
let core = SharedCore::from(core);
253-
254-
// Check that appending data emits a DataUpgrade and Have event
255-
256-
let mut rx = core.event_subscribe().await;
257-
let handle = async_std::task::spawn(async move {
258-
let mut out = vec![];
259-
loop {
260-
if out.len() == 2 {
261-
return (out, rx);
262-
}
263-
if let Ok(evt) = rx.recv().await {
264-
out.push(evt);
265-
}
266-
}
267-
});
268-
core.append(b"foo").await?;
269-
let (res, mut rx) = handle.await;
270-
assert!(matches!(res[0], Event::DataUpgrade(_)));
271-
assert!(matches!(
272-
res[1],
273-
Event::Have(Have {
274-
start: 0,
275-
length: 1,
276-
drop: false
277-
})
278-
));
279-
// no messages in queue
280-
assert!(rx.is_empty());
281-
282-
// Check that Hypercore::get for missing data emits a Get event
283-
284-
let handle = async_std::task::spawn(async move {
285-
let mut out = vec![];
286-
loop {
287-
if out.len() == 1 {
288-
return (out, rx);
289-
}
290-
if let Ok(evt) = rx.recv().await {
291-
out.push(evt);
292-
}
293-
}
294-
});
295-
assert_eq!(core.get(1).await?, None);
296-
let (res, rx) = handle.await;
297-
assert!(matches!(
298-
res[0],
299-
Event::Get(Get {
300-
index: 1,
301-
get_result: _
302-
})
303-
));
304-
// no messages in queue
305-
assert!(rx.is_empty());
306-
Ok(())
307-
}
308-
}

0 commit comments

Comments
 (0)