Skip to content

Commit a3d4a3e

Browse files
authored
BN: Fix el_manager timeouts issue in block processing. (#6665)
* Fix el_manager + block_processor NEWPAYLOAD_TIMEOUT timeouts issue. Use predefined array of exponential timeouts when all the requests to EL has been failed. * Increase timeout value to (next_slot.start_time - 1.second) * Address review comments. * Do not repeat requests when node is optimistically synced.
1 parent ead72de commit a3d4a3e

File tree

2 files changed

+127
-33
lines changed

2 files changed

+127
-33
lines changed

beacon_chain/el/el_manager.nim

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ export
3434
logScope:
3535
topics = "elman"
3636

37+
const
38+
SleepDurations =
39+
[100.milliseconds, 200.milliseconds, 500.milliseconds, 1.seconds]
40+
3741
type
3842
FixedBytes[N: static int] = web3.FixedBytes[N]
3943
PubKeyBytes = DynamicBytes[48, 48]
@@ -43,6 +47,11 @@ type
4347
WithoutTimeout* = distinct int
4448
Address = web3.Address
4549

50+
DeadlineObject* = object
51+
# TODO (cheatfate): This object declaration could be removed when
52+
# `Raising()` macro starts to support procedure arguments.
53+
future*: Future[void].Raising([CancelledError])
54+
4655
SomeEnginePayloadWithValue =
4756
BellatrixExecutionPayloadWithValue |
4857
GetPayloadV2Response |
@@ -233,6 +242,22 @@ declareCounter engine_api_last_minute_forkchoice_updates_sent,
233242
"Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals",
234243
labels = ["url"]
235244

245+
proc init*(t: typedesc[DeadlineObject], d: Duration): DeadlineObject =
246+
DeadlineObject(future: sleepAsync(d))
247+
248+
proc variedSleep*(
249+
counter: var int,
250+
durations: openArray[Duration]
251+
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
252+
doAssert(len(durations) > 0, "Empty durations array!")
253+
let index =
254+
if (counter < 0) or (counter > high(durations)):
255+
high(durations)
256+
else:
257+
counter
258+
inc(counter)
259+
sleepAsync(durations[index])
260+
236261
proc close(connection: ELConnection): Future[void] {.async: (raises: []).} =
237262
if connection.web3.isSome:
238263
try:
@@ -942,14 +967,20 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
942967

943968
proc sendNewPayload*(
944969
m: ELManager,
945-
blck: SomeForkyBeaconBlock
970+
blck: SomeForkyBeaconBlock,
971+
deadlineObj: DeadlineObject,
972+
maxRetriesCount: int
946973
): Future[PayloadExecutionStatus] {.async: (raises: [CancelledError]).} =
974+
doAssert maxRetriesCount > 0
975+
947976
let
948977
startTime = Moment.now()
949-
deadline = sleepAsync(NEWPAYLOAD_TIMEOUT)
978+
deadline = deadlineObj.future
950979
payload = blck.body.asEngineExecutionPayload
951980
var
952981
responseProcessor = ELConsensusViolationDetector.init()
982+
sleepCounter = 0
983+
retriesCount = 0
953984

954985
while true:
955986
block mainLoop:
@@ -1033,18 +1064,23 @@ proc sendNewPayload*(
10331064
return PayloadExecutionStatus.syncing
10341065

10351066
if len(pendingRequests) == 0:
1036-
# All requests failed, we will continue our attempts until deadline
1037-
# is not finished.
1067+
# All requests failed.
1068+
inc(retriesCount)
1069+
if retriesCount == maxRetriesCount:
1070+
return PayloadExecutionStatus.syncing
10381071

10391072
# To avoid continous spam of requests when EL node is offline we
1040-
# going to sleep until next attempt for
1041-
# (NEWPAYLOAD_TIMEOUT / 4) time (2.seconds).
1042-
let timeout =
1043-
chronos.nanoseconds(NEWPAYLOAD_TIMEOUT.nanoseconds div 4)
1044-
await sleepAsync(timeout)
1045-
1073+
# going to sleep until next attempt.
1074+
await variedSleep(sleepCounter, SleepDurations)
10461075
break mainLoop
10471076

1077+
proc sendNewPayload*(
1078+
m: ELManager,
1079+
blck: SomeForkyBeaconBlock
1080+
): Future[PayloadExecutionStatus] {.
1081+
async: (raises: [CancelledError], raw: true).} =
1082+
sendNewPayload(m, blck, DeadlineObject.init(NEWPAYLOAD_TIMEOUT), high(int))
1083+
10481084
proc forkchoiceUpdatedForSingleEL(
10491085
connection: ELConnection,
10501086
state: ref ForkchoiceStateV1,
@@ -1072,11 +1108,14 @@ proc forkchoiceUpdated*(
10721108
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
10731109
payloadAttributes: Opt[PayloadAttributesV1] |
10741110
Opt[PayloadAttributesV2] |
1075-
Opt[PayloadAttributesV3]
1111+
Opt[PayloadAttributesV3],
1112+
deadlineObj: DeadlineObject,
1113+
maxRetriesCount: int
10761114
): Future[(PayloadExecutionStatus, Opt[BlockHash])] {.
10771115
async: (raises: [CancelledError]).} =
10781116

10791117
doAssert not headBlockHash.isZero
1118+
doAssert maxRetriesCount > 0
10801119

10811120
# Allow finalizedBlockHash to be 0 to avoid sync deadlocks.
10821121
#
@@ -1132,9 +1171,12 @@ proc forkchoiceUpdated*(
11321171
safeBlockHash: safeBlockHash.asBlockHash,
11331172
finalizedBlockHash: finalizedBlockHash.asBlockHash)
11341173
startTime = Moment.now
1135-
deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT)
1174+
deadline = deadlineObj.future
1175+
11361176
var
11371177
responseProcessor = ELConsensusViolationDetector.init()
1178+
sleepCounter = 0
1179+
retriesCount = 0
11381180

11391181
while true:
11401182
block mainLoop:
@@ -1216,16 +1258,28 @@ proc forkchoiceUpdated*(
12161258
if len(pendingRequests) == 0:
12171259
# All requests failed, we will continue our attempts until deadline
12181260
# is not finished.
1261+
inc(retriesCount)
1262+
if retriesCount == maxRetriesCount:
1263+
return (PayloadExecutionStatus.syncing, Opt.none BlockHash)
12191264

12201265
# To avoid continous spam of requests when EL node is offline we
1221-
# going to sleep until next attempt for
1222-
# (FORKCHOICEUPDATED_TIMEOUT / 4) time (2.seconds).
1223-
let timeout =
1224-
chronos.nanoseconds(FORKCHOICEUPDATED_TIMEOUT.nanoseconds div 4)
1225-
await sleepAsync(timeout)
1226-
1266+
# going to sleep until next attempt.
1267+
await variedSleep(sleepCounter, SleepDurations)
12271268
break mainLoop
12281269

1270+
proc forkchoiceUpdated*(
1271+
m: ELManager,
1272+
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
1273+
payloadAttributes: Opt[PayloadAttributesV1] |
1274+
Opt[PayloadAttributesV2] |
1275+
Opt[PayloadAttributesV3]
1276+
): Future[(PayloadExecutionStatus, Opt[BlockHash])] {.
1277+
async: (raises: [CancelledError], raw: true).} =
1278+
forkchoiceUpdated(
1279+
m, headBlockHash, safeBlockHash, finalizedBlockHash,
1280+
payloadAttributes, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT),
1281+
high(int))
1282+
12291283
# TODO can't be defined within exchangeConfigWithSingleEL
12301284
func `==`(x, y: Quantity): bool {.borrow.}
12311285

beacon_chain/gossip_processing/block_processor.nim

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ from ../consensus_object_pools/consensus_manager import
2020
updateHeadWithExecution
2121
from ../consensus_object_pools/blockchain_dag import
2222
getBlockRef, getForkedBlock, getProposer, forkAtEpoch, loadExecutionBlockHash,
23-
markBlockVerified, validatorKey
23+
markBlockVerified, validatorKey, is_optimistic
2424
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
2525
from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
2626
from ../consensus_object_pools/block_pools_types import
@@ -230,19 +230,24 @@ from web3/engine_api_types import
230230
PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3,
231231
PayloadExecutionStatus, PayloadStatusV1
232232
from ../el/el_manager import
233-
ELManager, forkchoiceUpdated, hasConnection, hasProperlyConfiguredConnection,
234-
sendNewPayload
233+
ELManager, DeadlineObject, forkchoiceUpdated, hasConnection,
234+
hasProperlyConfiguredConnection, sendNewPayload, init
235235

236236
proc expectValidForkchoiceUpdated(
237237
elManager: ELManager, headBlockPayloadAttributesType: typedesc,
238238
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
239-
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async: (raises: [CancelledError]).} =
239+
receivedBlock: ForkySignedBeaconBlock,
240+
deadlineObj: DeadlineObject,
241+
maxRetriesCount: int
242+
): Future[void] {.async: (raises: [CancelledError]).} =
240243
let
241244
(payloadExecutionStatus, _) = await elManager.forkchoiceUpdated(
242245
headBlockHash = headBlockHash,
243246
safeBlockHash = safeBlockHash,
244247
finalizedBlockHash = finalizedBlockHash,
245-
payloadAttributes = Opt.none headBlockPayloadAttributesType)
248+
payloadAttributes = Opt.none headBlockPayloadAttributesType,
249+
deadlineObj = deadlineObj,
250+
maxRetriesCount = maxRetriesCount)
246251
receivedExecutionBlockHash =
247252
when typeof(receivedBlock).kind >= ConsensusFork.Bellatrix:
248253
receivedBlock.message.body.execution_payload.block_hash
@@ -277,8 +282,11 @@ from ../consensus_object_pools/attestation_pool import
277282
from ../consensus_object_pools/spec_cache import get_attesting_indices
278283

279284
proc newExecutionPayload*(
280-
elManager: ELManager, blck: SomeForkyBeaconBlock):
281-
Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =
285+
elManager: ELManager,
286+
blck: SomeForkyBeaconBlock,
287+
deadlineObj: DeadlineObject,
288+
maxRetriesCount: int
289+
): Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =
282290

