Skip to content

Commit 3413680

Browse files
authored
perf: fix TPS bottleneck in worker pipeline (#206)
1 parent c11dbf8 commit 3413680

File tree

8 files changed

+114
-32
lines changed

8 files changed

+114
-32
lines changed

crates/data-chain/src/worker/batch_maker.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ impl BatchMaker {
5454
None
5555
}
5656

57+
/// Add multiple transactions at once, returning any completed batches.
58+
///
59+
/// Convenience method that collects all resulting batches when adding
60+
/// a pre-drained set of transactions in bulk.
61+
pub fn add_transactions(&mut self, txs: Vec<Transaction>) -> Vec<Batch> {
62+
let mut batches = Vec::new();
63+
for tx in txs {
64+
if let Some(batch) = self.add_transaction(tx) {
65+
batches.push(batch);
66+
}
67+
}
68+
batches
69+
}
70+
5771
/// Check if batch should be flushed
5872
fn should_flush(&self) -> bool {
5973
self.pending_size >= self.max_bytes || self.pending_txs.len() >= self.max_txs

crates/data-chain/src/worker/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ impl WorkerConfig {
2222
/// Create a new configuration with defaults
2323
///
2424
/// Default batch thresholds are tuned for responsive transaction processing:
25-
/// - `max_batch_txs`: 100 transactions triggers immediate batch flush
25+
/// - `max_batch_txs`: 500 transactions triggers immediate batch flush
2626
/// - `flush_interval`: 50ms ensures batches don't wait too long
2727
pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self {
2828
Self {
2929
validator_id,
3030
worker_id,
3131
max_batch_bytes: 1024 * 1024, // 1MB
32-
max_batch_txs: 100, // Flush after 100 txs for responsive batching
32+
max_batch_txs: 500, // Flush after 500 txs for higher throughput
3333
flush_interval: Duration::from_millis(50), // Faster time-based flush
3434
}
3535
}

crates/data-chain/src/worker/core.rs

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,18 @@ impl Worker {
382382
self.check_time_flush().await;
383383
}
384384

385-
// Handle incoming transactions
385+
// Handle incoming transactions - drain all ready TXs at once
386386
tx = self.tx_receiver.recv() => {
387387
if let Some(tx) = tx {
388-
self.handle_transaction(tx).await;
388+
// Drain additional ready transactions to amortize select! overhead
389+
let mut txs = vec![tx];
390+
while let Ok(t) = self.tx_receiver.try_recv() {
391+
txs.push(t);
392+
if txs.len() >= self.config.max_batch_txs {
393+
break;
394+
}
395+
}
396+
self.handle_transactions_batch(txs).await;
389397
} else {
390398
warn!(worker_id = self.config.worker_id, "tx_receiver closed");
391399
self.shutdown = true;
@@ -420,9 +428,10 @@ impl Worker {
420428
info!(worker_id = self.config.worker_id, "Worker shutting down");
421429
}
422430

423-
/// Handle incoming transaction
431+
/// Handle incoming transaction (used by tests only; production uses handle_transactions_batch)
432+
#[cfg(test)]
424433
async fn handle_transaction(&mut self, tx: Transaction) {
425-
info!(
434+
trace!(
426435
worker_id = self.config.worker_id,
427436
tx_size = tx.len(),
428437
"Worker received transaction from channel"
@@ -457,14 +466,60 @@ impl Worker {
457466
);
458467
self.process_batch(batch).await;
459468
} else {
460-
info!(
469+
trace!(
461470
worker_id = self.config.worker_id,
462471
pending_txs = self.batch_maker.pending_count(),
463472
"Transaction added to batch maker, waiting for more or flush"
464473
);
465474
}
466475
}
467476

477+
/// Handle a batch of incoming transactions drained from the channel.
478+
///
479+
/// Validates each transaction, adds all valid ones to the batch maker,
480+
/// and processes any resulting batches. This amortizes the per-transaction
481+
/// overhead of the select! loop and logging.
482+
async fn handle_transactions_batch(&mut self, txs: Vec<Transaction>) {
483+
let total = txs.len();
484+
debug!(
485+
worker_id = self.config.worker_id,
486+
tx_count = total,
487+
"Processing drained transaction batch"
488+
);
489+
490+
// Validate and collect valid transactions
491+
let mut valid_txs = Vec::with_capacity(total);
492+
for tx in txs {
493+
if let Some(ref validator) = self.validator {
494+
match validator.validate_transaction(&tx).await {
495+
Ok(()) => valid_txs.push(tx),
496+
Err(e) => {
497+
warn!(
498+
worker_id = self.config.worker_id,
499+
error = %e,
500+
"Transaction validation failed, rejecting"
501+
);
502+
}
503+
}
504+
} else {
505+
valid_txs.push(tx);
506+
}
507+
}
508+
509+
// Add all valid transactions and collect any completed batches
510+
let batches = self.batch_maker.add_transactions(valid_txs);
511+
512+
// Process all completed batches
513+
for batch in batches {
514+
info!(
515+
worker_id = self.config.worker_id,
516+
tx_count = batch.transactions.len(),
517+
"Batch ready, processing"
518+
);
519+
self.process_batch(batch).await;
520+
}
521+
}
522+
468523
/// Handle message from Primary
469524
async fn handle_primary_message(&mut self, msg: PrimaryToWorker) {
470525
match msg {
@@ -694,9 +749,8 @@ impl Worker {
694749
let has_pending = self.batch_maker.has_pending();
695750
let elapsed = self.batch_maker.time_since_batch_start();
696751

697-
// Log every call so we can see the tick is working
698752
if has_pending {
699-
info!(
753+
trace!(
700754
worker_id = self.config.worker_id,
701755
should_flush,
702756
has_pending,
@@ -706,7 +760,7 @@ impl Worker {
706760
}
707761

708762
if should_flush && has_pending {
709-
info!(
763+
debug!(
710764
worker_id = self.config.worker_id,
711765
pending_txs = self.batch_maker.pending_count(),
712766
"Time flush triggered, creating batch"
@@ -783,7 +837,7 @@ impl Worker {
783837
if let Some(ref storage) = self.storage {
784838
match storage.put_batch(batch.clone()).await {
785839
Ok(_) => {
786-
info!(
840+
debug!(
787841
worker_id = self.config.worker_id,
788842
digest = %digest.digest,
789843
tx_count = batch.transactions.len(),
@@ -812,20 +866,20 @@ impl Worker {
812866
self.state.store_batch(batch.clone());
813867

814868
// Broadcast to peer Workers
815-
info!(
869+
debug!(
816870
worker_id = self.config.worker_id,
817871
digest = %digest.digest,
818872
"Broadcasting batch to peer Workers..."
819873
);
820874
self.network.broadcast_batch(&batch).await;
821-
info!(
875+
debug!(
822876
worker_id = self.config.worker_id,
823877
digest = %digest.digest,
824878
"Broadcast complete"
825879
);
826880

827881
// Report to Primary
828-
info!(
882+
debug!(
829883
worker_id = self.config.worker_id,
830884
digest = %digest.digest,
831885
"Sending BatchDigest to Primary"
@@ -846,7 +900,7 @@ impl Worker {
846900
"Failed to send BatchDigest to Primary - channel closed"
847901
);
848902
} else {
849-
info!(
903+
debug!(
850904
worker_id = self.config.worker_id,
851905
"BatchDigest sent to Primary successfully"
852906
);

crates/node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ toml = { workspace = true }
5151
# Logging
5252
tracing = { workspace = true }
5353
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] }
54+
tracing-appender = "0.2"
5455

5556
# CLI
5657
clap = { version = "4.4", features = ["derive"] }

crates/node/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl NodeConfig {
247247
data_dir: home_dir.join("data"),
248248
genesis_path: None, // Uses default: {home_dir}/config/genesis.json
249249
car_interval_ms: 100,
250-
max_batch_txs: 100,
250+
max_batch_txs: 500,
251251
max_batch_bytes: 1024 * 1024, // 1MB
252252
rpc_enabled: false,
253253
rpc_http_port: DEFAULT_RPC_HTTP_PORT + (index as u16),
@@ -473,7 +473,7 @@ mod tests {
473473
"num_workers": 1,
474474
"data_dir": "/tmp/cipherd-0",
475475
"car_interval_ms": 100,
476-
"max_batch_txs": 100,
476+
"max_batch_txs": 500,
477477
"max_batch_bytes": 1048576
478478
}"#;
479479

@@ -556,7 +556,7 @@ mod tests {
556556
"num_workers": 1,
557557
"data_dir": "/tmp/cipherd-0",
558558
"car_interval_ms": 100,
559-
"max_batch_txs": 100,
559+
"max_batch_txs": 500,
560560
"max_batch_bytes": 1048576
561561
}"#;
562562

crates/node/src/main.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ async fn main() -> Result<()> {
364364
let cli = Cli::parse();
365365

366366
// Initialize tracing based on global flags
367-
init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color);
367+
// Hold the guard to keep the non-blocking writer alive for the process lifetime
368+
let _tracing_guard = init_tracing(&cli.log_level, &cli.log_format, cli.log_no_color);
368369

369370
let result = match cli.command {
370371
Commands::Init {
@@ -424,7 +425,11 @@ async fn main() -> Result<()> {
424425
Ok(())
425426
}
426427

427-
fn init_tracing(log_level: &str, log_format: &str, no_color: bool) {
428+
fn init_tracing(
429+
log_level: &str,
430+
log_format: &str,
431+
no_color: bool,
432+
) -> tracing_appender::non_blocking::WorkerGuard {
428433
let level = match log_level.to_lowercase().as_str() {
429434
"trace" => Level::TRACE,
430435
"debug" => Level::DEBUG,
@@ -437,16 +442,22 @@ fn init_tracing(log_level: &str, log_format: &str, no_color: bool) {
437442
let filter =
438443
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level.to_string()));
439444

445+
// Use non-blocking writer to avoid blocking the async runtime on log I/O
446+
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());
447+
440448
let subscriber = tracing_subscriber::fmt()
441449
.with_env_filter(filter)
442450
.with_target(true)
443451
.with_thread_ids(true)
444-
.with_ansi(!no_color);
452+
.with_ansi(!no_color)
453+
.with_writer(non_blocking);
445454

446455
match log_format {
447456
"json" => subscriber.json().init(),
448457
_ => subscriber.init(),
449458
}
459+
460+
guard
450461
}
451462

452463
// =============================================================================
@@ -554,7 +565,7 @@ fn cmd_init(
554565
data_dir: data_dir.clone(),
555566
genesis_path: Some(genesis_path.clone()),
556567
car_interval_ms: 100,
557-
max_batch_txs: 100,
568+
max_batch_txs: 500,
558569
max_batch_bytes: 1024 * 1024,
559570
rpc_enabled: true,
560571
rpc_http_port: cipherd::DEFAULT_RPC_HTTP_PORT,
@@ -968,7 +979,7 @@ fn cmd_testnet_init_files(
968979
data_dir: data_dir.clone(),
969980
genesis_path: Some(genesis_path.clone()),
970981
car_interval_ms: 100,
971-
max_batch_txs: 100,
982+
max_batch_txs: 500,
972983
max_batch_bytes: 1024 * 1024,
973984
rpc_enabled: true,
974985
// Each validator gets HTTP and WS ports spaced by 10 to avoid conflicts

crates/node/src/node.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use std::sync::Arc;
5858
use std::time::Duration;
5959
use tokio::sync::mpsc;
6060
use tokio_util::sync::CancellationToken;
61-
use tracing::{debug, error, info, warn};
61+
use tracing::{debug, error, info, trace, warn};
6262

6363
use cipherbft_metrics;
6464

@@ -703,7 +703,9 @@ impl Node {
703703
// to ensure batches are flushed before Cars are created. Without this, there's a race
704704
// condition where Primary creates empty Cars before Worker flushes pending batches.
705705
let worker_config = WorkerConfig::new(self.validator_id, worker_id)
706-
.with_flush_interval(std::time::Duration::from_millis(50));
706+
.with_flush_interval(std::time::Duration::from_millis(50))
707+
.with_max_batch_txs(self.config.max_batch_txs)
708+
.with_max_batch_bytes(self.config.max_batch_bytes);
707709
let mut worker_handle = Worker::spawn_with_storage(
708710
worker_config,
709711
Box::new(worker_network),
@@ -754,12 +756,12 @@ impl Node {
754756
}
755757
} => {
756758
if let Some(tx_bytes) = tx {
757-
info!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len());
759+
trace!("Worker {} received transaction from RPC mempool ({} bytes)", worker_id, tx_bytes.len());
758760
if worker_handle.submit_transaction(tx_bytes).await.is_err() {
759761
warn!("Worker {} submit_transaction failed", worker_id);
760762
// Don't break - continue processing other messages
761763
} else {
762-
info!("Worker {} forwarded transaction to batch maker", worker_id);
764+
trace!("Worker {} forwarded transaction to batch maker", worker_id);
763765
}
764766
}
765767
}
@@ -784,7 +786,7 @@ impl Node {
784786
msg = worker_handle.recv_from_worker() => {
785787
match msg {
786788
Some(m) => {
787-
info!("Worker {} bridge forwarding {:?} to Primary", worker_id, m);
789+
debug!("Worker {} bridge forwarding {:?} to Primary", worker_id, m);
788790
if primary_worker_sender.send(m).await.is_err() {
789791
warn!("Worker {} send to primary failed", worker_id);
790792
break;

crates/rpc/src/adapters.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::collections::HashMap;
3939
use std::sync::atomic::{AtomicU64, Ordering};
4040
use std::sync::Arc;
4141
use tokio::sync::mpsc;
42-
use tracing::{debug, info, trace, warn};
42+
use tracing::{debug, trace, warn};
4343

4444
use cipherbft_execution::precompiles::{CipherBftPrecompileProvider, StakingPrecompile};
4545
use cipherbft_execution::AccountProof;
@@ -1573,7 +1573,7 @@ impl ChannelMempoolApi {
15731573
#[async_trait]
15741574
impl MempoolApi for ChannelMempoolApi {
15751575
async fn submit_transaction(&self, tx_bytes: Bytes) -> RpcResult<B256> {
1576-
info!(
1576+
trace!(
15771577
"ChannelMempoolApi::submit_transaction received {} bytes (chain_id={})",
15781578
tx_bytes.len(),
15791579
self.chain_id
@@ -1616,7 +1616,7 @@ impl MempoolApi for ChannelMempoolApi {
16161616
}
16171617

16181618
// Forward to worker via channel
1619-
info!(
1619+
trace!(
16201620
"Sending transaction {} to worker channel (capacity: {})",
16211621
tx_hash,
16221622
self.tx_sender.capacity()
@@ -1626,7 +1626,7 @@ impl MempoolApi for ChannelMempoolApi {
16261626
RpcError::Execution("Transaction submission failed: worker channel closed".to_string())
16271627
})?;
16281628

1629-
info!(
1629+
trace!(
16301630
"Transaction {} sent to worker channel ({} bytes)",
16311631
tx_hash,
16321632
tx_bytes.len()

0 commit comments

Comments
 (0)