@@ -4,14 +4,12 @@ import (
44 "context"
55 "fmt"
66 "sync"
7- "sync/atomic"
87
98 libcommon "github.com/ledgerwatch/erigon-lib/common"
109 "github.com/ledgerwatch/erigon/core/types"
1110 "github.com/ledgerwatch/erigon/eth/filters"
1211 kafkaTypes "github.com/ledgerwatch/erigon/zk/realtime/kafka/types"
1312 realtimeTypes "github.com/ledgerwatch/erigon/zk/realtime/types"
14- "github.com/ledgerwatch/log/v3"
1513)
1614
1715const (
@@ -28,15 +26,13 @@ type RealtimeSubMessage struct {
2826}
2927
3028type RealtimeSubscription struct {
31- currHeight atomic.Uint64
3229 rtSubs * SyncMap [SubID , Sub [RealtimeSubMessage ]]
3330 logsSubs * SyncMap [SubID , * LogsFilter ]
3431 newMsgChan chan RealtimeSubMessage
3532}
3633
3734func NewRealtimeSubscription () * RealtimeSubscription {
3835 return & RealtimeSubscription {
39- currHeight : atomic.Uint64 {},
4036 rtSubs : NewSyncMap [SubID , Sub [RealtimeSubMessage ]](),
4137 logsSubs : NewSyncMap [SubID , * LogsFilter ](),
4238 newMsgChan : make (chan RealtimeSubMessage , DefaultChannelSize ),
@@ -70,22 +66,6 @@ func (ff *RealtimeSubscription) Start(ctx context.Context) {
7066}
7167
7268func (ff * RealtimeSubscription ) handleRealtimeMsgs (ctx context.Context , msg RealtimeSubMessage ) {
73- msgHeight := uint64 (0 )
74- if msg .BlockMsg != nil {
75- msgHeight = msg .BlockMsg .Header .Number .Uint64 ()
76- } else if msg .TxMsg != nil {
77- msgHeight = msg .TxMsg .BlockNumber
78- }
79-
80- if msgHeight < ff .currHeight .Load () {
81- // Ignore msg from previous blocks
82- log .Debug (fmt .Sprintf ("[Realtime] Subscription ignoring msg from previous block. msgHeight: %d, currHeight: %d" , msgHeight , ff .currHeight .Load ()))
83- return
84- }
85- if msgHeight > ff .currHeight .Load () {
86- ff .currHeight .Store (msgHeight )
87- }
88-
8969 ff .rtSubs .Range (func (k SubID , v Sub [RealtimeSubMessage ]) error {
9070 select {
9171 case <- ctx .Done ():
0 commit comments