Skip to content

Commit 8163940

Browse files
committed
perf: fix TPS bottleneck in worker pipeline
1 parent 0a2a537 commit 8163940

File tree

8 files changed

+112
-31
lines changed

8 files changed

+112
-31
lines changed

crates/data-chain/src/worker/batch_maker.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ impl BatchMaker {
5454
None
5555
}
5656

57+
/// Add multiple transactions at once, returning any completed batches.
58+
///
59+
/// Convenience method that collects all resulting batches when adding
60+
/// a pre-drained set of transactions in bulk.
61+
pub fn add_transactions(&mut self, txs: Vec<Transaction>) -> Vec<Batch> {
62+
let mut batches = Vec::new();
63+
for tx in txs {
64+
if let Some(batch) = self.add_transaction(tx) {
65+
batches.push(batch);
66+
}
67+
}
68+
batches
69+
}
70+
5771
/// Check if batch should be flushed
5872
fn should_flush(&self) -> bool {
5973
self.pending_size >= self.max_bytes || self.pending_txs.len() >= self.max_txs

crates/data-chain/src/worker/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ impl WorkerConfig {
2222
/// Create a new configuration with defaults
2323
///
2424
/// Default batch thresholds are tuned for responsive transaction processing:
25-
/// - `max_batch_txs`: 100 transactions triggers immediate batch flush
25+
/// - `max_batch_txs`: 500 transactions triggers immediate batch flush
2626
/// - `flush_interval`: 50ms ensures batches don't wait too long
2727
pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self {
2828
Self {
2929
validator_id,
3030
worker_id,
3131
max_batch_bytes: 1024 * 1024, // 1MB
32-
max_batch_txs: 100, // Flush after 100 txs for responsive batching
32+
max_batch_txs: 500, // Flush after 500 txs for higher throughput
3333
flush_interval: Duration::from_millis(50), // Faster time-based flush
3434
}
3535
}

crates/data-chain/src/worker/core.rs

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,18 @@ impl Worker {
382382
self.check_time_flush().await;
383383
}
384384

385-
// Handle incoming transactions
385+
// Handle incoming transactions - drain all ready TXs at once
386386
tx = self.tx_receiver.recv() => {
387387
if let Some(tx) = tx {
388-
self.handle_transaction(tx).await;
388+
// Drain additional ready transactions to amortize select! overhead
389+
let mut txs = vec![tx];
390+
while let Ok(t) = self.tx_receiver.try_recv() {
391+
txs.push(t);
392+
if txs.len() >= self.config.max_batch_txs {
393+
break;
394+
}
395+
}
396+
self.handle_transactions_batch(txs).await;
389397
} else {
390398
warn!(worker_id = self.config.worker_id, "tx_receiver closed");
391399
self.shutdown = true;
@@ -422,7 +430,7 @@ impl Worker {
422430

423431
/// Handle incoming transaction
424432
async fn handle_transaction(&mut self, tx: Transaction) {
425-
info!(
433+
trace!(
426434
worker_id = self.config.worker_id,
427435
tx_size = tx.len(),
428436
"Worker received transaction from channel"
@@ -457,14 +465,60 @@ impl Worker {
457465
);
458466
self.process_batch(batch).await;
459467
} else {
460-
info!(
468+
trace!(
461469
worker_id = self.config.worker_id,
462470
pending_txs = self.batch_maker.pending_count(),
463471
"Transaction added to batch maker, waiting for more or flush"
464472
);
465473
}
466474
}
467475

476+
/// Handle a batch of incoming transactions drained from the channel.
477+
///
478+
/// Validates each transaction, adds all valid ones to the batch maker,
479+
/// and processes any resulting batches. This amortizes the per-transaction
480+
/// overhead of the select! loop and logging.
481+
async fn handle_transactions_batch(&mut self, txs: Vec<Transaction>) {
482+
let total = txs.len();
483+
debug!(
484+
worker_id = self.config.worker_id,
485+
tx_count = total,
486+
"Processing drained transaction batch"
487+
);
488+
489+
// Validate and collect valid transactions
490+
let mut valid_txs = Vec::with_capacity(total);
491+
for tx in txs {
492+
if let Some(ref validator) = self.validator {
493+
match validator.validate_transaction(&tx).await {
494+
Ok(()) => valid_txs.push(tx),
495+
Err(e) => {
496+
warn!(
497+
worker_id = self.config.worker_id,
498+
error = %e,
499+
"Transaction validation failed, rejecting"
500+
);
501+
}
502+
}
503+
} else {
504+
valid_txs.push(tx);
505+
}
506+
}
507+
508+
// Add all valid transactions and collect any completed batches
509+
let batches = self.batch_maker.add_transactions(valid_txs);
510+
511+
// Process all completed batches
512+
for batch in batches {
513+
info!(
514+
worker_id = self.config.worker_id,
515+
tx_count = batch.transactions.len(),
516+
"Batch ready, processing"
517+
);
518+
self.process_batch(batch).await;
519+
}
520+
}
521+
468522
/// Handle message from Primary
469523
async fn handle_primary_message(&mut self, msg: PrimaryToWorker) {
470524
match msg {
@@ -694,9 +748,8 @@ impl Worker {
694748
let has_pending = self.batch_maker.has_pending();
695749
let elapsed = self.batch_maker.time_since_batch_start();
696750

697-
// Log every call so we can see the tick is working
698751
if has_pending {
699-
info!(
752+
trace!(
700753
worker_id = self.config.worker_id,
701754
should_flush,
702755
has_pending,
@@ -706,7 +759,7 @@ impl Worker {
706759
}
707760

708761
if should_flush && has_pending {
709-
info!(
762+
debug!(
710763
worker_id = self.config.worker_id,
711764
pending_txs = self.batch_maker.pending_count(),
712765
"Time flush triggered, creating batch"
@@ -783,7 +836,7 @@ impl Worker {
783836
if let Some(ref storage) = self.storage {
784837
match storage.put_batch(batch.clone()).await {
785838
Ok(_) => {
786-
info!(
839+
debug!(
787840
worker_id = self.config.worker_id,
788841
digest = %digest.digest,
789842
tx_count = batch.transactions.len(),
@@ -812,20 +865,20 @@ impl Worker {
812865
self.state.store_batch(batch.clone());
813866

814867
// Broadcast to peer Workers
815-
info!(
868+
debug!(
816869
worker_id = self.config.worker_id,
817870
digest = %digest.digest,
818871
"Broadcasting batch to peer Workers..."
819872
);
820873
self.network.broadcast_batch(&batch).await;
821-
info!(
874+
debug!(
822875
worker_id = self.config.worker_id,
823876
digest = %digest.digest,
824877
"Broadcast complete"
825878
);
826879

827880
// Report to Primary
828-
info!(
881+
debug!(
829882
worker_id = self.config.worker_id,
830883
digest = %digest.digest,
831884
"Sending BatchDigest to Primary"
@@ -846,7 +899,7 @@ impl Worker {
846899
"Failed to send BatchDigest to Primary - channel closed"
847900
);
848901
} else {
849-
info!(
902+
debug!(
850903
worker_id = self.config.worker_id,
851904
"BatchDigest sent to Primary successfully"
852905
);

crates/node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ toml = { workspace = true }
5151
# Logging
5252
tracing = { workspace = true }
5353
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] }
54+
tracing-appender = "0.2"
5455

5556
# CLI
5657
clap = { version = "4.4", features = ["derive"] }

crates/node/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl NodeConfig {
247247
data_dir: home_dir.join("data"),
248248
genesis_path: None, // Uses default: {home_dir}/config/genesis.json
249249
car_interval_ms: 100,
250-
max_batch_txs: 100,
250+
max_batch_txs: 500,
251251
max_batch_bytes: 1024 * 1024, // 1MB
252252
rpc_enabled: false,
253253
rpc_http_port: DEFAULT_RPC_HTTP_PORT + (index as u16),
@@ -473,7 +473,7 @@ mod tests {
473473
"num_workers": 1,
474474
"data_dir": "/tmp/cipherd-0",
475475
"car_interval_ms": 100,
476-
"max_batch_txs": 100,
476+
"max_batch_txs": 500,
477477
"max_batch_bytes": 1048576
478478
}"#;
479479

@@ -556,7 +556,7 @@ mod tests {
556556
"num_workers": 1,
557557
"data_dir": "/tmp/cipherd-0",
558558
"car_interval_ms": 100,
559-
"max_batch_txs": 100,
559+
"max_batch_txs": 500,
560560
"max_batch_bytes": 1048576
561561
}"#;
562562

crates/node/src/main.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ async fn main() -> Result<()> {
364364
let cli = Cli::parse();
365365

366366
// Initialize tracing based on global flags
367-
init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color);
367+
// Hold the guard to keep the non-blocking writer alive for the process lifetime
368+
let _tracing_guard = init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color);
368369

369370
let result = match cli.command {
370371
Commands::Init {
@@ -424,7 +425,11 @@ async fn main() -> Result<()> {
424425
Ok(())
425426
}
426427

427-
fn init_tracing(log_level: &str, log_format: &str, no_color: bool) {
428+
fn init_tracing(
429+
log_level: &str,
430+
log_format: &str,
431+
no_color: bool,
432+
) -> tracing_appender::non_blocking::WorkerGuard {
428433
let level = match log_level.to_lowercase().as_str() {
429434
"trace" => Level::TRACE,
430435
"debug" => Level::DEBUG,
@@ -437,16 +442,22 @@ fn init_tracing(log_level: &str, log_format: &str, no_color: bool) {
437442
let filter =
438443
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level.to_string()));
439444

445+
// Use non-blocking writer to avoid blocking the async runtime on log I/O
446+
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());
447+
440448
let subscriber = tracing_subscriber::fmt()
441449
.with_env_filter(filter)
442450
.with_target(true)
443451
.with_thread_ids(true)
444-
.with_ansi(!no_color);
452+
.with_ansi(!no_color)
453+
.with_writer(non_blocking);
445454

446455
match log_format {
447456
"json" => subscriber.json().init(),
448457
_ => subscriber.init(),
449458
}
459+
460+
guard
450461
}
451462

452463
// =============================================================================
@@ -554,7 +565,7 @@ fn cmd_init(
554565
data_dir: data_dir.clone(),
555566
genesis_path: Some(genesis_path.clone()),
556567
car_interval_ms: 100,
557-
max_batch_txs: 100,
568+
max_batch_txs: 500,
558569
max_batch_bytes: 1024 * 1024,
559570
rpc_enabled: true,
560571
rpc_http_port: cipherd::DEFAULT_RPC_HTTP_PORT,
@@ -968,7 +979,7 @@ fn cmd_testnet_init_files(
968979
data_dir: data_dir.clone(),
969980
genesis_path: Some(genesis_path.clone()),
970981
car_interval_ms: 100,
971-
max_batch_txs: 100,
982+
max_batch_txs: 500,
972983
max_batch_bytes: 1024 * 1024,
973984
rpc_enabled: true,
974985
// Each validator gets HTTP and WS ports spaced by 10 to avoid conflicts

crates/node/src/node.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use std::sync::Arc;
5858
use std::time::Duration;
5959
use tokio::sync::mpsc;
6060
use tokio_util::sync::CancellationToken;
61-
use tracing::{debug, error, info, warn};
61+
use tracing::{debug, error, info, trace, warn};
6262

6363
use cipherbft_metrics;
6464

@@ -703,7 +703,9 @@ impl Node {
703703
// to ensure batches are flushed before Cars are created. Without this, there's a race
704704
// condition where Primary creates empty Cars before Worker flushes pending batches.
705705
let worker_config = WorkerConfig::new(self.validator_id, worker_id)
706-
.with_flush_interval(std::time::Duration::from_millis(50));
706+
.with_flush_interval(std::time::Duration::from_millis(50))
707+
.with_max_batch_txs(self.config.max_batch_txs)
708+
.with_max_batch_bytes(self.config.max_batch_bytes);
707709
let mut worker_handle = Worker::spawn_with_storage(
708710
worker_config,
709711
Box::new(worker_network),
@@ -754,12 +756,12 @@ impl Node {
754756
}
755757
} => {
756758
if let Some(tx_bytes) = tx {
757-
info!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len());
759+
trace!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len());
758760
if worker_handle.submit_transaction(tx_bytes).await.is_err() {
759761
warn!("Worker {} submit_transaction failed", worker_id);
760762
// Don't break - continue processing other messages
761763
} else {
762-
info!("Worker {} forwarded transaction to batch maker", worker_id);
764+
trace!("Worker {} forwarded transaction to batch maker", worker_id);
763765
}
764766
}
765767
}
@@ -784,7 +786,7 @@ impl Node {
784786
msg = worker_handle.recv_from_worker() => {
785787
match msg {
786788
Some(m) => {
787-
info!("Worker {} bridge forwarding {:?} to Primary", worker_id, m);
789+
debug!("Worker {} bridge forwarding {:?} to Primary", worker_id, m);
788790
if primary_worker_sender.send(m).await.is_err() {
789791
warn!("Worker {} send to primary failed", worker_id);
790792
break;

crates/rpc/src/adapters.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::collections::HashMap;
3939
use std::sync::atomic::{AtomicU64, Ordering};
4040
use std::sync::Arc;
4141
use tokio::sync::mpsc;
42-
use tracing::{debug, info, trace, warn};
42+
use tracing::{debug, trace, warn};
4343

4444
use cipherbft_execution::precompiles::{CipherBftPrecompileProvider, StakingPrecompile};
4545
use cipherbft_execution::AccountProof;
@@ -1573,7 +1573,7 @@ impl ChannelMempoolApi {
15731573
#[async_trait]
15741574
impl MempoolApi for ChannelMempoolApi {
15751575
async fn submit_transaction(&self, tx_bytes: Bytes) -> RpcResult<B256> {
1576-
info!(
1576+
trace!(
15771577
"ChannelMempoolApi::submit_transaction received {} bytes (chain_id={})",
15781578
tx_bytes.len(),
15791579
self.chain_id
@@ -1616,7 +1616,7 @@ impl MempoolApi for ChannelMempoolApi {
16161616
}
16171617

16181618
// Forward to worker via channel
1619-
info!(
1619+
trace!(
16201620
"Sending transaction {} to worker channel (capacity: {})",
16211621
tx_hash,
16221622
self.tx_sender.capacity()
@@ -1626,7 +1626,7 @@ impl MempoolApi for ChannelMempoolApi {
16261626
RpcError::Execution("Transaction submission failed: worker channel closed".to_string())
16271627
})?;
16281628

1629-
info!(
1629+
trace!(
16301630
"Transaction {} sent to worker channel ({} bytes)",
16311631
tx_hash,
16321632
tx_bytes.len()

0 commit comments

Comments
 (0)