Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 1 addition & 30 deletions crates/pathfinder/src/consensus/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ mod persist_proposals;
#[cfg(test)]
mod test_helpers;

use std::num::NonZeroU32;
use std::path::{Path, PathBuf};

use anyhow::Context;
use p2p::consensus::{Client, Event, HeightAndRound};
use p2p_proto::consensus::ProposalPart;
use pathfinder_common::{ChainId, ContractAddress, ProposalCommitment};
use pathfinder_consensus::{ConsensusCommand, ConsensusEvent, NetworkMessage};
use pathfinder_storage::pruning::BlockchainHistoryMode;
use pathfinder_storage::{JournalMode, Storage, TriePruneMode};
use pathfinder_storage::Storage;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};

Expand All @@ -45,9 +42,6 @@ pub fn start(
// TODO determine sufficient buffer size. 1 is not enough.
let (tx_to_p2p, rx_from_consensus) = mpsc::channel::<P2PTaskEvent>(10);

let consensus_storage =
open_consensus_storage(data_directory).expect("Consensus storage cannot be opened");

let consensus_p2p_event_processing_handle = p2p_task::spawn(
chain_id,
config.my_validator_address,
Expand All @@ -56,7 +50,6 @@ pub fn start(
p2p_event_rx,
tx_to_consensus,
rx_from_consensus,
consensus_storage.clone(),
);

let (info_watch_tx, consensus_info_watch) = watch::channel(None);
Expand All @@ -68,7 +61,6 @@ pub fn start(
tx_to_p2p,
rx_from_p2p,
info_watch_tx,
consensus_storage,
storage,
data_directory,
inject_failure_config,
Expand All @@ -81,27 +73,6 @@ pub fn start(
}
}

fn open_consensus_storage(data_directory: &Path) -> anyhow::Result<Storage> {
let storage_manager =
pathfinder_storage::StorageBuilder::file(data_directory.join("consensus.sqlite")) // TODO: https://github.com/eqlabs/pathfinder/issues/3047
.journal_mode(JournalMode::WAL)
.trie_prune_mode(Some(TriePruneMode::Archive))
.blockchain_history_mode(Some(BlockchainHistoryMode::Archive))
.migrate()?;
let available_parallelism = std::thread::available_parallelism()?;
let consensus_storage = storage_manager
.create_pool(NonZeroU32::new(5 + available_parallelism.get() as u32).unwrap())?;
let mut db_conn = consensus_storage
.connection()
.context("Creating database connection")?;
let db_tx = db_conn
.transaction()
.context("Creating database transaction")?;
db_tx.ensure_consensus_proposals_table_exists()?;
db_tx.commit()?;
Ok(consensus_storage)
}

/// Events handled by the consensus task.
enum ConsensusTaskEvent {
/// The consensus engine informs us about an event that it wants us to
Expand Down
3 changes: 1 addition & 2 deletions crates/pathfinder/src/consensus/inner/consensus_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub fn spawn(
mut rx_from_p2p: mpsc::Receiver<ConsensusTaskEvent>,
info_watch_tx: watch::Sender<Option<ConsensusInfo>>,
storage: Storage,
fake_proposals_storage: Storage,
data_directory: &Path,
// Does nothing in production builds. Used for integration testing only.
inject_failure: crate::config::integration_testing::InjectFailureConfig,
Expand Down Expand Up @@ -160,7 +159,7 @@ pub fn spawn(
{round}",
);

let fake_proposals_storage = fake_proposals_storage.clone();
let fake_proposals_storage = storage.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to work, but @CHr15F0x has been complaining that making proposals from the main DB is problematic (AFAIK because the state diffs have to be stored only after the proposal is accepted by consensus, and computing them before that, i.e. here, interferes with that). Has that been solved?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, i don't remember exactly but unless we have a decently working real proposal creation algo I'd keep this PR/issue parked. Currently using the prod db as a scratchpad for fake proposal creation seems fishy.

let (wire_proposal, finalized_block) =
util::task::spawn_blocking(move |_| {
create_empty_proposal(
Expand Down
30 changes: 10 additions & 20 deletions crates/pathfinder/src/consensus/inner/p2p_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ pub fn spawn(
mut p2p_event_rx: mpsc::UnboundedReceiver<Event>,
tx_to_consensus: mpsc::Sender<ConsensusTaskEvent>,
mut rx_from_consensus: mpsc::Receiver<P2PTaskEvent>,
consensus_storage: Storage,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
// Cache for finalized blocks that we created from our proposals and are
// waiting to be committed to the database once consensus is reached.
Expand Down Expand Up @@ -120,7 +119,6 @@ pub fn spawn(
let vcache = validator_cache.clone();
let dex = deferred_executions.clone();
let storage = storage.clone();
let consensus_storage2 = consensus_storage.clone();
let mut batch_execution_manager_inner = batch_execution_manager.clone();
let result = util::task::spawn_blocking(move |_| {
handle_incoming_proposal_part(
Expand All @@ -131,7 +129,6 @@ pub fn spawn(
vcache,
dex,
storage,
consensus_storage2,
&mut batch_execution_manager_inner,
)
})
Expand Down Expand Up @@ -190,9 +187,9 @@ pub fn spawn(
hash {proposal_commitment}"
);

let consensus_storage2 = consensus_storage.clone();
let storage2 = storage.clone();
let duplicate_encountered = util::task::spawn_blocking(move |_| {
let mut db_conn = consensus_storage2
let mut db_conn = storage2
.connection()
.context("Creating database connection")?;
let db_tx = db_conn
Expand Down Expand Up @@ -225,12 +222,11 @@ pub fn spawn(
proposal.round.as_u32().expect("Valid round"),
);

let consensus_storage2 = consensus_storage.clone();
let consensus_storage3 = consensus_storage.clone();
let storage2 = storage.clone();
let proposal_parts = util::task::spawn_blocking(move |_| {
let proposal_parts = if let Some(proposal_parts) =
query_own_proposal_parts(
consensus_storage2,
&storage2,
height_and_round,
&validator_address,
)? {
Expand All @@ -255,7 +251,7 @@ pub fn spawn(
// For now we just choose the proposal from the previous round, and
// the rest are kept for debugging
// purposes.
let mut db_conn = consensus_storage3
let mut db_conn = storage2
.connection()
.context("Creating database connection")?;
let db_tx = db_conn
Expand Down Expand Up @@ -371,7 +367,6 @@ pub fn spawn(
height_and_round,
value,
storage.clone(),
consensus_storage.clone(),
&mut my_finalized_blocks_cache,
validator_cache.clone(),
&mut batch_execution_manager,
Expand Down Expand Up @@ -418,7 +413,6 @@ async fn finalize_and_commit_block(
height_and_round: HeightAndRound,
value: ConsensusValue,
storage: Storage,
consensus_storage: Storage,
my_finalized_blocks_cache: &mut HashMap<HeightAndRound, FinalizedBlock>,
mut validator_cache: ValidatorCache,
batch_execution_manager: &mut BatchExecutionManager,
Expand All @@ -441,7 +435,6 @@ async fn finalize_and_commit_block(
};

let storage2 = storage.clone();
let consensus_storage2 = consensus_storage.clone();
util::task::spawn_blocking(move |_| {
let finalized_block = match finalized_block {
Either::Left(block) => block,
Expand All @@ -450,9 +443,7 @@ async fn finalize_and_commit_block(

assert_eq!(value.0 .0, finalized_block.header.state_diff_commitment.0);

commit_finalized_block(storage2, finalized_block.clone())?;
// Necessary for proper fake proposal creation at next heights.
commit_finalized_block(consensus_storage2, finalized_block)?;
commit_finalized_block(storage2, finalized_block)?;

anyhow::Ok(())
})
Expand Down Expand Up @@ -482,7 +473,7 @@ async fn finalize_and_commit_block(
);

util::task::spawn_blocking(move |_| {
let mut db_conn = consensus_storage
let mut db_conn = storage
.connection()
.context("Creating database connection")?;
let db_tx = db_conn
Expand Down Expand Up @@ -672,10 +663,9 @@ fn handle_incoming_proposal_part(
mut validator_cache: ValidatorCache,
deferred_executions: Arc<Mutex<HashMap<HeightAndRound, DeferredExecution>>>,
storage: Storage,
consensus_storage: Storage,
batch_execution_manager: &mut BatchExecutionManager,
) -> anyhow::Result<Option<ProposalCommitmentWithOrigin>> {
let mut db_conn = consensus_storage
let mut db_conn = storage
.connection()
.context("Creating database connection")?;
let db_tx = db_conn
Expand Down Expand Up @@ -981,11 +971,11 @@ fn consensus_vote_to_p2p_vote(
}

fn query_own_proposal_parts(
consensus_storage: Storage,
storage: &Storage,
height_and_round: HeightAndRound,
validator_address: &ContractAddress,
) -> anyhow::Result<Option<Vec<ProposalPart>>> {
let mut db_conn = consensus_storage
let mut db_conn = storage
.connection()
.context("Creating database connection")?;
let db_tx = db_conn
Expand Down
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
14 changes: 0 additions & 14 deletions crates/storage/src/connection/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,6 @@ use pathfinder_common::ContractAddress;
use crate::prelude::*;

impl Transaction<'_> {
pub fn ensure_consensus_proposals_table_exists(&self) -> anyhow::Result<()> {
self.inner().execute(
r"CREATE TABLE IF NOT EXISTS consensus_proposals (
height INTEGER NOT NULL,
round INTEGER NOT NULL,
proposer BLOB NOT NULL,
parts BLOB NOT NULL,
UNIQUE(height, round, proposer)
)",
[],
)?;
Ok(())
}

pub fn persist_consensus_proposal_parts(
&self,
height: u64,
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod revision_0071;
mod revision_0072;
pub(crate) mod revision_0073;
mod revision_0074;
mod revision_0075;

pub(crate) use base::base_schema;

Expand Down Expand Up @@ -76,6 +77,7 @@ pub fn migrations() -> &'static [MigrationFn] {
revision_0072::migrate,
revision_0073::migrate,
revision_0074::migrate,
revision_0075::migrate,
]
}

Expand Down
19 changes: 19 additions & 0 deletions crates/storage/src/schema/revision_0075.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use anyhow::Context;

pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This certainly works, but @kkovaacs was concerned about tying the P2P-specific tables to the main DB, and consequently to releases (major version for each DB change).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I think our consensus implementation is still too much in flux to have that in our 'main' DB. If we use the 'main' DB for consensus then pretty much every release we do will have to be a 'breaking' release and each and every change in consensus will require a migration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

tracing::info!("Creating consensus proposals table");

tx.execute(
r"CREATE TABLE IF NOT EXISTS consensus_proposals (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose IF NOT EXISTS is unnecessary here - but that's just a detail...

height INTEGER NOT NULL,
round INTEGER NOT NULL,
proposer BLOB NOT NULL,
parts BLOB NOT NULL,
UNIQUE(height, round, proposer)
)",
[],
)
.context("Creating consensus_proposals table")?;

Ok(())
}