Skip to content

Commit b5be6b7

Browse files
bas-vkobscuren
authored andcommitted
eth/filter: add support for pending logs (#3219)
1 parent 318ad3c commit b5be6b7

File tree

7 files changed

+319
-92
lines changed

7 files changed

+319
-92
lines changed

eth/filters/api.go

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,17 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
239239
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
240240
}
241241

242-
rpcSub := notifier.CreateSubscription()
242+
var (
243+
rpcSub = notifier.CreateSubscription()
244+
matchedLogs = make(chan []Log)
245+
)
246+
247+
logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
248+
if err != nil {
249+
return nil, err
250+
}
243251

244252
go func() {
245-
matchedLogs := make(chan []Log)
246-
logsSub := api.events.SubscribeLogs(crit, matchedLogs)
247253

248254
for {
249255
select {
@@ -276,18 +282,20 @@ type FilterCriteria struct {
276282
// used to retrieve logs when the state changes. This method cannot be
277283
// used to fetch logs that are already stored in the state.
278284
//
285+
// Default criteria for the from and to block are "latest".
286+
// Using "latest" as block number will return logs for mined blocks.
287+
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
288+
// In case logs are removed (chain reorg) previously returned logs are returned
289+
// again but with the removed property set to true.
290+
//
291+
// In case "fromBlock" > "toBlock" an error is returned.
292+
//
279293
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
280-
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
281-
var (
282-
logs = make(chan []Log)
283-
logsSub = api.events.SubscribeLogs(crit, logs)
284-
)
285-
286-
if crit.FromBlock == nil {
287-
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
288-
}
289-
if crit.ToBlock == nil {
290-
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
294+
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
295+
logs := make(chan []Log)
296+
logsSub, err := api.events.SubscribeLogs(crit, logs)
297+
if err != nil {
298+
return rpc.ID(""), err
291299
}
292300

293301
api.filtersMu.Lock()
@@ -312,7 +320,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
312320
}
313321
}()
314322

315-
return logsSub.ID
323+
return logsSub.ID, nil
316324
}
317325

318326
// GetLogs returns logs matching the given argument that are stored within the state.
@@ -363,28 +371,38 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log
363371
api.filtersMu.Unlock()
364372

365373
if !found || f.typ != LogsSubscription {
366-
return []Log{}, nil
374+
return nil, fmt.Errorf("filter not found")
367375
}
368376

369377
filter := New(api.backend, api.useMipMap)
370-
filter.SetBeginBlock(f.crit.FromBlock.Int64())
371-
filter.SetEndBlock(f.crit.ToBlock.Int64())
378+
if f.crit.FromBlock != nil {
379+
filter.SetBeginBlock(f.crit.FromBlock.Int64())
380+
} else {
381+
filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
382+
}
383+
if f.crit.ToBlock != nil {
384+
filter.SetEndBlock(f.crit.ToBlock.Int64())
385+
} else {
386+
filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
387+
}
372388
filter.SetAddresses(f.crit.Addresses)
373389
filter.SetTopics(f.crit.Topics)
374390

375-
logs, err := filter.Find(ctx)
376-
return returnLogs(logs), err
391+
logs, err:= filter.Find(ctx)
392+
if err != nil {
393+
return nil, err
394+
}
395+
return returnLogs(logs), nil
377396
}
378397

379398
// GetFilterChanges returns the logs for the filter with the given id since
380399
// last time is was called. This can be used for polling.
381400
//
382401
// For pending transaction and block filters the result is []common.Hash.
383-
// (pending)Log filters return []Log. If the filter could not be found
384-
// []interface{}{} is returned.
402+
// (pending)Log filters return []Log.
385403
//
386404
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
387-
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
405+
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
388406
api.filtersMu.Lock()
389407
defer api.filtersMu.Unlock()
390408

@@ -400,15 +418,15 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
400418
case PendingTransactionsSubscription, BlocksSubscription:
401419
hashes := f.hashes
402420
f.hashes = nil
403-
return returnHashes(hashes)
404-
case PendingLogsSubscription, LogsSubscription:
421+
return returnHashes(hashes), nil
422+
case LogsSubscription:
405423
logs := f.logs
406424
f.logs = nil
407-
return returnLogs(logs)
425+
return returnLogs(logs), nil
408426
}
409427
}
410428

411-
return []interface{}{}
429+
return []interface{}{}, fmt.Errorf("filter not found")
412430
}
413431

414432
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
@@ -443,15 +461,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
443461
return err
444462
}
445463

446-
if raw.From == nil || raw.From.Int64() < 0 {
447-
args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
448-
} else {
464+
if raw.From != nil {
449465
args.FromBlock = big.NewInt(raw.From.Int64())
450466
}
451467

452-
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
453-
args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
454-
} else {
468+
if raw.ToBlock != nil {
455469
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
456470
}
457471

eth/filters/api_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
4242
if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
4343
t.Fatal(err)
4444
}
45-
if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() {
46-
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
45+
if test0.FromBlock != nil {
46+
t.Fatalf("expected nil, got %d", test0.FromBlock)
4747
}
48-
if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() {
49-
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
48+
if test0.ToBlock != nil {
49+
t.Fatalf("expected nil, got %d", test0.ToBlock)
5050
}
5151
if len(test0.Addresses) != 0 {
5252
t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses))

eth/filters/filter.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"math"
2121
"time"
2222

23+
"math/big"
24+
2325
"github.com/ethereum/go-ethereum/common"
2426
"github.com/ethereum/go-ethereum/core"
2527
"github.com/ethereum/go-ethereum/core/types"
@@ -162,7 +164,7 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er
162164
}
163165
unfiltered = append(unfiltered, rl...)
164166
}
165-
logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...)
167+
logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...)
166168
}
167169
}
168170

@@ -179,12 +181,18 @@ func includes(addresses []common.Address, a common.Address) bool {
179181
return false
180182
}
181183

182-
func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log {
184+
func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log {
183185
var ret []Log
184-
185186
// Filter the logs for interesting stuff
186187
Logs:
187188
for _, log := range logs {
189+
if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber {
190+
continue
191+
}
192+
if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber {
193+
continue
194+
}
195+
188196
if len(addresses) > 0 && !includes(addresses, log.Address) {
189197
continue
190198
}
@@ -211,7 +219,6 @@ Logs:
211219
continue Logs
212220
}
213221
}
214-
215222
ret = append(ret, log)
216223
}
217224

0 commit comments

Comments
 (0)