Skip to content

Commit 8d92b8b

Browse files
committed
wallet: check blockfilters in parallel
Any operation that requires cs_wallet like GetLastBlockHeight() and SyncTransaction must be done on the main thread because cs_wallet is a RecursiveMutex and ScanForWalletTransactions is also called from AttachChain() which already LOCKs cs_wallet. Block hashes are read and pushed to m_blocks while block filters are being check for other blocks. When more block hashes cannot be read, the main thread will use ThreadPool::ProcessTask() to join other workers and process queued jobs, instead of wasting CPU styles constanly checking if a future is ready. Benchmarks show considerable improvement (approx 5x with 16 threads).
1 parent ec6c582 commit 8d92b8b

File tree

2 files changed

+134
-46
lines changed

2 files changed

+134
-46
lines changed

src/wallet/scan.cpp

Lines changed: 120 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include <wallet/scan.h>
99
#include <wallet/wallet.h>
1010

11+
#include <algorithm>
12+
#include <future>
13+
1114
using interfaces::FoundBlock;
1215

1316
namespace wallet {
@@ -97,14 +100,13 @@ class FastWalletRescanFilter
97100
};
98101
} // namespace
99102

100-
bool ChainScanner::ReadBlockHash(std::pair<uint256, int>& out, ScanResult& result) {
103+
bool ChainScanner::ReadBlockHash(ScanResult& result) {
101104
bool block_still_active = false;
102105
bool next_block = false;
103106
int block_height = m_next_block_height;
104107
uint256 block_hash = m_next_block_hash;
105108
m_wallet.chain().findBlock(block_hash, FoundBlock().inActiveChain(block_still_active).nextBlock(FoundBlock().inActiveChain(next_block).hash(m_next_block_hash)));
106-
out.first = block_hash;
107-
out.second = block_height;
109+
m_blocks.emplace_back(block_hash, block_height);
108110
++m_next_block_height;
109111
if (!block_still_active) {
110112
// Abort scan if current block is no longer active, to prevent
@@ -121,22 +123,86 @@ bool ChainScanner::ReadBlockHash(std::pair<uint256, int>& out, ScanResult& resul
121123
if (block_height >= WITH_LOCK(m_wallet.cs_wallet, return m_wallet.GetLastBlockHeight())) {
122124
return false;
123125
}
124-
return next_block;
126+
if (!next_block) return false;
127+
return true;
125128
}
126129

127-
bool ChainScanner::FilterBlock(const std::unique_ptr<FastWalletRescanFilter>& filter, const std::pair<uint256, int>& block) {
128-
auto matches_block{filter->MatchesBlock(block.first)};
129-
if (matches_block.has_value()) {
130-
if (*matches_block) {
131-
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (filter matched)\n", block.second, block.first.ToString());
132-
return true;
133-
} else {
134-
return false;
130+
enum FilterRes {
131+
FILTER_NO_MATCH,
132+
FILTER_MATCH,
133+
FILTER_NO_FILTER
134+
};
135+
136+
std::optional<std::pair<size_t, size_t>> ChainScanner::FilterBlocks(const std::unique_ptr<FastWalletRescanFilter>& filter, ScanResult& result) {
137+
if (!filter) {
138+
// Slow scan: no filter, scan all blocks
139+
m_continue = ReadBlockHash(result);
140+
return std::make_pair<size_t, size_t>(0, m_blocks.size());
141+
}
142+
filter->UpdateIfNeeded();
143+
auto* thread_pool = m_wallet.m_thread_pool;
144+
// ThreadPool pointer should never be null here
145+
// during normal operation because it should
146+
// have been passed from the WalletContext to
147+
// the CWallet constructor
148+
assert(thread_pool != nullptr);
149+
std::optional<std::pair<size_t, size_t>> range;
150+
size_t workers_count = thread_pool->WorkersCount();
151+
size_t i = 0;
152+
size_t completed = 0;
153+
std::vector<std::future<FilterRes>> futures;
154+
futures.reserve(workers_count);
155+
156+
while ((m_continue && completed < m_max_blockqueue_size) || i < m_blocks.size() || !futures.empty()) {
157+
const bool result_ready = futures.size() > 0 && futures[0].wait_for(std::chrono::seconds::zero()) == std::future_status::ready;
158+
if (result_ready) {
159+
const auto& scan_res{futures[0].get()};
160+
completed++;
161+
futures.erase(futures.begin());
162+
if (scan_res == FILTER_NO_MATCH) {
163+
if (range.has_value()) {
164+
return range;
165+
}
166+
continue;
167+
}
168+
int current_block_index = completed - 1;
169+
if (scan_res == FilterRes::FILTER_MATCH) {
170+
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (filter matched)\n", m_blocks[current_block_index].second, m_blocks[current_block_index].first.ToString());
171+
} else { // FILTER_NO_FILTER
172+
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (WARNING: block filter not found!)\n", m_blocks[current_block_index].second, m_blocks[current_block_index].first.ToString());
173+
}
174+
175+
if (!range.has_value()) range = std::make_pair(current_block_index, current_block_index + 1);
176+
else range->second = current_block_index + 1;
135177
}
136-
} else {
137-
LogDebug(BCLog::SCAN, "Fast rescan: inspect block %d [%s] (WARNING: block filter not found!)\n", block.second, block.first.ToString());
138-
return true;
178+
179+
const size_t job_gap = workers_count - futures.size();
180+
if (job_gap > 0 && i < m_blocks.size()) {
181+
for (size_t j = 0; j < job_gap && i < m_blocks.size(); ++j, ++i) {
182+
auto block = m_blocks[i];
183+
futures.emplace_back(thread_pool->Submit([&filter, block = std::move(block)]() {
184+
const auto matches_block{filter->MatchesBlock(block.first)};
185+
if (matches_block.has_value()) {
186+
if (*matches_block) {
187+
return FilterRes::FILTER_MATCH;
188+
} else {
189+
return FilterRes::FILTER_NO_MATCH;
190+
}
191+
} else {
192+
return FilterRes::FILTER_NO_FILTER;
193+
}
194+
}));
195+
}
196+
}
197+
198+
// If m_max_blockqueue_size blocks have been filtered,
199+
// stop reading more blocks for now, to give the
200+
// main scanning loop a chance to update progress
201+
// and erase some blocks from the queue.
202+
if (m_continue && completed < m_max_blockqueue_size) m_continue = ReadBlockHash(result);
203+
else if (!futures.empty()) thread_pool->ProcessTask();
139204
}
205+
return range;
140206
}
141207

142208
bool ChainScanner::ScanBlock(const std::pair<uint256, int>& data, bool save_progress) {
@@ -190,52 +256,63 @@ ScanResult ChainScanner::Scan() {
190256
if (m_max_height) chain.findAncestorByHeight(tip_hash, *m_max_height, FoundBlock().hash(end_hash));
191257

192258
ScanResult result;
259+
193260
double progress_begin = chain.guessVerificationProgress(m_start_block);
194261
double progress_end = chain.guessVerificationProgress(end_hash);
195262
double progress_current = progress_begin;
263+
264+
auto start_block = m_start_block;
265+
auto& block_hash = start_block;
196266
int block_height = m_start_height;
197267
m_next_block_height = m_start_height;
198268
m_next_block_hash = m_start_block;
199-
std::pair<uint256, int> current_block;
200-
while (!m_wallet.fAbortRescan && !chain.shutdownRequested()) {
201-
bool fContinue = ReadBlockHash(current_block, result);
202-
auto& block_hash = current_block.first;
203-
block_height = current_block.second;
204-
bool fetch_block{true};
205-
if (fast_rescan_filter) {
206-
fast_rescan_filter->UpdateIfNeeded();
207-
fetch_block = FilterBlock(fast_rescan_filter, current_block);
208-
}
209269

210-
if (progress_end - progress_begin > 0.0) {
211-
m_wallet.m_scanning_progress = (progress_current - progress_begin) / (progress_end - progress_begin);
212-
} else { // avoid divide-by-zero for single block scan range (i.e. start and stop hashes are equal)
213-
m_wallet.m_scanning_progress = 0;
214-
}
215-
if (block_height % 100 == 0 && progress_end - progress_begin > 0.0) {
216-
m_wallet.ShowProgress(strprintf("[%s] %s", m_wallet.DisplayName(), _("Rescanning…")), std::max(1, std::min(99, (int)(m_wallet.m_scanning_progress * 100))));
270+
size_t start_index = 0;
271+
size_t end_index = 0;
272+
while((m_continue || !m_blocks.empty()) && !m_wallet.fAbortRescan && !chain.shutdownRequested()) {
273+
auto range = FilterBlocks(fast_rescan_filter, result);
274+
// If no blocks to scan, mark current batch as scanned
275+
start_index = range.has_value() ? range->first : m_blocks.size();
276+
end_index = range.has_value() ? range->second : m_blocks.size();
277+
if (start_index > 0) {
278+
// Some blocks at the start of the batch were skipped.
279+
// Update last scanned block to indicate that these
280+
// blocks have been scanned.
281+
block_hash = m_blocks[start_index - 1].first;
282+
block_height = m_blocks[start_index - 1].second;
283+
result.last_scanned_block = block_hash;
284+
result.last_scanned_height = block_height;
217285
}
218286

219-
bool next_interval = m_reserver.now() >= current_time + INTERVAL_TIME;
220-
if (next_interval) {
221-
current_time = m_reserver.now();
222-
m_wallet.WalletLogPrintf("Still rescanning. At block %d. Progress=%f\n", block_height, progress_current);
223-
}
287+
for (size_t i = start_index; i < end_index; ++i) {
288+
block_hash = m_blocks[i].first;
289+
block_height = m_blocks[i].second;
290+
if (progress_end - progress_begin > 0.0) {
291+
m_wallet.m_scanning_progress = (progress_current - progress_begin) / (progress_end - progress_begin);
292+
} else { // avoid divide-by-zero for single block scan range (i.e. start and stop hashes are equal)
293+
m_wallet.m_scanning_progress = 0;
294+
}
295+
if (block_height % 100 == 0 && progress_end - progress_begin > 0.0) {
296+
m_wallet.ShowProgress(strprintf("[%s] %s", m_wallet.DisplayName(), _("Rescanning…")), std::max(1, std::min(99, (int)(m_wallet.m_scanning_progress * 100))));
297+
}
298+
299+
bool next_interval = m_reserver.now() >= current_time + INTERVAL_TIME;
300+
if (next_interval) {
301+
current_time = m_reserver.now();
302+
m_wallet.WalletLogPrintf("Still rescanning. At block %d. Progress=%f\n", block_height, progress_current);
303+
}
224304

225-
if (fetch_block) {
226-
if (ScanBlock(current_block, m_save_progress && next_interval)) {
305+
if (ScanBlock(m_blocks[i], m_save_progress && next_interval)) {
227306
result.last_scanned_block = block_hash;
228307
result.last_scanned_height = block_height;
229308
} else {
230309
// could not scan block, keep scanning but record this block as the most recent failure
231310
result.last_failed_block = block_hash;
232311
result.status = ScanResult::FAILURE;
233312
}
234-
} else {
235-
result.last_scanned_block = block_hash;
236-
result.last_scanned_height = block_height;
237313
}
238-
if (!fContinue) break;
314+
315+
m_blocks.erase(m_blocks.begin(), m_blocks.begin() + end_index);
239316

240317
progress_current = chain.guessVerificationProgress(block_hash);
241318

src/wallet/scan.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,17 @@ class ChainScanner {
3434
int m_next_block_height;
3535
uint256 m_next_block_hash;
3636

37-
bool ReadBlockHash(std::pair<uint256, int>& out, ScanResult& result);
38-
bool FilterBlock(const std::unique_ptr<FastWalletRescanFilter>& filter, const std::pair<uint256, int>& block);
37+
/// Queued block hashes and heights to filter and scan
38+
std::vector<std::pair<uint256, int>> m_blocks;
39+
size_t m_max_blockqueue_size{1000};
40+
bool m_continue{true};
41+
42+
bool ReadBlockHash(ScanResult& result);
43+
/**
44+
* @return a pair of indexes into the blocks array that specify a range of blocks
45+
* to scan (end index excluded) or std::nullopt if no blocks should be scanned.
46+
*/
47+
std::optional<std::pair<size_t, size_t>> FilterBlocks(const std::unique_ptr<FastWalletRescanFilter>& filter, ScanResult& result);
3948
bool ScanBlock(const std::pair<uint256, int>& data, bool save_progress);
4049

4150
public:
@@ -47,7 +56,9 @@ class ChainScanner {
4756
m_start_height(start_height),
4857
m_max_height(max_height),
4958
m_fUpdate(fUpdate),
50-
m_save_progress(save_progress) {}
59+
m_save_progress(save_progress) {
60+
m_blocks.reserve(m_max_blockqueue_size);
61+
}
5162

5263
ScanResult Scan();
5364

0 commit comments

Comments
 (0)