@@ -20,15 +20,18 @@ const DEFAULT_BLOCKS_PER_POLL = 10
2020const DEFAULT_TRIGGER_INTERVAL = 1000
2121
2222type Poller struct {
23- rpc rpc.IRPCClient
24- blocksPerPoll int64
25- triggerIntervalMs int64
26- storage storage.IStorage
27- lastPolledBlock * big.Int
28- pollFromBlock * big.Int
29- pollUntilBlock * big.Int
30- parallelPollers int
31- workModeChan chan WorkMode
23+ rpc rpc.IRPCClient
24+ blocksPerPoll int64
25+ triggerIntervalMs int64
26+ storage storage.IStorage
27+ lastPolledBlock * big.Int
28+ lastPolledBlockMutex sync.RWMutex
29+ pollFromBlock * big.Int
30+ pollUntilBlock * big.Int
31+ parallelPollers int
32+ workModeChan chan WorkMode
33+ currentWorkMode WorkMode
34+ workModeMutex sync.RWMutex
3235}
3336
3437type BlockNumberWithError struct {
@@ -117,6 +120,15 @@ func (p *Poller) Start(ctx context.Context) {
117120 if ! ok {
118121 return
119122 }
123+
124+ // Do not poll if not in backfill mode
125+ p .workModeMutex .RLock ()
126+ if p .currentWorkMode != WorkModeBackfill {
127+ p .workModeMutex .RUnlock ()
128+ continue
129+ }
130+ p .workModeMutex .RUnlock ()
131+
120132 blockRangeMutex .Lock ()
121133 blockNumbers , err := p .getNextBlockRange (pollCtx )
122134 blockRangeMutex .Unlock ()
@@ -149,10 +161,26 @@ func (p *Poller) Start(ctx context.Context) {
149161 p .shutdown (cancel , tasks , & wg )
150162 return
151163 case workMode := <- p .workModeChan :
152- if workMode == WorkModeLive {
153- log .Info ().Msg ("Switching to live mode, stopping poller" )
154- p .shutdown (cancel , tasks , & wg )
155- return
164+ p .workModeMutex .RLock ()
165+ currentWorkMode := p .currentWorkMode
166+ p .workModeMutex .RUnlock ()
167+ if workMode != currentWorkMode && workMode != "" {
168+ log .Info ().Msgf ("Poller work mode changing from %s to %s" , currentWorkMode , workMode )
169+ p .workModeMutex .Lock ()
170+ changedToBackfillFromLive := currentWorkMode == WorkModeLive && workMode == WorkModeBackfill
171+ p .currentWorkMode = workMode
172+ p .workModeMutex .Unlock ()
173+ if changedToBackfillFromLive {
174+ lastBlockInMainStorage , err := p .storage .MainStorage .GetMaxBlockNumber (p .rpc .GetChainID ())
175+ if err != nil {
176+ log .Error ().Err (err ).Msg ("Error getting last block in main storage" )
177+ } else {
178+ p .lastPolledBlockMutex .Lock ()
179+ p .lastPolledBlock = lastBlockInMainStorage
180+ p .lastPolledBlockMutex .Unlock ()
181+ log .Debug ().Msgf ("Switching to backfill mode, updating last polled block to %s" , p .lastPolledBlock .String ())
182+ }
183+ }
156184 }
157185 case <- ticker .C :
158186 select {
0 commit comments