Skip to content

Commit 62df82f

Browse files
arya2upbqdn
andauthored
fix(state): Restore initial sync performance by avoiding RocksDB merge operations when the on-disk database format is up-to-date (#9973)
* Adds a `finished_format_upgrades` field to `DiskDb` and sets it to true if no db format upgrade is required or after required db format upgrades have been applied * Updates `ZebraDb::write_block()` to insert new address balance locations instead of merging the changes once any db format upgrades have finished. * Apply suggestions from code review Co-authored-by: Marek <mail@marek.onl> * Removes `#[inline]` attribute from private generic functions that should be inlined by the compiler anyway Adds a comment explaining why we prefer insertions when updating address balance locations over merge operations when there's no risk that insertions could overwrite concurrently updated entries. --------- Co-authored-by: Marek <mail@marek.onl>
1 parent 6fe583f commit 62df82f

File tree

6 files changed

+251
-87
lines changed

6 files changed

+251
-87
lines changed

zebra-state/src/service/finalized_state/disk_db.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use std::{
1616
fs,
1717
ops::RangeBounds,
1818
path::Path,
19-
sync::Arc,
19+
sync::{
20+
atomic::{self, AtomicBool},
21+
Arc,
22+
},
2023
};
2124

2225
use itertools::Itertools;
@@ -90,6 +93,10 @@ pub struct DiskDb {
9093
/// If true, the database files are deleted on drop.
9194
ephemeral: bool,
9295

96+
/// A boolean flag indicating whether the db format change task has finished
97+
/// applying any format changes that may have been required.
98+
finished_format_upgrades: Arc<AtomicBool>,
99+
93100
// Owned State
94101
//
95102
// Everything contained in this state must be shared by all clones, or read-only.
@@ -623,6 +630,19 @@ impl DiskDb {
623630
total_size_on_disk
624631
}
625632

633+
/// Sets `finished_format_upgrades` to true to indicate that Zebra has
634+
/// finished applying any required db format upgrades.
635+
pub fn mark_finished_format_upgrades(&self) {
636+
self.finished_format_upgrades
637+
.store(true, atomic::Ordering::SeqCst);
638+
}
639+
640+
/// Returns true if the `finished_format_upgrades` flag has been set to true to
641+
/// indicate that Zebra has finished applying any required db format upgrades.
642+
pub fn finished_format_upgrades(&self) -> bool {
643+
self.finished_format_upgrades.load(atomic::Ordering::SeqCst)
644+
}
645+
626646
/// When called with a secondary DB instance, tries to catch up with the primary DB instance
627647
pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
628648
self.db.try_catch_up_with_primary()
@@ -932,6 +952,7 @@ impl DiskDb {
932952
network: network.clone(),
933953
ephemeral: config.ephemeral,
934954
db: Arc::new(db),
955+
finished_format_upgrades: Arc::new(AtomicBool::new(false)),
935956
};
936957

937958
db.assert_default_cf_is_empty();

zebra-state/src/service/finalized_state/disk_format/transparent.rs

Lines changed: 68 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
66
//! each time the database format (column, serialization, etc) changes.
77
8-
use std::{cmp::max, fmt::Debug};
8+
use std::{cmp::max, collections::HashMap, fmt::Debug};
99

1010
use zebra_chain::{
1111
amount::{self, Amount, Constraint, NegativeAllowed, NonNegative},
@@ -173,7 +173,7 @@ impl<C: Constraint + Copy + std::fmt::Debug> AddressBalanceLocationInner<C> {
173173
/// the first [`transparent::Output`] sent to an address.
174174
///
175175
/// The returned value has a zero initial balance and received balance.
176-
fn new(first_output: OutputLocation) -> Self {
176+
pub(crate) fn new(first_output: OutputLocation) -> Self {
177177
Self {
178178
balance: Amount::zero(),
179179
received: 0,
@@ -212,6 +212,38 @@ impl<C: Constraint + Copy + std::fmt::Debug> AddressBalanceLocationInner<C> {
212212
pub fn height_mut(&mut self) -> &mut Height {
213213
&mut self.location.transaction_location.height
214214
}
215+
216+
/// Updates the current balance by adding the supplied output's value.
217+
#[allow(clippy::unwrap_in_result)]
218+
pub fn receive_output(
219+
&mut self,
220+
unspent_output: &transparent::Output,
221+
) -> Result<(), amount::Error> {
222+
self.balance = (self
223+
.balance
224+
.zatoshis()
225+
.checked_add(unspent_output.value().zatoshis()))
226+
.expect("ops handling taddr balances must not overflow")
227+
.try_into()?;
228+
self.received = self.received.saturating_add(unspent_output.value().into());
229+
Ok(())
230+
}
231+
232+
/// Updates the current balance by subtracting the supplied output's value.
233+
#[allow(clippy::unwrap_in_result)]
234+
pub fn spend_output(
235+
&mut self,
236+
spent_output: &transparent::Output,
237+
) -> Result<(), amount::Error> {
238+
self.balance = (self
239+
.balance
240+
.zatoshis()
241+
.checked_sub(spent_output.value().zatoshis()))
242+
.expect("ops handling taddr balances must not underflow")
243+
.try_into()?;
244+
245+
Ok(())
246+
}
215247
}
216248

217249
impl<C: Constraint + Copy + std::fmt::Debug> std::ops::Add for AddressBalanceLocationInner<C> {
@@ -238,49 +270,51 @@ impl<C: Constraint + Copy + std::fmt::Debug> std::ops::Add for AddressBalanceLoc
238270
}
239271
}
240272

273+
impl From<AddressBalanceLocationInner<NonNegative>> for AddressBalanceLocation {
274+
fn from(value: AddressBalanceLocationInner<NonNegative>) -> Self {
275+
Self(value)
276+
}
277+
}
278+
279+
impl From<AddressBalanceLocationInner<NegativeAllowed>> for AddressBalanceLocationChange {
280+
fn from(value: AddressBalanceLocationInner<NegativeAllowed>) -> Self {
281+
Self(value)
282+
}
283+
}
284+
241285
/// Represents a change in the [`AddressBalanceLocation`] of a transparent address
242286
/// in the finalized state.
243287
pub struct AddressBalanceLocationChange(AddressBalanceLocationInner<NegativeAllowed>);
244288

289+
/// Represents a set of updates to address balance locations in the database.
290+
pub enum AddressBalanceLocationUpdates {
291+
/// A set of [`AddressBalanceLocationChange`]s that should be merged into the existing values in the database.
292+
Merge(HashMap<transparent::Address, AddressBalanceLocationChange>),
293+
/// A set of full [`AddressBalanceLocation`]s that should be inserted as the new values in the database.
294+
Insert(HashMap<transparent::Address, AddressBalanceLocation>),
295+
}
296+
297+
impl From<HashMap<transparent::Address, AddressBalanceLocation>> for AddressBalanceLocationUpdates {
298+
fn from(value: HashMap<transparent::Address, AddressBalanceLocation>) -> Self {
299+
Self::Insert(value)
300+
}
301+
}
302+
303+
impl From<HashMap<transparent::Address, AddressBalanceLocationChange>>
304+
for AddressBalanceLocationUpdates
305+
{
306+
fn from(value: HashMap<transparent::Address, AddressBalanceLocationChange>) -> Self {
307+
Self::Merge(value)
308+
}
309+
}
310+
245311
impl AddressBalanceLocationChange {
246312
/// Creates a new [`AddressBalanceLocationChange`].
247313
///
248314
/// See [`AddressBalanceLocationInner::new`] for more details.
249315
pub fn new(location: AddressLocation) -> Self {
250316
Self(AddressBalanceLocationInner::new(location))
251317
}
252-
253-
/// Updates the current balance by adding the supplied output's value.
254-
#[allow(clippy::unwrap_in_result)]
255-
pub fn receive_output(
256-
&mut self,
257-
unspent_output: &transparent::Output,
258-
) -> Result<(), amount::Error> {
259-
self.balance = (self
260-
.balance
261-
.zatoshis()
262-
.checked_add(unspent_output.value().zatoshis()))
263-
.expect("adding two Amounts is always within an i64")
264-
.try_into()?;
265-
self.received = self.received.saturating_add(unspent_output.value().into());
266-
Ok(())
267-
}
268-
269-
/// Updates the current balance by subtracting the supplied output's value.
270-
#[allow(clippy::unwrap_in_result)]
271-
pub fn spend_output(
272-
&mut self,
273-
spent_output: &transparent::Output,
274-
) -> Result<(), amount::Error> {
275-
self.balance = (self
276-
.balance
277-
.zatoshis()
278-
.checked_sub(spent_output.value().zatoshis()))
279-
.expect("subtracting two Amounts is always within an i64")
280-
.try_into()?;
281-
282-
Ok(())
283-
}
284318
}
285319

286320
impl std::ops::Deref for AddressBalanceLocationChange {

zebra-state/src/service/finalized_state/disk_format/upgrade.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,9 +381,18 @@ impl DbFormatChange {
381381
initial_tip_height: Option<Height>,
382382
cancel_receiver: &Receiver<CancelFormatChange>,
383383
) -> Result<(), CancelFormatChange> {
384+
// Mark the database as having finished applying any format upgrades if there are no
385+
// format upgrades that need to be applied.
386+
if !self.is_upgrade() {
387+
db.mark_finished_format_upgrades();
388+
}
389+
384390
match self {
385391
// Perform any required upgrades, then mark the state as upgraded.
386-
Upgrade { .. } => self.apply_format_upgrade(db, initial_tip_height, cancel_receiver)?,
392+
Upgrade { .. } => {
393+
self.apply_format_upgrade(db, initial_tip_height, cancel_receiver)?;
394+
db.mark_finished_format_upgrades();
395+
}
387396

388397
NewlyCreated { .. } => {
389398
Self::mark_as_newly_created(db);

zebra-state/src/service/finalized_state/zebra_db.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,18 @@ impl ZebraDb {
179179
self.format_change_handle = Some(format_change_handle);
180180
}
181181

182+
/// Sets `finished_format_upgrades` to true on the inner [`DiskDb`] to indicate that Zebra has
183+
/// finished applying any required db format upgrades.
184+
pub fn mark_finished_format_upgrades(&self) {
185+
self.db.mark_finished_format_upgrades();
186+
}
187+
188+
/// Returns true if the `finished_format_upgrades` flag has been set to true on the inner [`DiskDb`] to
189+
/// indicate that Zebra has finished applying any required db format upgrades.
190+
pub fn finished_format_upgrades(&self) -> bool {
191+
self.db.finished_format_upgrades()
192+
}
193+
182194
/// Returns config for this database.
183195
pub fn config(&self) -> &Config {
184196
&self.config

zebra-state/src/service/finalized_state/zebra_db/block.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::{
3737
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
3838
disk_format::{
3939
block::TransactionLocation,
40-
transparent::{AddressBalanceLocationChange, OutputLocation},
40+
transparent::{AddressBalanceLocationUpdates, OutputLocation},
4141
},
4242
zebra_db::{metrics::block_precommit_metrics, ZebraDb},
4343
FromDisk, RawBytes,
@@ -517,18 +517,36 @@ impl ZebraDb {
517517
.collect();
518518

519519
// Get the current address balances, before the transactions in this block
520-
let address_balances: HashMap<transparent::Address, AddressBalanceLocationChange> =
520+
521+
fn read_addr_locs<T, F: Fn(&transparent::Address) -> Option<T>>(
522+
changed_addresses: HashSet<transparent::Address>,
523+
f: F,
524+
) -> HashMap<transparent::Address, T> {
521525
changed_addresses
522526
.into_iter()
523-
.filter_map(|address| {
524-
// # Correctness
525-
//
526-
// Address balances are updated with the `fetch_add_balance_and_received` merge operator, so
527-
// the values must represent the changes to the balance, not the final balance.
528-
let addr_loc = self.address_balance_location(&address)?.into_new_change();
529-
Some((address.clone(), addr_loc))
530-
})
531-
.collect();
527+
.filter_map(|address| Some((address.clone(), f(&address)?)))
528+
.collect()
529+
}
530+
531+
// # Performance
532+
//
533+
// It's better to update entries in RocksDB with insertions over merge operations when there is no risk that
534+
// insertions may overwrite values that are updated concurrently in database format upgrades as inserted values
535+
// are quicker to read and require less background compaction.
536+
//
537+
// Reading entries that have been updated with merge ops often requires reading the latest fully-merged value,
538+
// reading all of the pending merge operands (potentially hundreds), and applying pending merge operands to the
539+
// fully-merged value such that it's much faster to read entries that have been updated with insertions than it
540+
// is to read entries that have been updated with merge operations.
541+
let address_balances: AddressBalanceLocationUpdates = if self.finished_format_upgrades() {
542+
AddressBalanceLocationUpdates::Insert(read_addr_locs(changed_addresses, |addr| {
543+
self.address_balance_location(addr)
544+
}))
545+
} else {
546+
AddressBalanceLocationUpdates::Merge(read_addr_locs(changed_addresses, |addr| {
547+
Some(self.address_balance_location(addr)?.into_new_change())
548+
}))
549+
};
532550

533551
let mut batch = DiskWriteBatch::new();
534552

@@ -602,7 +620,7 @@ impl DiskWriteBatch {
602620
transparent::OutPoint,
603621
OutputLocation,
604622
>,
605-
address_balances: HashMap<transparent::Address, AddressBalanceLocationChange>,
623+
address_balances: AddressBalanceLocationUpdates,
606624
value_pool: ValueBalance<NonNegative>,
607625
prev_note_commitment_trees: Option<NoteCommitmentTrees>,
608626
) -> Result<(), BoxError> {

0 commit comments

Comments
 (0)