diff --git a/Makefile b/Makefile index 8a7accbd..8de3d986 100644 --- a/Makefile +++ b/Makefile @@ -80,10 +80,8 @@ e2e-test: .PHONY: check-tidy check-tidy: - go mod tidy - git diff --exit-code - cd tests - go mod tidy + go mod tidy -v + cd tests; go mod tidy -v git diff --exit-code .PHONY: build diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index f5748c2e..3aa4a0ef 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -57,6 +57,7 @@ type Storages struct { Transactions storage.TransactionIndexer Receipts storage.ReceiptIndexer Traces storage.TraceIndexer + EventsHash *pebble.EventsHash } type Publishers struct { @@ -159,14 +160,37 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { nextCadenceHeight -= 1 } - // create EVM event subscriber - subscriber := ingestion.NewRPCEventSubscriber( - b.logger, - b.client, - chainID, - b.keystore, - nextCadenceHeight, - ) + // create event subscriber + var subscriber ingestion.EventSubscriber + if b.config.ExperimentalSoftFinalityEnabled { + var verifier *ingestion.SealingVerifier + if b.config.ExperimentalSealingVerificationEnabled { + verifier = ingestion.NewSealingVerifier( + b.logger, + b.client, + chainID, + b.storages.EventsHash, + nextCadenceHeight, + ) + } + + subscriber = ingestion.NewRPCBlockTrackingSubscriber( + b.logger, + b.client, + chainID, + b.keystore, + nextCadenceHeight, + verifier, + ) + } else { + subscriber = ingestion.NewRPCEventSubscriber( + b.logger, + b.client, + chainID, + b.keystore, + nextCadenceHeight, + ) + } callTracerCollector, err := replayer.NewCallTracerCollector( b.config.EVMNetworkID, @@ -591,6 +615,7 @@ func setupStorage( blocks := pebble.NewBlocks(store, config.FlowNetworkID) storageAddress := evm.StorageAccountAddress(config.FlowNetworkID) registerStore := pebble.NewRegisterStorage(store, storageAddress) + eventsHash := pebble.NewEventsHash(store) batch := store.NewBatch() defer func() { @@ -605,7 +630,20 @@ func setupStorage( if config.ForceStartCadenceHeight != 0 { logger.Warn().Uint64("height", config.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!") if err := blocks.SetLatestCadenceHeight(config.ForceStartCadenceHeight, batch); err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to set latest cadence height: %w", err) + } + + verifiedHeight, err := eventsHash.ProcessedSealedHeight() + if err != nil && !errors.Is(err, errs.ErrStorageNotInitialized) { + return nil, nil, fmt.Errorf("failed to get latest verified sealed height: %w", err) + } + if verifiedHeight > config.ForceStartCadenceHeight { + if err := eventsHash.BatchSetProcessedSealedHeight(config.ForceStartCadenceHeight, batch); err != nil { + return nil, nil, fmt.Errorf("failed to set latest verified sealed height: %w", err) + } + } + if err := eventsHash.BatchRemoveAboveHeight(config.ForceStartCadenceHeight, batch); err != nil { + return nil, nil, fmt.Errorf("failed to reset events hash above height: %w", err) } } @@ -670,6 +708,7 @@ func setupStorage( Transactions: pebble.NewTransactions(store), Receipts: pebble.NewReceipts(store), Traces: pebble.NewTraces(store), + EventsHash: eventsHash, }, nil } diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 871b3578..e3b22cf1 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -227,6 +227,9 @@ func parseConfigFromFlags() error { return fmt.Errorf("tx-batch-mode should be enabled with tx-state-validation=local-index") } + cfg.ExperimentalSoftFinalityEnabled = experimentalSoftFinalityEnabled + cfg.ExperimentalSealingVerificationEnabled = experimentalSealingVerificationEnabled + return nil } @@ -250,6 +253,9 @@ var ( txStateValidation string initHeight, forceStartHeight uint64 + + experimentalSoftFinalityEnabled, + experimentalSealingVerificationEnabled bool ) func init() { @@ -290,6 +296,8 @@ func init() { Cmd.Flags().DurationVar(&cfg.TxRequestLimitDuration, "tx-request-limit-duration", time.Second*3, "Time interval upon which to enforce transaction submission rate limiting.") Cmd.Flags().BoolVar(&cfg.TxBatchMode, "tx-batch-mode", false, "Enable batch transaction submission, to avoid nonce mismatch issues for high-volume EOAs.") Cmd.Flags().DurationVar(&cfg.TxBatchInterval, "tx-batch-interval", time.Millisecond*1200, "Time interval upon which to submit the transaction batches to the Flow network.") + Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "Sets whether the gateway should use the experimental soft finality feature. WARNING: This may result in incorrect results being returned in certain circumstances. Use only if you know what you are doing.") + Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", true, "Sets whether the gateway should use the experimental soft finality sealing verification feature. WARNING: This may result in indexing halts if events do not match. Use only if you know what you are doing.") Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.") Cmd.Flags().DurationVar(&cfg.RpcRequestTimeout, "rpc-request-timeout", time.Second*120, "Sets the maximum duration at which JSON-RPC requests should generate a response, before they timeout. The default is 120 seconds.") diff --git a/config/config.go b/config/config.go index acd1adcf..10694f0b 100644 --- a/config/config.go +++ b/config/config.go @@ -120,6 +120,15 @@ type Config struct { // TxBatchInterval is the time interval upon which to submit the transaction batches to the // Flow network. TxBatchInterval time.Duration + // ExperimentalSoftFinalityEnabled enables the experimental soft finality feature which syncs + // EVM block and transaction data from the upstream Access node before the block is sealed. + // CAUTION: This feature is experimental and may return incorrect data in certain circumstances. + ExperimentalSoftFinalityEnabled bool + // ExperimentalSealingVerificationEnabled enables the experimental sealing verification feature + // which verifies the hash of the EVM events ingested by the requester engine match the hash + // of the events from the sealed block in the Flow network. + // CAUTION: This feature is experimental and will cause the node to halt if the events don't match. + ExperimentalSealingVerificationEnabled bool // EOAActivityCacheTTL is the time interval used to track EOA activity. Tx send more // frequently than this interval will be batched. // Useful only when batch transaction submission is enabled. diff --git a/models/events.go b/models/events.go index ad6cdb4f..b1ad5084 100644 --- a/models/events.go +++ b/models/events.go @@ -152,6 +152,11 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { return e, nil } +// BlockEvents returns the Flow block events. +func (c *CadenceEvents) BlockEvents() flow.BlockEvents { + return c.events +} + // Block evm block. If event doesn't contain EVM block the return value is nil. func (c *CadenceEvents) Block() *Block { return c.block diff --git a/services/ingestion/block_tracking_subscriber.go b/services/ingestion/block_tracking_subscriber.go new file mode 100644 index 00000000..b5db0420 --- /dev/null +++ b/services/ingestion/block_tracking_subscriber.go @@ -0,0 +1,413 @@ +package ingestion + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/onflow/flow-go-sdk" + "github.com/onflow/flow-go/fvm/evm/events" + flowGo "github.com/onflow/flow-go/model/flow" + "github.com/rs/zerolog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/onflow/flow-evm-gateway/models" + errs "github.com/onflow/flow-evm-gateway/models/errors" + "github.com/onflow/flow-evm-gateway/services/requester" + "github.com/onflow/flow-evm-gateway/services/requester/keystore" +) + +var ErrSystemTransactionFailed = errors.New("system transaction failed") +var ErrSporkRootBlockHasNoEVMBlocks = errors.New("spork root block has no EVM blocks") + +var _ EventSubscriber = &RPCBlockTrackingSubscriber{} + +// RPCBlockTrackingSubscriber subscribes to new EVM block events for unsealed finalized blocks. +// This is accomplished by following finalized blocks from the upstream Access node, and using the +// polling endpoint to fetch the events for each finalized block. +// +// IMPORTANT: Since data is downloaded and processed from unsealed blocks, it's possible for the +// data that was downloaded to be incorrect. This subscriber provides no handling or detection for +// cases where the received data differs from the data that was ultimately sealed. The operator must +// handle this manually. +// Since it's not reasonable to expect operators to do this manual tracking, this features should NOT +// be used outside of a limited Proof of Concept. Use at own risk. +// +// A future version of the RPCEventSubscriber will provide this detection and handling functionality +// at which point this subscriber will be removed. +type RPCBlockTrackingSubscriber struct { + *RPCEventSubscriber + + verifier *SealingVerifier +} + +func NewRPCBlockTrackingSubscriber( + logger zerolog.Logger, + client *requester.CrossSporkClient, + chainID flowGo.ChainID, + keyLock keystore.KeyLock, + startHeight uint64, + verifier *SealingVerifier, +) *RPCBlockTrackingSubscriber { + return &RPCBlockTrackingSubscriber{ + RPCEventSubscriber: NewRPCEventSubscriber( + logger.With().Str("component", "subscriber").Logger(), + client, + chainID, + keyLock, + startHeight, + ), + verifier: verifier, + } +} + +// Subscribe will retrieve all the events from the provided height. If the height is from previous +// sporks, it will first backfill all the events in all the previous sporks, and then continue +// to listen all new events in the current spork. +// +// If error is encountered during backfill the subscription will end and the response chanel will be closed. +func (r *RPCBlockTrackingSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { + // buffered channel so that the decoding of the events can happen in parallel to other operations + eventsChan := make(chan models.BlockEvents, 1000) + + go func() { + defer func() { + close(eventsChan) + }() + + // if the height is from the previous spork, backfill all the eventsChan from previous sporks first + if r.client.IsPastSpork(r.height) { + r.logger.Info(). + Uint64("height", r.height). + Msg("height found in previous spork, starting to backfill") + + // backfill all the missed events, handling of context cancellation is done by the producer + for ev := range r.backfill(ctx, r.height) { + eventsChan <- ev + + if ev.Err != nil { + return + } + + if r.verifier != nil { + if err := r.verifier.AddFinalizedBlock(ev.Events.BlockEvents()); err != nil { + r.logger.Fatal().Err(err).Msg("failed to add finalized block to sealing verifier") + return + } + } + + // keep updating height, so after we are done back-filling + // it will be at the first height in the current spork + r.height = ev.Events.CadenceHeight() + } + + // after back-filling is done, increment height by one, + // so we start with the height in the current spork + r.height = r.height + 1 + } + + r.logger.Info(). + Uint64("next-height", r.height). + Msg("backfilling done, subscribe for live data") + + // start the verifier after backfilling since backfilled data is already sealed + if r.verifier != nil { + go func() { + r.verifier.SetStartHeight(r.height) + if err := r.verifier.Run(ctx); err != nil { + r.logger.Fatal().Err(err).Msg("failure running sealing verifier") + return + } + }() + } + + // subscribe in the current spork, handling of context cancellation is done by the producer + for ev := range r.subscribe(ctx, r.height) { + eventsChan <- ev + } + + r.logger.Warn().Msg("ended subscription for events") + }() + + return eventsChan +} + +// subscribe to events by the provided height and handle any errors. +// +// Subscribing to EVM specific events and handle any disconnection errors +// as well as context cancellations. +func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { + eventsChan := make(chan models.BlockEvents) + + var blockHeadersChan <-chan *flow.BlockHeader + var errChan <-chan error + + lastReceivedHeight := height + connect := func(height uint64) error { + var err error + blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight( + ctx, + height, + flow.BlockStatusFinalized, + ) + return err + } + + if err := connect(lastReceivedHeight); err != nil { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf( + "failed to subscribe for finalized block headers on height: %d, with: %w", + height, + err, + ), + ) + close(eventsChan) + return eventsChan + } + + go func() { + defer func() { + close(eventsChan) + }() + + blockHeadersQueue := []flow.BlockHeader{} + + for ctx.Err() == nil { + select { + case <-ctx.Done(): + r.logger.Info().Msg("event ingestion received done signal") + return + + case blockHeader, ok := <-blockHeadersChan: + if !ok { + // typically we receive an error in the errChan before the channels are closes + var err error + err = errs.ErrDisconnected + if ctx.Err() != nil { + err = ctx.Err() + } + eventsChan <- models.NewBlockEventsError(err) + return + } + + blockEvents, err := r.evmEventsForBlock(ctx, blockHeader) + if err != nil && !errors.Is(err, ErrSporkRootBlockHasNoEVMBlocks) { + eventsChan <- models.NewBlockEventsError(err) + return + } + + if r.verifier != nil { + // submit the block events to the verifier for future sealing verification + if err := r.verifier.AddFinalizedBlock(blockEvents); err != nil { + eventsChan <- models.NewBlockEventsError(err) + return + } + } + + // this means that either: + // - the system transaction failed AND there were no EVM transactions + // - this was the spork root block which has no EVM blocks + // In either case, we can skip the block + // Note: put this after the verify step, so we can verify that there were no EVM + // blocks in the sealed data as well + if len(blockEvents.Events) == 0 { + continue + } + + evmEvents := models.NewSingleBlockEvents(blockEvents) + // if events contain an error, or we are in a recovery mode + if evmEvents.Err != nil || r.recovery { + evmEvents = r.recover(ctx, blockEvents, evmEvents.Err) + // if we are still in recovery go to the next event + if r.recovery { + continue + } + } + + for _, evt := range blockEvents.Events { + r.keyLock.NotifyTransaction(evt.TransactionID) + } + lastReceivedHeight = blockHeader.Height + + blockHeadersQueue = append(blockHeadersQueue, *blockHeader) + + // The current `blockHeader` has a status of `BlockStatusFinalized`, + // but calling `NotifyBlock` might fail if the AN has not actually + // finished syncing all collections. + // Hence, we keep a small queue of the incoming block headers, so + // that we can call `NotifyBlock` on block N-15, where N is the + // height of the current block header. This will give enough time + // for the block to be sealed. + if len(blockHeadersQueue) > 15 { + earliestBlockHeader := blockHeadersQueue[0] + r.keyLock.NotifyBlock(earliestBlockHeader) + + blockHeadersQueue = blockHeadersQueue[1:] + } + + eventsChan <- evmEvents + + case err, ok := <-errChan: + if !ok { + // typically we receive an error in the errChan before the channels are closes + var err error + err = errs.ErrDisconnected + if ctx.Err() != nil { + err = ctx.Err() + } + eventsChan <- models.NewBlockEventsError(err) + return + } + + switch status.Code(err) { + case codes.NotFound: + // we can get not found when reconnecting after a disconnect/restart before the + // next block is finalized. just wait briefly and try again + time.Sleep(200 * time.Millisecond) + case codes.DeadlineExceeded, codes.Internal: + // these are sometimes returned when the stream is disconnected by a middleware or the server + default: + // skip reconnect on all other errors + eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) + return + } + + if err := connect(lastReceivedHeight + 1); err != nil { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf( + "failed to resubscribe for finalized block headers on height: %d, with: %w", + lastReceivedHeight+1, + err, + ), + ) + return + } + } + } + }() + + return eventsChan +} + +func (r *RPCBlockTrackingSubscriber) evmEventsForBlock( + ctx context.Context, + blockHeader *flow.BlockHeader, +) (flow.BlockEvents, error) { + eventTypes := blocksFilter(r.chain).EventTypes + + // evm Block events + blockEvents, err := r.getEventsByType(ctx, blockHeader, eventTypes[0]) + if err == nil { + payload, err := events.DecodeBlockEventPayload(blockEvents.Events[0].Value) + if err != nil { + return flow.BlockEvents{}, fmt.Errorf("failed to decode block event payload: %w", err) + } + + if payload.TransactionHashRoot == types.EmptyTxsHash { + return blockEvents, nil + } + } else if errors.Is(err, ErrSystemTransactionFailed) { + r.logger.Warn(). + Uint64("cadence_height", blockHeader.Height). + Str("cadence_block_id", blockHeader.ID.String()). + Msg("no EVM block events: system transaction failed") + + // continue to check for EVM Transaction events since there may still be tx executed events + // even if the system transaction failed + blockEvents = flow.BlockEvents{ + BlockID: blockHeader.ID, + Height: blockHeader.Height, + BlockTimestamp: blockHeader.Timestamp, + } + } else { + return flow.BlockEvents{}, fmt.Errorf("failed to get EVM block event for cadence block %d: %w", + blockHeader.Height, err) + } + + // evm TX events + txEvents, err := r.getEventsByType(ctx, blockHeader, eventTypes[1]) + if err != nil { + return flow.BlockEvents{}, fmt.Errorf("failed to get EVM transaction events for cadence block %d: %w", + blockHeader.Height, err) + } + + // combine block and tx events to be processed together + blockEvents.Events = append(blockEvents.Events, txEvents.Events...) + + return blockEvents, nil +} + +func (r *RPCBlockTrackingSubscriber) getEventsByType( + ctx context.Context, + blockHeader *flow.BlockHeader, + eventType string, +) (flow.BlockEvents, error) { + var evts []flow.BlockEvents + var err error + + // retry until we get the block from an execution node that has the events + for { + evts, err = r.client.GetEventsForBlockHeader( + ctx, + eventType, + blockHeader, + ) + if err != nil { + // retry after a short pause + if status.Code(err) == codes.NotFound || status.Code(err) == codes.ResourceExhausted { + time.Sleep(200 * time.Millisecond) + continue + } + + return flow.BlockEvents{}, fmt.Errorf("failed to get events from access node: %w", err) + } + break + } + + if len(evts) != 1 { + // this shouldn't happen and probably indicates a bug on the Access node. + return flow.BlockEvents{}, fmt.Errorf( + "received unexpected number of BlockEvents from access node: got: %d, expected: 1", + len(evts), + ) + } + event := evts[0] + + // The `EVM.BlockExecuted` event should be present for every Flow block. + if strings.Contains(eventType, string(events.EventTypeBlockExecuted)) && len(event.Events) != 1 { + // The spork root block has no transactions, and therefore no EVM blocks. + if r.client.IsSporkRootBlockHeight(blockHeader.Height) { + return flow.BlockEvents{}, ErrSporkRootBlockHasNoEVMBlocks + } + + missingEventsErr := fmt.Errorf( + "received unexpected number of EVM events in block: got: %d, expected: 1", + len(event.Events), + ) + + // EVM Blocks events are emitted from the system transaction. if the system transaction fails, + // there will be no EVM block events. + // Verify that the system transaction did fail, otherwise return an error + result, err := r.client.GetSystemTransactionResult(ctx, blockHeader.ID) + if err != nil { + return flow.BlockEvents{}, errors.Join( + missingEventsErr, + fmt.Errorf("failed to lookup system transaction result: %w", err), + ) + } + + // system transaction succeeded, return an error since this is an unexpected error case + if result.Error == nil { + return flow.BlockEvents{}, missingEventsErr + } + + // system transaction failed, there will not be any EVM block events + return flow.BlockEvents{}, ErrSystemTransactionFailed + } + + return event, nil +} diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 04d012a0..1442bfdf 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -298,12 +298,12 @@ func (e *Engine) indexEvents(events *models.CadenceEvents, batch *pebbleDB.Batch txHash := tx.Hash() traceResult, err := traceCollector.Collect(txHash) if err != nil { - return err + return fmt.Errorf("failed to collect trace for transaction %s: %w", txHash, err) } err = e.traces.StoreTransaction(txHash, traceResult, batch) if err != nil { - return err + return fmt.Errorf("failed to store trace for transaction %s: %w", txHash, err) } } diff --git a/services/ingestion/sealing_verifier.go b/services/ingestion/sealing_verifier.go new file mode 100644 index 00000000..9e4b21b4 --- /dev/null +++ b/services/ingestion/sealing_verifier.go @@ -0,0 +1,477 @@ +package ingestion + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/onflow/flow-go-sdk" + "github.com/onflow/flow-go-sdk/access" + flowGo "github.com/onflow/flow-go/model/flow" + "github.com/rs/zerolog" + "go.uber.org/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/onflow/flow-evm-gateway/models" + errs "github.com/onflow/flow-evm-gateway/models/errors" + "github.com/onflow/flow-evm-gateway/services/requester" + "github.com/onflow/flow-evm-gateway/storage/pebble" +) + +var _ models.Engine = (*SealingVerifier)(nil) + +// SealingVerifier verifies that soft finality events received over the Access polling API match the +// actually sealed results from the event stream. +type SealingVerifier struct { + *models.EngineStatus + + logger zerolog.Logger + client *requester.CrossSporkClient + chain flowGo.ChainID + + startHeight uint64 + eventsHash *pebble.EventsHash + + // unsealedBlocksToVerify contains the events has for unsealed blocks by the ingestion engine + // Cache the unsealed data until the sealed data is available to verify. + unsealedBlocksToVerify map[uint64]flow.Identifier + + // sealedBlocksToVerify contains the events hash for sealed blocks return by the Access node + // Note: we also track sealed blocks since it's possible for the sealed data stream to get ahead + // of the unsealed data ingestion. In this case, we need to cache the sealed data until the unsealed + // data is available. + sealedBlocksToVerify map[uint64]flow.Identifier + + lastUnsealedHeight *atomic.Uint64 + lastSealedHeight *atomic.Uint64 + + mu sync.Mutex +} + +// NewSealingVerifier creates a new sealing verifier. +func NewSealingVerifier( + logger zerolog.Logger, + client *requester.CrossSporkClient, + chain flowGo.ChainID, + eventsHash *pebble.EventsHash, + startHeight uint64, +) *SealingVerifier { + // startHeight is the first height to verify, which is one block after the last processed height + lastProcessedUnsealedHeight := startHeight + if lastProcessedUnsealedHeight > 0 { + lastProcessedUnsealedHeight-- + } + + return &SealingVerifier{ + EngineStatus: models.NewEngineStatus(), + logger: logger.With().Str("component", "sealing_verifier").Logger(), + client: client, + chain: chain, + startHeight: startHeight, + eventsHash: eventsHash, + unsealedBlocksToVerify: make(map[uint64]flow.Identifier), + sealedBlocksToVerify: make(map[uint64]flow.Identifier), + lastUnsealedHeight: atomic.NewUint64(lastProcessedUnsealedHeight), + lastSealedHeight: atomic.NewUint64(0), + } +} + +// Stop the engine. +func (v *SealingVerifier) Stop() { + v.MarkDone() + <-v.Stopped() +} + +// SetStartHeight sets the start height for the sealing verifier. +// This is used to update the height when backfilling to skip verification of already sealed blocks. +func (v *SealingVerifier) SetStartHeight(height uint64) { + v.startHeight = height +} + +// AddFinalizedBlock adds events for an unsealed block to the sealing verifier for verification when +// the sealed data is received. +func (v *SealingVerifier) AddFinalizedBlock(events flow.BlockEvents) error { + return v.onUnsealedEvents(events) +} + +// Run executes the sealing verifier. +// This method will block until the context is canceled or an error occurs. +func (v *SealingVerifier) Run(ctx context.Context) error { + defer v.MarkStopped() + + lastVerifiedHeight, err := v.eventsHash.ProcessedSealedHeight() + if err != nil { + if !errors.Is(err, errs.ErrStorageNotInitialized) { + return fmt.Errorf("failed to get processed sealed height: %w", err) + } + + // lastVerifiedHeight should be the block before the startHeight + // handle the case where startHeight is 0 like when running with the emulator + if lastVerifiedHeight = v.startHeight; lastVerifiedHeight > 0 { + lastVerifiedHeight = v.startHeight - 1 + } + if err := v.eventsHash.SetProcessedSealedHeight(lastVerifiedHeight); err != nil { + return fmt.Errorf("failed to initialize processed sealed height: %w", err) + } + } + v.lastSealedHeight.Store(lastVerifiedHeight) + startHeight := v.lastSealedHeight.Load() + 1 + + if v.client.IsPastSpork(startHeight) { + if err := v.backfill(ctx, startHeight); err != nil { + return fmt.Errorf("failed to backfill: %w", err) + } + startHeight = v.lastSealedHeight.Load() + 1 + } + + var eventsChan <-chan flow.BlockEvents + var errChan <-chan error + + subscriptionCtx, cancel := context.WithCancel(ctx) + defer cancel() + + reconnectHeight := startHeight + connect := func(height uint64) error { + var err error + for { + eventsChan, errChan, err = v.client.SubscribeEventsByBlockHeight( + subscriptionCtx, + height, + blocksFilter(v.chain), + access.WithHeartbeatInterval(1), + ) + + if err != nil { + // access node has not sealed the next height yet, wait and try again + // this typically happens when the AN reboots and the stream is reconnected before + // it has sealed the next block + if strings.Contains(err.Error(), "could not get start height") && + strings.Contains(err.Error(), "higher than highest indexed height") { + v.logger.Info().Err(err).Uint64("height", height).Msg("waiting for start block to be sealed") + time.Sleep(time.Second) + continue + } + return err + } + + return nil + } + } + + v.logger.Info(). + Uint64("start_sealed_height", startHeight). + Uint64("start_unsealed_height", v.lastUnsealedHeight.Load()+1). + Msg("starting verifier") + + if err := connect(reconnectHeight); err != nil { + return fmt.Errorf("failed to subscribe for finalized block events on height: %d, with: %w", reconnectHeight, err) + } + + v.MarkReady() + for { + select { + case <-ctx.Done(): + v.logger.Info().Msg("received done signal") + return nil + + case <-v.Done(): + v.logger.Info().Msg("received stop signal") + cancel() + return nil + + case sealedEvents, ok := <-eventsChan: + if !ok { + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("failed to receive block events: %w", err) + } + + if err := v.onSealedEvents(sealedEvents); err != nil { + return fmt.Errorf("failed to process sealed events: %w", err) + } + + reconnectHeight = sealedEvents.Height + 1 + + case err, ok := <-errChan: + if !ok { + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("failed to receive block events: %w", err) + } + + switch status.Code(err) { + case codes.NotFound: + // we can get not found when reconnecting after a disconnect/restart before the + // next block is finalized. just wait briefly and try again + time.Sleep(200 * time.Millisecond) + case codes.DeadlineExceeded, codes.Internal: + // these are sometimes returned when the stream is disconnected by a middleware or the server + default: + // skip reconnect on all other errors + return fmt.Errorf("%w: %w", errs.ErrDisconnected, err) + } + + if err := connect(reconnectHeight); err != nil { + return fmt.Errorf("failed to resubscribe for finalized block headers on height: %d, with: %w", reconnectHeight, err) + } + } + } +} + +// backfill fetches EVM events for blocks before the current spork's root block, and adds them to the +// verifier's sealed events cache. +func (v *SealingVerifier) backfill(ctx context.Context, height uint64) error { + eventTypes := blocksFilter(v.chain).EventTypes + + sporkRootHeight := v.client.CurrentSporkRootHeight() + + v.logger.Info(). + Uint64("start_height", height). + Uint64("end_height", sporkRootHeight-1). + Msg("backfilling verifier") + + startHeight := height + for { + endHeight := startHeight + maxRangeForGetEvents + if endHeight >= sporkRootHeight { + endHeight = sporkRootHeight - 1 + } + + // this will fail if the start and end heights are now in the same spork. The only way that + // could happen is if we skipped a spork. That's unlikely to happen, so not handling the case + // for now. + blockEvents, err := v.client.GetEventsForHeightRange(ctx, eventTypes[0], startHeight, endHeight) + if err != nil { + return fmt.Errorf("failed to get events for height range %d-%d: %w", startHeight, endHeight, err) + } + + txEvents, err := v.client.GetEventsForHeightRange(ctx, eventTypes[1], startHeight, endHeight) + if err != nil { + return fmt.Errorf("failed to get events for height range %d-%d: %w", startHeight, endHeight, err) + } + + // it's technically possible that the end height is modified by the Access node to match the + // sealed height. Since this is backfill mode, the AN is a historic AN so it is not sealing + // any new blocks. Therefore, both requests must have the same number of events. + if len(blockEvents) != len(txEvents) { + return fmt.Errorf("received unexpected number of events for height range %d-%d: %d != %d", startHeight, endHeight, len(blockEvents), len(txEvents)) + } + + var currentBlockTxEvents []flow.Event + for i, blockEvent := range blockEvents { + currentBlockTxEvents = append(currentBlockTxEvents, txEvents[i].Events...) + + // if the system transaction failed, there won't be an EVM block event, but there may + // be EVM transactions. Group all transactions into the next block. + if len(blockEvent.Events) != 1 { + v.logger.Warn(). + Uint64("height", blockEvent.Height). + Str("block_id", blockEvent.BlockID.String()). + Msg("missing evm block event. will accumulate transactions into the next block") + continue + } + + blockEvent.Events = append(blockEvent.Events, currentBlockTxEvents...) + if err := v.onSealedEvents(blockEvent); err != nil { + return fmt.Errorf("failed to verify block events for height %d: %w", blockEvent.Height, err) + } + + // transactions sucessessfully grouped with a block. reset the list + currentBlockTxEvents = nil + } + + startHeight = endHeight + 1 + if startHeight >= sporkRootHeight { + return nil + } + } +} + +// onSealedEvents processes sealed events +// if unsealed events are found for the same height, the events are verified. +// otherwise, the sealed events are cached for future verification. +func (v *SealingVerifier) onSealedEvents(sealedEvents flow.BlockEvents) error { + // Note: there should be an unsealed event entry, even for blocks with no transactions + + // ensure we have received sealed data for all blocks + if sealedEvents.Height > 0 && !v.lastSealedHeight.CompareAndSwap(sealedEvents.Height-1, sealedEvents.Height) { + // note: this conditional skips updating the lastSealedHeight if the height is 0. this is + // desired since it will be the last height when we process block 1. + return fmt.Errorf("received sealed events out of order: expected %d, got %d", v.lastSealedHeight.Load()+1, sealedEvents.Height) + } + + sealedHash, err := CalculateHash(sealedEvents) + if err != nil { + return fmt.Errorf("failed to calculate hash for sealed events for height %d: %w", sealedEvents.Height, err) + } + + v.mu.Lock() + defer v.mu.Unlock() + + unsealedHash, err := v.getUnsealedEventsHash(sealedEvents.Height) + + // cache the sealed hash if + // 1. we haven't processed the unsealed data for this block yet + // 2. we have the data, but the state was rolled back to a previous height. In this case, wait + // until we've reprocessed data for the height. + if errors.Is(err, errs.ErrEntityNotFound) || sealedEvents.Height > v.lastUnsealedHeight.Load() { + // we haven't processed the unsealed data for this block yet, cache the sealed hash + v.sealedBlocksToVerify[sealedEvents.Height] = sealedHash + return nil + } + if err != nil { + return fmt.Errorf("no unsealed events found for height %d: %w", sealedEvents.Height, err) + } + + if err := v.verifyBlock(sealedEvents.Height, sealedHash, unsealedHash); err != nil { + v.logger.Fatal().Err(err). + Uint64("height", sealedEvents.Height). + Str("block_id", sealedEvents.BlockID.String()). + Msg("failed to verify block events") + return fmt.Errorf("failed to verify block events for %d: %w", sealedEvents.Height, err) + } + + v.logger.Info(). + Uint64("height", sealedEvents.Height). + Int("num_events", len(sealedEvents.Events)). + Msg("verified height from sealed events") + + return nil +} + +// onUnsealedEvents processes unsealed events. +// if sealed events are found for the same height, the events are verified. +// otherwise, the unsealed events are cached for future verification. +func (v *SealingVerifier) onUnsealedEvents(unsealedEvents flow.BlockEvents) error { + unsealedHash, err := CalculateHash(unsealedEvents) + if err != nil { + return fmt.Errorf("failed to calculate hash for block %d: %w", unsealedEvents.Height, err) + } + + v.mu.Lock() + defer v.mu.Unlock() + + // note: do this inside the lock to avoid a race with onSealedEvents + if err := v.eventsHash.Store(unsealedEvents.Height, unsealedHash); err != nil { + return fmt.Errorf("failed to store events hash for block %d: %w", unsealedEvents.Height, err) + } + + // update the last unsealed height after successfully storing the hash + if unsealedEvents.Height > 0 && !v.lastUnsealedHeight.CompareAndSwap(unsealedEvents.Height-1, unsealedEvents.Height) { + // note: this conditional skips updating the lastUnsealedHeight if the height is 0. this is + // desired since it will be the last height when we process block 1. + return fmt.Errorf("received unsealed events out of order: expected %d, got %d", v.lastUnsealedHeight.Load()+1, unsealedEvents.Height) + } + + sealedHash, ok := v.sealedBlocksToVerify[unsealedEvents.Height] + if !ok { + v.unsealedBlocksToVerify[unsealedEvents.Height] = unsealedHash + return nil + } + + if err := v.verifyBlock(unsealedEvents.Height, sealedHash, unsealedHash); err != nil { + v.logger.Fatal().Err(err). + Uint64("height", unsealedEvents.Height). + Str("block_id", unsealedEvents.BlockID.String()). + Msg("failed to verify block events") + return fmt.Errorf("failed to verify block events for %d: %w", unsealedEvents.Height, err) + } + + v.logger.Info(). + Uint64("height", unsealedEvents.Height). + Int("num_events", len(unsealedEvents.Events)). + Msg("verified height from unsealed events") + + return nil +} + +// getUnsealedEventsHash returns the events hash for the given height without taking a lock +func (v *SealingVerifier) getUnsealedEventsHash(height uint64) (flow.Identifier, error) { + if hash, ok := v.unsealedBlocksToVerify[height]; ok { + return hash, nil + } + + hash, err := v.eventsHash.GetByHeight(height) + if err != nil { + return flow.Identifier{}, fmt.Errorf("failed to get events hash for block %d: %w", height, err) + } + + // note: don't cache it here since we will usually not revisit this height + + return hash, nil +} + +// verifyBlock verifies that the hash of the sealed events matches the hash of unsealed events stored +// for the same height. +func (v *SealingVerifier) verifyBlock(height uint64, sealedHash, unsealedHash flow.Identifier) error { + // always delete since we will crash on error anyway + defer delete(v.unsealedBlocksToVerify, height) + defer delete(v.sealedBlocksToVerify, height) + + if sealedHash != unsealedHash { + return fmt.Errorf("event hash mismatch: expected %s, got %s", sealedHash, unsealedHash) + } + + if err := v.eventsHash.SetProcessedSealedHeight(height); err != nil { + return fmt.Errorf("failed to store processed sealed height: %w", err) + } + + return nil +} + +// CalculateHash calculates the hash of the given block events object. +func CalculateHash(events flow.BlockEvents) (flow.Identifier, error) { + // convert to strip cadence payload objects + converted, err := convertFlowBlockEvents(events) + if err != nil { + return flow.Identifier{}, err + } + + hash := flowGo.MakeID(converted) + return flow.BytesToID(hash[:]), nil +} + +// convertFlowBlockEvents converts a flow.BlockEvents (flow-go-sdk) to a flowGo.BlockEvents (flow-go). +func convertFlowBlockEvents(events flow.BlockEvents) (flowGo.BlockEvents, error) { + blockID, err := flowGo.ByteSliceToId(events.BlockID.Bytes()) + if err != nil { + return flowGo.BlockEvents{}, fmt.Errorf("failed to convert block ID: %w", err) + } + + flowEvents := make([]flowGo.Event, len(events.Events)) + for i, e := range events.Events { + txID, err := flowGo.ByteSliceToId(e.TransactionID.Bytes()) + if err != nil { + return flowGo.BlockEvents{}, fmt.Errorf("failed to convert transaction ID %s: %w", e.TransactionID.Hex(), err) + } + flowEvents[i] = flowGo.Event{ + Type: flowGo.EventType(e.Type), + TransactionID: txID, + TransactionIndex: uint32(e.TransactionIndex), + EventIndex: uint32(e.EventIndex), + Payload: e.Payload, + } + } + + // need canonical order before hashing + sort.Slice(flowEvents, func(i, j int) bool { + if flowEvents[i].TransactionIndex != flowEvents[j].TransactionIndex { + return flowEvents[i].TransactionIndex < flowEvents[j].TransactionIndex + } + return flowEvents[i].EventIndex < flowEvents[j].EventIndex + }) + + return flowGo.BlockEvents{ + BlockID: blockID, + BlockHeight: events.Height, + BlockTimestamp: events.BlockTimestamp, + Events: flowEvents, + }, nil +} diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 60053663..0e245dd0 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -3,16 +3,18 @@ package requester import ( "context" "fmt" - "slices" "github.com/hashicorp/go-multierror" "github.com/onflow/cadence" - errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" + "github.com/onflow/flow-go-sdk/access/grpc" flowGo "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" "go.uber.org/ratelimit" + "golang.org/x/exp/slices" + + errs "github.com/onflow/flow-evm-gateway/models/errors" ) type sporkClient struct { @@ -59,6 +61,7 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error { Msg("adding spork client") *s = append(*s, &sporkClient{ + // must use NodeRootBlockHeight here, since this is the first height available on the node. firstHeight: info.NodeRootBlockHeight, lastHeight: header.Height, client: client, @@ -106,9 +109,9 @@ func (s *sporkClients) continuous() bool { // Any API that supports cross-spork access must have a defined function // that shadows the original access Client function. type CrossSporkClient struct { - logger zerolog.Logger - sporkClients sporkClients - currentSporkFirstHeight uint64 + logger zerolog.Logger + sporkClients sporkClients + currentSporkRootHeight uint64 access.Client } @@ -120,6 +123,7 @@ func NewCrossSporkClient( logger zerolog.Logger, chainID flowGo.ChainID, ) (*CrossSporkClient, error) { + sporkRootBlockHeight := uint64(0) nodeRootBlockHeight := uint64(0) // Temp fix due to the fact that Emulator does not support the @@ -129,6 +133,19 @@ func NewCrossSporkClient( if err != nil { return nil, fmt.Errorf("failed to get node version info: %w", err) } + + if info.SporkRootBlockHeight != info.NodeRootBlockHeight { + logger.Warn(). + Uint64("spork-root-block-height", info.SporkRootBlockHeight). + Uint64("node-root-block-height", info.NodeRootBlockHeight). + Msg("spork root block height is not equal to node root block height. syncing may fail due to missing blocks.") + } + + // It's possible that the node's SporkRootBlockHeight != NodeRootBlockHeight if the node was + // bootstrapped with a block after the spork root block. In this case, using SporkRootBlockHeight + // for the checks in IsPastSpork and IsSporkRootBlockHeight is correct. However, there may be + // missing blocks in the gap between the roots. + sporkRootBlockHeight = info.SporkRootBlockHeight nodeRootBlockHeight = info.NodeRootBlockHeight } @@ -143,17 +160,36 @@ func NewCrossSporkClient( return nil, fmt.Errorf("provided past-spork clients don't create a continuous range of heights") } + // at this point, we've verified that the past spork clients form a continue range of heights. + // next, make sure the the last spork client forms a continuous range with the current spork + // client's node root block height. Note: this must be the NodeRootBlockHeight, not the + // SporkRootBlockHeight, since this is the first height available on the node. + if len(clients) > 0 && clients[len(clients)-1].lastHeight+1 != nodeRootBlockHeight { + return nil, fmt.Errorf("provided past-spork clients don't end at the spork root block height (%d != %d-1)", + clients[len(clients)-1].lastHeight, nodeRootBlockHeight) + } + return &CrossSporkClient{ - logger: logger, - currentSporkFirstHeight: nodeRootBlockHeight, - sporkClients: clients, - Client: currentSpork, + logger: logger, + currentSporkRootHeight: sporkRootBlockHeight, + sporkClients: clients, + Client: currentSpork, }, nil } // IsPastSpork will check if the provided height is contained in the previous sporks. func (c *CrossSporkClient) IsPastSpork(height uint64) bool { - return height < c.currentSporkFirstHeight + return height < c.currentSporkRootHeight +} + +// IsSporkRootBlockHeight will check if the provided height is the spork root block height. +func (c *CrossSporkClient) IsSporkRootBlockHeight(height uint64) bool { + return height == c.currentSporkRootHeight +} + +// CurrentSporkRootHeight returns the spork root block height of the current spork. +func (c *CrossSporkClient) CurrentSporkRootHeight() uint64 { + return c.currentSporkRootHeight } // getClientForHeight returns the client for the given height that contains the height range. @@ -249,6 +285,35 @@ func (c *CrossSporkClient) GetEventsForHeightRange( return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } +func (c *CrossSporkClient) GetEventsForBlockHeader( + ctx context.Context, + eventType string, + blockHeader *flow.BlockHeader, +) ([]flow.BlockEvents, error) { + client, err := c.getClientForHeight(blockHeader.Height) + if err != nil { + return nil, err + } + return client.GetEventsForBlockIDs(ctx, eventType, []flow.Identifier{blockHeader.ID}) +} + +func (c *CrossSporkClient) SubscribeBlockHeadersFromStartHeight( + ctx context.Context, + startHeight uint64, + blockStatus flow.BlockStatus, +) (<-chan *flow.BlockHeader, <-chan error, error) { + client, err := c.getClientForHeight(startHeight) + if err != nil { + return nil, nil, err + } + grpcClient, ok := (client).(*grpc.Client) + if !ok { + return nil, nil, fmt.Errorf("unable to convert to Flow grpc.Client") + } + + return grpcClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus) +} + func (c *CrossSporkClient) Close() error { var merr *multierror.Error diff --git a/services/requester/cross-spork_client_test.go b/services/requester/cross-spork_client_test.go index 32c22930..800202e4 100644 --- a/services/requester/cross-spork_client_test.go +++ b/services/requester/cross-spork_client_test.go @@ -73,14 +73,15 @@ func Test_CrossSporkClients(t *testing.T) { } func Test_CrossSpork(t *testing.T) { - t.Run("client", func(t *testing.T) { - past1Last := uint64(300) - past2Last := uint64(500) - currentLast := uint64(1000) - current := testutils.SetupClientForRange(501, currentLast) - past1 := testutils.SetupClientForRange(100, past1Last) - past2 := testutils.SetupClientForRange(301, past2Last) - + past1Last := uint64(300) + past2Last := uint64(500) + currentSporkRootHeight := uint64(501) + currentLast := uint64(1000) + past1 := testutils.SetupClientForRange(100, past1Last) + past2 := testutils.SetupClientForRange(301, past2Last) + current := testutils.SetupClientForRange(currentSporkRootHeight, currentLast) + + t.Run("clients form continuous range of heights", func(t *testing.T) { client, err := NewCrossSporkClient( current, []access.Client{past2, past1}, @@ -136,4 +137,37 @@ func Test_CrossSpork(t *testing.T) { require.ErrorContains(t, err, "invalid height not in available range: 10") }) + + t.Run("gap between current's spork root and node root heights", func(t *testing.T) { + current.GetNodeVersionInfoFunc = func(ctx context.Context) (*flow.NodeVersionInfo, error) { + return &flow.NodeVersionInfo{ + NodeRootBlockHeight: currentSporkRootHeight + 10, + SporkRootBlockHeight: currentSporkRootHeight, + }, nil + } + + client, err := NewCrossSporkClient( + current, + []access.Client{past2, past1}, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.Error(t, err) + require.ErrorContains(t, err, "provided past-spork clients don't end at the spork root block height") + require.Nil(t, client) + }) + + t.Run("gap between past spork end height, and current spork root height", func(t *testing.T) { + currentWithGap := testutils.SetupClientForRange(currentSporkRootHeight+1, currentLast) + + client, err := NewCrossSporkClient( + currentWithGap, + []access.Client{past2, past1}, + zerolog.Nop(), + flowGo.Previewnet, + ) + require.Error(t, err) + require.ErrorContains(t, err, "provided past-spork clients don't end at the spork root block height") + require.Nil(t, client) + }) } diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index 2165b765..56519c78 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -97,7 +97,8 @@ func SetupClient(startHeight uint64, endHeight uint64) (*MockClient, chan flow.B }, GetNodeVersionInfoFunc: func(ctx context.Context) (*flow.NodeVersionInfo, error) { return &flow.NodeVersionInfo{ - NodeRootBlockHeight: startHeight, + NodeRootBlockHeight: startHeight, + SporkRootBlockHeight: startHeight, }, nil }, SubscribeEventsByBlockHeightFunc: func( diff --git a/storage/pebble/db.go b/storage/pebble/db.go index 5e3c0f89..54707a25 100644 --- a/storage/pebble/db.go +++ b/storage/pebble/db.go @@ -5,6 +5,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" + "github.com/rs/zerolog/log" ) // OpenDB opens a new pebble database at the provided directory. @@ -60,3 +61,24 @@ func OpenDB(dir string) (*pebble.DB, error) { } return db, nil } + +func WithBatch(store *Storage, f func(batch *pebble.Batch) error) error { + batch := store.NewBatch() + defer func(batch *pebble.Batch) { + err := batch.Close() + if err != nil { + log.Fatal().Err(err).Msg("failed to close batch") + } + }(batch) + + err := f(batch) + if err != nil { + return err + } + + if err := batch.Commit(pebble.Sync); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + + return nil +} diff --git a/storage/pebble/events_hash.go b/storage/pebble/events_hash.go new file mode 100644 index 00000000..b6e53997 --- /dev/null +++ b/storage/pebble/events_hash.go @@ -0,0 +1,87 @@ +package pebble + +import ( + "encoding/binary" + "errors" + "fmt" + + pebbleDB "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go-sdk" + + errs "github.com/onflow/flow-evm-gateway/models/errors" +) + +type EventsHash struct { + store *Storage +} + +func NewEventsHash(store *Storage) *EventsHash { + return &EventsHash{ + store: store, + } +} + +func (e *EventsHash) Store(height uint64, hash flow.Identifier) error { + return WithBatch(e.store, func(batch *pebbleDB.Batch) error { + if err := e.store.set(eventsHashKey, uint64Bytes(height), hash.Bytes(), batch); err != nil { + return fmt.Errorf( + "failed to store events hash for block %d, with: %w", + height, + err, + ) + } + return nil + }) +} + +func (e *EventsHash) GetByHeight(height uint64) (flow.Identifier, error) { + hash, err := e.store.get(eventsHashKey, uint64Bytes(height)) + if err != nil { + return flow.Identifier{}, fmt.Errorf("failed to get events hash for block %d, with: %w", height, err) + } + + return flow.BytesToID(hash), nil +} + +// RemoveAboveHeight removes all stored events hashes above the given height (exclusive). +func (e *EventsHash) BatchRemoveAboveHeight(height uint64, batch *pebbleDB.Batch) error { + for { + height++ // skip the current height + if _, err := e.GetByHeight(height); err != nil { + if errors.Is(err, errs.ErrEntityNotFound) { + // event hashes are inserted in order with no gaps, so we can stop at the first + // missing hash + return nil + } + return err + } + if err := e.store.delete(eventsHashKey, uint64Bytes(height), batch); err != nil { + return err + } + } +} + +func (e *EventsHash) ProcessedSealedHeight() (uint64, error) { + val, err := e.store.get(sealedEventsHeightKey) + if err != nil { + if errors.Is(err, errs.ErrEntityNotFound) { + return 0, errs.ErrStorageNotInitialized + } + return 0, fmt.Errorf("failed to get latest processed sealed height: %w", err) + } + + return binary.BigEndian.Uint64(val), nil +} + +func (e *EventsHash) SetProcessedSealedHeight(height uint64) error { + return WithBatch(e.store, func(batch *pebbleDB.Batch) error { + return e.BatchSetProcessedSealedHeight(height, batch) + }) +} + +func (e *EventsHash) BatchSetProcessedSealedHeight(height uint64, batch *pebbleDB.Batch) error { + if err := e.store.set(sealedEventsHeightKey, nil, uint64Bytes(height), batch); err != nil { + return fmt.Errorf("failed to store latest processed sealed height: %d, with: %w", height, err) + } + return nil +} diff --git a/storage/pebble/keys.go b/storage/pebble/keys.go index aa46b61a..36056de6 100644 --- a/storage/pebble/keys.go +++ b/storage/pebble/keys.go @@ -30,6 +30,9 @@ const ( // special keys latestEVMHeightKey = byte(100) latestCadenceHeightKey = byte(102) + + eventsHashKey = byte(150) + sealedEventsHeightKey = byte(151) ) // makePrefix makes a key used internally to store the values diff --git a/storage/pebble/storage.go b/storage/pebble/storage.go index 94c420cf..d6e01965 100644 --- a/storage/pebble/storage.go +++ b/storage/pebble/storage.go @@ -58,6 +58,12 @@ func (s *Storage) get(keyCode byte, key ...[]byte) ([]byte, error) { return cp, nil } +func (s *Storage) delete(keyCode byte, key []byte, batch *pebble.Batch) error { + prefixedKey := makePrefix(keyCode, key) + + return batch.Delete(prefixedKey, nil) +} + func (s *Storage) NewBatch() *pebble.Batch { return s.db.NewBatch() }