Skip to content
This repository was archived by the owner on Sep 12, 2025. It is now read-only.

Commit a877707

Browse files
committed
Initial dedicated intergrating testing app
Added tests over passes of fixed length, including turning off receiver and transmitter. Added Terminate API for shutting down listener Iterating on fixing multi-pass scenario More fixing attempts
1 parent 3427eac commit a877707

File tree

13 files changed

+937
-10
lines changed

13 files changed

+937
-10
lines changed

Cargo.lock

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ members = [
77
"local-storage",
88
"messages",
99
"myceli",
10+
"testing/testing-scenarios",
1011
"transports"
1112
]
1213

@@ -57,4 +58,5 @@ tracing-subscriber = { version = "0.3.14", default-features = false, features =
5758
ipfs-unixfs = { path = "ipfs-unixfs" }
5859
local-storage = { path = "local-storage" }
5960
messages = { path = "messages" }
61+
myceli = { path = "myceli" }
6062
transports = { path = "transports" }

local-storage/src/provider.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,9 @@ impl StorageProvider for SqliteStorageProvider {
262262
WITH RECURSIVE cids(x) AS (
263263
VALUES(?1)
264264
UNION
265-
SELECT block_cid FROM links JOIN cids ON root_cid=x
265+
SELECT block_cid FROM links JOIN cids ON root_cid=x WHERE block_id IS NOT null
266266
)
267-
SELECT x FROM cids;
267+
SELECT cid FROM blocks WHERE cid in cids
268268
",
269269
)?
270270
.query_map([cid], |row| {
@@ -540,4 +540,41 @@ pub mod tests {
540540
0
541541
);
542542
}
543+
544+
#[test]
545+
pub fn test_verify_get_all_cids() {
546+
let harness = TestHarness::new();
547+
548+
let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00"));
549+
let cid_str = cid.to_string();
550+
let block_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11"));
551+
let child_cid_str = block_cid.to_string();
552+
553+
let other_child_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11"));
554+
555+
let block = StoredBlock {
556+
cid: cid_str.to_string(),
557+
data: vec![],
558+
links: vec![block_cid.to_string(), other_child_cid.to_string()],
559+
};
560+
561+
let child_block = StoredBlock {
562+
cid: block_cid.to_string(),
563+
data: b"101293910101".to_vec(),
564+
links: vec![],
565+
};
566+
567+
harness.provider.import_block(&block).unwrap();
568+
569+
let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap();
570+
assert_eq!(dag_cids, vec![cid_str.to_string()]);
571+
572+
harness.provider.import_block(&child_block).unwrap();
573+
574+
let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap();
575+
assert_eq!(
576+
dag_cids,
577+
vec![cid_str.to_string(), child_cid_str.to_string()]
578+
);
579+
}
543580
}

local-storage/src/storage.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl Storage {
3838
Ok(blocks)
3939
});
4040
let blocks = blocks?;
41+
info!("FileBuilder found {} blocks in {path:?}", blocks.len());
4142
let mut root_cid: Option<String> = None;
4243

