Skip to content

Commit 453cb2f

Browse files
authored
portal_bridge: Add concurrency to the history content gossip (#2855)
1 parent 107db3a commit 453cb2f

File tree

2 files changed

+93
-114
lines changed

2 files changed

+93
-114
lines changed

fluffy/tools/portal_bridge/portal_bridge_conf.nim

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ type
141141
defaultValueDesc: defaultEra1DataDir(),
142142
name: "era1-dir"
143143
.}: InputDir
144+
145+
gossipConcurrency* {.
146+
desc:
147+
"The number of concurrent gossip workers for gossiping content into the portal network",
148+
defaultValue: 50,
149+
name: "gossip-concurrency"
150+
.}: int
144151
of PortalBridgeCmd.state:
145152
web3UrlState* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}:
146153
JsonRpcUrl

fluffy/tools/portal_bridge/portal_bridge_history.nim

Lines changed: 86 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ from eth/common/eth_types_rlp import rlpHash
3131

3232
const newHeadPollInterval = 6.seconds # Slot with potential block is every 12s
3333

34+
type PortalHistoryBridge = ref object
35+
portalClient: RpcClient
36+
web3Client: RpcClient
37+
gossipQueue: AsyncQueue[(seq[byte], seq[byte])]
38+
3439
## Conversion functions for Block and Receipts
3540

