Skip to content

Commit 48154b8

Browse files
committed
wallet: check blockfilters in parallel
This commit implements parallel block filter checking using the wallet threadpool. The main thread reads block hashes and queues them while worker threads check filters in parallel. Synchronization: - Operations requiring cs_wallet (GetLastBlockHeight, SyncTransaction) remain on the main thread since cs_wallet is a RecursiveMutex and ScanForWalletTransactions is called from AttachChain which locks cs_wallet - Main thread uses ThreadPool::ProcessTask() to join workers when the block queue is full, avoiding busy-waiting Batching: - Up to m_max_blockqueue_size (1000) blocks are queued for filtering - When queue is full, main loop processes filtered blocks before reading more Thread safety: - All futures (at most `workers_count`) are waited on before returning to avoid data races on `FastWalletRescanFilter::m_filter_set`. Benchmarks show considerable improvement (approx 5x with 16 threads).
1 parent 555ea01 commit 48154b8

File tree

2 files changed

+90
-17
lines changed

2 files changed

+90
-17
lines changed

src/wallet/scan.cpp

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include <wallet/scan.h>
99
#include <wallet/wallet.h>
1010

11+
#include <algorithm>
12+
#include <future>
13+
1114
using interfaces::FoundBlock;
1215

1316
namespace wallet {
@@ -130,30 +133,100 @@ std::optional<std::pair<uint256, int>> ChainScanner::ReadNextBlock(ScanResult& r
130133
return std::make_pair(block_hash, block_height);
131134
}
132135

133-
std::optional<std::pair<size_t, size_t>> ChainScanner::ReadNextBlocks(const std::unique_ptr<FastWalletRescanFilter>& filter, ScanResult& result) {
134-
auto next_block = ReadNextBlock(result);
135-
if (next_block) m_blocks.emplace_back(*next_block);
136-
else return std::nullopt;
136+
enum FilterRes {
137+
FILTER_NO_MATCH,
138+
FILTER_MATCH,
139+
FILTER_NO_FILTER
140+
};
137141

142+
std::optional<std::pair<size_t, size_t>> ChainScanner::ReadNextBlocks(const std::unique_ptr<FastWalletRescanFilter>& filter, ScanResult& result) {
138143
if (!filter) {
139-
// Slow scan: scan all blocks.
140-
return std::make_pair<size_t, size_t>(0, 1);
144+
// Slow scan: no filter, scan all blocks
145+
auto next_block = ReadNextBlock(result);
146+
if (next_block) m_blocks.emplace_back(*next_block);
147+
return std::make_pair<size_t, size_t>(0, m_blocks.size());
141148
}
142-
143149
filter->UpdateIfNeeded();
150+
auto* thread_pool = m_wallet.m_thread_pool;
151+
// ThreadPool pointer should never be null here
152+
// during normal operation because it should
153+
// have been passed from the WalletContext to
154+
// the CWallet constructor
155+
assert(thread_pool != nullptr);
156+
std::optional<std::pair<size_t, size_t>> range;
157+
size_t workers_count = thread_pool->WorkersCount();
158+
size_t i = 0;
159+
size_t completed = 0;
160+
std::vector<std::future<FilterRes>> futures;
161+
futures.reserve(workers_count);
162+
163+
while ((m_next_block && completed < m_max_blockqueue_size) || i < m_blocks.size() || !futures.empty()) {
164+
const bool result_ready = futures.size() > 0 && futures[0].wait_for(std::chrono::seconds::zero()) == std::future_status::ready;
165+
if (result_ready) {
166+
const auto& scan_res{futures[0].get()};
167+
completed++;
168+
futures.erase(futures.begin());
169+
if (scan_res == FILTER_NO_MATCH) {
170+
if (range.has_value()) {
171+
break;
172+
}
173+
continue;
174+
}
175+
int current_block_index = completed - 1;
176+
if (scan_res == FilterRes::FILTER_MATCH) {
177+
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (filter matched)\n", m_blocks[current_block_index].second, m_blocks[current_block_index].first.ToString());
178+
} else { // FILTER_NO_FILTER
179+
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (WARNING: block filter not found!)\n", m_blocks[current_block_index].second, m_blocks[current_block_index].first.ToString());
180+
}
144181

145-
const auto& [block_hash, block_height] = m_blocks[0];
146-
auto matches_block{filter->MatchesBlock(block_hash)};
182+
if (!range.has_value()) range = std::make_pair(current_block_index, current_block_index + 1);
183+
else range->second = current_block_index + 1;
184+
}
185+
186+
// Submit jobs to the threadpool in batches of at most `workers_count` size.
187+
// This prevents over-submission: if we queued all jobs upfront and the filtered
188+
// block range is smaller than expected, worker threads would process blocks
189+
// that get discarded, wasting CPU cycles.
190+
const size_t job_gap = workers_count - futures.size();
191+
if (job_gap > 0 && i < m_blocks.size()) {
192+
for (size_t j = 0; j < job_gap && i < m_blocks.size(); ++j, ++i) {
193+
auto block = m_blocks[i];
194+
futures.emplace_back(*thread_pool->Submit([&filter, block = std::move(block)]() {
195+
const auto matches_block{filter->MatchesBlock(block.first)};
196+
if (matches_block.has_value()) {
197+
if (*matches_block) {
198+
return FilterRes::FILTER_MATCH;
199+
} else {
200+
return FilterRes::FILTER_NO_MATCH;
201+
}
202+
} else {
203+
return FilterRes::FILTER_NO_FILTER;
204+
}
205+
}));
206+
}
207+
}
208+
209+
// If m_max_blockqueue_size blocks have been filtered,
210+
// stop reading more blocks for now, to give the
211+
// main scanning loop a chance to update progress
212+
// and erase some blocks from the queue.
213+
if (m_next_block && completed < m_max_blockqueue_size) {
214+
auto next_block = ReadNextBlock(result);
215+
if (next_block) m_blocks.emplace_back(*next_block);
216+
}
217+
else if (!futures.empty()) {
218+
// Join work processing instead of waiting idly.
219+
thread_pool->ProcessTask();
220+
}
221+
}
147222

148-
if (matches_block.has_value() && *matches_block) {
149-
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (filter matched)\n", block_height, block_hash.ToString());
150-
return std::make_pair<size_t, size_t>(0, 1);
151-
} else if (!matches_block.has_value()) {
152-
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (WARNING: block filter not found!)\n", block_height, block_hash.ToString());
153-
return std::make_pair<size_t, size_t>(0, 1);
223+
for (auto& fut : futures) {
224+
// Wait for all remaining futures to complete
225+
// to avoid data race on FastWalletRescanFilter::m_filter_set
226+
fut.wait();
154227
}
155228

156-
return std::nullopt;
229+
return range;
157230
}
158231

159232
void ChainScanner::UpdateProgress(int block_height) {

src/wallet/wallet.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ class CWallet final : public WalletStorage, public interfaces::Chain::Notificati
408408
std::unique_ptr<WalletDatabase> m_database;
409409

410410
/** Thread pool for wallet operations. */
411-
[[maybe_unused]] ThreadPool* m_thread_pool;
411+
ThreadPool* m_thread_pool;
412412

413413
/**
414414
* The following is used to keep track of how far behind the wallet is

0 commit comments

Comments
 (0)