4344
blocks.iter().for_each(|b| {
@@ -65,6 +66,7 @@ impl Storage {
6566
if blocks.len() == 1 {
6667
if let Some(first) = blocks.first() {
6768
root_cid = Some(first.cid().to_string());
69+
info!("set final root {root_cid:?}");
6870
}
6971
}
7072
if let Some(root_cid) = root_cid {
@@ -139,6 +141,29 @@ impl Storage {
139141
self.provider
140142
.get_dag_blocks_by_window(cid, offset, window_size)
141143
}
144+
145+
pub fn get_last_dag_cid(&self, cid: &str) -> Result<String> {
146+
let dag_cids = self.get_all_dag_cids(cid)?;
147+
match dag_cids.last() {
148+
Some(cid) => Ok(cid.to_owned()),
149+
None => bail!("No last cid found for dag {cid}"),
150+
}
151+
}
152+
153+
// Given a root CID, a number of CIDs, approximate the window we should be in
154+
// pub fn find_dag_window(&self, root: &str, cid_count: u32, window_size: u32) -> Result<u32> {
155+
156+
// let all_cids = self.get_all_dag_cids(root)?;
157+
// let chunks = all_cids.chunks(window_size as usize);
158+
// let mut window_num = 0;
159+
// for c in chunks {
160+
// if c.contains(&child.to_string()) {
161+
// return Ok(window_num);
162+
// }
163+
// window_num += 1;
164+
// }
165+
// bail!("Failed to find child cid {child} in dag {root}");
166+
// }
142167
}
143168

144169
#[cfg(test)]
@@ -169,6 +194,45 @@ pub mod tests {
169194
}
170195
}
171196

197+
fn generate_stored_blocks(num_blocks: u16) -> Result<Vec<StoredBlock>> {
198+
const CHUNK_SIZE: u16 = 20;
199+
let data_size = CHUNK_SIZE * num_blocks;
200+
let mut data = Vec::<u8>::new();
201+
data.resize(data_size.into(), 1);
202+
thread_rng().fill_bytes(&mut data);
203+
204+
let rt = tokio::runtime::Runtime::new().unwrap();
205+
let blocks = rt.block_on(async {
206+
let file: File = FileBuilder::new()
207+
.content_bytes(data)
208+
.name("testfile")
209+
.fixed_chunker(CHUNK_SIZE.into())
210+
.build()
211+
.await
212+
.unwrap();
213+
let blocks: Vec<_> = file.encode().await.unwrap().try_collect().await.unwrap();
214+
blocks
215+
});
216+
let mut stored_blocks = vec![];
217+
218+
blocks.iter().for_each(|b| {
219+
let links = b
220+
.links()
221+
.iter()
222+
.map(|c| c.to_string())
223+
.collect::<Vec<String>>();
224+
let stored = StoredBlock {
225+
cid: b.cid().to_string(),
226+
data: b.data().to_vec(),
227+
links,
228+
};
229+
230+
stored_blocks.push(stored);
231+
});
232+
233+
Ok(stored_blocks)
234+
}
235+
172236
#[test]
173237
pub fn test_import_path_to_storage() {
174238
let harness = TestHarness::new();
@@ -289,6 +353,38 @@ pub mod tests {
289353
assert_eq!(blocks.len(), cids.len());
290354
}
291355

356+
#[test]
357+
pub fn test_get_all_dag_cids() {
358+
let harness = TestHarness::new();
359+
360+
let mut dag_blocks = generate_stored_blocks(50).unwrap();
361+
let total_block_count = dag_blocks.len();
362+
363+
let root = dag_blocks.pop().unwrap();
364+
365+
harness.storage.import_block(&root).unwrap();
366+
367+
let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
368+
assert_eq!(dag_cids.len(), 1);
369+
370+
for _ in (1..10) {
371+
harness
372+
.storage
373+
.import_block(&dag_blocks.pop().unwrap())
374+
.unwrap();
375+
}
376+
377+
let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
378+
assert_eq!(dag_cids.len(), 10);
379+
380+
while let Some(block) = dag_blocks.pop() {
381+
harness.storage.import_block(&block).unwrap()
382+
}
383+
384+
let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
385+
assert_eq!(dag_cids.len(), total_block_count);
386+
}
387+
292388
// TODO: duplicated data is not being handled correctly right now, need to fix this
293389
// #[test]
294390
// pub fn export_from_storage_various_file_sizes_duplicated_data() {

messages/src/api.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,22 @@ pub enum ApplicationAPI {
7171
},
7272
// Resumes the transmission of all dags which may be paused
7373
ResumeTransmitAllDags,
74+
// Resumes the transmission of a dag from a prior session, given the last received CID
75+
// for determining where to restart the transmission
76+
ResumePriorDagTransmit {
77+
cid: String,
78+
num_received_cids: u32,
79+
retries: u8,
80+
},
7481
/// Listens on address for data and writes out files received
7582
Receive {
7683
listen_addr: String,
7784
},
85+
/// Commands a node to request another node at target_addr to resume dag transfer
86+
RequestResumeDagTransfer {
87+
cid: String,
88+
target_addr: String,
89+
},
7890
/// Request Available Blocks
7991
RequestAvailableBlocks,
8092
/// Advertise all available blocks by CID
@@ -103,6 +115,8 @@ pub enum ApplicationAPI {
103115
Version {
104116
version: String,
105117
},
118+
/// Asks IPFS instance to terminate
119+
Terminate,
106120
// TODO: Implement later
107121
// Information about the next pass used for calculating
108122
// data transfer parameters

messages/src/protocol.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ pub enum DataProtocol {
4444
target_addr: String,
4545
retries: u8,
4646
},
47+
// Resumes the transmission of a dag which isn't currently tracked in sessions
48+
// This accounts for resuming after restarting of transmitter
49+
ResumePriorDagTransmit {
50+
cid: String,
51+
num_received_cids: u32,
52+
target_addr: String,
53+
retries: u8,
54+
},
4755
// Resumes the transmission of a dag which may have run out of retries or
4856
// paused due to connectivity lost
4957
ResumeTransmitDag {
@@ -64,4 +72,6 @@ pub enum DataProtocol {
6472
cid: String,
6573
blocks: Vec<String>,
6674
},
75+
// Used by listener to terminate shipper on program exit
76+
Terminate,
6777
}

myceli/src/handlers.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::Result;
1+
use anyhow::{bail, Result};
22
use local_storage::storage::Storage;
33
use messages::{ApplicationAPI, DataProtocol, Message};
44
use std::path::PathBuf;
@@ -63,6 +63,11 @@ pub fn get_missing_dag_blocks_window_protocol(
6363
}))
6464
}
6565

66+
pub fn get_last_dag_cid(cid: &str, storage: Rc<Storage>) -> Result<String> {
67+
let last_dag_cid = storage.get_last_dag_cid(cid)?;
68+
Ok(last_dag_cid)
69+
}
70+
6671
#[cfg(test)]
6772
pub mod tests {
6873
use super::*;

0 commit comments

Comments
 (0)