Skip to content

Commit 05f4a50

Browse files
Zekun Lizekun000
authored andcommitted
[pipeline] impl Drop to ensure futures are properly gc'ed
Ends up cleaning the PipelineBlock to make sure it's only shared with Arc instead of cloning, so we can abort the pipeline upon drop.
1 parent f0c7e0e commit 05f4a50

23 files changed

+140
-142
lines changed

config/src/config/consensus_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ impl Default for ConsensusConfig {
226226
min_block_time_ms_to_activate: 100,
227227
min_blocks_to_activate: 4,
228228
metric: ExecutionBackpressureMetric::Percentile(0.5),
229-
target_block_time_ms: 125,
229+
target_block_time_ms: 120,
230230
},
231231
min_calibrated_txns_per_block: 8,
232232
}),
@@ -236,7 +236,7 @@ impl Default for ConsensusConfig {
236236
min_block_time_ms_to_activate: 10,
237237
min_blocks_to_activate: 4,
238238
metric: ExecutionBackpressureMetric::Mean,
239-
target_block_time_ms: 125,
239+
target_block_time_ms: 120,
240240
},
241241
block_execution_overhead_ms: 10,
242242
min_calibrated_block_gas_limit: 2000,

consensus/consensus-types/src/pipelined_block.rs

Lines changed: 65 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -184,37 +184,39 @@ impl OrderedBlockWindow {
184184
/// A representation of a block that has been added to the execution pipeline. It might either be in ordered
185185
/// or in executed state. In the ordered state, the block is waiting to be executed. In the executed state,
186186
/// the block has been executed and the output is available.
187-
#[derive(Derivative, Clone)]
188-
#[derivative(Eq, PartialEq)]
187+
/// This struct is not Cloneable, use Arc to share it.
188+
#[derive(Derivative)]
189189
pub struct PipelinedBlock {
190190
/// Block data that cannot be regenerated.
191191
block: Block,
192192
/// A window of blocks that are needed for execution with the execution pool, EXCLUDING the current block
193-
#[derivative(PartialEq = "ignore")]
194193
block_window: OrderedBlockWindow,
195-
/// Input transactions in the order of execution
194+
/// Input transactions in the order of execution. DEPRECATED stay for serialization compatibility.
196195
input_transactions: Vec<SignedTransaction>,
197196
/// The state_compute_result is calculated for all the pending blocks prior to insertion to
198197
/// the tree. The execution results are not persisted: they're recalculated again for the
199198
/// pending blocks upon restart.
200-
#[derivative(PartialEq = "ignore")]
201-
state_compute_result: StateComputeResult,
199+
state_compute_result: Mutex<StateComputeResult>,
202200
randomness: OnceCell<Randomness>,
203201
pipeline_insertion_time: OnceCell<Instant>,
204-
execution_summary: Arc<OnceCell<ExecutionSummary>>,
205-
#[derivative(PartialEq = "ignore")]
206-
pre_commit_fut: Arc<Mutex<Option<BoxFuture<'static, ExecutorResult<()>>>>>,
207-
// pipeline related fields
208-
#[derivative(PartialEq = "ignore")]
209-
pipeline_futs: Arc<Mutex<Option<PipelineFutures>>>,
210-
#[derivative(PartialEq = "ignore")]
211-
pipeline_tx: Arc<Mutex<Option<PipelineInputTx>>>,
212-
#[derivative(PartialEq = "ignore")]
213-
pipeline_abort_handle: Arc<Mutex<Option<Vec<AbortHandle>>>>,
214-
#[derivative(PartialEq = "ignore")]
215-
block_qc: Arc<Mutex<Option<Arc<QuorumCert>>>>,
202+
execution_summary: OnceCell<ExecutionSummary>,
203+
pre_commit_fut: Mutex<Option<BoxFuture<'static, ExecutorResult<()>>>>,
204+
/// pipeline related fields
205+
pipeline_futs: Mutex<Option<PipelineFutures>>,
206+
pipeline_tx: Mutex<Option<PipelineInputTx>>,
207+
pipeline_abort_handle: Mutex<Option<Vec<AbortHandle>>>,
208+
block_qc: Mutex<Option<Arc<QuorumCert>>>,
216209
}
217210

211+
impl PartialEq for PipelinedBlock {
212+
fn eq(&self, other: &Self) -> bool {
213+
self.block == other.block
214+
&& self.input_transactions == other.input_transactions
215+
&& self.randomness.get() == other.randomness.get()
216+
}
217+
}
218+
impl Eq for PipelinedBlock {}
219+
218220
impl Serialize for PipelinedBlock {
219221
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
220222
where
@@ -265,15 +267,13 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
265267

266268
impl PipelinedBlock {
267269
pub fn set_compute_result(
268-
&mut self,
269-
compute_result: StateComputeResult,
270+
&self,
271+
state_compute_result: StateComputeResult,
270272
execution_time: Duration,
271273
) {
272-
self.state_compute_result = compute_result;
273-
274274
let mut to_commit = 0;
275275
let mut to_retry = 0;
276-
for txn in self.state_compute_result.compute_status_for_input_txns() {
276+
for txn in state_compute_result.compute_status_for_input_txns() {
277277
match txn {
278278
TransactionStatus::Keep(_) => to_commit += 1,
279279
TransactionStatus::Retry => to_retry += 1,
@@ -289,14 +289,14 @@ impl PipelinedBlock {
289289
to_commit,
290290
to_retry,
291291
execution_time,
292-
root_hash: self.state_compute_result.root_hash(),
293-
gas_used: self
294-
.state_compute_result
292+
root_hash: state_compute_result.root_hash(),
293+
gas_used: state_compute_result
295294
.execution_output
296295
.block_end_info
297296
.as_ref()
298297
.map(|info| info.block_effective_gas_units()),
299298
};
299+
*self.state_compute_result.lock() = state_compute_result;
300300

301301
// We might be retrying execution, so it might have already been set.
302302
// Because we use this for statistics, it's ok that we drop the newer value.
@@ -321,23 +321,17 @@ impl PipelinedBlock {
321321
}
322322
}
323323

324-
pub fn set_execution_result(
325-
mut self,
326-
pipeline_execution_result: PipelineExecutionResult,
327-
) -> Self {
324+
pub fn set_execution_result(&self, pipeline_execution_result: PipelineExecutionResult) {
328325
let PipelineExecutionResult {
329-
input_txns,
326+
input_txns: _,
330327
result,
331328
execution_time,
332329
pre_commit_fut,
333330
} = pipeline_execution_result;
334331

335-
self.input_transactions = input_txns;
336-
self.pre_commit_fut = Arc::new(Mutex::new(Some(pre_commit_fut)));
332+
*self.pre_commit_fut.lock() = Some(pre_commit_fut);
337333

338334
self.set_compute_result(result, execution_time);
339-
340-
self
341335
}
342336

343337
#[cfg(any(test, feature = "fuzzing"))]
@@ -380,6 +374,13 @@ impl Display for PipelinedBlock {
380374
}
381375
}
382376

377+
/// Safeguard to ensure that the pipeline is aborted when the block is dropped.
378+
impl Drop for PipelinedBlock {
379+
fn drop(&mut self) {
380+
let _ = self.abort_pipeline();
381+
}
382+
}
383+
383384
impl PipelinedBlock {
384385
pub fn new(
385386
block: Block,
@@ -390,25 +391,28 @@ impl PipelinedBlock {
390391
block,
391392
block_window: OrderedBlockWindow::empty(),
392393
input_transactions,
393-
state_compute_result,
394+
state_compute_result: Mutex::new(state_compute_result),
394395
randomness: OnceCell::new(),
395396
pipeline_insertion_time: OnceCell::new(),
396-
execution_summary: Arc::new(OnceCell::new()),
397-
pre_commit_fut: Arc::new(Mutex::new(None)),
398-
pipeline_futs: Arc::new(Mutex::new(None)),
399-
pipeline_tx: Arc::new(Mutex::new(None)),
400-
pipeline_abort_handle: Arc::new(Mutex::new(None)),
401-
block_qc: Arc::new(Mutex::new(None)),
397+
execution_summary: OnceCell::new(),
398+
pre_commit_fut: Mutex::new(None),
399+
pipeline_futs: Mutex::new(None),
400+
pipeline_tx: Mutex::new(None),
401+
pipeline_abort_handle: Mutex::new(None),
402+
block_qc: Mutex::new(None),
402403
}
403404
}
404405

406+
pub fn with_block_window(self, window: OrderedBlockWindow) -> Self {
407+
let mut block = self;
408+
block.block_window = window;
409+
block
410+
}
411+
405412
pub fn new_ordered(block: Block, window: OrderedBlockWindow) -> Self {
406413
let input_transactions = Vec::new();
407414
let state_compute_result = StateComputeResult::new_dummy();
408-
Self {
409-
block_window: window,
410-
..Self::new(block, input_transactions, state_compute_result)
411-
}
415+
Self::new(block, input_transactions, state_compute_result).with_block_window(window)
412416
}
413417

414418
pub fn block(&self) -> &Block {
@@ -423,10 +427,6 @@ impl PipelinedBlock {
423427
self.block().id()
424428
}
425429

426-
pub fn input_transactions(&self) -> &Vec<SignedTransaction> {
427-
&self.input_transactions
428-
}
429-
430430
pub fn epoch(&self) -> u64 {
431431
self.block.epoch()
432432
}
@@ -455,8 +455,8 @@ impl PipelinedBlock {
455455
self.block().timestamp_usecs()
456456
}
457457

458-
pub fn compute_result(&self) -> &StateComputeResult {
459-
&self.state_compute_result
458+
pub fn compute_result(&self) -> StateComputeResult {
459+
self.state_compute_result.lock().clone()
460460
}
461461

462462
pub fn randomness(&self) -> Option<&Randomness> {
@@ -468,18 +468,20 @@ impl PipelinedBlock {
468468
}
469469

470470
pub fn block_info(&self) -> BlockInfo {
471+
let compute_result = self.compute_result();
471472
self.block().gen_block_info(
472-
self.compute_result().root_hash(),
473-
self.compute_result().last_version_or_0(),
474-
self.compute_result().epoch_state().clone(),
473+
compute_result.root_hash(),
474+
compute_result.last_version_or_0(),
475+
compute_result.epoch_state().clone(),
475476
)
476477
}
477478

478479
pub fn vote_proposal(&self) -> VoteProposal {
480+
let compute_result = self.compute_result();
479481
VoteProposal::new(
480-
self.compute_result().extension_proof(),
482+
compute_result.extension_proof(),
481483
self.block.clone(),
482-
self.compute_result().epoch_state().clone(),
484+
compute_result.epoch_state().clone(),
483485
true,
484486
)
485487
}
@@ -493,15 +495,15 @@ impl PipelinedBlock {
493495
if self.is_reconfiguration_suffix() {
494496
return vec![];
495497
}
496-
self.state_compute_result.subscribable_events().to_vec()
498+
self.compute_result().subscribable_events().to_vec()
497499
}
498500

499501
/// The block is suffix of a reconfiguration block if the state result carries over the epoch state
500502
/// from parent but has no transaction.
501503
pub fn is_reconfiguration_suffix(&self) -> bool {
502-
self.state_compute_result.has_reconfiguration()
503-
&& self
504-
.state_compute_result
504+
let state_compute_result = self.compute_result();
505+
state_compute_result.has_reconfiguration()
506+
&& state_compute_result
505507
.compute_status_for_input_txns()
506508
.is_empty()
507509
}
@@ -543,8 +545,8 @@ impl PipelinedBlock {
543545
*self.pipeline_abort_handle.lock() = Some(abort_handles);
544546
}
545547

546-
pub fn pipeline_tx(&self) -> Arc<Mutex<Option<PipelineInputTx>>> {
547-
self.pipeline_tx.clone()
548+
pub fn pipeline_tx(&self) -> &Mutex<Option<PipelineInputTx>> {
549+
&self.pipeline_tx
548550
}
549551

550552
pub fn abort_pipeline(&self) -> Option<PipelineFutures> {

consensus/src/block_storage/block_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl BlockStore {
341341
// This callback is invoked synchronously with and could be used for multiple batches of blocks.
342342
self.execution_client
343343
.finalize_order(
344-
&blocks_to_commit,
344+
blocks_to_commit,
345345
finality_proof.clone(),
346346
Box::new(
347347
move |committed_blocks: &[Arc<PipelinedBlock>],

consensus/src/block_storage/execution_pool/common_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub const DEFAULT_MAX_PRUNED_BLOCKS_IN_MEM: usize = 10;
2525

2626
/// Helper function to create a `QuorumCert` which can provide a `highest_commit_cert` via
2727
/// `highest_quorum_cert.into_wrapped_ledger_info()`
28+
#[allow(dead_code)]
2829
pub fn generate_qc(round: Round, parent_round: Round) -> QuorumCert {
2930
let num_nodes = 4;
3031
let (signers, validators) = random_validator_verifier(num_nodes, None, false);

consensus/src/consensus_observer/observer/consensus_observer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ impl ConsensusObserver {
295295
if let Err(error) = self
296296
.execution_client
297297
.finalize_order(
298-
ordered_block.blocks(),
298+
ordered_block.blocks().clone(),
299299
WrappedLedgerInfo::new(VoteData::dummy(), ordered_block.ordered_proof().clone()),
300300
commit_callback,
301301
)

consensus/src/counters.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ pub fn update_counters_for_compute_result(compute_result: &StateComputeResult) {
13211321
pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<PipelinedBlock>]) {
13221322
for block in blocks_to_commit {
13231323
update_counters_for_block(block.block());
1324-
update_counters_for_compute_result(block.compute_result());
1324+
update_counters_for_compute_result(&block.compute_result());
13251325
}
13261326
}
13271327

consensus/src/dag/adapter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
182182
};
183183
NUM_ROUNDS_PER_BLOCK.observe((rounds_between + 1) as f64);
184184

185-
let block = PipelinedBlock::new(
185+
let block = Arc::new(PipelinedBlock::new(
186186
Block::new_for_dag(
187187
epoch,
188188
round,
@@ -197,7 +197,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
197197
),
198198
vec![],
199199
StateComputeResult::new_dummy(),
200-
);
200+
));
201201
let block_info = block.block_info();
202202
let ledger_info_provider = self.ledger_info_provider.clone();
203203
let dag = self.dag.clone();

0 commit comments

Comments
 (0)