Skip to content

Commit fbe559e

Browse files
Add blockListener-scoped throttle controlled query for all receipts in a block
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent 7958448 commit fbe559e

File tree

6 files changed

+317
-37
lines changed

6 files changed

+317
-37
lines changed

internal/ethereum/config.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ const (
4343

4444
RetryEnabled = "retry.enabled"
4545

46-
MaxConcurrentRequests = "maxConcurrentRequests"
47-
TxCacheSize = "txCacheSize"
48-
HederaCompatibilityMode = "hederaCompatibilityMode"
49-
TraceTXForRevertReason = "traceTXForRevertReason"
50-
WebSocketsEnabled = "ws.enabled"
46+
MaxConcurrentRequests = "maxConcurrentRequests"
47+
TxCacheSize = "txCacheSize"
48+
HederaCompatibilityMode = "hederaCompatibilityMode"
49+
TraceTXForRevertReason = "traceTXForRevertReason"
50+
WebSocketsEnabled = "ws.enabled"
51+
MaxAsyncBlockFetchConcurrency = "maxAsyncBlockFetchConcurrency"
52+
UseGetBlockReceipts = "useGetBlockReceipts"
5153
)
5254

5355
const (
@@ -88,6 +90,8 @@ func InitConfig(conf config.Section) {
8890
conf.AddKnownKey(TxCacheSize, 250)
8991
conf.AddKnownKey(HederaCompatibilityMode, false)
9092
conf.AddKnownKey(TraceTXForRevertReason, false)
93+
conf.AddKnownKey(MaxAsyncBlockFetchConcurrency, 25)
94+
conf.AddKnownKey(UseGetBlockReceipts, true)
9195

9296
// FireFly Common default for retry enabled is false,
9397
// but we want to enable it by default

internal/ethereum/ethereum.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,12 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto
153153
})
154154

155155
if c.blockListener, err = ethblocklistener.NewBlockListenerSupplyBackend(ctx, c.retry.Retry, &ethblocklistener.BlockListenerConfig{
156-
BlockPollingInterval: conf.GetDuration(BlockPollingInterval),
157-
MonitoredHeadLength: int(c.checkpointBlockGap),
158-
HederaCompatibilityMode: conf.GetBool(HederaCompatibilityMode),
159-
BlockCacheSize: conf.GetInt(BlockCacheSize),
156+
BlockPollingInterval: conf.GetDuration(BlockPollingInterval),
157+
MonitoredHeadLength: int(c.checkpointBlockGap),
158+
HederaCompatibilityMode: conf.GetBool(HederaCompatibilityMode),
159+
BlockCacheSize: conf.GetInt(BlockCacheSize),
160+
MaxAsyncBlockFetchConcurrency: conf.GetInt(MaxAsyncBlockFetchConcurrency),
161+
UseGetBlockReceipts: conf.GetBool(UseGetBlockReceipts),
160162
}, c.backend, c.wsBackend); err != nil {
161163
return nil, err
162164
}