283291
template executionPayload: untyped = blck.body.execution_payload
284292

@@ -295,7 +303,8 @@ proc newExecutionPayload*(
295303
executionPayload = shortLog(executionPayload)
296304

297305
try:
298-
let payloadStatus = await elManager.sendNewPayload(blck)
306+
let payloadStatus =
307+
await elManager.sendNewPayload(blck, deadlineObj, maxRetriesCount)
299308

300309
debug "newPayload: succeeded",
301310
parentHash = executionPayload.parent_hash,
@@ -312,22 +321,34 @@ proc newExecutionPayload*(
312321
blockNumber = executionPayload.block_number
313322
return Opt.none PayloadExecutionStatus
314323

324+
proc newExecutionPayload*(
325+
elManager: ELManager,
326+
blck: SomeForkyBeaconBlock
327+
): Future[Opt[PayloadExecutionStatus]] {.
328+
async: (raises: [CancelledError], raw: true).} =
329+
newExecutionPayload(
330+
elManager, blck, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT),
331+
high(int))
332+
315333
proc getExecutionValidity(
316334
elManager: ELManager,
317335
blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
318-
deneb.SignedBeaconBlock | electra.SignedBeaconBlock):
319-
Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
336+
deneb.SignedBeaconBlock | electra.SignedBeaconBlock,
337+
deadlineObj: DeadlineObject,
338+
maxRetriesCount: int
339+
): Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
320340
if not blck.message.is_execution_block:
321341
return NewPayloadStatus.valid # vacuously
322342

