Skip to content

Commit fdea0e1

Browse files
committed
Merge branch 'feat/dash-spv-client-interface' into fix/reverse-hash
2 parents 1d765dd + dc5fed4 commit fdea0e1

File tree

14 files changed

+386
-52
lines changed

14 files changed

+386
-52
lines changed

dash-spv-ffi/src/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,8 +889,13 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
889889
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
890890
let run_token = shutdown_token_sync.clone();
891891
let (abort_handle, abort_registration) = AbortHandle::new_pair();
892+
<<<<<<< HEAD
892893
let mut run_future = Box::pin(Abortable::new(
893894
spv_client.run(command_receiver, run_token),
895+
=======
896+
let mut monitor_future = Box::pin(Abortable::new(
897+
spv_client.monitor_network(command_receiver, run_token),
898+
>>>>>>> feat/dash-spv-client-interface
894899
abort_registration,
895900
));
896901
let result = tokio::select! {

dash-spv/examples/filter_sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4848
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
4949
let shutdown_token = CancellationToken::new();
5050

51-
client.run_until_shutdown(command_receiver, shutdown_token).await?;
51+
client.run(command_receiver, shutdown_token).await?;
5252

5353
println!("Done!");
5454
Ok(())

dash-spv/examples/simple_sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5151
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
5252
let shutdown_token = CancellationToken::new();
5353

54-
client.run_until_shutdown(command_receiver, shutdown_token).await?;
54+
client.run(command_receiver, shutdown_token).await?;
5555

5656
println!("Done!");
5757
Ok(())

dash-spv/examples/spv_with_wallet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5252
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
5353
let shutdown_token = CancellationToken::new();
5454

55-
client.run_until_shutdown(command_receiver, shutdown_token).await?;
55+
client.run(command_receiver, shutdown_token).await?;
5656

5757
println!("Done!");
5858
Ok(())

dash-spv/src/client/sync_coordinator.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ impl<
6868
///
6969
/// This is the sole network message receiver to prevent race conditions.
7070
/// All sync operations coordinate through this monitoring loop.
71-
pub async fn run(
71+
pub async fn monitor_network(
7272
&mut self,
73-
mut receiver: UnboundedReceiver<DashSpvClientCommand>,
73+
mut command_receiver: UnboundedReceiver<DashSpvClientCommand>,
7474
token: CancellationToken,
7575
) -> Result<()> {
7676
let running = self.running.read().await;
@@ -482,7 +482,7 @@ impl<
482482
}
483483

484484
tokio::select! {
485-
received = receiver.recv() => {
485+
received = command_receiver.recv() => {
486486
match received {
487487
None => {tracing::warn!("DashSpvClientCommand channel closed.");},
488488
Some(command) => {
@@ -570,15 +570,15 @@ impl<
570570
Ok(())
571571
}
572572

573-
pub async fn run_until_shutdown(
573+
pub async fn run(
574574
mut self,
575-
receiver: UnboundedReceiver<DashSpvClientCommand>,
575+
command_receiver: UnboundedReceiver<DashSpvClientCommand>,
576576
shutdown_token: CancellationToken,
577577
) -> Result<()> {
578578
let client_token = shutdown_token.clone();
579579

580580
let client_task = tokio::spawn(async move {
581-
let result = self.run(receiver, client_token).await;
581+
let result = self.monitor_network(command_receiver, client_token).await;
582582
if let Err(e) = &result {
583583
tracing::error!("Error running client: {}", e);
584584
}
@@ -589,7 +589,9 @@ impl<
589589
});
590590

591591
let shutdown_task = tokio::spawn(async move {
592-
let _ = tokio::signal::ctrl_c().await;
592+
if let Err(e) = tokio::signal::ctrl_c().await {
593+
tracing::error!("Error waiting for ctrl_c: {}", e);
594+
}
593595
tracing::debug!("Shutdown signal received");
594596
shutdown_token.cancel();
595597
});

dash-spv/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ async fn run_client<S: dash_spv::storage::StorageManager + Send + Sync + 'static
633633
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
634634
let shutdown_token = CancellationToken::new();
635635

636-
client.run_until_shutdown(command_receiver, shutdown_token).await?;
636+
client.run(command_receiver, shutdown_token).await?;
637637

638638
Ok(())
639639
}

dash-spv/src/storage/disk/filters.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::ops::Range;
55
use dashcore::hash_types::FilterHeader;
66
use dashcore_hashes::Hash;
77

8-
use crate::error::StorageResult;
8+
use crate::error::{StorageError, StorageResult};
9+
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
910

1011
use super::manager::DiskStorageManager;
1112
use super::segments::SegmentState;
@@ -216,6 +217,17 @@ impl DiskStorageManager {
216217
}
217218
tokio::fs::create_dir_all(&filters_dir).await?;
218219

220+
// Remove trusted checkpoint predecessor filter header metadata if present
221+
let metadata_path =
222+
self.base_path.join(format!("state/{}.dat", CHECKPOINT_PREV_FILTER_HEADER_KEY));
223+
if metadata_path.exists() {
224+
if let Err(e) = tokio::fs::remove_file(&metadata_path).await {
225+
if e.kind() != std::io::ErrorKind::NotFound {
226+
return Err(StorageError::Io(e));
227+
}
228+
}
229+
}
230+
219231
// Restart background worker for future operations
220232
self.start_worker().await;
221233

dash-spv/src/storage/memory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::ops::Range;
77
use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid};
88

99
use crate::error::{StorageError, StorageResult};
10+
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
1011
use crate::storage::{MasternodeState, StorageManager, StorageStats};
1112
use crate::types::{ChainState, MempoolState, UnconfirmedTransaction};
1213

@@ -310,6 +311,7 @@ impl StorageManager for MemoryStorageManager {
310311
async fn clear_filters(&mut self) -> StorageResult<()> {
311312
self.filter_headers.clear();
312313
self.filters.clear();
314+
self.metadata.remove(CHECKPOINT_PREV_FILTER_HEADER_KEY);
313315
Ok(())
314316
}
315317

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
//! Common metadata keys stored by storage backends.
2+
3+
/// Metadata key storing the filter header for the block immediately before a trusted checkpoint.
4+
pub const CHECKPOINT_PREV_FILTER_HEADER_KEY: &str = "checkpoint_prev_filter_header_v1";

dash-spv/src/storage/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
pub mod disk;
44
pub mod memory;
5+
pub mod metadata_keys;
56
pub mod sync_state;
67
pub mod sync_storage;
78
pub mod types;

0 commit comments

Comments
 (0)