internal/msgs/en_error_messages.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,6 @@ var (
8484
MsgFromBlockInvalid = ffe("FF23064", "From block invalid. Must be 'earliest', 'latest' or a decimal: %s", http.StatusBadRequest)
8585
MsgInvalidJSONFormatOptions = ffe("FF23065", "The JSON formatting options must be a valid set of key=value pairs in URL query string format '%s'")
8686
MsgUnknownJSONFormatOptions = ffe("FF23066", "JSON formatting option unknown %s=%s")
87+
MsgObservedPanic = ffe("FF23067", "Observed panic: %v")
88+
MsgReturnedBlockHashMismatch = ffe("FF23068", "Returned block %d hash %s does not match requested hash %s")
8789
)
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright © 2026 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package ethblocklistener
18+
19+
import (
20+
"runtime/debug"
21+
22+
"github.com/hyperledger/firefly-common/pkg/i18n"
23+
"github.com/hyperledger/firefly-common/pkg/log"
24+
"github.com/hyperledger/firefly-evmconnect/internal/msgs"
25+
"github.com/hyperledger/firefly-evmconnect/pkg/ethrpc"
26+
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
27+
)
28+
29+
type blockReceiptRequest struct {
30+
bl *blockListener
31+
blockNumber *ethtypes.HexInteger
32+
blockHash ethtypes.HexBytes0xPrefix
33+
cb func([]*ethrpc.TxReceiptJSONRPC, error)
34+
}
35+
36+
// Initiates a background request to get all the receipts in a block.
37+
// Blocks if throttled
38+
func (bl *blockListener) FetchBlockReceiptsAsync(blockNumber *ethtypes.HexInteger, blockHash ethtypes.HexBytes0xPrefix, cb func([]*ethrpc.TxReceiptJSONRPC, error)) {
39+
brr := &blockReceiptRequest{
40+
bl: bl,
41+
blockNumber: blockNumber,
42+
blockHash: blockHash,
43+
cb: cb,
44+
}
45+
// We have a throttle here that's global to the whole blockListener, to protect us from flooding the RPC gateway / node
46+
brr.bl.blockFetchConcurrencyThrottle <- brr
47+
go brr.run()
48+
}
49+
50+
func (brr *blockReceiptRequest) run() {
51+
var err error
52+
var receipts []*ethrpc.TxReceiptJSONRPC
53+
earlyExit := true
54+
defer func() {
55+
<-brr.bl.blockFetchConcurrencyThrottle // return our slot
56+
if earlyExit {
57+
panicDetail := recover()
58+
log.L(brr.bl.ctx).Errorf("Observed panic: %v\n%s", panicDetail, debug.Stack())
59+
err = i18n.NewError(brr.bl.ctx, msgs.MsgObservedPanic, panicDetail)
60+
}
61+
brr.cb(receipts, err)
62+
}()
63+
rpc := brr.bl.backend
64+
65+
if brr.bl.UseGetBlockReceipts {
66+
// just need to make a single call to get all the receipts
67+
rpcErr := rpc.CallRPC(brr.bl.ctx, &receipts, "eth_getBlockReceipts", brr.blockNumber)
68+
if rpcErr != nil {
69+
err = rpcErr.Error()
70+
} else {
71+
// check the hash in all the receipts
72+
for _, r := range receipts {
73+
if brr.blockHash != nil && !r.BlockHash.Equals(brr.blockHash) {
74+
err = i18n.NewError(brr.bl.ctx, msgs.MsgReturnedBlockHashMismatch, brr.blockNumber.Uint64(), r.BlockHash, brr.blockHash)
75+
break
76+
}
77+
}
78+
}
79+
} else {
80+
// we don't currently optimize this branch, as all modern clients support eth_getBlockReceipts
81+
// and it seems well established that using that RPC is more efficient than attempting
82+
// parallelization or batching of eth_getTransactionReceipt calls.
83+
84+
// Get the block by hash first
85+
blockInfo, err := brr.bl.GetBlockInfoByHash(brr.bl.ctx, brr.blockHash.String())
86+
if err == nil {
87+
// Then get each receipt
88+
receipts = make([]*ethrpc.TxReceiptJSONRPC, len(blockInfo.Transactions))
89+
for i := 0; i < len(receipts) && err == nil; i++ {
90+
receipts[i], err = brr.bl.GetTransactionReceipt(brr.bl.ctx, blockInfo.Transactions[i].String())
91+
}
92+
}
93+
94+
}
95+
96+
// No early return in this function - return must happen by reaching here
97+
earlyExit = false
98+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright © 2026 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package ethblocklistener
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/hyperledger/firefly-common/pkg/fftypes"
24+
"github.com/hyperledger/firefly-common/pkg/i18n"
25+
"github.com/hyperledger/firefly-evmconnect/mocks/rpcbackendmocks"
26+
"github.com/hyperledger/firefly-evmconnect/pkg/ethrpc"
27+
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
28+
"github.com/hyperledger/firefly-signer/pkg/rpcbackend"
29+
"github.com/stretchr/testify/assert"
30+
"github.com/stretchr/testify/mock"
31+
)
32+
33+
func TestFetchBlockReceiptsAsyncOptimizedOk(t *testing.T) {
34+
_, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) {
35+
conf.UseGetBlockReceipts = true
36+
})
37+
defer done()
38+
39+
blockHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String())
40+
blockNumber := ethtypes.NewHexIntegerU64(12346)
41+
42+
receipt := &ethrpc.TxReceiptJSONRPC{
43+
TransactionHash: ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()),
44+
BlockHash: blockHash,
45+
}
46+
47+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockReceipts", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
48+
assert.Equal(t, blockNumber, args[3])
49+
res := args[1].(*[]*ethrpc.TxReceiptJSONRPC)
50+
*res = []*ethrpc.TxReceiptJSONRPC{receipt}
51+
})
52+
53+
fetched := make(chan struct{})
54+
bl.FetchBlockReceiptsAsync(blockNumber, blockHash, func(receipts []*ethrpc.TxReceiptJSONRPC, err error) {
55+
defer close(fetched)
56+
assert.NoError(t, err)
57+
assert.Equal(t, []*ethrpc.TxReceiptJSONRPC{receipt}, receipts)
58+
})
59+
<-fetched
60+
}
61+
62+
func TestFetchBlockReceiptsAsyncOptimizedBlockMismatch(t *testing.T) {
63+
_, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) {
64+
conf.UseGetBlockReceipts = true
65+
})
66+
defer done()
67+
68+
blockHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String())
69+
blockNumber := ethtypes.NewHexIntegerU64(12346)
70+
71+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockReceipts", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
72+
assert.Equal(t, blockNumber, args[3])
73+
res := args[1].(*[]*ethrpc.TxReceiptJSONRPC)
74+
*res = []*ethrpc.TxReceiptJSONRPC{
75+
{
76+
TransactionHash: ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()),
77+
BlockHash: ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()),
78+
},
79+
}
80+
})
81+
82+
fetched := make(chan struct{})
83+
bl.FetchBlockReceiptsAsync(blockNumber, blockHash, func(receipts []*ethrpc.TxReceiptJSONRPC, err error) {
84+
defer close(fetched)
85+
assert.Regexp(t, "FF23068.*"+blockHash.String(), err)
86+
})
87+
<-fetched
88+
}
89+
90+
func TestFetchBlockReceiptsAsyncOptimizedBlockHandleError(t *testing.T) {
91+
ctx, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) {
92+
conf.UseGetBlockReceipts = true
93+
})
94+
defer done()
95+
96+
blockHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String())
97+
blockNumber := ethtypes.NewHexIntegerU64(12346)
98+
99+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockReceipts", mock.Anything).
100+
Return(rpcbackend.NewRPCError(ctx, rpcbackend.RPCCodeInternalError, i18n.Msg404NotFound))
101+
102+
fetched := make(chan struct{})
103+
bl.FetchBlockReceiptsAsync(blockNumber, blockHash, func(receipts []*ethrpc.TxReceiptJSONRPC, err error) {
104+
defer close(fetched)
105+
assert.Regexp(t, "FF00167", err)
106+
})
107+
<-fetched
108+
}
109+
110+
func TestFetchBlockReceiptsAsyncOptimizedBlockHandlePanic(t *testing.T) {
111+
_, bl, mRPC, done := newTestBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) {
112+
conf.UseGetBlockReceipts = true
113+
})
114+
defer done()
115+
116+
blockHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String())
117+
blockNumber := ethtypes.NewHexIntegerU64(12346)
118+
119+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockReceipts", mock.Anything).Panic("pop")
120+
121+
fetched := make(chan struct{})
122+
bl.FetchBlockReceiptsAsync(blockNumber, blockHash, func(receipts []*ethrpc.TxReceiptJSONRPC, err error) {
123+
defer close(fetched)
124+
assert.Regexp(t, "FF23067.*pop", err)
125+
})
126+
<-fetched
127+
}
128+
129+
func TestFetchBlockReceiptsAsyncNonOptimizedOk(t *testing.T) {
130+
_, bl, mRPC, done := newTestBlockListener(t)
131+
defer done()
132+
133+
blockHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String())
134+
blockNumber := ethtypes.NewHexIntegerU64(12346)
135+
txHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String())
136+
137+
block := &ethrpc.BlockInfoJSONRPC{
138+
Number: blockNumber,
139+
Hash: blockHash,
140+
Transactions: []ethtypes.HexBytes0xPrefix{txHash},
141+
}
142+
143+
receipt := &ethrpc.TxReceiptJSONRPC{
144+
TransactionHash: txHash,
145+
BlockHash: blockHash,
146+
}
147+
148+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.Anything, false).Return(nil).Run(func(args mock.Arguments) {
149+
assert.Equal(t, blockHash.String(), args[3])
150+
res := args[1].(**ethrpc.BlockInfoJSONRPC)
151+
*res = block
152+
})
153+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getTransactionReceipt", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
154+
assert.Equal(t, txHash.String(), args[3])
155+
res := args[1].(**ethrpc.TxReceiptJSONRPC)
156+
*res = receipt
157+
})
158+
159+
fetched := make(chan struct{})
160+
bl.FetchBlockReceiptsAsync(blockNumber, blockHash, func(receipts []*ethrpc.TxReceiptJSONRPC, err error) {
161+
defer close(fetched)
162+
assert.NoError(t, err)
163+
assert.Equal(t, []*ethrpc.TxReceiptJSONRPC{receipt}, receipts)
164+
})
165+
<-fetched
166+
}

0 commit comments

Comments
 (0)