Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/substreams-sink-mongodb/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var sinkRunCmd = Command(sinkRunE,
RangeArgs(6, 7),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags)
flags.Uint64("cursor-checkpoint-interval", 100, "Number of blocks between cursor checkpoints (0 = only save on shutdown)")
}),
OnCommandErrorLogAndExit(zlog),
)
Expand Down Expand Up @@ -76,7 +77,12 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("unable to setup sinker: %w", err)
}

mongoSinker, err := sinker.New(sink, mongoLoader, tables, zlog, tracer)
cursorCheckpointInterval, err := cmd.Flags().GetUint64("cursor-checkpoint-interval")
if err != nil {
return fmt.Errorf("unable to get cursor-checkpoint-interval flag: %w", err)
}

mongoSinker, err := sinker.New(sink, mongoLoader, tables, zlog, tracer, cursorCheckpointInterval)
if err != nil {
return fmt.Errorf("unable to setup mongo sinker: %w", err)
}
Expand Down
58 changes: 51 additions & 7 deletions mongo/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)

var ErrCursorNotFound = errors.New("cursor not found")
Expand All @@ -25,13 +26,18 @@ func (l *Loader) GetCursor(ctx context.Context, outputModuleHash string) (*sink.
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

l.logger.Debug("retrieving cursor from database",
zap.String("module_hash", outputModuleHash))

res := l.database.Collection("_cursors").FindOne(
ctx,
bson.M{"id": outputModuleHash},
)

if res.Err() != nil {
if res.Err() == mongo.ErrNoDocuments {
l.logger.Debug("cursor not found in database",
zap.String("module_hash", outputModuleHash))
return nil, ErrCursorNotFound
}
return nil, fmt.Errorf("getting cursor %q: %w", outputModuleHash, res.Err())
Expand All @@ -42,31 +48,69 @@ func (l *Loader) GetCursor(ctx context.Context, outputModuleHash string) (*sink.
return nil, fmt.Errorf("decoding cursor %q: %w", outputModuleHash, err)
}

return sink.NewCursor(c.Cursor)
l.logger.Debug("cursor document found in database",
zap.String("module_hash", c.Id),
zap.Int("cursor_length", len(c.Cursor)),
zap.Uint64("block_num", c.BlockNum),
zap.String("block_id", c.BlockID))

cursor, err := sink.NewCursor(c.Cursor)
if err != nil {
return nil, fmt.Errorf("creating cursor from string: %w", err)
}

l.logger.Debug("cursor successfully parsed",
zap.Bool("is_blank", cursor.IsBlank()),
zap.Stringer("block", cursor.Block()))

return cursor, nil
}

func (l *Loader) WriteCursor(ctx context.Context, moduleHash string, c *sink.Cursor) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

l.logger.Debug("writing cursor to database",
zap.String("module_hash", moduleHash),
zap.Int("cursor_length", len(c.String())),
zap.Uint64("block_num", c.Block().Num()),
zap.String("block_id", c.Block().ID()),
zap.Bool("is_blank", c.IsBlank()))

doc := cursorDocument{
Id: moduleHash,
Cursor: c.String(),
BlockNum: c.Block().Num(),
BlockID: c.Block().ID(),
}

filter := bson.M{"id": moduleHash}
update := bson.M{"$set": cursorDocument{Id: moduleHash, Cursor: c.String(), BlockNum: c.Block().Num(), BlockID: c.Block().ID()}}
update := bson.M{"$set": doc}

res, err := l.database.Collection("_cursors").UpdateOne(ctx, filter, update, options.Update().SetUpsert(true))
if err != nil {
return fmt.Errorf("updating cursor %q: %w", moduleHash, err)
return fmt.Errorf("upserting cursor %q: %w", moduleHash, err)
}

// Verify the operation succeeded
if res.UpsertedCount > 0 || res.ModifiedCount > 0 || res.MatchedCount > 0 {
l.logger.Debug("cursor successfully written",
zap.String("module_hash", moduleHash),
zap.Uint64("block_num", c.Block().Num()),
zap.Bool("was_insert", res.UpsertedCount > 0),
zap.Bool("was_update", res.ModifiedCount > 0))
return nil
}

// else we need to insert it

_, err = l.database.Collection("_cursors").InsertOne(ctx, cursorDocument{Id: moduleHash, Cursor: c.String(), BlockNum: c.Block().Num(), BlockID: c.Block().ID()})
// Fallback: try explicit insert if update reports no operation
_, err = l.database.Collection("_cursors").InsertOne(ctx, doc)
if err != nil {
return fmt.Errorf("inserting cursor %q: %w", moduleHash, err)
return fmt.Errorf("inserting cursor %q: %w", moduleHash, err)
}

l.logger.Debug("cursor successfully inserted (fallback)",
zap.String("module_hash", moduleHash),
zap.Uint64("block_num", c.Block().Num()))

return nil
}
47 changes: 41 additions & 6 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ type MongoSinker struct {
logger *zap.Logger
tracer logging.Tracer

stats *Stats
lastCursor *sink.Cursor
stats *Stats
lastCursor *sink.Cursor
cursorCheckpointInterval uint64
}

