Skip to content

Commit c9c4763

Browse files
proxy: log filters (#3461)
* txs, receipts and logs * format * add log filters * lint and rebase fixes * resolve filters * remove bugs * format * fixes * trigger CI * review * revert line
1 parent a0277cd commit c9c4763

File tree

7 files changed

+224
-27
lines changed

7 files changed

+224
-27
lines changed

execution_chain/version.nim

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ const
1818

1919
static:
2020
doAssert(nimbusRevision.len == 8, "nimbusRevision must consist of 8 characters")
21-
doAssert(nimbusRevision.allIt(it in HexDigits), "nimbusRevision should contains only hex chars")
21+
doAssert(
22+
nimbusRevision.allIt(it in HexDigits),
23+
"nimbusRevision should contains only hex chars",
24+
)
2225

2326
proc gitFolderExists(path: string): bool {.compileTime.} =
2427
# walk up parent folder to find `.git` folder

nimbus_verified_proxy/rpc/blocks.nim

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@ import
2323

2424
proc resolveBlockTag*(
2525
vp: VerifiedRpcProxy, blockTag: BlockTag
26-
): Result[base.BlockNumber, string] =
26+
): Result[BlockTag, string] =
2727
if blockTag.kind == bidAlias:
2828
let tag = blockTag.alias.toLowerAscii()
2929
case tag
3030
of "latest":
3131
let hLatest = vp.headerStore.latest.valueOr:
3232
return err("Couldn't get the latest block number from header store")
33-
ok(hLatest.number)
33+
ok(BlockTag(kind: bidNumber, number: Quantity(hLatest.number)))
3434
of "finalized":
3535
let hFinalized = vp.headerStore.finalized.valueOr:
3636
return err("Couldn't get the latest block number from header store")
37-
ok(hFinalized.number)
37+
ok(BlockTag(kind: bidNumber, number: Quantity(hFinalized.number)))
3838
of "earliest":
3939
let hEarliest = vp.headerStore.earliest.valueOr:
4040
return err("Couldn't get the latest block number from header store")
41-
ok(hEarliest.number)
41+
ok(BlockTag(kind: bidNumber, number: Quantity(hEarliest.number)))
4242
else:
4343
err("No support for block tag " & $blockTag)
4444
else:
45-
ok(base.BlockNumber(distinctBase(blockTag.number)))
45+
ok(blockTag)
4646

