Skip to content

Commit b417642

Browse files
committed
ethreceipts: receipt listener updates, and new QueryOnChain fetch func option
1 parent 9aca5f2 commit b417642

File tree

8 files changed

+186
-111
lines changed

8 files changed

+186
-111
lines changed

ethmonitor/ethmonitor.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/0xsequence/ethkit/util"
2121
"github.com/goware/breaker"
2222
cachestore "github.com/goware/cachestore2"
23-
"github.com/goware/calc"
2423
"github.com/goware/channel"
2524
"github.com/goware/superr"
2625
"github.com/zeebo/xxh3"
@@ -626,7 +625,7 @@ func (m *Monitor) buildCanonicalChain(ctx context.Context, nextBlock *types.Bloc
626625

627626
// let's always take a pause between any reorg for the polling interval time
628627
// to allow nodes to sync to the correct chain
629-
pause := calc.Max(2*m.options.PollingInterval, 2*time.Second)
628+
pause := max(2*m.options.PollingInterval, 2*time.Second)
630629
time.Sleep(pause)
631630

632631
// Fetch/connect the broken chain backwards by traversing recursively via parent hashes

ethreceipts/ethreceipts.go

Lines changed: 100 additions & 77 deletions
Large diffs are not rendered by default.

ethreceipts/ethreceipts_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
199199
go func(i int, txnHash common.Hash) {
200200
defer wg.Done()
201201

202-
receipt, waitFinality, err := receiptsListener.FetchTransactionReceipt(ctx, txnHash, 7)
202+
receipt, waitFinality, err := receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHash, 7)
203203
require.NoError(t, err)
204204
require.NotNil(t, receipt)
205205
require.True(t, receipt.Status() == types.ReceiptStatusSuccessful)
@@ -226,7 +226,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
226226
require.Equal(t, 1, monitor.NumSubscribers())
227227

228228
// Testing exhausted filter after maxWait period is unable to find non-existant txn hash
229-
receipt, waitFinality, err := receiptsListener.FetchTransactionReceipt(ctx, ethkit.Hash{1, 2, 3, 4}, 5)
229+
receipt, waitFinality, err := receiptsListener.FetchTransactionReceiptWithFinality(ctx, ethkit.Hash{1, 2, 3, 4}, 5)
230230
require.Error(t, err)
231231
require.True(t, errors.Is(err, ethreceipts.ErrFilterExhausted))
232232
require.Nil(t, receipt)
@@ -245,7 +245,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
245245
monitor.PurgeHistory()
246246
receiptsListener.PurgeHistory()
247247

248-
receipt, waitFinality, err = receiptsListener.FetchTransactionReceipt(ctx, txnHashes[0])
248+
receipt, waitFinality, err = receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHashes[0])
249249
require.NoError(t, err)
250250
require.NotNil(t, receipt)
251251
finalReceipt, err = waitFinality(context.Background())
@@ -256,7 +256,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) {
256256
// wait enough time, so that the fetched receipt will come as finalized right away
257257
time.Sleep(5 * time.Second)
258258

259-
receipt, waitFinality, err = receiptsListener.FetchTransactionReceipt(ctx, txnHashes[1])
259+
receipt, waitFinality, err = receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHashes[1])
260260
require.NoError(t, err)
261261
require.NotNil(t, receipt)
262262
require.True(t, receipt.Final)
@@ -349,7 +349,7 @@ func TestFetchTransactionReceiptBlast(t *testing.T) {
349349
go func(i int, txnHash common.Hash) {
350350
defer wg.Done()
351351

352-
receipt, receiptFinality, err := receiptsListener.FetchTransactionReceipt(ctx, txnHash)
352+
receipt, receiptFinality, err := receiptsListener.FetchTransactionReceiptWithFinality(ctx, txnHash)
353353
assert.NoError(t, err)
354354
assert.NotNil(t, receipt)
355355
assert.True(t, receipt.Status() == types.ReceiptStatusSuccessful)

ethreceipts/filterer.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ func FilterTxnHash(txnHash ethkit.Hash) FilterQuery {
1919
// default options for TxnHash filter. Note, other filter conds
2020
// have a different set of defaults.
2121
options: FilterOptions{
22-
Finalize: true,
23-
LimitOne: true,
24-
SearchCache: true,
25-
SearchOnChain: true,
22+
Finalize: true,
23+
LimitOne: true,
24+
SearchCache: true,
25+
QueryOnChainTxnHash: true,
2626

2727
// wait up to NumBlocksToFinality*2 number of blocks between
2828
// filter matches before unsubcribing if no matches occured
@@ -116,8 +116,13 @@ type FilterQuery interface {
116116
Finalize(bool) FilterQuery
117117
LimitOne(bool) FilterQuery
118118
SearchCache(bool) FilterQuery
119-
SearchOnChain(bool) FilterQuery
119+
QueryOnChainTxnHash(bool) FilterQuery
120+
QueryOnChain(func(context.Context) (*types.Receipt, error)) FilterQuery
120121
MaxWait(int) FilterQuery
122+
123+
// DEPRECATED: please use QueryOnChainTxnHash instead, which is the same thing, renamed to be more clear
124+
// in addition, see new QueryChain(fn) as additional feature.
125+
SearchOnChain(bool) FilterQuery
121126
}
122127

123128
type FilterOptions struct {
@@ -133,9 +138,14 @@ type FilterOptions struct {
133138
// ..
134139
SearchCache bool
135140

136-
// SearchOnChain will search for txn hash on-chain. This is only useful
137-
// when used in combination with TxnHash filter cond.
138-
SearchOnChain bool
141+
// QueryOnChainTxnHash will query the chain for the txn hash at the start
142+
// of a filter subscription. This is only useful when used in combination with
143+
// TxnHash filter cond, to search for past transactions which may have been
144+
// mined before the filter was created.
145+
QueryOnChainTxnHash bool
146+
147+
// ..
148+
QueryOnChain func(context.Context) (*types.Receipt, error)
139149

140150
// MaxWait filter option waits some number of blocks without a filter match after
141151
// which point will auto-unsubscribe the filter. This is useful to help automatically
@@ -196,8 +206,19 @@ func (f *filter) SearchCache(searchCache bool) FilterQuery {
196206
return f
197207
}
198208

209+
// DEPRECATED: please use QueryChainForTxnHash instead, which is the same thing, renamed to be more clear
199210
func (f *filter) SearchOnChain(searchOnChain bool) FilterQuery {
200-
f.options.SearchOnChain = searchOnChain
211+
f.options.QueryOnChainTxnHash = searchOnChain
212+
return f
213+
}
214+
215+
func (f *filter) QueryOnChainTxnHash(queryOnChainTxnHash bool) FilterQuery {
216+
f.options.QueryOnChainTxnHash = queryOnChainTxnHash
217+
return f
218+
}
219+
220+
func (f *filter) QueryOnChain(fn func(context.Context) (*types.Receipt, error)) FilterQuery {
221+
f.options.QueryOnChain = fn
201222
return f
202223
}
203224

ethreceipts/receipt.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package ethreceipts
22

33
import (
4+
"fmt"
45
"math/big"
6+
"sync/atomic"
57

68
"github.com/0xsequence/ethkit"
9+
"github.com/0xsequence/ethkit/ethtxn"
710
"github.com/0xsequence/ethkit/go-ethereum/common"
811
"github.com/0xsequence/ethkit/go-ethereum/core"
912
"github.com/0xsequence/ethkit/go-ethereum/core/types"
@@ -14,10 +17,14 @@ type Receipt struct {
1417
Final bool // flags that this receipt is finalized
1518
Reorged bool // chain reorged / removed the txn
1619

20+
chainID *big.Int
1721
transaction *types.Transaction
18-
message *core.Message // TODOXXX: this intermediate type is lame.. with new ethrpc we can remove
1922
receipt *types.Receipt
2023
logs []*types.Log
24+
25+
// TODOXXX: this intermediate type is lame.. with new ethrpc we can remove
26+
// NOTE: we only use this for From/To address resolution currently
27+
message atomic.Value
2128
}
2229

2330
func (r *Receipt) Receipt() *types.Receipt {
@@ -143,24 +150,52 @@ func (r *Receipt) Logs() []*types.Log {
143150
func (r *Receipt) From() common.Address {
144151
if r.receipt != nil {
145152
return r.receipt.From
146-
} else if r.message != nil {
147-
return r.message.From
148153
} else {
149-
return common.Address{}
154+
if msg, _ := r.AsMessage(); msg != nil {
155+
return msg.From
156+
}
150157
}
158+
return common.Address{}
151159
}
152160

153161
func (r *Receipt) To() common.Address {
154162
if r.receipt != nil {
155163
return r.receipt.To
156-
} else if r.message != nil {
157-
to := r.message.To
158-
if to == nil {
159-
return common.Address{}
160-
} else {
161-
return *to
162-
}
163164
} else {
164-
return common.Address{}
165+
if msg, _ := r.AsMessage(); msg != nil {
166+
to := msg.To
167+
if to == nil {
168+
return common.Address{}
169+
} else {
170+
return *to
171+
}
172+
}
173+
}
174+
return common.Address{}
175+
}
176+
177+
func (r *Receipt) AsMessage() (*core.Message, error) {
178+
msg, ok := r.message.Load().(*core.Message)
179+
if !ok {
180+
return nil, fmt.Errorf("ethreceipts: Receipt.message type-assertion fail, unexpected")
181+
}
182+
if msg != nil {
183+
return msg, nil
184+
}
185+
186+
// TODOXXX: avoid using AsMessage as its fairly expensive operation, especially
187+
// to do it for every txn for every filter.
188+
// TODO: in order to do this, we'll have to update ethrpc with a different
189+
// implementation to just use raw types, aka, ethrpc/types.go with Block/Transaction/Receipt/Log ..
190+
txnMsg, err := ethtxn.AsMessage(r.transaction, r.chainID)
191+
if err != nil {
192+
// NOTE: this should never happen, but lets log in case it does. In the
193+
// future, we should just not use go-ethereum for these types.
194+
// l.log.Warn(fmt.Sprintf("unexpected failure of txn (%s index %d) on block %d (total txns=%d) AsMessage(..): %s",
195+
// txn.Hash(), i, block.NumberU64(), len(block.Transactions()), err,
196+
// ))
197+
return nil, err
165198
}
199+
r.message.Store(txnMsg)
200+
return txnMsg, nil
166201
}

ethreceipts/subscription.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func (s *subscriber) AddFilter(filterQueries ...FilterQuery) {
103103
}
104104

105105
s.mu.Lock()
106-
107106
if len(s.filters)+len(filters) > maxFiltersPerListener {
108107
// too many filters, ignore the extra filter. not ideal, but better than
109108
// deadlocking
@@ -112,7 +111,6 @@ func (s *subscriber) AddFilter(filterQueries ...FilterQuery) {
112111
s.mu.Unlock()
113112
return
114113
}
115-
116114
s.filters = append(s.filters, filters...)
117115
s.mu.Unlock()
118116

@@ -143,7 +141,9 @@ func (s *subscriber) ClearFilters() {
143141
s.filters = s.filters[:0]
144142
}
145143

146-
func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, receipts []Receipt) ([]bool, error) {
144+
// matchFiltersAndPublish matches the given receipts against the provided filterers,
145+
// fetches any missing receipt data as needed, and notifies the subscriber of matches.
146+
func (s *subscriber) matchFiltersAndPublish(ctx context.Context, filterers []Filterer, receipts []Receipt) ([]bool, error) {
147147
oks := make([]bool, len(filterers))
148148

149149
// Collect matches that need receipt fetching

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ require (
2828
github.com/goware/cachestore-mem v0.2.2
2929
github.com/goware/cachestore-redis v0.2.1
3030
github.com/goware/cachestore2 v0.12.3
31-
github.com/goware/calc v0.2.0
3231
github.com/goware/channel v0.5.0
3332
github.com/goware/pp v0.0.3
3433
github.com/goware/superr v0.0.2

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ github.com/goware/cachestore-redis v0.2.1 h1:bMdkzGuy6Dsybq2chv3kRwu8UGoJz3aQBO9
104104
github.com/goware/cachestore-redis v0.2.1/go.mod h1:+8rOAfL1qNLNiXHe8+WACPk+I9kaLOAfArj2Z7FDiWg=
105105
github.com/goware/cachestore2 v0.12.3 h1:V4VODChSAV29p8htHj8Lb36Hvv28CLrJsw49gx0h+ks=
106106
github.com/goware/cachestore2 v0.12.3/go.mod h1:PR+lXK8UXa/wjKB7mpIj6HtRhC7vbcRXx4b5F1Av/ik=
107-
github.com/goware/calc v0.2.0 h1:3B9qjXYpE0kgS4LhyklbM6X/0cOvZLdUZG7sdAuVCb4=
108-
github.com/goware/calc v0.2.0/go.mod h1:BSQUbfS6ICW9RvSV9SikDY+t6/HQKI+CUxIpjE3VD28=
109107
github.com/goware/channel v0.5.0 h1:cOllKceCH5Xhibs0v8jtPJ81ez3L7WpYri/OU+9IBfg=
110108
github.com/goware/channel v0.5.0/go.mod h1:Eai0KCjphDZ44M/qT7G1ZE6lZfywiTFvwV3Xc6cDPdo=
111109
github.com/goware/pp v0.0.3 h1:2Yv0IFGOpVjCDayPYzrqskCe9qmGoKBIyu6Uy//LVUU=

0 commit comments

Comments
 (0)