Skip to content

Commit f3b7983

Browse files
committed
Added NewPayloadCache
1 parent 29e5a1f commit f3b7983

File tree

2 files changed

+159
-38
lines changed

2 files changed

+159
-38
lines changed

beacon_node/execution_layer/src/lib.rs

Lines changed: 158 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3939
use strum::AsRefStr;
4040
use task_executor::TaskExecutor;
4141
use tokio::{
42-
sync::{Mutex, MutexGuard, RwLock},
42+
sync::{Mutex, MutexGuard, RwLock, broadcast},
4343
time::sleep,
4444
};
4545
use tokio_stream::wrappers::WatchStream;
@@ -138,15 +138,15 @@ impl<E: EthSpec> TryFrom<BuilderBid<E>> for ProvenancedPayload<BlockProposalCont
138138
}
139139
}
140140

141-
#[derive(Debug)]
141+
#[derive(Debug, Clone)]
142142
pub enum Error {
143143
NoEngine,
144144
NoPayloadBuilder,
145-
ApiError(ApiError),
146-
Builder(builder_client::Error),
145+
ApiError(Arc<ApiError>),
146+
Builder(Arc<builder_client::Error>),
147147
NoHeaderFromBuilder,
148148
CannotProduceHeader,
149-
EngineError(Box<EngineError>),
149+
EngineError(Arc<EngineError>),
150150
NotSynced,
151151
ShuttingDown,
152152
FeeRecipientUnspecified,
@@ -177,7 +177,7 @@ impl From<BeaconStateError> for Error {
177177

178178
impl From<ApiError> for Error {
179179
fn from(e: ApiError) -> Self {
180-
Error::ApiError(e)
180+
Error::ApiError(Arc::new(e))
181181
}
182182
}
183183

@@ -186,12 +186,18 @@ impl From<EngineError> for Error {
186186
match e {
187187
// This removes an unnecessary layer of indirection.
188188
// TODO (mark): consider refactoring these error enums
189-
EngineError::Api { error } => Error::ApiError(error),
190-
_ => Error::EngineError(Box::new(e)),
189+
EngineError::Api { error } => Error::ApiError(Arc::new(error)),
190+
_ => Error::EngineError(Arc::new(e)),
191191
}
192192
}
193193
}
194194

195+
impl From<builder_client::Error> for Error {
196+
fn from(e: builder_client::Error) -> Self {
197+
Error::Builder(Arc::new(e))
198+
}
199+
}
200+
195201
pub enum BlockProposalContentsType<E: EthSpec> {
196202
Full(BlockProposalContents<E, FullPayload<E>>),
197203
Blinded(BlockProposalContents<E, BlindedPayload<E>>),
@@ -418,6 +424,108 @@ pub enum SubmitBlindedBlockResponse<E: EthSpec> {
418424

419425
type PayloadContentsRefTuple<'a, E> = (ExecutionPayloadRef<'a, E>, Option<&'a BlobsBundle<E>>);
420426

427+
/// Cache for deduplicating new payload requests.
428+
///
429+
/// Handles both in-flight requests and recently completed requests to avoid
430+
/// duplicate network calls to the execution engine.
431+
struct NewPayloadCache {
432+
inner: Mutex<NewPayloadCacheInner>,
433+
}
434+
435+
struct NewPayloadCacheInner {
436+
/// In-flight requests mapped by block hash
437+
in_flight: HashMap<ExecutionBlockHash, broadcast::Sender<Result<PayloadStatus, Error>>>,
438+
/// Recently completed requests with their completion time
439+
completed: LruCache<ExecutionBlockHash, (Instant, Result<PayloadStatus, Error>)>,
440+
}
441+
442+
impl NewPayloadCache {
443+
/// Cache TTL for completed requests (12 seconds)
444+
const COMPLETED_TTL: Duration = Duration::from_secs(12);
445+
/// Maximum number of completed requests to cache
446+
const COMPLETED_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32);
447+
448+
fn new() -> Self {
449+
Self {
450+
inner: Mutex::new(NewPayloadCacheInner {
451+
in_flight: HashMap::new(),
452+
completed: LruCache::new(Self::COMPLETED_CACHE_SIZE),
453+
}),
454+
}
455+
}
456+
457+
/// Get cached result or execute the provided function.
458+
///
459+
/// Returns a future that resolves to the payload status. Handles:
460+
/// 1. Returning cached completed results (if not expired)
461+
/// 2. Joining in-flight requests
462+
/// 3. Executing new requests and caching results
463+
async fn get_or_execute<F, Fut>(
464+
&self,
465+
block_hash: ExecutionBlockHash,
466+
execute_fn: F,
467+
) -> Result<PayloadStatus, Error>
468+
where
469+
F: FnOnce() -> Fut,
470+
Fut: Future<Output = Result<PayloadStatus, Error>>,
471+
{
472+
let now = Instant::now();
473+
474+
// Single lock acquisition to handle all cases
475+
let mut cache = self.inner.lock().await;
476+
477+
// 1. Check completed cache first
478+
if let Some((timestamp, result)) = cache.completed.get(&block_hash) {
479+
if now.duration_since(*timestamp) < Self::COMPLETED_TTL {
480+
return result.clone();
481+
} else {
482+
// Entry expired, remove it
483+
cache.completed.pop(&block_hash);
484+
}
485+
}
486+
487+
// 2. Check in-flight requests
488+
if let Some(sender) = cache.in_flight.get(&block_hash) {
489+
let mut receiver = sender.subscribe();
490+
drop(cache); // Release lock early
491+
492+
match receiver.recv().await {
493+
Ok(result) => return result,
494+
Err(_) => {
495+
// Sender was dropped, fall through to execute new request
496+
error!(
497+
"NewPayloadCache: Sender was dropped for block hash {}. This shouldn't happen.",
498+
block_hash
499+
);
500+
// just call the execute_fn again
501+
return execute_fn().await;
502+
}
503+
}
504+
}
505+
506+
// 3. Start new request
507+
let (sender, _receiver) = broadcast::channel(1);
508+
cache.in_flight.insert(block_hash, sender.clone());
509+
drop(cache); // Release lock for execution
510+
511+
// Execute the function
512+
let result = execute_fn().await;
513+
514+
// Update cache with result
515+
let mut cache = self.inner.lock().await;
516+
cache.in_flight.remove(&block_hash);
517+
cache
518+
.completed
519+
.put(block_hash, (Instant::now(), result.clone()));
520+
drop(cache);
521+
522+
// Broadcast result to any waiting receivers
523+
let _ = sender.send(result.clone());
524+
525+
result
526+
}
527+
}
528+
421529
struct Inner<E: EthSpec> {
422530
engine: Arc<Engine>,
423531
builder: ArcSwapOption<BuilderHttpClient>,
@@ -433,6 +541,10 @@ struct Inner<E: EthSpec> {
433541
/// This is used *only* in the informational sync status endpoint, so that a VC using this
434542
/// node can prefer another node with a healthier EL.
435543
last_new_payload_errored: RwLock<bool>,
544+
/// Cache for deduplicating `notify_new_payload` requests.
545+
///
546+
/// Handles both in-flight requests and recently completed requests.
547+
new_payload_cache: NewPayloadCache,
436548
}
437549

438550
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -523,8 +635,8 @@ impl<E: EthSpec> ExecutionLayer<E> {
523635
let engine: Engine = {
524636
let auth = Auth::new(jwt_key, jwt_id, jwt_version);
525637
debug!(endpoint = %execution_url, jwt_path = ?secret_file.as_path(),"Loaded execution endpoint");
526-
let api = HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier)
527-
.map_err(Error::ApiError)?;
638+
let api =
639+
HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier)?;
528640
Engine::new(api, executor.clone())
529641
};
530642

@@ -539,6 +651,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
539651
executor,
540652
payload_cache: PayloadCache::default(),
541653
last_new_payload_errored: RwLock::new(false),
654+
new_payload_cache: NewPayloadCache::new(),
542655
};
543656

544657
let el = Self {
@@ -582,7 +695,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
582695
builder_header_timeout,
583696
disable_ssz,
584697
)
585-
.map_err(Error::Builder)?;
698+
.map_err(Into::<Error>::into)?;
586699
info!(
587700
?builder_url,
588701
local_user_agent = builder_client.get_user_agent(),
@@ -1349,15 +1462,34 @@ impl<E: EthSpec> ExecutionLayer<E> {
13491462
Ok(GetPayloadResponseType::Full(payload_response))
13501463
})
13511464
.await
1352-
.map_err(Box::new)
1353-
.map_err(Error::EngineError)
1465+
.map_err(Into::into)
13541466
}
13551467

