Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dash-spv-ffi/include/dash_spv_ffi.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id,
bool is_ours,
void *user_data);

typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height,
uint32_t header_height,
double percentage,
void *user_data);

Comment on lines +152 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

New FilterHeadersProgress callback shape and placement look correct.

Signature matches Rust (u32, u32, f64, void*), and appending the field at the end of FFIEventCallbacks preserves ABI for rebuilt consumers.

Please also sync the generated header into the Swift SDK to keep it in lockstep:

Also applies to: 166-167


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Verify Swift header is in sync with generated one
GEN="dash-spv-ffi/include/dash_spv_ffi.h"
SWIFT="swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h"
if [[ -f "$GEN" && -f "$SWIFT" ]]; then
  if cmp -s "$GEN" "$SWIFT"; then
    echo "Headers are in sync ✅"
  else
    echo "Headers differ ❌"; diff -u "$SWIFT" "$GEN" | sed -n '1,200p'
    echo "Run ./sync-headers.sh to update."
    exit 1
  fi
else
  echo "One or both header paths missing. Please check repository layout."
  exit 1
fi

Length of output: 1048


Sync Swift header with generated header (run ./sync-headers.sh)

dash-spv-ffi/include/dash_spv_ffi.h adds the FilterHeadersProgress typedef and the on_filter_headers_progress field; swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h is out of sync — run ./sync-headers.sh to update. Signature matches Rust (u32, u32, f64, void*) and appending the field at the end of FFIEventCallbacks preserves ABI.

🤖 Prompt for AI Agents
In dash-spv-ffi/include/dash_spv_ffi.h around lines 152-156 the new typedef
FilterHeadersProgressCallback and the on_filter_headers_progress callback field
were added but the Swift copy
swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h is out of sync;
run ./sync-headers.sh to regenerate the Swift header so it includes the typedef
(uint32_t, uint32_t, double, void*) and append the on_filter_headers_progress
field at the end of the FFIEventCallbacks struct to preserve ABI compatibility.

typedef struct FFIEventCallbacks {
BlockCallback on_block;
TransactionCallback on_transaction;
Expand All @@ -158,6 +163,7 @@ typedef struct FFIEventCallbacks {
MempoolRemovedCallback on_mempool_transaction_removed;
CompactFilterMatchedCallback on_compact_filter_matched;
WalletTransactionCallback on_wallet_transaction;
FilterHeadersProgressCallback on_filter_headers_progress;
void *user_data;
} FFIEventCallbacks;

Expand Down
26 changes: 26 additions & 0 deletions dash-spv-ffi/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub type WalletTransactionCallback = Option<
),
>;

pub type FilterHeadersProgressCallback = Option<
extern "C" fn(filter_height: u32, header_height: u32, percentage: f64, user_data: *mut c_void),
>;

#[repr(C)]
pub struct FFIEventCallbacks {
pub on_block: BlockCallback,
Expand All @@ -145,6 +149,7 @@ pub struct FFIEventCallbacks {
pub on_mempool_transaction_removed: MempoolRemovedCallback,
pub on_compact_filter_matched: CompactFilterMatchedCallback,
pub on_wallet_transaction: WalletTransactionCallback,
pub on_filter_headers_progress: FilterHeadersProgressCallback,
pub user_data: *mut c_void,
}

Expand Down Expand Up @@ -173,6 +178,7 @@ impl Default for FFIEventCallbacks {
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data: std::ptr::null_mut(),
}
}
Expand Down Expand Up @@ -383,3 +389,23 @@ impl FFIEventCallbacks {
}
}
}