func New(sink *sink.Sinker, loader *mongo.Loader, tables mongo.Tables, logger *zap.Logger, tracer logging.Tracer) (*MongoSinker, error) {
func New(sink *sink.Sinker, loader *mongo.Loader, tables mongo.Tables, logger *zap.Logger, tracer logging.Tracer, cursorCheckpointInterval uint64) (*MongoSinker, error) {
s := &MongoSinker{
Shutter: shutter.New(),
Sinker: sink,
Expand All @@ -42,7 +43,8 @@ func New(sink *sink.Sinker, loader *mongo.Loader, tables mongo.Tables, logger *z
logger: logger,
tracer: tracer,

stats: NewStats(logger),
stats: NewStats(logger),
cursorCheckpointInterval: cursorCheckpointInterval,
}

s.OnTerminating(func(err error) {
Expand All @@ -55,16 +57,33 @@ func New(sink *sink.Sinker, loader *mongo.Loader, tables mongo.Tables, logger *z
}

func (s *MongoSinker) writeLastCursor(ctx context.Context, err error) {
if s.lastCursor == nil || err != nil {
if s.lastCursor == nil {
return
}

_ = s.loader.WriteCursor(ctx, s.OutputModuleHash(), s.lastCursor)
// Log whether we're saving on error or graceful shutdown
if err != nil {
s.logger.Info("saving cursor on error shutdown",
zap.Stringer("cursor_block", s.lastCursor.Block()),
zap.Error(err))
}

if writeErr := s.loader.WriteCursor(ctx, s.OutputModuleHash(), s.lastCursor); writeErr != nil {
s.logger.Error("failed to write cursor on shutdown",
zap.Stringer("cursor_block", s.lastCursor.Block()),
zap.Error(writeErr))
} else {
s.logger.Info("cursor saved on shutdown",
zap.Stringer("cursor_block", s.lastCursor.Block()))
}
}

func (s *MongoSinker) Run(ctx context.Context) {
cursor, err := s.loader.GetCursor(ctx, s.OutputModuleHash())
if err != nil && !errors.Is(err, mongo.ErrCursorNotFound) {
s.logger.Error("unable to retrieve cursor",
zap.String("module_hash", s.OutputModuleHash()),
zap.Error(err))
s.Shutdown(fmt.Errorf("unable to retrieve cursor: %w", err))
return
}
Expand Down Expand Up @@ -110,6 +129,22 @@ func (s *MongoSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstre

s.lastCursor = cursor

// Save cursor periodically if interval is configured
if s.cursorCheckpointInterval > 0 && data.Clock.Number%s.cursorCheckpointInterval == 0 {
// Use background context since we don't want to block block processing
saveCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := s.loader.WriteCursor(saveCtx, s.OutputModuleHash(), cursor)
cancel()
if err != nil {
s.logger.Warn("failed to save cursor periodically",
zap.Uint64("block_num", data.Clock.Number),
zap.Error(err))
} else {
s.logger.Debug("cursor saved periodically",
zap.Uint64("block_num", data.Clock.Number))
}
}

return nil
}

Expand Down