13561468
/// Maps to the `engine_newPayload` JSON-RPC call.
1469+
///
1470+
/// Deduplicates concurrent requests with the same block hash - if multiple threads
1471+
/// call this function with the same block hash simultaneously, only one request
1472+
/// is sent to the execution engine, but all threads receive the same response.
1473+
/// Also caches recent results for a short time to avoid duplicate requests.
13571474
/// TODO(EIP-7732) figure out how and why Mark relaxed new_payload_request param's typ to NewPayloadRequest<E>
13581475
pub async fn notify_new_payload(
13591476
&self,
13601477
new_payload_request: NewPayloadRequest<'_, E>,
1478+
) -> Result<PayloadStatus, Error> {
1479+
let block_hash = new_payload_request.block_hash();
1480+
1481+
self.inner
1482+
.new_payload_cache
1483+
.get_or_execute(block_hash, || {
1484+
self.notify_new_payload_impl(new_payload_request)
1485+
})
1486+
.await
1487+
}
1488+
1489+
/// Internal implementation of notify_new_payload without deduplication logic.
1490+
async fn notify_new_payload_impl(
1491+
&self,
1492+
new_payload_request: NewPayloadRequest<'_, E>,
13611493
) -> Result<PayloadStatus, Error> {
13621494
let _timer = metrics::start_timer_vec(
13631495
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
@@ -1391,9 +1523,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
13911523
}
13921524
*self.inner.last_new_payload_errored.write().await = result.is_err();
13931525

1394-
process_payload_status(block_hash, result)
1395-
.map_err(Box::new)
1396-
.map_err(Error::EngineError)
1526+
process_payload_status(block_hash, result).map_err(Into::into)
13971527
}
13981528