4747
func convHeader*(blk: eth_api_types.BlockObject): Header =
4848
let nonce = blk.nonce.valueOr:
@@ -184,10 +184,8 @@ proc getBlock*(
184184
proc getBlock*(
185185
vp: VerifiedRpcProxy, blockTag: BlockTag, fullTransactions: bool
186186
): Future[Result[BlockObject, string]] {.async.} =
187-
let
188-
n = vp.resolveBlockTag(blockTag).valueOr:
189-
return err(error)
190-
numberTag = BlockTag(kind: BlockIdentifierKind.bidNumber, number: Quantity(n))
187+
let numberTag = vp.resolveBlockTag(blockTag).valueOr:
188+
return err(error)
191189

192190
# get the target block
193191
let blk =
@@ -196,7 +194,7 @@ proc getBlock*(
196194
except CatchableError as e:
197195
return err(e.msg)
198196

199-
if n != distinctBase(blk.number):
197+
if numberTag.number != blk.number:
200198
return
201199
err("the downloaded block number doesn't match with the requested block number")
202200

@@ -235,9 +233,9 @@ proc getHeader*(
235233
vp: VerifiedRpcProxy, blockTag: BlockTag
236234
): Future[Result[Header, string]] {.async.} =
237235
let
238-
n = vp.resolveBlockTag(blockTag).valueOr:
236+
numberTag = vp.resolveBlockTag(blockTag).valueOr:
239237
return err(error)
240-
numberTag = BlockTag(kind: BlockIdentifierKind.bidNumber, number: Quantity(n))
238+
n = distinctBase(numberTag.number)
241239
cachedHeader = vp.headerStore.get(n)
242240

243241
if cachedHeader.isNone():

nimbus_verified_proxy/rpc/receipts.nim

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func toReceipt(rec: ReceiptObject): Receipt =
2929
let isHash = not rec.status.isSome()
3030

3131
let status = rec.status.isSome() and rec.status.get() == 1.Quantity
32-
3332
return Receipt(
3433
hash: rec.transactionHash,
3534
isHash: isHash,
@@ -51,7 +50,6 @@ proc getReceipts(
5150
await vp.rpcClient.eth_getBlockReceipts(blockTag)
5251
except CatchableError as e:
5352
return err(e.msg)
54-
5553
if rxs.isSome():
5654
if orderedTrieRoot(toReceipts(rxs.get())) != header.receiptsRoot:
5755
return
@@ -86,15 +84,32 @@ proc getReceipts*(
8684

8785
await vp.getReceipts(header, numberTag)
8886

89-
proc getLogs*(
90-
vp: VerifiedRpcProxy, filterOptions: FilterOptions
91-
): Future[Result[seq[LogObject], string]] {.async.} =
92-
let logObjs =
93-
try:
94-
await vp.rpcClient.eth_getLogs(filterOptions)
95-
except CatchableError as e:
96-
return err(e.msg)
87+
proc resolveFilterTags*(
88+
vp: VerifiedRpcProxy, filter: FilterOptions
89+
): Result[FilterOptions, string] =
90+
if filter.blockHash.isSome():
91+
return ok(filter)
92+
let
93+
fromBlock = filter.fromBlock.get(types.BlockTag(kind: bidAlias, alias: "latest"))
94+
toBlock = filter.toBlock.get(types.BlockTag(kind: bidAlias, alias: "latest"))
95+
fromBlockNumberTag = vp.resolveBlockTag(fromBlock).valueOr:
96+
return err(error)
97+
toBlockNumberTag = vp.resolveBlockTag(toBlock).valueOr:
98+
return err(error)
9799

100+
return ok(
101+
FilterOptions(
102+
fromBlock: Opt.some(fromBlockNumberTag),
103+
toBlock: Opt.some(toBlockNumberTag),
104+
address: filter.address,
105+
topics: filter.topics,
106+
blockHash: filter.blockHash,
107+
)
108+
)
109+
110+
proc verifyLogs*(
111+
vp: VerifiedRpcProxy, filter: FilterOptions, logObjs: seq[LogObject]
112+
): Future[Result[void, string]] {.async.} =
98113
# store block hashes contains the logs so that we can batch receipt requests
99114
var
100115
prevBlockHash: Hash32
@@ -109,7 +124,6 @@ proc getLogs*(
109124
rxs = (await vp.getReceipts(lg.blockHash.get())).valueOr:
110125
return err(error)
111126
prevBlockHash = lg.blockHash.get()
112-
113127
let
114128
txIdx = distinctBase(lg.transactionIndex.get())
115129
logIdx =
@@ -119,7 +133,25 @@ proc getLogs*(
119133

120134
if rxLog.address != lg.address or rxLog.data != lg.data or
121135
rxLog.topics != lg.topics or
122-
(not match(toLog(lg), filterOptions.address, filterOptions.topics)):
136+
lg.blockNumber.get() < filter.fromBlock.get().number or
137+
lg.blockNumber.get() > filter.toBlock.get().number or
138+
(not match(toLog(lg), filter.address, filter.topics)):
123139
return err("one of the returned logs is invalid")
124140

141+
ok()
142+
143+
proc getLogs*(
144+
vp: VerifiedRpcProxy, filter: FilterOptions
145+
): Future[Result[seq[LogObject], string]] {.async.} =
146+
let
147+
resolvedFilter = vp.resolveFilterTags(filter).valueOr:
148+
return err(error)
149+
logObjs =
150+
try:
151+
await vp.rpcClient.eth_getLogs(resolvedFilter)
152+
except CatchableError as e:
153+
return err(e.msg)
154+
155+
?(await vp.verifyLogs(resolvedFilter, logObjs))
156+
125157
return ok(logObjs)

nimbus_verified_proxy/rpc/rpc_eth_api.nim

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import
1111
results,
1212
chronicles,
1313
stew/byteutils,
14+
nimcrypto/sysrand,
1415
json_rpc/[rpcserver, rpcclient, rpcproxy],
1516
eth/common/accounts,
1617
web3/eth_api,
@@ -237,6 +238,7 @@ proc installEthApiHandlers*(vp: VerifiedRpcProxy) =
237238
await vp.rpcClient.eth_getTransactionByHash(txHash)
238239
except CatchableError as e:
239240
raise newException(ValueError, e.msg)
241+
240242
if tx.hash != txHash:
241243
raise newException(
242244
ValueError,
@@ -274,7 +276,86 @@ proc installEthApiHandlers*(vp: VerifiedRpcProxy) =
274276
(await vp.getLogs(filterOptions)).valueOr:
275277
raise newException(ValueError, error)
276278

277-
# TODO:
279+
vp.proxy.rpc("eth_newFilter") do(filterOptions: FilterOptions) -> string:
280+
if vp.filterStore.len >= MAX_FILTERS:
281+
raise newException(ValueError, "FilterStore already full")
282+
283+
var
284+
id: array[8, byte] # 64bits
285+
strId: string
286+
287+
for i in 0 .. (MAX_ID_TRIES + 1):
288+
if randomBytes(id) != len(id):
289+
raise newException(
290+
ValueError, "Couldn't generate a random identifier for the filter"
291+
)
292+
293+
strId = toHex(id)
294+
295+
if not vp.filterStore.contains(strId):
296+
break
297+
298+
if i >= MAX_ID_TRIES:
299+
raise
300+
newException(ValueError, "Couldn't create a unique identifier for the filter")
301+
302+
vp.filterStore[strId] =
303+
FilterStoreItem(filter: filterOptions, blockMarker: Opt.none(Quantity))
304+
305+
return strId
306+
307+
vp.proxy.rpc("eth_uninstallFilter") do(filterId: string) -> bool:
308+
if filterId in vp.filterStore:
309+
vp.filterStore.del(filterId)
310+
return true
311+
312+
return false
313+
314+
vp.proxy.rpc("eth_getFilterLogs") do(filterId: string) -> seq[LogObject]:
315+
if filterId notin vp.filterStore:
316+
raise newException(ValueError, "Filter doesn't exist")
317+
318+
(await vp.getLogs(vp.filterStore[filterId].filter)).valueOr:
319+
raise newException(ValueError, error)
320+
321+
vp.proxy.rpc("eth_getFilterChanges") do(filterId: string) -> seq[LogObject]:
322+
if filterId notin vp.filterStore:
323+
raise newException(ValueError, "Filter doesn't exist")
324+
325+
let
326+
filterItem = vp.filterStore[filterId]
327+
filter = vp.resolveFilterTags(filterItem.filter).valueOr:
328+
raise newException(ValueError, error)
329+
# after resolving toBlock is always some and a number tag
330+
toBlock = filter.toBlock.get().number
331+
332+
if filterItem.blockMarker.isSome() and toBlock <= filterItem.blockMarker.get():
333+
raise newException(ValueError, "No changes for the filter since the last query")
334+
335+
let
336+
fromBlock =
337+
if filterItem.blockMarker.isSome():
338+
Opt.some(
339+
types.BlockTag(kind: bidNumber, number: filterItem.blockMarker.get())
340+
)
341+
else:
342+
filter.fromBlock
343+
344+
changesFilter = FilterOptions(
345+
fromBlock: fromBlock,
346+
toBlock: filter.toBlock,
347+
address: filter.address,
348+
topics: filter.topics,
349+
blockHash: filter.blockHash,
350+
)
351+
logObjs = (await vp.getLogs(changesFilter)).valueOr:
352+
raise newException(ValueError, error)
353+
354+
# all logs verified so we can update blockMarker
355+
vp.filterStore[filterId].blockMarker = Opt.some(toBlock)
356+
357+
return logObjs
358+
278359
# Following methods are forwarded directly to the web3 provider and therefore
279360
# are not validated in any way.
280361
vp.proxy.registerProxyMethod("net_version")

nimbus_verified_proxy/rpc_api_backend.nim

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77

88
{.push raises: [], gcsafe.}
99

10-
import json_rpc/[rpcproxy, rpcclient], web3/[eth_api, eth_api_types], stint, ./types
10+
import
11+
json_rpc/[rpcproxy, rpcclient],
12+
web3/[eth_api, eth_api_types],
13+
stint,
14+
std/json,
15+
./types
1116

1217
proc initNetworkApiBackend*(vp: VerifiedRpcProxy): EthApiBackend =
1318
let

nimbus_verified_proxy/tests/test_receipts.nim

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,73 @@ suite "test receipts verification":
9292
ts.loadLogs(filterOptions, logs)
9393
let verifiedLogs = waitFor vp.proxy.getClient().eth_getLogs(filterOptions)
9494
check verifiedLogs.len == logs.len
95+
96+
test "create filters and uninstall filters":
97+
# filter options without any tags would test resolving default "latest"
98+
let filterOptions = FilterOptions(
99+
topics:
100+
@[
101+
TopicOrList(
102+
kind: SingleOrListKind.slkSingle,
103+
single:
104+
bytes32"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
105+
)
106+
],
107+
blockHash: Opt.none(Hash32),
108+
)
109+
110+
let
111+
# create a filter
112+
newFilter = waitFor vp.proxy.getClient().eth_newFilter(filterOptions)
113+
# deleting will prove if the filter was created
114+
delStatus = waitFor vp.proxy.getClient().eth_uninstallFilter(newFilter)
115+
116+
check delStatus
117+
118+
let
119+
unknownFilterId = "thisisacorrectfilterid"
120+
delStatus2 = waitFor vp.proxy.getClient().eth_uninstallFilter(newFilter)
121+
122+
check not delStatus2
123+
124+
test "get logs using filter changes":
125+
let
126+
blk = getBlockFromJson("nimbus_verified_proxy/tests/data/Paris.json")
127+
rxs = getReceiptsFromJson("nimbus_verified_proxy/tests/data/receipts.json")
128+
logs = getLogsFromJson("nimbus_verified_proxy/tests/data/logs.json")
129+
130+
# update block tags because getLogs (uses)-> getReceipts (uses)-> getHeader
131+
ts.loadBlockReceipts(blk, rxs)
132+
discard vp.headerStore.add(convHeader(blk), blk.hash)
133+
discard vp.headerStore.updateFinalized(convHeader(blk), blk.hash)
134+
135+
# filter options without any tags would test resolving default "latest"
136+
let filterOptions = FilterOptions(
137+
topics:
138+
@[
139+
TopicOrList(
140+
kind: SingleOrListKind.slkSingle,
141+
single:
142+
bytes32"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
143+
)
144+
],
145+
blockHash: Opt.none(Hash32),
146+
)
147+
148+
ts.loadLogs(filterOptions, logs)
149+
150+
let
151+
# create a filter
152+
newFilter = waitFor vp.proxy.getClient().eth_newFilter(filterOptions)
153+
filterLogs = waitFor vp.proxy.getClient().eth_getFilterLogs(newFilter)
154+
filterChanges = waitFor vp.proxy.getClient().eth_getFilterChanges(newFilter)
155+
156+
check filterLogs.len == logs.len
157+
check filterChanges.len == logs.len
158+
159+
try:
160+
let againFilterChanges =
161+
waitFor vp.proxy.getClient().eth_getFilterChanges(newFilter)
162+
check false
163+
except CatchableError as e:
164+
check true

nimbus_verified_proxy/types.nim

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
{.push raises: [], gcsafe.}
99

1010
import
11+
std/tables,
1112
json_rpc/[rpcproxy, rpcclient],
1213
web3/[eth_api, eth_api_types],
1314
stint,
@@ -21,6 +22,8 @@ const
2122
ACCOUNTS_CACHE_SIZE = 128
2223
CODE_CACHE_SIZE = 64
2324
STORAGE_CACHE_SIZE = 256
25+
MAX_ID_TRIES* = 10
26+
MAX_FILTERS* = 256
2427

2528
type
2629
AccountsCacheKey* = (Root, Address)
@@ -63,6 +66,10 @@ type
6366
eth_getTransactionByHash*: GetTransactionByHashProc
6467
eth_getLogs*: GetLogsProc
6568

69+
FilterStoreItem* = object
70+
filter*: FilterOptions
71+
blockMarker*: Opt[Quantity]
72+
6673
VerifiedRpcProxy* = ref object
6774
evm*: AsyncEvm
6875
proxy*: RpcProxy
@@ -74,6 +81,7 @@ type
7481

7582
# TODO: when the list grows big add a config object instead
7683
# config parameters
84+
filterStore*: Table[string, FilterStoreItem]
7785
chainId*: UInt256
7886
maxBlockWalk*: uint64
7987

0 commit comments

Comments
 (0)