Skip to content

Commit 6341f5f

Browse files
authored
Fluffy: Update the state bridge to send each content offer to the closest connected portal client (#3278)
* Refactor state bridge to support sending each content to any of the connected portal clients sorted by distance from the content key.
1 parent 4c4f6d9 commit 6341f5f

File tree

1 file changed

+152
-126
lines changed

1 file changed

+152
-126
lines changed

fluffy/tools/portal_bridge/portal_bridge_state.nim

Lines changed: 152 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,25 @@ type
3737
stateRoot: Hash32
3838
stateDiffs: seq[TransactionDiff]
3939

40-
BlockOffersRef = ref object
40+
BlockOffers = ref object
4141
blockNumber: uint64
4242
accountTrieOffers: seq[AccountTrieOfferWithKey]
4343
contractTrieOffers: seq[ContractTrieOfferWithKey]
4444
contractCodeOffers: seq[ContractCodeOfferWithKey]
4545

4646
PortalStateGossipWorker = ref object
4747
id: int
48-
portalClient: RpcClient
49-
portalUrl: JsonRpcUrl
50-
nodeId: NodeId
51-
blockOffersQueue: AsyncQueue[BlockOffersRef]
48+
portalClients: OrderedTable[NodeId, RpcClient]
49+
portalEndpoints: seq[(JsonRpcUrl, NodeId)]
50+
blockOffersQueue: AsyncQueue[BlockOffers]
5251
gossipBlockOffersLoop: Future[void]
5352

5453
PortalStateBridge* = ref object
5554
web3Client: RpcClient
5655
web3Url: JsonRpcUrl
5756
db: DatabaseRef
5857
blockDataQueue: AsyncQueue[BlockData]
59-
blockOffersQueue: AsyncQueue[BlockOffersRef]
58+
blockOffersQueue: AsyncQueue[BlockOffers]
6059
gossipWorkers: seq[PortalStateGossipWorker]
6160
collectBlockDataLoop: Future[void]
6261
buildBlockOffersLoop: Future[void]
@@ -92,27 +91,6 @@ proc putLastPersistedBlockNumber(db: DatabaseRef, blockNumber: uint64) {.inline.
9291
if blockNumber > db.getLastPersistedBlockNumber().valueOr(0):
9392
db.put(rlp.encode("lastPersistedBlockNumber"), rlp.encode(blockNumber))
9493

95-
proc collectOffer(
96-
offersMap: OrderedTableRef[seq[byte], seq[byte]],
97-
offerWithKey:
98-
AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey,
99-
) {.inline.} =
100-
let keyBytes = offerWithKey.key.toContentKey().encode().asSeq()
101-
offersMap[keyBytes] = offerWithKey.offer.encode()
102-
103-
proc recursiveCollectOffer(
104-
offersMap: OrderedTableRef[seq[byte], seq[byte]],
105-
offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey,
106-
) =
107-
offersMap.collectOffer(offerWithKey)
108-
109-
# root node, recursive collect is finished
110-
if offerWithKey.key.path.unpackNibbles().len() == 0:
111-
return
112-
113-
# continue the recursive collect
114-
offersMap.recursiveCollectOffer(offerWithKey.getParent())
115-
11694
proc runCollectBlockDataLoop(
11795
bridge: PortalStateBridge, startBlockNumber: uint64
11896
) {.async: (raises: []).} =
@@ -237,7 +215,7 @@ proc runBuildBlockOffersLoop(
237215
builder.buildBlockOffers()
238216

239217
await bridge.blockOffersQueue.addLast(
240-
BlockOffersRef(
218+
BlockOffers(
241219
blockNumber: 0.uint64,
242220
accountTrieOffers: builder.getAccountTrieOffers(),
243221
contractTrieOffers: builder.getContractTrieOffers(),
@@ -284,7 +262,7 @@ proc runBuildBlockOffersLoop(
284262
builder.buildBlockOffers()
285263

286264
await bridge.blockOffersQueue.addLast(
287-
BlockOffersRef(
265+
BlockOffers(
288266
blockNumber: blockData.blockNumber,
289267
accountTrieOffers: builder.getAccountTrieOffers(),
290268
contractTrieOffers: builder.getContractTrieOffers(),
@@ -299,7 +277,107 @@ proc runBuildBlockOffersLoop(
299277
except CancelledError:
300278
trace "buildBlockOffersLoop canceled"
301279

302-
proc runGossipBlockOffersLoop(
280+
proc collectOffer(
281+
offersMap: OrderedTableRef[seq[byte], seq[byte]],
282+
offerWithKey:
283+
AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey,
284+
) {.inline.} =
285+
let keyBytes = offerWithKey.key.toContentKey().encode().asSeq()
286+
offersMap[keyBytes] = offerWithKey.offer.encode()
287+
288+
proc recursiveCollectOffer(
289+
offersMap: OrderedTableRef[seq[byte], seq[byte]],
290+
offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey,
291+
) =
292+
offersMap.collectOffer(offerWithKey)
293+
294+
# root node, recursive collect is finished
295+
if offerWithKey.key.path.unpackNibbles().len() == 0:
296+
return
297+
298+
# continue the recursive collect
299+
offersMap.recursiveCollectOffer(offerWithKey.getParent())
300+
301+
func buildOffersMap(blockOffers: BlockOffers): auto =
302+
let offersMap = newOrderedTable[seq[byte], seq[byte]]()
303+
304+
for offerWithKey in blockOffers.accountTrieOffers:
305+
offersMap.recursiveCollectOffer(offerWithKey)
306+
for offerWithKey in blockOffers.contractTrieOffers:
307+
offersMap.recursiveCollectOffer(offerWithKey)
308+
for offerWithKey in blockOffers.contractCodeOffers:
309+
offersMap.collectOffer(offerWithKey)
310+
311+
offersMap
312+
313+
proc orderPortalClientsByDistanceFromContent(
314+
worker: PortalStateGossipWorker, contentKey: seq[byte]
315+
) =
316+
let contentId = ContentKeyByteList.init(contentKey).toContentId()
317+
318+
# Closure to sort the portal clients using their nodeIds
319+
# and comparing them to the contentId to be gossipped
320+
proc portalClientsCmp(x, y: (NodeId, RpcClient)): int =
321+
let
322+
xDistance = contentId xor x[0]
323+
yDistance = contentId xor y[0]
324+
325+
if xDistance == yDistance:
326+
0
327+
elif xDistance > yDistance:
328+
1
329+
else:
330+
-1
331+
332+
# Sort the portalClients based on distance from the content so that
333+
# we gossip each piece of content to the closest node first
334+
worker.portalClients.sort(portalClientsCmp)
335+
336+
proc contentFoundInNetwork(
337+
worker: PortalStateGossipWorker, contentKey: seq[byte]
338+
): Future[bool] {.async: (raises: [CancelledError]).} =
339+
for nodeId, client in worker.portalClients:
340+
try:
341+
let contentInfo = await client.portal_stateGetContent(contentKey.to0xHex())
342+
if contentInfo.content.len() > 0:
343+
trace "Found existing content in network",
344+
contentKey = contentKey.to0xHex(), nodeId, workerId = worker.id
345+
return true
346+
except CancelledError as e:
347+
raise e
348+
except CatchableError as e:
349+
debug "Unable to find existing content in network",
350+
contentKey = contentKey.to0xHex(), nodeId, error = e.msg, workerId = worker.id
351+
return false
352+
353+
proc gossipContentIntoNetwork(
354+
worker: PortalStateGossipWorker,
355+
minGossipPeers: int,
356+
contentKey: seq[byte],
357+
contentOffer: seq[byte],
358+
): Future[bool] {.async: (raises: [CancelledError]).} =
359+
for nodeId, client in worker.portalClients:
360+
try:
361+
let
362+
putContentResult = await client.portal_statePutContent(
363+
contentKey.to0xHex(), contentOffer.to0xHex()
364+
)
365+
numPeers = putContentResult.peerCount
366+
if numPeers >= minGossipPeers:
367+
trace "Offer successfully gossipped to peers",
368+
contentKey = contentKey.to0xHex(), nodeId, numPeers, workerId = worker.id
369+
return true
370+
else:
371+
warn "Offer not gossiped to enough peers",
372+
contentKey = contentKey.to0xHex(), nodeId, numPeers, workerId = worker.id
373+
except CancelledError as e:
374+
raise e
375+
except CatchableError as e:
376+
error "Failed to gossip offer to peers",
377+
contentKey = contentKey.to0xHex(), nodeId, error = e.msg, workerId = worker.id
378+
return false
379+
380+
proc runGossipLoop(
303381
worker: PortalStateGossipWorker,
304382
verifyGossip: bool,
305383
skipGossipForExisting: bool,
@@ -308,106 +386,52 @@ proc runGossipBlockOffersLoop(
308386
debug "Starting gossip block offers loop", workerId = worker.id
309387

310388
try:
311-
# Create one client per worker in order to improve performance.
389+
# Create separate clients in each worker in order to improve performance.
312390
# WebSocket connections don't perform well when shared by many
313391
# concurrent workers.
314-
worker.portalClient = newRpcClientConnect(worker.portalUrl)
392+
for (rpcUrl, nodeId) in worker.portalEndpoints:
393+
worker.portalClients[nodeId] = newRpcClientConnect(rpcUrl)
315394

316-
var blockOffers = await worker.blockOffersQueue.popFirst()
317-
318-
while true:
395+
var
396+
blockOffers = await worker.blockOffersQueue.popFirst()
319397
# A table of offer key, value pairs is used to filter out duplicates so
320398
# that we don't gossip the same offer multiple times.
321-
let offersMap = newOrderedTable[seq[byte], seq[byte]]()
322-
323-
for offerWithKey in blockOffers.accountTrieOffers:
324-
offersMap.recursiveCollectOffer(offerWithKey)
325-
for offerWithKey in blockOffers.contractTrieOffers:
326-
offersMap.recursiveCollectOffer(offerWithKey)
327-
for offerWithKey in blockOffers.contractCodeOffers:
328-
offersMap.collectOffer(offerWithKey)
329-
330-
# We need to use a closure here because nodeId is required to calculate the
331-
# distance of each content id from the node
332-
proc offersMapCmp(x, y: (seq[byte], seq[byte])): int =
333-
let
334-
xId = ContentKeyByteList.init(x[0]).toContentId()
335-
yId = ContentKeyByteList.init(y[0]).toContentId()
336-
xDistance = worker.nodeId xor xId
337-
yDistance = worker.nodeId xor yId
338-
339-
if xDistance == yDistance:
340-
0
341-
elif xDistance > yDistance:
342-
1
343-
else:
344-
-1
345-
346-
# Sort the offers based on the distance from the node so that we will gossip
347-
# content that is closest to the node first
348-
offersMap.sort(offersMapCmp)
399+
offersMap = buildOffersMap(blockOffers)
349400

401+
while true:
350402
var retryGossip = false
351-
for k, v in offersMap:
403+
404+
for contentKey, contentOffer in offersMap:
405+
worker.orderPortalClientsByDistanceFromContent(contentKey)
406+
352407
# Check if we need to gossip the content
353-
var gossipContent = true
354-
355-
if skipGossipForExisting:
356-
try:
357-
let contentInfo =
358-
await worker.portalClient.portal_stateGetContent(k.to0xHex())
359-
if contentInfo.content.len() > 0:
360-
gossipContent = false
361-
except CancelledError as e:
362-
raise e
363-
except CatchableError as e:
364-
debug "Unable to find existing content. Will attempt to gossip content: ",
365-
contentKey = k.to0xHex(), error = e.msg, workerId = worker.id
408+
if skipGossipForExisting and (await worker.contentFoundInNetwork(contentKey)):
409+
continue # move on to the next content key
366410

367411
# Gossip the content into the network
368-
if gossipContent:
369-
try:
370-
let
371-
putContentResult = await worker.portalClient.portal_statePutContent(
372-
k.to0xHex(), v.to0xHex()
373-
)
374-
numPeers = putContentResult.peerCount
375-
if numPeers >= minGossipPeers:
376-
debug "Offer successfully gossipped to peers",
377-
numPeers, workerId = worker.id
378-
else:
379-
warn "Offer not gossiped to enough peers", numPeers, workerId = worker.id
380-
retryGossip = true
381-
break
382-
except CancelledError as e:
383-
raise e
384-
except CatchableError as e:
385-
error "Failed to gossip offer to peers", error = e.msg, workerId = worker.id
386-
retryGossip = true
387-
break
412+
let gossipCompleted = await worker.gossipContentIntoNetwork(
413+
minGossipPeers, contentKey, contentOffer
414+
)
415+
if not gossipCompleted:
416+
# Retry gossip of this block
417+
retryGossip = true
418+
break
388419

389420
# Check if the content can be found in the network
390421
var foundContentKeys = newSeq[seq[byte]]()
391422
if verifyGossip and not retryGossip:
392-
# wait for the peers to be updated
423+
# Wait for the peers to be updated.
424+
# Wait time is proportional to the number of offers
393425
let waitTimeMs = 200 + (offersMap.len() * 20)
394426
await sleepAsync(waitTimeMs.milliseconds)
395-
# wait time is proportional to the number of offers
396-
397-
for k, _ in offersMap:
398-
try:
399-
let contentInfo =
400-
await worker.portalClient.portal_stateGetContent(k.to0xHex())
401-
if contentInfo.content.len() == 0:
402-
error "Found empty contentValue", workerId = worker.id
403-
retryGossip = true
404-
break
405-
foundContentKeys.add(k)
406-
except CancelledError as e:
407-
raise e
408-
except CatchableError as e:
409-
warn "Unable to find content with key. Will retry gossipping content:",
410-
contentKey = k.to0xHex(), error = e.msg, workerId = worker.id
427+
428+
for contentKey, _ in offersMap:
429+
worker.orderPortalClientsByDistanceFromContent(contentKey)
430+
431+
if await worker.contentFoundInNetwork(contentKey):
432+
foundContentKeys.add(contentKey)
433+
else:
434+
# Retry gossip of this block
411435
retryGossip = true
412436
break
413437

@@ -418,13 +442,17 @@ proc runGossipBlockOffersLoop(
418442
# Don't retry gossip for content that was found in the network
419443
for key in foundContentKeys:
420444
offersMap.del(key)
445+
421446
warn "Retrying state gossip for block: ",
422447
blockNumber = blockOffers.blockNumber,
423448
remainingOffers = offersMap.len(),
424449
workerId = worker.id
425450

426-
# We might need to reconnect if using a WebSocket client
427-
await worker.portalClient.tryReconnect(worker.portalUrl)
451+
# We might need to reconnect if using WebSocket clients
452+
for (rpcUrl, nodeId) in worker.portalEndpoints:
453+
await worker.portalClients.getOrDefault(nodeId).tryReconnect(rpcUrl)
454+
455+
# Jump back to the top of while loop to retry processing the current block
428456
continue
429457

430458
if blockOffers.blockNumber mod 1000 == 0:
@@ -439,6 +467,7 @@ proc runGossipBlockOffersLoop(
439467
workerId = worker.id
440468

441469
blockOffers = await worker.blockOffersQueue.popFirst()
470+
offersMap = buildOffersMap(blockOffers)
442471
except CancelledError:
443472
trace "gossipBlockOffersLoop canceled"
444473

@@ -528,7 +557,7 @@ proc start*(bridge: PortalStateBridge, config: PortalBridgeConf) =
528557
info "Starting concurrent gossip workers", workerCount = bridge.gossipWorkers.len()
529558

530559
for worker in bridge.gossipWorkers:
531-
worker.gossipBlockOffersLoop = worker.runGossipBlockOffersLoop(
560+
worker.gossipBlockOffersLoop = worker.runGossipLoop(
532561
config.verifyGossip, config.skipGossipForExisting, config.minGossipPeers.int
533562
)
534563

@@ -580,19 +609,16 @@ proc runState*(
580609
web3Url: config.web3RpcUrl,
581610
db: db,
582611
blockDataQueue: newAsyncQueue[BlockData](queueSize),
583-
blockOffersQueue: newAsyncQueue[BlockOffersRef](queueSize),
612+
blockOffersQueue: newAsyncQueue[BlockOffers](queueSize),
584613
gossipWorkers: newSeq[PortalStateGossipWorker](),
585614
)
586615
587616
for i in 0 ..< config.gossipWorkers.int:
588-
let
589-
(rpcUrl, nodeId) = portalEndpoints[i mod config.portalRpcEndpoints.int]
590-
worker = PortalStateGossipWorker(
591-
id: i + 1,
592-
portalUrl: rpcUrl,
593-
nodeId: nodeId,
594-
blockOffersQueue: bridge.blockOffersQueue,
595-
)
617+
let worker = PortalStateGossipWorker(
618+
id: i + 1,
619+
portalEndpoints: portalEndpoints,
620+
blockOffersQueue: bridge.blockOffersQueue,
621+
)
596622
bridge.gossipWorkers.add(worker)
597623
598624
bridge.start(config)

0 commit comments

Comments
 (0)