13991529
/// Update engine sync status.
@@ -1529,8 +1659,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
15291659
head_block_hash,
15301660
result.map(|response| response.payload_status),
15311661
)
1532-
.map_err(Box::new)
1533-
.map_err(Error::EngineError)
1662+
.map_err(Into::into)
15341663
}
15351664

15361665
/// Returns the execution engine capabilities resulting from a call to
@@ -1622,9 +1751,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
16221751
}
16231752
Ok(block.map(|b| b.block_hash))
16241753
})
1625-
.await
1626-
.map_err(Box::new)
1627-
.map_err(Error::EngineError)?;
1754+
.await?;
16281755

16291756
if let Some(hash) = &hash_opt {
16301757
info!(
@@ -1734,8 +1861,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
17341861
Ok(None)
17351862
})
17361863
.await
1737-
.map_err(Box::new)
1738-
.map_err(Error::EngineError)
1864+
.map_err(Into::into)
17391865
}
17401866

17411867
/// This function should remain internal.
@@ -1786,8 +1912,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
17861912
engine.api.get_payload_bodies_by_hash_v1(hashes).await
17871913
})
17881914
.await
1789-
.map_err(Box::new)
1790-
.map_err(Error::EngineError)
1915+
.map_err(Into::into)
17911916
}
17921917

17931918
pub async fn get_payload_bodies_by_range(
@@ -1804,8 +1929,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
18041929
.await
18051930
})
18061931
.await
1807-
.map_err(Box::new)
1808-
.map_err(Error::EngineError)
1932+
.map_err(Into::into)
18091933
}
18101934

18111935
/// Fetch a full payload from the execution node.
@@ -1867,8 +1991,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
18671991
self.engine()
18681992
.request(|engine| async move { engine.api.get_blobs_v1(query).await })
18691993
.await
1870-
.map_err(Box::new)
1871-
.map_err(Error::EngineError)
1994+
.map_err(Into::into)
18721995
} else {
18731996
Err(Error::GetBlobsNotSupported)
18741997
}
@@ -1884,8 +2007,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
18842007
self.engine()
18852008
.request(|engine| async move { engine.api.get_blobs_v2(query).await })
18862009
.await
1887-
.map_err(Box::new)
1888-
.map_err(Error::EngineError)
2010+
.map_err(Into::into)
18892011
} else {
18902012
Err(Error::GetBlobsNotSupported)
18912013
}
@@ -1898,8 +2020,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
18982020
self.engine()
18992021
.request(|engine| async move { engine.api.get_block_by_number(query).await })
19002022
.await
1901-
.map_err(Box::new)
1902-
.map_err(Error::EngineError)
2023+
.map_err(Into::into)
19032024
}
19042025

19052026
pub async fn propose_blinded_beacon_block(
@@ -1948,12 +2069,12 @@ impl<E: EthSpec> ExecutionLayer<E> {
19482069
builder
19492070
.post_builder_blinded_blocks_v1_ssz(block)
19502071
.await
1951-
.map_err(Error::Builder)
2072+
.map_err(Into::into)
19522073
} else {
19532074
builder
19542075
.post_builder_blinded_blocks_v1(block)
19552076
.await
1956-
.map_err(Error::Builder)
2077+
.map_err(Into::into)
19572078
.map(|d| d.data)
19582079
}
19592080
})
@@ -2018,12 +2139,12 @@ impl<E: EthSpec> ExecutionLayer<E> {
20182139
builder
20192140
.post_builder_blinded_blocks_v2_ssz(block)
20202141
.await
2021-
.map_err(Error::Builder)
2142+
.map_err(Into::into)
20222143
} else {
20232144
builder
20242145
.post_builder_blinded_blocks_v2(block)
20252146
.await
2026-
.map_err(Error::Builder)
2147+
.map_err(Into::into)
20272148
}
20282149
})
20292150
.await;

beacon_node/execution_layer/src/versioned_hashes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use alloy_consensus::TxEnvelope;
22
use alloy_rlp::Decodable;
33
use types::{EthSpec, ExecutionPayloadRef, Hash256, Unsigned, VersionedHash};
44

5-
#[derive(Debug)]
5+
#[derive(Debug, Clone)]
66
pub enum Error {
77
DecodingTransaction(String),
88
LengthMismatch { expected: usize, found: usize },

0 commit comments

Comments
 (0)