Skip to content

Commit 49ae44c

Browse files
committed
wallet: check blockfilters in parallel
Any operation that requires cs_wallet like GetLastBlockHeight() and SyncTransaction must be done on the main thread because cs_wallet is a RecursiveMutex and ScanForWalletTransactions is also called from AttachChain() which already LOCKs cs_wallet. Block hashes are read and pushed to m_blocks while block filters are being check for other blocks. When more block hashes cannot be read, the main thread will use ThreadPool::ProcessTask() to join other workers and process queued jobs, instead of wasting CPU styles constanly checking if a future is ready. Benchmarks show considerable improvement (approx 5x with 16 threads).
1 parent f4d5ae6 commit 49ae44c

File tree

3 files changed

+142
-47
lines changed

3 files changed

+142
-47
lines changed

src/wallet/scan.cpp

Lines changed: 127 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include <wallet/scan.h>
99
#include <wallet/wallet.h>
1010

11+
#include <algorithm>
12+
#include <future>
13+
1114
using interfaces::FoundBlock;
1215

1316
namespace wallet {
@@ -97,14 +100,13 @@ class FastWalletRescanFilter
97100
};
98101
} // namespace
99102

100-
bool ChainScanner::ReadBlockHash(std::pair<uint256, int>& out, ScanResult& result) {
103+
bool ChainScanner::ReadBlockHash(ScanResult& result) {
101104
bool block_still_active = false;
102105
bool next_block = false;
103106
int block_height = m_next_block_height;
104107
uint256 block_hash = m_next_block_hash;
105108
m_wallet.chain().findBlock(block_hash, FoundBlock().inActiveChain(block_still_active).nextBlock(FoundBlock().inActiveChain(next_block).hash(m_next_block_hash)));
106-
out.first = block_hash;
107-
out.second = block_height;
109+
m_blocks.emplace_back(block_hash, block_height);
108110
++m_next_block_height;
109111
if (!block_still_active) {
110112
// Abort scan if current block is no longer active, to prevent
@@ -121,22 +123,93 @@ bool ChainScanner::ReadBlockHash(std::pair<uint256, int>& out, ScanResult& resul
121123
if (block_height >= WITH_LOCK(m_wallet.cs_wallet, return m_wallet.GetLastBlockHeight())) {
122124
return false;
123125
}
124-
return next_block;
126+
if (!next_block) return false;
127+
return true;
125128
}
126129

127-
bool ChainScanner::FilterBlock(const std::unique_ptr<FastWalletRescanFilter>& filter, const std::pair<uint256, int>& block) {
128-
auto matches_block{filter->MatchesBlock(block.first)};
129-
if (matches_block.has_value()) {
130-
if (*matches_block) {
131-
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (filter matched)\n", block.second, block.first.ToString());
132-
return true;
133-
} else {
134-
return false;
130+
enum FilterRes {
131+
FILTER_NO_MATCH,
132+
FILTER_MATCH,
133+
FILTER_NO_FILTER
134+
};
135+
136+
std::optional<std::pair<size_t, size_t>> ChainScanner::FilterBlocks(const std::unique_ptr<FastWalletRescanFilter>& filter, ScanResult& result) {
137+
if (!filter) {
138+
// Slow scan: no filter, scan all blocks
139+
m_continue = ReadBlockHash(result);
140+
return std::make_pair<size_t, size_t>(0, m_blocks.size());
141+
}
142+
filter->UpdateIfNeeded();
143+
auto* thread_pool = m_wallet.m_thread_pool;
144+
// ThreadPool pointer should never be null here
145+
// during normal operation because it should
146+
// have been passed from the WalletContext to
147+
// the CWallet constructor
148+
assert(thread_pool != nullptr);
149+
std::optional<std::pair<size_t, size_t>> range;
150+
size_t workers_count = thread_pool->WorkersCount();
151+
size_t i = 0;
152+
size_t completed = 0;
153+
std::vector<std::future<FilterRes>> futures;
154+
futures.reserve(workers_count);
155+
156+
while ((m_continue && completed < m_max_blockqueue_size) || i < m_blocks.size() || !futures.empty()) {
157+
const bool result_ready = futures.size() > 0 && futures[0].wait_for(std::chrono::seconds::zero()) == std::future_status::ready;
158+
if (result_ready) {
159+
const auto& scan_res{futures[0].get()};
160+
completed++;
161+
futures.erase(futures.begin());
162+
if (scan_res == FILTER_NO_MATCH) {
163+
if (range.has_value()) {
164+
break;
165+
}
166+
continue;
167+
}
168+
int current_block_index = completed - 1;
169+
if (scan_res == FilterRes::FILTER_MATCH) {
170+
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (filter matched)\n", m_blocks[current_block_index].second, m_blocks[current_block_index].first.ToString());
171+
} else { // FILTER_NO_FILTER
172+
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (WARNING: block filter not found!)\n", m_blocks[current_block_index].second, m_blocks[current_block_index].first.ToString());
173+
}
174+
175+
if (!range.has_value()) range = std::make_pair(current_block_index, current_block_index + 1);
176+
else range->second = current_block_index + 1;
135177
}
136-
} else {
137-
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (WARNING: block filter not found!)\n", block.second, block.first.ToString());
138-
return true;
178+
179+
const size_t job_gap = workers_count - futures.size();
180+
if (job_gap > 0 && i < m_blocks.size()) {
181+
for (size_t j = 0; j < job_gap && i < m_blocks.size(); ++j, ++i) {
182+
auto block = m_blocks[i];
183+
futures.emplace_back(thread_pool->Submit([&filter, block = std::move(block)]() {
184+
const auto matches_block{filter->MatchesBlock(block.first)};
185+
if (matches_block.has_value()) {
186+
if (*matches_block) {
187+
return FilterRes::FILTER_MATCH;
188+
} else {
189+
return FilterRes::FILTER_NO_MATCH;
190+
}
191+
} else {
192+
return FilterRes::FILTER_NO_FILTER;
193+
}
194+
}));
195+
}
196+
}
197+
198+
// If m_max_blockqueue_size blocks have been filtered,
199+
// stop reading more blocks for now, to give the
200+
// main scanning loop a chance to update progress
201+
// and erase some blocks from the queue.
202+
if (m_continue && completed < m_max_blockqueue_size) m_continue = ReadBlockHash(result);
203+
else if (!futures.empty()) thread_pool->ProcessTask();
204+
}
205+
206+
for (auto& fut : futures) {
207+
// Wait for all remaining futures to complete
208+
// to avoid data race on FastWalletRescanFilter::m_filter_set
209+
fut.wait();
139210
}
211+
212+
return range;
140213
}
141214

142215
bool ChainScanner::ScanBlock(const std::pair<uint256, int>& data, bool save_progress) {
@@ -190,52 +263,63 @@ ScanResult ChainScanner::Scan() {
190263
if (m_max_height) chain.findAncestorByHeight(tip_hash, *m_max_height, FoundBlock().hash(end_hash));
191264

192265
ScanResult result;
266+
193267
double progress_begin = chain.guessVerificationProgress(m_start_block);
194268
double progress_end = chain.guessVerificationProgress(end_hash);
195269
double progress_current = progress_begin;
270+
271+
auto start_block = m_start_block;
272+
auto& block_hash = start_block;
196273
int block_height = m_start_height;
197274
m_next_block_height = m_start_height;
198275
m_next_block_hash = m_start_block;
199-
std::pair<uint256, int> current_block;
200-
while (!m_wallet.fAbortRescan && !chain.shutdownRequested()) {
201-
bool fContinue = ReadBlockHash(current_block, result);
202-
auto& block_hash = current_block.first;
203-
block_height = current_block.second;
204-
bool fetch_block{true};
205-
if (fast_rescan_filter) {
206-
fast_rescan_filter->UpdateIfNeeded();
207-
fetch_block = FilterBlock(fast_rescan_filter, current_block);
208-
}
209276

210-
if (progress_end - progress_begin > 0.0) {
211-
m_wallet.m_scanning_progress = (progress_current - progress_begin) / (progress_end - progress_begin);
212-
} else { // avoid divide-by-zero for single block scan range (i.e. start and stop hashes are equal)
213-
m_wallet.m_scanning_progress = 0;
214-
}
215-
if (block_height % 100 == 0 && progress_end - progress_begin > 0.0) {
216-
m_wallet.ShowProgress(strprintf("[%s] %s", m_wallet.DisplayName(), _("Rescanning…")), std::max(1, std::min(99, (int)(m_wallet.m_scanning_progress * 100))));
277+
size_t start_index = 0;
278+
size_t end_index = 0;
279+
while((m_continue || !m_blocks.empty()) && !m_wallet.fAbortRescan && !chain.shutdownRequested()) {
280+
auto range = FilterBlocks(fast_rescan_filter, result);
281+
// If no blocks to scan, mark current batch as scanned
282+
start_index = range.has_value() ? range->first : m_blocks.size();
283+
end_index = range.has_value() ? range->second : m_blocks.size();
284+
if (start_index > 0) {
285+
// Some blocks at the start of the batch were skipped.
286+
// Update last scanned block to indicate that these
287+
// blocks have been scanned.
288+
block_hash = m_blocks[start_index - 1].first;
289+
block_height = m_blocks[start_index - 1].second;
290+
result.last_scanned_block = block_hash;
291+
result.last_scanned_height = block_height;
217292
}
218293

219-
bool next_interval = m_reserver.now() >= current_time + INTERVAL_TIME;
220-
if (next_interval) {
221-
current_time = m_reserver.now();
222-
m_wallet.WalletLogPrintf("Still rescanning. At block %d. Progress=%f\n", block_height, progress_current);
223-
}
294+
for (size_t i = start_index; i < end_index; ++i) {
295+
block_hash = m_blocks[i].first;
296+
block_height = m_blocks[i].second;
297+
if (progress_end - progress_begin > 0.0) {
298+
m_wallet.m_scanning_progress = (progress_current - progress_begin) / (progress_end - progress_begin);
299+
} else { // avoid divide-by-zero for single block scan range (i.e. start and stop hashes are equal)
300+
m_wallet.m_scanning_progress = 0;
301+
}
302+
if (block_height % 100 == 0 && progress_end - progress_begin > 0.0) {
303+
m_wallet.ShowProgress(strprintf("[%s] %s", m_wallet.DisplayName(), _("Rescanning…")), std::max(1, std::min(99, (int)(m_wallet.m_scanning_progress * 100))));
304+
}
305+
306+
bool next_interval = m_reserver.now() >= current_time + INTERVAL_TIME;
307+
if (next_interval) {
308+
current_time = m_reserver.now();
309+
m_wallet.WalletLogPrintf("Still rescanning. At block %d. Progress=%f\n", block_height, progress_current);
310+
}
224311

225-
if (fetch_block) {
226-
if (ScanBlock(current_block, m_save_progress && next_interval)) {
312+
if (ScanBlock(m_blocks[i], m_save_progress && next_interval)) {
227313
result.last_scanned_block = block_hash;
228314
result.last_scanned_height = block_height;
229315
} else {
230316
// could not scan block, keep scanning but record this block as the most recent failure
231317
result.last_failed_block = block_hash;
232318
result.status = ScanResult::FAILURE;
233319
}
234-
} else {
235-
result.last_scanned_block = block_hash;
236-
result.last_scanned_height = block_height;
237320
}
238-
if (!fContinue) break;
321+
322+
m_blocks.erase(m_blocks.begin(), m_blocks.begin() + end_index);
239323

240324
progress_current = chain.guessVerificationProgress(block_hash);
241325

src/wallet/scan.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,17 @@ class ChainScanner {
3434
int m_next_block_height;
3535
uint256 m_next_block_hash;
3636

37-
bool ReadBlockHash(std::pair<uint256, int>& out, ScanResult& result);
38-
bool FilterBlock(const std::unique_ptr<FastWalletRescanFilter>& filter, const std::pair<uint256, int>& block);
37+
/// Queued block hashes and heights to filter and scan
38+
std::vector<std::pair<uint256, int>> m_blocks;
39+
size_t m_max_blockqueue_size{1000};
40+
bool m_continue{true};
41+
42+
bool ReadBlockHash(ScanResult& result);
43+
/**
44+
* @return a pair of indexes into the blocks array that specify a range of blocks
45+
* to scan (end index excluded) or std::nullopt if no blocks should be scanned.
46+
*/
47+
std::optional<std::pair<size_t, size_t>> FilterBlocks(const std::unique_ptr<FastWalletRescanFilter>& filter, ScanResult& result);
3948
bool ScanBlock(const std::pair<uint256, int>& data, bool save_progress);
4049

4150
public:
@@ -47,7 +56,9 @@ class ChainScanner {
4756
m_start_height(start_height),
4857
m_max_height(max_height),
4958
m_fUpdate(fUpdate),
50-
m_save_progress(save_progress) {}
59+
m_save_progress(save_progress) {
60+
m_blocks.reserve(m_max_blockqueue_size);
61+
}
5162

5263
ScanResult Scan();
5364

src/wallet/wallet.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ class CWallet final : public WalletStorage, public interfaces::Chain::Notificati
410410
std::unique_ptr<WalletDatabase> m_database;
411411

412412
/** Thread pool for wallet operations. */
413-
[[maybe_unused]] ThreadPool* m_thread_pool;
413+
ThreadPool* m_thread_pool;
414414

415415
/**
416416
* The following is used to keep track of how far behind the wallet is

0 commit comments

Comments
 (0)