@@ -15,19 +15,61 @@ import (
1515 roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
1616)
1717
18+ const (
19+ statusStarted = "started"
20+ statusReindexing = "reindexing"
21+ statusIndexing = "indexing"
22+ statusStopped = "stopped"
23+ )
24+
25+ // IndexerStatus is the status of runtime history indexer.
26+ type IndexerStatus struct {
27+ // Status is the concise status of runtime history indexer state.
28+ Status string `json:"status"`
29+
30+ // LastRound is the last runtime round that was indexed.
31+ LastRound uint64 `json:"last_round"`
32+
33+ // ReindexStatus is history reindex status.
34+ //
35+ // It is nil unless during history reindex.
36+ ReindexStatus * ReindexStatus `json:"reindex_status,omitempty"`
37+ }
38+
39+ type ReindexStatus struct {
40+ // BatchSize is the number of blocks to reindex in a single batch.
41+ BatchSize uint16 `json:"batch_size"`
42+ // LastHeight is the last consensus height that was indexed.
43+ LastHeight int64 `json:"last_height"`
44+ // StartHeight is the first height of history reindex interval.
45+ StartHeight int64 `json:"start_height"`
46+ // EndHeight is the last height of history reindex interval.
47+ EndHeight int64 `json:"end_height"`
48+ // ETA is expected time of history reindex completition.
49+ ETA time.Time `json:"eta"`
50+ }
51+
1852const (
1953 maxPendingBlocks = 10
2054)
2155
2256// BlockIndexer is responsible for indexing and committing finalized
2357// runtime blocks from the consensus into the runtime history.
2458type BlockIndexer struct {
59+ mu sync.RWMutex
2560 startOne cmSync.One
2661
2762 consensus consensus.Backend
2863 history History
2964 batchSize uint16
3065
66+ status string
67+ lastHeight int64
68+ startHeight int64
69+ endHeight int64
70+ lastRound uint64
71+ started time.Time
72+
3173 logger * logging.Logger
3274}
3375
@@ -54,9 +96,41 @@ func (bi *BlockIndexer) Stop() {
5496 bi .startOne .TryStop ()
5597}
5698
99+ // Status returns runtime block history indexer status.
100+ func (bi * BlockIndexer ) Status () * IndexerStatus {
101+ bi .mu .RLock ()
102+ defer bi .mu .RUnlock ()
103+
104+ status := & IndexerStatus {
105+ Status : bi .status ,
106+ LastRound : bi .lastRound ,
107+ }
108+
109+ if bi .status != statusReindexing {
110+ return status
111+ }
112+
113+ elapsed := time .Since (bi .started ).Milliseconds ()
114+ remaining := elapsed * (bi .endHeight - bi .lastHeight ) / max ((bi .lastHeight - bi .startHeight + 1 ), 1 )
115+ eta := time .Now ().Add (time .Duration (remaining ) * time .Millisecond )
116+ status .ReindexStatus = & ReindexStatus {
117+ BatchSize : bi .batchSize ,
118+ LastHeight : bi .lastHeight ,
119+ StartHeight : bi .startHeight ,
120+ EndHeight : bi .endHeight ,
121+ ETA : eta ,
122+ }
123+
124+ return status
125+ }
126+
57127func (bi * BlockIndexer ) run (ctx context.Context ) {
58128 bi .logger .Info ("starting" )
59129
130+ bi .mu .Lock ()
131+ bi .status = statusStarted
132+ bi .mu .Unlock ()
133+
60134 // Subscribe to new runtime blocks.
61135 blkCh , blkSub , err := bi .consensus .RootHash ().WatchBlocks (ctx , bi .history .RuntimeID ())
62136 if err != nil {
@@ -86,11 +160,19 @@ func (bi *BlockIndexer) run(ctx context.Context) {
86160 // Index new blocks.
87161 bi .index (ctx , blkCh )
88162 bi .logger .Info ("stopping" )
163+
164+ bi .mu .Lock ()
165+ bi .status = statusStopped
166+ bi .mu .Unlock ()
89167}
90168
91169func (bi * BlockIndexer ) index (ctx context.Context , blkCh <- chan * roothash.AnnotatedBlock ) {
92170 bi .logger .Debug ("indexing" )
93171
172+ bi .mu .Lock ()
173+ bi .status = statusIndexing
174+ bi .mu .Unlock ()
175+
94176 retry := time .Duration (math .MaxInt64 )
95177 boff := cmnBackoff .NewExponentialBackOff ()
96178 boff .Reset ()
@@ -216,23 +298,33 @@ func (bi *BlockIndexer) reindexTo(ctx context.Context, height int64) error {
216298 )
217299 return fmt .Errorf ("failed to get last indexed height: %w" , err )
218300 }
219- lastHeight ++ // +1 since we want the last non-seen height.
301+ startHeight := lastHeight + 1 // +1 since we want the last non-seen height.
220302
221303 lastRetainedHeight , err := bi .consensus .GetLastRetainedHeight (ctx )
222304 if err != nil {
223305 return fmt .Errorf ("failed to get last retained height: %w" , err )
224306 }
225307
226- if lastHeight < lastRetainedHeight {
308+ if startHeight < lastRetainedHeight {
227309 bi .logger .Debug ("skipping pruned heights" ,
228310 "last_retained_height" , lastRetainedHeight ,
229- "last_height " , lastHeight ,
311+ "start_height " , startHeight ,
230312 )
231- lastHeight = lastRetainedHeight
313+ startHeight = lastRetainedHeight
314+ }
315+
316+ bi .mu .Lock ()
317+ bi .status = statusReindexing
318+ bi .endHeight = height
319+ if bi .startHeight == 0 {
320+ bi .lastHeight = lastHeight
321+ bi .startHeight = startHeight
322+ bi .started = time .Now ()
232323 }
324+ bi .mu .Unlock ()
233325
234326 batchSize := int64 (bi .batchSize )
235- for start := lastHeight ; start <= height ; start += batchSize {
327+ for start := startHeight ; start <= height ; start += batchSize {
236328 end := min (start + batchSize - 1 , height )
237329 if err = bi .reindexRange (ctx , start , end ); err != nil {
238330 return fmt .Errorf ("failed to reindex batch: %w" , err )
@@ -291,6 +383,10 @@ func (bi *BlockIndexer) reindexRange(ctx context.Context, start int64, end int64
291383 return err
292384 }
293385
386+ bi .mu .Lock ()
387+ bi .lastHeight = end
388+ bi .mu .Unlock ()
389+
294390 bi .logger .Debug ("block reindex completed" )
295391 return nil
296392}
@@ -312,6 +408,11 @@ func (bi *BlockIndexer) commitBlocks(blocks []*roothash.AnnotatedBlock) error {
312408 return fmt .Errorf ("failed to commit blocks: %w" , err )
313409 }
314410
411+ bi .mu .Lock ()
412+ defer bi .mu .Unlock ()
413+ lastBlk := blocks [len (blocks )- 1 ]
414+ bi .lastRound = lastBlk .Block .Header .Round
415+
315416 return nil
316417}
317418
0 commit comments