Skip to content

Commit 5d208e0

Browse files
Aryan Tikaryarvagg
authored andcommitted
feat(eth): check events are indexed within in requested range (#12728)
Initial work, not yet complete.
1 parent a526c48 commit 5d208e0

File tree

3 files changed

+92
-39
lines changed

3 files changed

+92
-39
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing
102102
- Make the ordering of event output for `eth_` APIs and `GetActorEventsRaw` consistent, sorting ascending on: epoch, message index, event index and original event entry order. ([filecoin-project/lotus#12623](https://github.com/filecoin-project/lotus/pull/12623))
103103
- Return a consistent error when encountering null rounds in ETH RPC method calls. ([filecoin-project/lotus#12655](https://github.com/filecoin-project/lotus/pull/12655))
104104
- Correct erroneous sector QAP-calculation upon sector extension in lotus-miner cli. ([filecoin-project/lotus#12720](https://github.com/filecoin-project/lotus/pull/12720))
105+
- Return error if logs or events within range are not indexed. ([filecoin-project/lotus#12728](https://github.com/filecoin-project/lotus/pull/12728))
106+
105107

106108
## 📝 Changelog
107109

chain/index/ddls.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
9191
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
9292
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
9393
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
94-
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
9594
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
9695
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
9796
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",

chain/index/events.go

Lines changed: 90 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ import (
1717
"github.com/filecoin-project/go-address"
1818
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
1919
"github.com/filecoin-project/go-state-types/abi"
20-
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
21-
2220
"github.com/filecoin-project/lotus/chain/types"
21+
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
2322
)
2423

25-
var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
24+
var (
25+
ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
26+
ErrRangeInFuture = fmt.Errorf("range end is in the future")
27+
)
2628

2729
const maxLookBackForWait = 120 // one hour of tipsets
2830

@@ -236,48 +238,98 @@ func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetSta
236238
return ems, nil
237239
}
238240

239-
// checkTipsetIndexedStatus verifies if a specific tipset is indexed based on the EventFilter.
240-
// It returns nil if the tipset is indexed, ErrNotFound if it's not indexed or not specified,
241-
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventFilter) error {
242-
var tipsetKeyCid []byte
243-
var err error
244-
245-
// Determine the tipset to check based on the filter
246-
switch {
247-
case f.TipsetCid != cid.Undef:
248-
tipsetKeyCid = f.TipsetCid.Bytes()
249-
case f.MinHeight >= 0 && f.MinHeight == f.MaxHeight:
250-
tipsetKeyCid, err = si.getTipsetKeyCidByHeight(ctx, f.MinHeight)
251-
if err != nil {
252-
if err == ErrNotFound {
253-
// this means that this is a null round and there exist no events for this epoch
254-
return nil
255-
}
241+
// checkRangeIndexedStatus verifies if a range of heights is indexed.
242+
// It checks for the existence of non-null rounds at the range boundaries.
243+
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, f *EventFilter) error {
244+
minHeight := f.MinHeight
245+
maxHeight := f.MaxHeight
256246

257-
return xerrors.Errorf("failed to get tipset key cid by height: %w", err)
247+
// Find the first non-null round in the range
248+
startCid, err := si.findFirstNonNullRound(ctx, &minHeight, maxHeight)
249+
if err != nil {
250+
return xerrors.Errorf("failed to find first non-null round: %w", err)
251+
}
252+
253+
// If all rounds are null, consider the range valid
254+
if startCid == nil {
255+
return nil
256+
}
257+
258+
// Find the last non-null round in the range
259+
endCid, err := si.findLastNonNullRound(ctx, &maxHeight, minHeight)
260+
if err != nil {
261+
if errors.Is(err, ErrRangeInFuture) {
262+
return xerrors.Errorf("range end is in the future: %w", err)
258263
}
259-
default:
260-
// This function distinguishes between two scenarios:
261-
// 1. Missing events: The requested tipset is not present in the Index (an error condition).
262-
// 2. Valid case: The tipset exists but contains no events (a normal situation).
263-
// Currently, this distinction is only made for the common use case where a user requests events for a single tipset.
264-
// TODO: Implement this functionality for a range of tipsets. This is expensive and not a common use case so it's deferred for now.
264+
return xerrors.Errorf("failed to find last non-null round: %w", err)
265+
}
266+
267+
// If all rounds are null, consider the range valid
268+
if endCid == nil {
265269
return nil
266270
}
267271

268-
// If we couldn't determine a specific tipset, return ErrNotFound
269-
if tipsetKeyCid == nil {
270-
return ErrNotFound
272+
// Check indexing for start and end tipsets
273+
if err := si.checkTipsetByKeyCid(ctx, startCid, minHeight); err != nil {
274+
return err
271275
}
272276

273-
// Check if the determined tipset is indexed
274-
if exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid); err != nil {
275-
return xerrors.Errorf("failed to check if tipset is indexed: %w", err)
276-
} else if exists {
277-
return nil // Tipset is indexed
277+
if err := si.checkTipsetByKeyCid(ctx, endCid, maxHeight); err != nil {
278+
return err
279+
}
280+
281+
return nil
282+
}
283+
284+
// checkTipsetByKeyCid checks if a tipset identified by its key CID is indexed.
285+
func (si *SqliteIndexer) checkTipsetByKeyCid(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
286+
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
287+
if err != nil {
288+
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
289+
}
290+
291+
if exists {
292+
return nil // null round
293+
}
294+
295+
return ErrNotFound // tipset is not indexed
296+
}
297+
298+
// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight
299+
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight *abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, error) {
300+
for height := *minHeight; height <= maxHeight; height++ {
301+
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
302+
if err == nil {
303+
*minHeight = height // Update the minHeight to the found height
304+
return cid, nil
305+
}
306+
if !errors.Is(err, ErrNotFound) {
307+
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
308+
}
309+
}
310+
311+
return nil, nil
312+
}
313+
314+
// findLastNonNullRound finds the last non-null round starting from maxHeight down to minHeight
315+
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight *abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, error) {
316+
head := si.cs.GetHeaviestTipSet()
317+
if head == nil || *maxHeight > head.Height() {
318+
return nil, ErrRangeInFuture
319+
}
320+
321+
for height := *maxHeight; height >= minHeight; height-- {
322+
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
323+
if err == nil {
324+
*maxHeight = height // Update the maxHeight to the found height
325+
return cid, nil
326+
}
327+
if !errors.Is(err, ErrNotFound) {
328+
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
329+
}
278330
}
279331

280-
return ErrNotFound // Tipset is not indexed
332+
return nil, nil
281333
}
282334

283335
// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height.
@@ -460,7 +512,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
460512
// if the height is old enough, we'll assume the index is caught up to it and not bother
461513
// waiting for it to be indexed
462514
if height <= maxLookBackHeight {
463-
return nil, si.checkTipsetIndexedStatus(ctx, f)
515+
return nil, si.checkRangeIndexedStatus(ctx, f)
464516
}
465517
}
466518

@@ -474,7 +526,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
474526
}
475527

476528
if len(ces) == 0 {
477-
return nil, si.checkTipsetIndexedStatus(ctx, f)
529+
return nil, si.checkRangeIndexedStatus(ctx, f)
478530
}
479531
}
480532

0 commit comments

Comments
 (0)