Skip to content

Commit 5e84e49

Browse files
authored
feat: auto-recover from pruned node errors during extraction (#80)
1 parent 52e2bc7 commit 5e84e49

File tree

14 files changed

+396
-32
lines changed

14 files changed

+396
-32
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ on:
55
pull_request:
66

77
env:
8-
GO_VERSION: '1.25.4'
8+
GO_VERSION: '1.25.5'
99
GORELEASER_VERSION: 'v1.26.2'
1010

1111
concurrency:

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
packages: write # if you also publish containers/packages via GoReleaser
2121

2222
env:
23-
GO_VERSION: '1.25.4'
23+
GO_VERSION: '1.25.5'
2424
GORELEASER_VERSION: 'v1.26.2'
2525

2626
steps:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Off-chain indexing of block & transaction data.
1616

1717
## Requirements
1818

19-
- Go 1.25.4
19+
- Go 1.25.5
2020
- Docker & Docker Compose (optional)
2121
- CosmosSDK >= 0.50 (chain to index)
2222

cmd/yaci/extract.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import (
88
"os/signal"
99
"syscall"
1010

11-
"github.com/manifest-network/yaci/internal/client"
12-
"github.com/manifest-network/yaci/internal/config"
1311
"github.com/spf13/cobra"
1412
"github.com/spf13/viper"
13+
14+
"github.com/manifest-network/yaci/internal/client"
15+
"github.com/manifest-network/yaci/internal/config"
1516
)
1617

1718
var (

cmd/yaci/postgres.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66

77
"github.com/jackc/pgx/v5/pgxpool"
88
"github.com/jackc/pgx/v5/stdlib"
9+
"github.com/spf13/cobra"
10+
"github.com/spf13/viper"
11+
912
"github.com/manifest-network/yaci/internal/metrics"
1013
"github.com/manifest-network/yaci/internal/output/postgresql"
1114
"github.com/manifest-network/yaci/internal/utils"
12-
"github.com/spf13/cobra"
13-
"github.com/spf13/viper"
1415

1516
"github.com/manifest-network/yaci/internal/config"
1617
"github.com/manifest-network/yaci/internal/extractor"

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/manifest-network/yaci
22

3-
go 1.25.4
3+
go 1.25.5
44

55
require (
66
github.com/DATA-DOG/go-sqlmock v1.5.2

internal/client/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import (
77
"os"
88
"time"
99

10-
"github.com/manifest-network/yaci/internal/reflection"
1110
"google.golang.org/grpc"
1211
"google.golang.org/grpc/credentials"
1312
"google.golang.org/grpc/keepalive"
13+
14+
"github.com/manifest-network/yaci/internal/reflection"
1415
)
1516

1617
var keepaliveParams = keepalive.ClientParameters{

internal/extractor/block.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import (
77
"fmt"
88
"log/slog"
99

10+
"github.com/schollz/progressbar/v3"
11+
"golang.org/x/sync/errgroup"
12+
1013
"github.com/manifest-network/yaci/internal/client"
1114
"github.com/manifest-network/yaci/internal/config"
1215
"github.com/manifest-network/yaci/internal/models"
1316
"github.com/manifest-network/yaci/internal/output"
1417
"github.com/manifest-network/yaci/internal/utils"
15-
"github.com/schollz/progressbar/v3"
16-
"golang.org/x/sync/errgroup"
1718
)
1819

1920
// extractBlocksAndTransactions extracts blocks and transactions from the gRPC server.
@@ -24,6 +25,7 @@ func extractBlocksAndTransactions(gRPCClient *client.GRPCClient, start, stop uin
2425
} else {
2526
slog.Info("Extracting blocks and transactions", "height", start)
2627
}
28+
2729
var bar *progressbar.ProgressBar
2830
if displayProgress {
2931
bar = progressbar.NewOptions64(
@@ -101,15 +103,18 @@ func processBlocks(gRPCClient *client.GRPCClient, start, stop uint64, outputHand
101103

102104
err := processSingleBlockWithRetry(clientWithCtx, blockHeight, outputHandler, maxRetries)
103105
if err != nil {
104-
if !errors.Is(err, context.Canceled) {
105-
slog.Error("Block processing error",
106+
if errors.Is(err, context.Canceled) {
107+
slog.Debug("Block processing cancelled",
106108
"height", blockHeight,
107-
"error", err,
108-
"errorType", fmt.Sprintf("%T", err))
109-
return err
109+
"error", err)
110+
return fmt.Errorf("failed to process block %d: %w", blockHeight, err)
110111
}
111-
slog.Error("Failed to process block", "height", blockHeight, "error", err, "retries", maxRetries)
112-
return fmt.Errorf("failed to process block %d: %w", blockHeight, err)
112+
113+
slog.Error("Block processing error",
114+
"height", blockHeight,
115+
"error", err,
116+
"errorType", fmt.Sprintf("%T", err))
117+
return err
113118
}
114119

115120
if bar != nil {

internal/extractor/extractor.go

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ import (
1313
const (
1414
blockMethodFullName = "cosmos.tx.v1beta1.Service.GetBlockWithTxs"
1515
txMethodFullName = "cosmos.tx.v1beta1.Service.GetTx"
16+
17+
// maxPrunedNodeRecoveryAttempts limits how many times we adjust the start height
18+
// due to pruned node errors before giving up. This prevents infinite loops if
19+
// the node's pruning boundary keeps advancing during extraction.
20+
maxPrunedNodeRecoveryAttempts = 10
1621
)
1722

1823
// Extract extracts blocks and transactions from a gRPC server.
@@ -30,6 +35,24 @@ func Extract(gRPCClient *client.GRPCClient, outputHandler output.OutputHandler,
3035
}
3136
}
3237

38+
// Warm-up: validate start height against potential load balancer inconsistencies
39+
validatedStart, err := warmUpStartHeight(gRPCClient, config.BlockStart, outputHandler, config.MaxConcurrency, config.MaxRetries)
40+
if err != nil {
41+
return err
42+
}
43+
if validatedStart != config.BlockStart {
44+
slog.Info("Start height adjusted after warm-up",
45+
"original", config.BlockStart,
46+
"validated", validatedStart)
47+
config.BlockStart = validatedStart
48+
}
49+
50+
// In batch mode, verify the adjusted start doesn't exceed the stop block
51+
if !config.LiveMonitoring && config.BlockStart > config.BlockStop {
52+
return fmt.Errorf("pruned node boundary (%d) exceeds requested stop block (%d): requested range is unavailable",
53+
config.BlockStart, config.BlockStop)
54+
}
55+
3356
if config.LiveMonitoring {
3457
slog.Info("Starting live extraction", "block_time", config.BlockTime)
3558
err := extractLiveBlocksAndTransactions(gRPCClient, config.BlockStart, outputHandler, config.BlockTime, config.MaxConcurrency, config.MaxRetries)
@@ -47,36 +70,46 @@ func Extract(gRPCClient *client.GRPCClient, outputHandler output.OutputHandler,
4770
return nil
4871
}
4972

50-
// setBlockRange sets correct the block range based on the configuration.
51-
// If the start block is not set, it will be set to the latest block in the database.
52-
// If the stop block is not set, it will be set to the latest block in the gRPC server.
53-
// If the start block is greater than the stop block, an error will be returned.
73+
// setBlockRange sets the block range based on the configuration.
74+
// If the start block is not set, it will be set to the latest block in the database + 1.
75+
// If the database is empty, it queries the node for the earliest available block.
76+
// If the stop block is not set, it will be set to the latest block on the node.
77+
// Returns an error if the start block is greater than the stop block.
5478
func setBlockRange(gRPCClient *client.GRPCClient, outputHandler output.OutputHandler, cfg *config.ExtractConfig) error {
5579
if cfg.ReIndex {
5680
slog.Info("Reindexing entire database...")
57-
// TODO: Get the earliest block from the gRPC server
58-
// See https://github.com/manifest-network/yaci/issues/28
59-
cfg.BlockStart = 1
6081
earliestLocalBlock, err := outputHandler.GetEarliestBlock(gRPCClient.Ctx)
6182
if err != nil {
6283
return fmt.Errorf("failed to get the earliest local block: %w", err)
6384
}
6485
if earliestLocalBlock != nil {
6586
cfg.BlockStart = earliestLocalBlock.ID
87+
} else {
88+
// Fresh DB with reindex - probe for earliest available
89+
earliestAvailable, err := utils.GetEarliestBlockHeight(gRPCClient, cfg.MaxRetries)
90+
if err != nil {
91+
return fmt.Errorf("failed to determine earliest available block: %w", err)
92+
}
93+
cfg.BlockStart = earliestAvailable
6694
}
6795
cfg.BlockStop = 0
6896
}
6997

7098
if cfg.BlockStart == 0 {
71-
// TODO: Get the earliest block from the gRPC server
72-
// See https://github.com/manifest-network/yaci/issues/28
73-
cfg.BlockStart = 1
7499
latestLocalBlock, err := outputHandler.GetLatestBlock(gRPCClient.Ctx)
75100
if err != nil {
76101
return fmt.Errorf("failed to get the latest block: %w", err)
77102
}
78103
if latestLocalBlock != nil {
104+
// Resume from existing DB - no probe needed
79105
cfg.BlockStart = latestLocalBlock.ID + 1
106+
} else {
107+
// Fresh DB - probe to find earliest available block on node
108+
earliestAvailable, err := utils.GetEarliestBlockHeight(gRPCClient, cfg.MaxRetries)
109+
if err != nil {
110+
return fmt.Errorf("failed to determine earliest available block: %w", err)
111+
}
112+
cfg.BlockStart = earliestAvailable
80113
}
81114
}
82115

@@ -99,3 +132,42 @@ func setBlockRange(gRPCClient *client.GRPCClient, outputHandler output.OutputHan
99132
func shouldSkipMissingBlockCheck(cfg config.ExtractConfig) bool {
100133
return (cfg.BlockStart != 0 && cfg.BlockStop != 0) || cfg.ReIndex
101134
}
135+
136+
// warmUpStartHeight validates that the start height is available by attempting to
137+
// fetch a single block. If the node returns a pruned error with a higher boundary,
138+
// it adjusts the start height and retries. This handles load balancer scenarios
139+
// where different nodes may have different pruning boundaries.
140+
func warmUpStartHeight(
141+
gRPCClient *client.GRPCClient,
142+
start uint64,
143+
outputHandler output.OutputHandler,
144+
maxConcurrency, maxRetries uint,
145+
) (uint64, error) {
146+
currentStart := start
147+
148+
for attempt := 0; attempt <= maxPrunedNodeRecoveryAttempts; attempt++ {
149+
// Try to fetch just the start block
150+
err := extractBlocksAndTransactions(gRPCClient, currentStart, currentStart, outputHandler, maxConcurrency, maxRetries)
151+
if err == nil {
152+
return currentStart, nil
153+
}
154+
155+
// Check if error is due to pruned node with higher boundary
156+
newStart := utils.ParseLowestHeightFromError(err.Error())
157+
if newStart > currentStart {
158+
slog.Warn("Warm-up: adjusting start height due to pruned node",
159+
"original_start", currentStart,
160+
"new_start", newStart,
161+
"skipped_blocks", newStart-currentStart,
162+
"attempt", attempt+1)
163+
currentStart = newStart
164+
continue
165+
}
166+
167+
// Non-recoverable error
168+
return 0, fmt.Errorf("warm-up failed: %w", err)
169+
}
170+
171+
return 0, fmt.Errorf("warm-up exceeded maximum attempts (%d): pruning boundary keeps changing",
172+
maxPrunedNodeRecoveryAttempts)
173+
}

internal/metrics/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import (
77
"net"
88
"net/http"
99

10-
"github.com/manifest-network/yaci/internal/metrics/collectors"
1110
"github.com/prometheus/client_golang/prometheus"
1211
"github.com/prometheus/client_golang/prometheus/promhttp"
1312

13+
"github.com/manifest-network/yaci/internal/metrics/collectors"
14+
1415
_ "github.com/manifest-network/yaci/internal/metrics/collectors" // Import all collectors
1516
)
1617

0 commit comments

Comments
 (0)