impl FFIEventCallbacks {
pub fn call_filter_headers_progress(
&self,
filter_height: u32,
header_height: u32,
percentage: f64,
) {
if let Some(callback) = self.on_filter_headers_progress {
tracing::info!(
"📊 Calling filter headers progress callback: filter_height={}, header_height={}, pct={:.2}",
filter_height, header_height, percentage
);
callback(filter_height, header_height, percentage, self.user_data);
tracing::info!("✅ Filter headers progress callback completed");
} else {
tracing::debug!("Filter headers progress callback not set");
}
}
}
22 changes: 22 additions & 0 deletions dash-spv-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ impl FFIDashSpvClient {
confirmed, unconfirmed, total);
callbacks.call_balance_update(confirmed, unconfirmed);
}
dash_spv::types::SpvEvent::FilterHeadersProgress { filter_header_height, header_height, percentage } => {
tracing::info!("📊 Filter headers progress event: filter={}, header={}, pct={:.2}",
filter_header_height, header_height, percentage);
callbacks
.call_filter_headers_progress(
filter_header_height,
header_height,
percentage,
);
}
dash_spv::types::SpvEvent::TransactionDetected { ref txid, confirmed, ref addresses, amount, block_height, .. } => {
tracing::info!("💸 Transaction detected: txid={}, confirmed={}, amount={}, addresses={:?}, height={:?}",
txid, confirmed, amount, addresses, block_height);
Expand Down Expand Up @@ -717,6 +727,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
let inner = client.inner.clone();
let runtime = client.runtime.clone();
let sync_callbacks = client.sync_callbacks.clone();
// Shared flag to coordinate internal threads during sync
let sync_running = Arc::new(AtomicBool::new(true));

// Take progress receiver from client
let progress_receiver = {
Expand Down Expand Up @@ -772,6 +784,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
// Spawn sync task in a separate thread with safe callback access
let runtime_handle = runtime.handle().clone();
let sync_callbacks_clone = sync_callbacks.clone();
let sync_running_for_join = sync_running.clone();
let sync_handle = std::thread::spawn(move || {
// Run monitoring loop
let monitor_result = runtime_handle.block_on(async move {
Expand All @@ -792,6 +805,9 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
res
});

// Signal background handlers to stop
sync_running_for_join.store(false, Ordering::Relaxed);

// Send completion callback and cleanup
{
let mut cb_guard = sync_callbacks_clone.lock().unwrap();
Expand Down Expand Up @@ -843,6 +859,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
FFIErrorCode::Success as i32
}

// Note: filter headers progress is forwarded via FFIEventCallbacks.on_filter_headers_progress

/// Cancels the sync operation.
///
/// **Note**: This function currently only stops the SPV client and clears sync callbacks,
Expand Down Expand Up @@ -1021,6 +1039,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_set_event_callbacks(
tracing::info!(" Block callback: {}", callbacks.on_block.is_some());
tracing::info!(" Transaction callback: {}", callbacks.on_transaction.is_some());
tracing::info!(" Balance update callback: {}", callbacks.on_balance_update.is_some());
tracing::info!(
" Filter headers progress callback: {}",
callbacks.on_filter_headers_progress.is_some()
);

let mut event_callbacks = client.event_callbacks.lock().unwrap();
*event_callbacks = callbacks;
Expand Down
8 changes: 7 additions & 1 deletion dash-spv-ffi/tests/integration/test_full_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ mod tests {
on_block: Some(on_block),
on_transaction: Some(on_transaction),
on_balance_update: Some(on_balance),
on_mempool_transaction_added: None,
on_mempool_transaction_confirmed: None,
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data: &ctx as *const _ as *mut c_void,
};

Expand Down Expand Up @@ -536,4 +542,4 @@ mod tests {
ctx.cleanup();
}
}
}
}
2 changes: 2 additions & 0 deletions dash-spv-ffi/tests/test_event_callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn test_event_callbacks_setup() {
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data,
};

Expand Down Expand Up @@ -266,6 +267,7 @@ fn test_enhanced_event_callbacks() {
on_mempool_transaction_removed: None,
on_compact_filter_matched: Some(test_compact_filter_matched_callback),
on_wallet_transaction: Some(test_wallet_transaction_callback),
on_filter_headers_progress: None,
user_data: Arc::as_ptr(&event_data) as *mut c_void,
};

Expand Down
1 change: 1 addition & 0 deletions dash-spv-ffi/tests/unit/test_async_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ mod tests {
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data: &event_data as *const _ as *mut c_void,
};

Expand Down
19 changes: 19 additions & 0 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,25 @@ impl<
{
tracing::error!("Sequential sync manager error handling message: {}", e);
}

// Additionally forward compact filters to the block processor so it can
// perform wallet matching and emit CompactFilterMatched events.
if let NetworkMessage::CFilter(ref cfilter_msg) = message {
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
let task = crate::client::BlockProcessingTask::ProcessCompactFilter {
filter: dashcore::bip158::BlockFilter {
content: cfilter_msg.filter.clone(),
},
block_hash: cfilter_msg.block_hash,
response_tx,
};
if let Err(e) = self.block_processor_tx.send(task) {
tracing::warn!(
"Failed to forward CFilter to block processor for event emission: {}",
e
);
}
}
}
NetworkMessage::Block(_) => {
// Blocks can be large - avoid cloning unless necessary
Expand Down
51 changes: 47 additions & 4 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,10 @@ impl<
// Track masternode sync completion for ChainLock validation
let mut masternode_engine_updated = false;

// Last emitted heights for filter headers progress to avoid duplicate events
let mut last_emitted_header_height: u32 = 0;
let mut last_emitted_filter_header_height: u32 = 0;

loop {
// Check if we should stop
let running = self.running.read().await;
Expand Down Expand Up @@ -862,10 +866,14 @@ impl<

// Emit detailed progress update
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
let current_height = {
// Storage tip is the headers vector index (0-based).
let current_storage_tip = {
let storage = self.storage.lock().await;
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
};
// Convert to absolute blockchain height: base + storage_tip
let sync_base_height = { self.state.read().await.sync_base_height };
let current_height = sync_base_height + current_storage_tip;
let peer_best = self
.network
.get_peer_best_height()
Expand All @@ -875,9 +883,9 @@ impl<
.unwrap_or(current_height);

// Calculate headers downloaded this second
if current_height > last_height {
headers_this_second = current_height - last_height;
last_height = current_height;
if current_storage_tip > last_height {
headers_this_second = current_storage_tip - last_height;
last_height = current_storage_tip;
}

let headers_per_second = headers_this_second as f64;
Expand Down Expand Up @@ -928,6 +936,34 @@ impl<
last_rate_calc = Instant::now();
}

// Emit filter headers progress only when heights change
let (abs_header_height, filter_header_height) = {
let storage = self.storage.lock().await;
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
let filter_tip =
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
};
if abs_header_height != last_emitted_header_height
|| filter_header_height != last_emitted_filter_header_height
{
if abs_header_height > 0 {
let pct = if filter_header_height <= abs_header_height {
(filter_header_height as f64 / abs_header_height as f64 * 100.0)
.min(100.0)
} else {
0.0
};
self.emit_event(SpvEvent::FilterHeadersProgress {
filter_header_height,
header_height: abs_header_height,
percentage: pct,
});
}
last_emitted_header_height = abs_header_height;
last_emitted_filter_header_height = filter_header_height;
}

last_status_update = Instant::now();
}

Expand Down Expand Up @@ -2374,6 +2410,13 @@ impl<
}
}

tracing::debug!(
"get_stats: header_height={}, filter_height={}, peers={}",
stats.header_height,
stats.filter_height,
stats.connected_peers
);

Ok(stats)
}

Expand Down
Loading
Loading