Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ethConnector struct {
blockListener *blockListener
eventFilterPollingInterval time.Duration
traceTXForRevertReason bool
chainID string

mux sync.Mutex
eventStreams map[fftypes.UUID]*eventStream
Expand Down
8 changes: 8 additions & 0 deletions internal/ethereum/event_enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,16 @@ func (ee *eventEnricher) filterEnrichEthLog(ctx context.Context, f *eventFilter,
log.L(ctx).Infof("detected event '%s'", protoID)
data, decoded := ee.decodeLogData(ctx, f.Event, ethLog.Topics, ethLog.Data)

if len(ee.connector.chainID) == 0 {
resp, _, err := ee.connector.IsReady(ctx)
if !resp.Ready || err != nil {
return nil, matched, decoded, err
}
}

info := eventInfo{
logJSONRPC: *ethLog,
ChainID: ee.connector.chainID,
}

var timestamp *fftypes.FFTime
Expand Down
25 changes: 25 additions & 0 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ func TestListenerCatchupErrorsThenDeliveryExit(t *testing.T) {
l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand All @@ -194,6 +197,9 @@ func TestListenerCatchupScalesBackOnExpectedError(t *testing.T) {
l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -221,6 +227,9 @@ func TestListenerCatchupScalesBackNTimesOnExpectedError(t *testing.T) {
l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -248,6 +257,9 @@ func TestListenerCatchupScalesBackToOne(t *testing.T) {
l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -275,6 +287,9 @@ func TestListenerNoCatchupScaleBackOnErrorMismatch(t *testing.T) {
l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -306,6 +321,9 @@ func TestListenerCatchupScalesBackCustomRegex(t *testing.T) {

assert.NoError(t, err)

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -344,6 +362,10 @@ func TestListenerCatchupNoScaleBackEmptyRegex(t *testing.T) {
Number: ethtypes.NewHexInteger64(1001),
}
})

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "ACME JSON/RPC endpoint error - eth_getLogs response size is too large"}).Times(5)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
Expand Down Expand Up @@ -453,6 +475,9 @@ func TestFilterEnrichEthLogMethodInputsOk(t *testing.T) {
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
l.ee.connector.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
Expand Down
1 change: 1 addition & 0 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type eventInfo struct {
InputMethod string `json:"inputMethod,omitempty"` // the method invoked, if it matched one of the signatures in the listener definition
InputArgs *fftypes.JSONAny `json:"inputArgs,omitempty"` // the method parameters, if the method matched one of the signatures in the listener definition
InputSigner *ethtypes.Address0xHex `json:"inputSigner,omitempty"` // the signing `from` address of the transaction
ChainID string `json:"chainId,omitempty"` // an identifier for the chain this event relates to
}

// eventStream is the state we hold in memory for each eventStream
Expand Down
1 change: 1 addition & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done fu
es := c.eventStreams[*esID]
es.c.eventFilterPollingInterval = 1 * time.Millisecond
es.c.retry.MaximumDelay = 1 * time.Microsecond
c.chainID = "12345"
assert.NotNil(t, es)

es.preStartProcessing()
Expand Down
3 changes: 3 additions & 0 deletions internal/ethereum/get_receipt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ func TestGetReceiptEventDecodeOK(t *testing.T) {
ctx, c, mRPC, done := newTestConnector(t)
defer done()

mRPC.On("CallRPC", mock.Anything, mock.Anything, "net_version", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
c.chainID = "12345"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getTransactionReceipt",
"0x7d48ae971faf089878b57e3c28e3035540d34f38af395958d2c73c36c57c83a2").
Return(nil).
Expand Down
5 changes: 2 additions & 3 deletions internal/ethereum/statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ func (c *ethConnector) IsLive(_ context.Context) (*ffcapi.LiveResponse, ffcapi.E
}

func (c *ethConnector) IsReady(ctx context.Context) (*ffcapi.ReadyResponse, ffcapi.ErrorReason, error) {
var chainID string
err := c.backend.CallRPC(ctx, &chainID, "net_version")
err := c.backend.CallRPC(ctx, &c.chainID, "net_version")
if err != nil {
return &ffcapi.ReadyResponse{
Ready: false,
}, mapError(netVersionRPCMethods, err.Error()), err.Error()
}

details := &fftypes.JSONObject{
"chainID": chainID,
"chainID": c.chainID,
}

return &ffcapi.ReadyResponse{
Expand Down