Skip to content

Commit ad6ded4

Browse files
authored
Merge pull request zingolabs#1836 from Oscar-Pepper/add_performance_level_to_sync_config
Add performance level to sync config
2 parents eae3488 + f165e2e commit ad6ded4

File tree

19 files changed

+253
-196
lines changed

19 files changed

+253
-196
lines changed

libtonode-tests/tests/concrete.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1424,7 +1424,7 @@ tmQuMoTTjU3GFfTjrhPiBYihbTVfYmPk5Gr"
14241424
mod slow {
14251425
use std::num::NonZeroU32;
14261426

1427-
use pepper_sync::sync::{SyncConfig, TransparentAddressDiscovery};
1427+
use pepper_sync::sync::{PerformanceLevel, SyncConfig, TransparentAddressDiscovery};
14281428
use pepper_sync::wallet::{
14291429
NoteInterface, OrchardNote, OutgoingNoteInterface, OutputInterface, SaplingNote,
14301430
TransparentCoin,
@@ -1780,6 +1780,7 @@ mod slow {
17801780
WalletSettings {
17811781
sync_config: SyncConfig {
17821782
transparent_address_discovery: TransparentAddressDiscovery::minimal(),
1783+
performance_level: PerformanceLevel::High,
17831784
},
17841785
min_confirmations: NonZeroU32::try_from(1).unwrap(),
17851786
},

libtonode-tests/tests/sync.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{num::NonZeroU32, time::Duration};
22

33
use bip0039::Mnemonic;
4-
use pepper_sync::sync::{SyncConfig, TransparentAddressDiscovery};
4+
use pepper_sync::sync::{PerformanceLevel, SyncConfig, TransparentAddressDiscovery};
55
use tempfile::TempDir;
66
use testvectors::seeds::HOSPITAL_MUSEUM_SEED;
77
use zingolib::{
@@ -33,6 +33,7 @@ async fn sync_mainnet_test() {
3333
WalletSettings {
3434
sync_config: SyncConfig {
3535
transparent_address_discovery: TransparentAddressDiscovery::minimal(),
36+
performance_level: PerformanceLevel::High,
3637
},
3738
min_confirmations: NonZeroU32::try_from(1).unwrap(),
3839
},
@@ -101,6 +102,7 @@ async fn sync_status() {
101102
WalletSettings {
102103
sync_config: SyncConfig {
103104
transparent_address_discovery: TransparentAddressDiscovery::minimal(),
105+
performance_level: PerformanceLevel::High,
104106
},
105107
min_confirmations: NonZeroU32::try_from(1).unwrap(),
106108
},

pepper-sync/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ pub use sync::sync_status;
6060

6161
use zcash_protocol::ShieldedProtocol;
6262

63-
pub(crate) const MAX_BATCH_OUTPUTS: usize = 2usize.pow(12);
64-
6563
pub(crate) trait SyncDomain {
6664
const SHIELDED_PROTOCOL: ShieldedProtocol;
6765
}

pepper-sync/src/scan.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ pub(crate) async fn scan<P>(
113113
consensus_parameters: &P,
114114
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
115115
scan_task: ScanTask,
116+
max_batch_outputs: usize,
116117
) -> Result<ScanResults, ScanError>
117118
where
118119
P: consensus::Parameters + Sync + Send + 'static,
@@ -159,6 +160,7 @@ where
159160
&consensus_parameters_clone,
160161
&ufvks_clone,
161162
initial_scan_data,
163+
max_batch_outputs / 8,
162164
)
163165
})
164166
.await
@@ -196,8 +198,16 @@ where
196198

197199
let (sapling_located_trees, orchard_located_trees) = tokio::task::spawn_blocking(move || {
198200
(
199-
witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions),
200-
witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions),
201+
witness::build_located_trees(
202+
sapling_initial_position,
203+
sapling_leaves_and_retentions,
204+
max_batch_outputs / 8,
205+
),
206+
witness::build_located_trees(
207+
orchard_initial_position,
208+
orchard_leaves_and_retentions,
209+
max_batch_outputs / 8,
210+
),
201211
)
202212
})
203213
.await

pepper-sync/src/scan/compact_blocks.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use zcash_primitives::{block::BlockHash, zip32::AccountId};
1717
use zcash_protocol::consensus::{self, BlockHeight};
1818

1919
use crate::{
20-
MAX_BATCH_OUTPUTS,
2120
client::{self, FetchRequest},
2221
error::{ContinuityError, ScanError, ServerError},
2322
keys::{KeyId, ScanningKeyOps, ScanningKeys},
@@ -34,13 +33,12 @@ use super::{DecryptedNoteData, InitialScanData, ScanData};
3433

3534
mod runners;
3635

37-
const TRIAL_DECRYPT_TASK_SIZE: usize = MAX_BATCH_OUTPUTS / 8;
38-
3936
pub(super) fn scan_compact_blocks<P>(
4037
compact_blocks: Vec<CompactBlock>,
4138
consensus_parameters: &P,
4239
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
4340
initial_scan_data: InitialScanData,
41+
trial_decrypt_task_size: usize,
4442
) -> Result<ScanData, ScanError>
4543
where
4644
P: consensus::Parameters + Sync + Send + 'static,
@@ -52,7 +50,12 @@ where
5250
)?;
5351

5452
let scanning_keys = ScanningKeys::from_account_ufvks(ufvks.clone());
55-
let mut runners = trial_decrypt(consensus_parameters, &scanning_keys, &compact_blocks)?;
53+
let mut runners = trial_decrypt(
54+
consensus_parameters,
55+
&scanning_keys,
56+
&compact_blocks,
57+
trial_decrypt_task_size,
58+
)?;
5659

5760
let mut wallet_blocks: BTreeMap<BlockHeight, WalletBlock> = BTreeMap::new();
5861
let mut nullifiers = NullifierMap::new();
@@ -178,11 +181,12 @@ fn trial_decrypt<P>(
178181
consensus_parameters: &P,
179182
scanning_keys: &ScanningKeys,
180183
compact_blocks: &[CompactBlock],
184+
trial_decrypt_task_size: usize,
181185
) -> Result<BatchRunners<(), ()>, ScanError>
182186
where
183187
P: consensus::Parameters + Send + 'static,
184188
{
185-
let mut runners = BatchRunners::<(), ()>::for_keys(TRIAL_DECRYPT_TASK_SIZE, scanning_keys);
189+
let mut runners = BatchRunners::<(), ()>::for_keys(trial_decrypt_task_size, scanning_keys);
186190
for block in compact_blocks {
187191
runners
188192
.add_block(consensus_parameters, block.clone())

pepper-sync/src/scan/task.rs

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ use zcash_primitives::{transaction::TxId, zip32::AccountId};
2222
use zcash_protocol::consensus::{self, BlockHeight};
2323

2424
use crate::{
25-
MAX_BATCH_OUTPUTS,
2625
client::{self, FetchRequest},
2726
error::{ScanError, ServerError, SyncError},
2827
keys::transparent::TransparentAddressId,
29-
sync,
28+
sync::{self, PerformanceLevel},
3029
wallet::{
3130
ScanTarget, WalletBlock,
3231
traits::{SyncBlocks, SyncNullifiers, SyncWallet},
@@ -88,9 +87,16 @@ where
8887
}
8988
}
9089

91-
pub(crate) fn launch(&mut self) {
92-
self.spawn_batcher();
93-
self.spawn_workers();
90+
pub(crate) fn launch(&mut self, performance_level: PerformanceLevel) {
91+
let max_batch_outputs = match performance_level {
92+
PerformanceLevel::Low => 2usize.pow(11),
93+
PerformanceLevel::Medium => 2usize.pow(12),
94+
PerformanceLevel::High => 2usize.pow(12),
95+
PerformanceLevel::Maximum => 2usize.pow(13),
96+
};
97+
98+
self.spawn_batcher(max_batch_outputs);
99+
self.spawn_workers(max_batch_outputs);
94100
}
95101

96102
pub(crate) fn worker_poolsize(&self) -> usize {
@@ -100,13 +106,13 @@ where
100106
/// Spawns the batcher.
101107
///
102108
/// When the batcher is running it will wait for a scan task.
103-
pub(crate) fn spawn_batcher(&mut self) {
109+
pub(crate) fn spawn_batcher(&mut self, max_batch_outputs: usize) {
104110
tracing::debug!("Spawning batcher");
105111
let mut batcher = Batcher::new(
106112
self.consensus_parameters.clone(),
107113
self.fetch_request_sender.clone(),
108114
);
109-
batcher.run();
115+
batcher.run(max_batch_outputs);
110116
self.batcher = Some(batcher);
111117
}
112118

@@ -132,7 +138,7 @@ where
132138
/// Spawns a worker.
133139
///
134140
/// When the worker is running it will wait for a scan task.
135-
pub(crate) fn spawn_worker(&mut self) {
141+
pub(crate) fn spawn_worker(&mut self, max_batch_outputs: usize) {
136142
tracing::debug!("Spawning worker {}", self.unique_id);
137143
let mut worker = ScanWorker::new(
138144
self.unique_id,
@@ -141,17 +147,17 @@ where
141147
self.fetch_request_sender.clone(),
142148
self.ufvks.clone(),
143149
);
144-
worker.run();
150+
worker.run(max_batch_outputs);
145151
self.workers.push(worker);
146152
self.unique_id += 1;
147153
}
148154

149155
/// Spawns the initial pool of workers.
150156
///
151157
/// Poolsize is set by [`self::MAX_WORKER_POOLSIZE`].
152-
pub(crate) fn spawn_workers(&mut self) {
158+
pub(crate) fn spawn_workers(&mut self, max_batch_outputs: usize) {
153159
for _ in 0..MAX_WORKER_POOLSIZE {
154-
self.spawn_worker();
160+
self.spawn_worker(max_batch_outputs);
155161
}
156162
}
157163

@@ -192,6 +198,7 @@ where
192198
&mut self,
193199
wallet: &mut W,
194200
shutdown_mempool: Arc<AtomicBool>,
201+
performance_level: PerformanceLevel,
195202
) -> Result<(), SyncError<W::Error>>
196203
where
197204
W: SyncWallet + SyncBlocks + SyncNullifiers,
@@ -227,7 +234,7 @@ where
227234
}
228235

229236
// scan ranges with `Verify` priority
230-
self.update_batcher(wallet)
237+
self.update_batcher(wallet, performance_level)
231238
.map_err(SyncError::WalletError)?;
232239
}
233240
ScannerState::Scan => {
@@ -236,7 +243,7 @@ where
236243
.expect("batcher should be running")
237244
.update_batch_store();
238245
self.update_workers();
239-
self.update_batcher(wallet)
246+
self.update_batcher(wallet, performance_level)
240247
.map_err(SyncError::WalletError)?;
241248
}
242249
ScannerState::Shutdown => {
@@ -268,15 +275,21 @@ where
268275
}
269276
}
270277

271-
fn update_batcher<W>(&mut self, wallet: &mut W) -> Result<(), W::Error>
278+
fn update_batcher<W>(
279+
&mut self,
280+
wallet: &mut W,
281+
performance_level: PerformanceLevel,
282+
) -> Result<(), W::Error>
272283
where
273284
W: SyncWallet + SyncBlocks + SyncNullifiers,
274285
{
275286
let batcher = self.batcher.as_ref().expect("batcher should be running");
276287
if !batcher.is_batching() {
277-
if let Some(scan_task) =
278-
sync::state::create_scan_task(&self.consensus_parameters, wallet)?
279-
{
288+
if let Some(scan_task) = sync::state::create_scan_task(
289+
&self.consensus_parameters,
290+
wallet,
291+
performance_level,
292+
)? {
280293
batcher.add_scan_task(scan_task);
281294
} else if wallet.get_sync_state()?.scan_complete() {
282295
self.state.shutdown();
@@ -320,7 +333,7 @@ where
320333
///
321334
/// Waits for a scan task and then fetches compact blocks to form fixed output batches. The scan task is split if
322335
/// needed and the compact blocks are added to each scan task and sent to the scan workers for scanning.
323-
fn run(&mut self) {
336+
fn run(&mut self, max_batch_outputs: usize) {
324337
let (scan_task_sender, mut scan_task_receiver) = mpsc::channel::<ScanTask>(1);
325338
let (batch_sender, batch_receiver) = mpsc::channel::<ScanTask>(1);
326339

@@ -405,7 +418,7 @@ where
405418
.vtx
406419
.iter()
407420
.fold(0, |acc, transaction| acc + transaction.actions.len());
408-
if sapling_output_count + orchard_output_count > MAX_BATCH_OUTPUTS {
421+
if sapling_output_count + orchard_output_count > max_batch_outputs {
409422
let (full_batch, new_batch) = scan_task
410423
.clone()
411424
.split(
@@ -535,7 +548,7 @@ where
535548
/// Runs the worker in a new tokio task.
536549
///
537550
/// Waits for a scan task and then calls [`crate::scan::scan`] on the given range.
538-
fn run(&mut self) {
551+
fn run(&mut self, max_batch_outputs: usize) {
539552
let (scan_task_sender, mut scan_task_receiver) = mpsc::channel::<ScanTask>(1);
540553

541554
let is_scanning = self.is_scanning.clone();
@@ -552,6 +565,7 @@ where
552565
&consensus_parameters,
553566
&ufvks,
554567
scan_task,
568+
max_batch_outputs,
555569
)
556570
.await;
557571

0 commit comments

Comments
 (0)