323343
try:
324344
let executionPayloadStatus = await elManager.newExecutionPayload(
325-
blck.message)
345+
blck.message, deadlineObj, maxRetriesCount)
326346
if executionPayloadStatus.isNone:
327347
return NewPayloadStatus.noResponse
328348

329349
case executionPayloadStatus.get
330-
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
350+
of PayloadExecutionStatus.invalid,
351+
PayloadExecutionStatus.invalid_block_hash:
331352
# Blocks come either from gossip or request manager requests. In the
332353
# former case, they've passed libp2p gosisp validation which implies
333354
# correct signature for correct proposer,which makes spam expensive,
@@ -413,6 +434,20 @@ proc storeBlock(
413434
vm = self.validatorMonitor
414435
dag = self.consensusManager.dag
415436
wallSlot = wallTime.slotOrZero
437+
deadlineTime =
438+
block:
439+
let slotTime = (wallSlot + 1).start_beacon_time() - 1.seconds
440+
if slotTime <= wallTime:
441+
0.seconds
442+
else:
443+
chronos.nanoseconds((slotTime - wallTime).nanoseconds)
444+
deadlineObj = DeadlineObject.init(deadlineTime)
445+
446+
func getRetriesCount(): int =
447+
if dag.is_optimistic(dag.head.bid):
448+
1
449+
else:
450+
high(int)
416451

417452
# If the block is missing its parent, it will be re-orphaned below
418453
self.consensusManager.quarantine[].removeOrphan(signedBlock)
@@ -518,7 +553,8 @@ proc storeBlock(
518553
NewPayloadStatus.noResponse
519554
else:
520555
when typeof(signedBlock).kind >= ConsensusFork.Bellatrix:
521-
await self.consensusManager.elManager.getExecutionValidity(signedBlock)
556+
await self.consensusManager.elManager.getExecutionValidity(
557+
signedBlock, deadlineObj, getRetriesCount())
522558
else:
523559
NewPayloadStatus.valid # vacuously
524560
payloadValid = payloadStatus == NewPayloadStatus.valid
@@ -685,7 +721,9 @@ proc storeBlock(
685721
self.consensusManager[].optimisticExecutionBlockHash,
686722
safeBlockHash = newHead.get.safeExecutionBlockHash,
687723
finalizedBlockHash = newHead.get.finalizedExecutionBlockHash,
688-
payloadAttributes = Opt.none attributes)
724+
payloadAttributes = Opt.none attributes,
725+
deadlineObj = deadlineObj,
726+
maxRetriesCount = getRetriesCount())
689727

690728
let consensusFork = self.consensusManager.dag.cfg.consensusForkAtEpoch(
691729
newHead.get.blck.bid.slot.epoch)
@@ -712,7 +750,9 @@ proc storeBlock(
712750
headBlockHash = headExecutionBlockHash,
713751
safeBlockHash = newHead.get.safeExecutionBlockHash,
714752
finalizedBlockHash = newHead.get.finalizedExecutionBlockHash,
715-
receivedBlock = signedBlock)
753+
receivedBlock = signedBlock,
754+
deadlineObj = deadlineObj,
755+
maxRetriesCount = getRetriesCount())
716756

717757
template callForkChoiceUpdated: auto =
718758
case self.consensusManager.dag.cfg.consensusForkAtEpoch(

0 commit comments

Comments
 (0)