3641
func asEthBlock(blockObject: BlockObject): EthBlock =
@@ -139,63 +144,34 @@ proc getBlockReceipts(
139144
## Portal JSON-RPC API helper calls for pushing block and receipts
140145

141146
proc gossipBlockHeader(
142-
client: RpcClient, id: Hash32 | uint64, headerWithProof: BlockHeaderWithProof
143-
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
144-
let
145-
contentKey = blockHeaderContentKey(id)
146-
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
147-
148-
peers =
149-
try:
150-
await client.portal_historyGossip(
151-
encodedContentKeyHex, SSZ.encode(headerWithProof).toHex()
152-
)
153-
except CatchableError as e:
154-
return err("JSON-RPC portal_historyGossip failed: " & $e.msg)
155-
156-
info "Block header gossiped", peers, contentKey = encodedContentKeyHex
157-
ok()
147+
bridge: PortalHistoryBridge,
148+
id: Hash32 | uint64,
149+
headerWithProof: BlockHeaderWithProof,
150+
): Future[void] {.async: (raises: [CancelledError]).} =
151+
let contentKey = blockHeaderContentKey(id)
152+
153+
await bridge.gossipQueue.addLast(
154+
(contentKey.encode.asSeq(), SSZ.encode(headerWithProof))
155+
)
158156

159157
proc gossipBlockBody(
160-
client: RpcClient,
158+
bridge: PortalHistoryBridge,
161159
hash: Hash32,
162160
body: PortalBlockBodyLegacy | PortalBlockBodyShanghai,
163-
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
164-
let
165-
contentKey = blockBodyContentKey(hash)
166-
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
167-
168-
peers =
169-
try:
170-
await client.portal_historyGossip(
171-
encodedContentKeyHex, SSZ.encode(body).toHex()
172-
)
173-
except CatchableError as e:
174-
return err("JSON-RPC portal_historyGossip failed: " & $e.msg)
175-
176-
info "Block body gossiped", peers, contentKey = encodedContentKeyHex
177-
ok()
161+
): Future[void] {.async: (raises: [CancelledError]).} =
162+
let contentKey = blockBodyContentKey(hash)
178163

179-
proc gossipReceipts(
180-
client: RpcClient, hash: Hash32, receipts: PortalReceipts
181-
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
182-
let
183-
contentKey = receiptsContentKey(hash)
184-
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
164+
await bridge.gossipQueue.addLast((contentKey.encode.asSeq(), SSZ.encode(body)))
185165

186-
peers =
187-
try:
188-
await client.portal_historyGossip(
189-
encodedContentKeyHex, SSZ.encode(receipts).toHex()
190-
)
191-
except CatchableError as e:
192-
return err("JSON-RPC portal_historyGossip failed: " & $e.msg)
166+
proc gossipReceipts(
167+
bridge: PortalHistoryBridge, hash: Hash32, receipts: PortalReceipts
168+
): Future[void] {.async: (raises: [CancelledError]).} =
169+
let contentKey = receiptsContentKey(hash)
193170

194-
info "Receipts gossiped", peers, contentKey = encodedContentKeyHex
195-
return ok()
171+
await bridge.gossipQueue.addLast((contentKey.encode.asSeq(), SSZ.encode(receipts)))
196172

197173
proc runLatestLoop(
198-
portalClient: RpcClient, web3Client: RpcClient, validate = false
174+
bridge: PortalHistoryBridge, validate = false
199175
) {.async: (raises: [CancelledError]).} =
200176
## Loop that requests the latest block + receipts and pushes them into the
201177
## Portal network.
@@ -211,14 +187,14 @@ proc runLatestLoop(
211187
var lastBlockNumber = 0'u64
212188
while true:
213189
let t0 = Moment.now()
214-
let blockObject = (await getBlockByNumber(web3Client, blockId)).valueOr:
190+
let blockObject = (await bridge.web3Client.getBlockByNumber(blockId)).valueOr:
215191
error "Failed to get latest block", error
216192
await sleepAsync(1.seconds)
217193
continue
218194

219195
let blockNumber = distinctBase(blockObject.number)
220196
if blockNumber > lastBlockNumber:
221-
let receiptObjects = (await web3Client.getBlockReceipts(blockNumber)).valueOr:
197+
let receiptObjects = (await bridge.web3Client.getBlockReceipts(blockNumber)).valueOr:
222198
error "Failed to get latest receipts", error
223199
await sleepAsync(1.seconds)
224200
continue
@@ -250,24 +226,19 @@ proc runLatestLoop(
250226
continue
251227

252228
# gossip block header by hash
253-
(await portalClient.gossipBlockHeader(hash, headerWithProof)).isOkOr:
254-
error "Failed to gossip block header", error, hash
229+
await bridge.gossipBlockHeader(hash, headerWithProof)
255230
# gossip block header by number
256-
(await portalClient.gossipBlockHeader(blockNumber, headerWithProof)).isOkOr:
257-
error "Failed to gossip block header", error, hash
231+
await bridge.gossipBlockHeader(blockNumber, headerWithProof)
258232

259233
# For bodies & receipts to get verified, the header needs to be available
260234
# on the network. Wait a little to get the headers propagated through
261235
# the network.
262236
await sleepAsync(2.seconds)
263237

264238
# gossip block body
265-
(await portalClient.gossipBlockBody(hash, body)).isOkOr:
266-
error "Failed to gossip block body", error, hash
267-
239+
await bridge.gossipBlockBody(hash, body)
268240
# gossip receipts
269-
(await portalClient.gossipReceipts(hash, portalReceipts)).isOkOr:
270-
error "Failed to gossip receipts", error, hash
241+
await bridge.gossipReceipts(hash, portalReceipts)
271242

272243
# Making sure here that we poll enough times not to miss a block.
273244
# We could also do some work without awaiting it, e.g. the gossiping or
@@ -281,7 +252,7 @@ proc runLatestLoop(
281252
warn "Block gossip took longer than slot interval"
282253

283254
proc gossipHeadersWithProof(
284-
portalClient: RpcClient,
255+
bridge: PortalHistoryBridge,
285256
era1File: string,
286257
epochRecordFile: Opt[string] = Opt.none(string),
287258
verifyEra = false,
@@ -312,15 +283,15 @@ proc gossipHeadersWithProof(
312283
blockHash = blockHeader.rlpHash()
313284

314285
# gossip block header by hash
315-
?(await portalClient.gossipBlockHeader(blockHash, headerWithProof))
286+
await bridge.gossipBlockHeader(blockHash, headerWithProof)
316287
# gossip block header by number
317-
?(await portalClient.gossipBlockHeader(blockHeader.number, headerWithProof))
288+
await bridge.gossipBlockHeader(blockHeader.number, headerWithProof)
318289

319-
info "Succesfully gossiped headers from era1 file", era1File
290+
info "Succesfully put headers from era1 file in gossip queue", era1File
320291
ok()
321292

322293
proc gossipBlockContent(
323-
portalClient: RpcClient, era1File: string, verifyEra = false
294+
bridge: PortalHistoryBridge, era1File: string, verifyEra = false
324295
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
325296
let f = ?Era1File.open(era1File)
326297

@@ -333,28 +304,15 @@ proc gossipBlockContent(
333304
let blockHash = header.rlpHash()
334305

335306
# gossip block body
336-
?(
337-
await portalClient.gossipBlockBody(
338-
blockHash, PortalBlockBodyLegacy.fromBlockBody(body)
339-
)
340-
)
341-
307+
await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body))
342308
# gossip receipts
343-
?(
344-
await portalClient.gossipReceipts(
345-
blockHash, PortalReceipts.fromReceipts(receipts)
346-
)
347-
)
309+
await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts))
348310

349-
info "Succesfully gossiped bodies and receipts from era1 file", era1File
311+
info "Succesfully put bodies and receipts from era1 file in gossip queue", era1File
350312
ok()
351313

352314
proc runBackfillLoop(
353-
portalClient: RpcClient,
354-
web3Client: RpcClient,
355-
era1Dir: string,
356-
startEra: uint64,
357-
endEra: uint64,
315+
bridge: PortalHistoryBridge, era1Dir: string, startEra: uint64, endEra: uint64
358316
) {.async: (raises: [CancelledError]).} =
359317
let accumulator = loadAccumulator()
360318

@@ -381,7 +339,7 @@ proc runBackfillLoop(
381339
info "Gossip headers from era1 file", era1File
382340
let headerRes =
383341
try:
384-
await portalClient.portal_debug_historyGossipHeaders(era1File)
342+
await bridge.portalClient.portal_debug_historyGossipHeaders(era1File)
385343
except CatchableError as e:
386344
error "JSON-RPC portal_debug_historyGossipHeaders failed", error = e.msg
387345
false
@@ -390,7 +348,7 @@ proc runBackfillLoop(
390348
info "Gossip block content from era1 file", era1File
391349
let res =
392350
try:
393-
await portalClient.portal_debug_historyGossipBlockContent(era1File)
351+
await bridge.portalClient.portal_debug_historyGossipBlockContent(era1File)
394352
except CatchableError as e:
395353
error "JSON-RPC portal_debug_historyGossipBlockContent failed",
396354
error = e.msg
@@ -400,16 +358,16 @@ proc runBackfillLoop(
400358
else:
401359
error "Failed to gossip headers from era1 file", era1File
402360
else:
403-
(await portalClient.gossipHeadersWithProof(era1File)).isOkOr:
361+
(await bridge.gossipHeadersWithProof(era1File)).isOkOr:
404362
error "Failed to gossip headers from era1 file", error, era1File
405363
continue
406364

407-
(await portalClient.gossipBlockContent(era1File)).isOkOr:
365+
(await bridge.gossipBlockContent(era1File)).isOkOr:
408366
error "Failed to gossip block content from era1 file", error, era1File
409367
continue
410368

411369
proc runBackfillLoopAuditMode(
412-
portalClient: RpcClient, web3Client: RpcClient, era1Dir: string
370+
bridge: PortalHistoryBridge, era1Dir: string
413371
) {.async: (raises: [CancelledError]).} =
414372
let
415373
rng = newRng()
@@ -436,7 +394,7 @@ proc runBackfillLoopAuditMode(
436394
contentHex =
437395
try:
438396
(
439-
await portalClient.portal_historyGetContent(
397+
await bridge.portalClient.portal_historyGetContent(
440398
contentKey.encode.asSeq().toHex()
441399
)
442400
).content
@@ -468,7 +426,7 @@ proc runBackfillLoopAuditMode(
468426
contentHex =
469427
try:
470428
(
471-
await portalClient.portal_historyGetContent(
429+
await bridge.portalClient.portal_historyGetContent(
472430
contentKey.encode.asSeq().toHex()
473431
)
474432
).content
@@ -496,7 +454,7 @@ proc runBackfillLoopAuditMode(
496454
contentHex =
497455
try:
498456
(
499-
await portalClient.portal_historyGetContent(
457+
await bridge.portalClient.portal_historyGetContent(
500458
contentKey.encode.asSeq().toHex()
501459
)
502460
).content
@@ -526,44 +484,58 @@ proc runBackfillLoopAuditMode(
526484
raiseAssert "Failed to build header with proof: " & error
527485

528486
# gossip block header by hash
529-
(await portalClient.gossipBlockHeader(blockHash, headerWithProof)).isOkOr:
530-
error "Failed to gossip block header", error, blockHash
487+
await bridge.gossipBlockHeader(blockHash, headerWithProof)
531488
# gossip block header by number
532-
(await portalClient.gossipBlockHeader(blockNumber, headerWithProof)).isOkOr:
533-
error "Failed to gossip block header", error, blockHash
489+
await bridge.gossipBlockHeader(blockNumber, headerWithProof)
534490
if not bodySuccess:
535-
(
536-
await portalClient.gossipBlockBody(
537-
blockHash, PortalBlockBodyLegacy.fromBlockBody(body)
538-
)
539-
).isOkOr:
540-
error "Failed to gossip block body", error, blockHash
491+
await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body))
541492
if not receiptsSuccess:
542-
(
543-
await portalClient.gossipReceipts(
544-
blockHash, PortalReceipts.fromReceipts(receipts)
545-
)
546-
).isOkOr:
547-
error "Failed to gossip receipts", error, blockHash
493+
await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts))
548494

549495
await sleepAsync(2.seconds)
550496

551497
proc runHistory*(config: PortalBridgeConf) =
552-
let
553-
portalClient = newRpcClientConnect(config.portalRpcUrl)
554-
web3Client = newRpcClientConnect(config.web3Url)
498+
let bridge = PortalHistoryBridge(
499+
portalClient: newRpcClientConnect(config.portalRpcUrl),
500+
web3Client: newRpcClientConnect(config.web3Url),
501+
gossipQueue: newAsyncQueue[(seq[byte], seq[byte])](config.gossipConcurrency),
502+
)
503+
504+
proc gossipWorker(bridge: PortalHistoryBridge) {.async: (raises: []).} =
505+
try:
506+
while true:
507+
let
508+
(contentKey, contentValue) = await bridge.gossipQueue.popFirst()
509+
contentKeyHex = contentKey.toHex()
510+
contentValueHex = contentValue.toHex()
511+
512+
try:
513+
let peers = await bridge.portalClient.portal_historyGossip(
514+
contentKeyHex, contentValueHex
515+
)
516+
debug "Content gossiped", peers, contentKey = contentKeyHex
517+
except CancelledError as e:
518+
trace "Cancelled gossipWorker"
519+
raise e
520+
except CatchableError as e:
521+
error "JSON-RPC portal_historyGossip failed",
522+
error = $e.msg, contentKey = contentKeyHex
523+
except CancelledError:
524+
trace "gossipWorker canceled"
525+
526+
var workers: seq[Future[void]] = @[]
527+
for i in 0 ..< config.gossipConcurrency:
528+
workers.add bridge.gossipWorker()
555529

556530
if config.latest:
557-
asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify)
531+
asyncSpawn bridge.runLatestLoop(config.blockVerify)
558532

559533
if config.backfill:
560534
if config.audit:
561-
asyncSpawn runBackfillLoopAuditMode(
562-
portalClient, web3Client, config.era1Dir.string
563-
)
535+
asyncSpawn bridge.runBackfillLoopAuditMode(config.era1Dir.string)
564536
else:
565-
asyncSpawn runBackfillLoop(
566-
portalClient, web3Client, config.era1Dir.string, config.startEra, config.endEra
537+
asyncSpawn bridge.runBackfillLoop(
538+
config.era1Dir.string, config.startEra, config.endEra
567539
)
568540

569541
while true:

0 commit comments

Comments
 (0)