Skip to content

Commit 247b3ae

Browse files
committed
chore: remove unneccessary go routine to pass headers into separate channel
1 parent 967a8ab commit 247b3ae

File tree

2 files changed

+32
-80
lines changed

2 files changed

+32
-80
lines changed

cmd/monitor.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,8 @@ func monitorAndVerifyDataAndHeaders(cmd *cobra.Command, args []string) error {
197197
g.Go(func() error {
198198
return verifier.VerifyHeadersAndData(ctx)
199199
})
200-
g.Go(func() error {
201-
return verifier.ProcessHeaders(ctx)
202-
})
203200

204-
if flags.referenceNode == "" || len(flags.fullNodes) == 0 {
201+
if flags.referenceNode != "" && flags.fullNodes != "" {
205202
g.Go(func() error {
206203
fullNodeList := strings.Split(flags.fullNodes, ",")
207204
return drift.Monitor(ctx, m, flags.chainID, flags.referenceNode, fullNodeList, flags.pollingInterval, logger)

cmd/verifier.go

Lines changed: 31 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,6 @@ import (
1111
"time"
1212
)
1313

14-
// daVerificationStatus tracks a block pending verification
15-
type daVerificationStatus struct {
16-
blockHeight uint64
17-
namespace string // "header" or "data"
18-
}
19-
2014
// BlockVerifier handles verification of blocks against Celestia DA
2115
type BlockVerifier struct {
2216
evnodeClient *evnode.Client
@@ -27,9 +21,6 @@ type BlockVerifier struct {
2721
metrics *metrics.Metrics
2822
chainID string
2923
logger zerolog.Logger
30-
31-
// internal channels for retry logic
32-
unverifiedCh chan *daVerificationStatus
3324
}
3425

3526
// NewBlockVerifier creates a new BlockVerifier
@@ -51,94 +42,58 @@ func NewBlockVerifier(
5142
metrics: metrics,
5243
chainID: chainID,
5344
logger: logger,
54-
unverifiedCh: make(chan *daVerificationStatus, 100),
5545
}
5646
}
5747

5848
// VerifyHeadersAndData begins the background retry processor
5949
func (v *BlockVerifier) VerifyHeadersAndData(ctx context.Context) error {
60-
for {
61-
select {
62-
case <-ctx.Done():
63-
return nil
64-
// when a new unverified status is received, spawn a goroutine to handle retries.
65-
case status := <-v.unverifiedCh:
66-
// spawn a goroutine to handle this block's retries
67-
go v.retryBlock(ctx, status)
68-
}
69-
}
70-
}
71-
72-
func (v *BlockVerifier) onVerified(status *daVerificationStatus, verified bool) {
73-
if verified {
74-
v.metrics.RemoveVerifiedBlock(v.chainID, status.namespace, status.blockHeight)
75-
} else {
76-
v.metrics.RecordMissingBlock(v.chainID, status.namespace, status.blockHeight)
77-
}
78-
}
79-
80-
func (v *BlockVerifier) ProcessHeaders(ctx context.Context) error {
8150
headers := make(chan *types.Header, 10)
8251
sub, err := v.evmClient.SubscribeNewHead(ctx, headers)
8352
if err != nil {
8453
return err
8554
}
8655
defer sub.Unsubscribe()
8756

88-
headerCount := 0
8957
for {
9058
select {
9159
case <-ctx.Done():
92-
v.logger.Info().Int("headers_processed", headerCount).Msg("stream completed")
9360
return nil
61+
// when a new unverified status is received, spawn a goroutine to handle retries.
9462
case header := <-headers:
95-
headerCount++
96-
v.enqueueHeader(header)
63+
// spawn a goroutine to handle this block's retries
64+
go v.retryBlock(ctx, header)
9765
}
9866
}
9967
}
10068

101-
// enqueueHeader queues a block for verification.
102-
func (v *BlockVerifier) enqueueHeader(header *types.Header) {
69+
func (v *BlockVerifier) onVerified(namespace string, blockHeight uint64, verified bool) {
70+
if verified {
71+
v.metrics.RemoveVerifiedBlock(v.chainID, namespace, blockHeight)
72+
} else {
73+
v.metrics.RecordMissingBlock(v.chainID, namespace, blockHeight)
74+
}
75+
}
76+
77+
// retryBlock attempts to re-verify a DA height for a given block status.
78+
func (v *BlockVerifier) retryBlock(ctx context.Context, header *types.Header) {
10379
blockHeight := header.Number.Uint64()
10480

10581
// check if block has transactions
10682
hasTransactions := header.TxHash != types.EmptyRootHash
10783

108-
v.logger.Info().
109-
Uint64("block_height", blockHeight).
84+
namespace := "header"
85+
if hasTransactions {
86+
namespace = "data"
87+
}
88+
89+
logger := v.logger.With().Str("namespace", namespace).Uint64("block_height", blockHeight).Logger()
90+
logger.Info().
11091
Str("hash", header.Hash().Hex()).
11192
Time("time", time.Unix(int64(header.Time), 0)).
11293
Uint64("gas_used", header.GasUsed).
11394
Bool("has_transactions", hasTransactions).
11495
Msg("processing block")
11596

116-
// queue header verification
117-
v.unverifiedCh <- &daVerificationStatus{
118-
blockHeight: blockHeight,
119-
namespace: "header",
120-
}
121-
122-
// queue data verification if block has transactions
123-
if hasTransactions {
124-
v.unverifiedCh <- &daVerificationStatus{
125-
blockHeight: blockHeight,
126-
namespace: "data",
127-
}
128-
}
129-
}
130-
131-
// statusLogger returns a logger pre-populated with status fields
132-
func (v *BlockVerifier) statusLogger(status *daVerificationStatus) zerolog.Logger {
133-
return v.logger.With().
134-
Str("namespace", status.namespace).
135-
Uint64("block_height", status.blockHeight).
136-
Logger()
137-
}
138-
139-
// retryBlock attempts to re-verify a DA height for a given block status.
140-
func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationStatus) {
141-
logger := v.statusLogger(status)
14297
startTime := time.Now()
14398

14499
// exponential backoff intervals matching observed DA submission timing
@@ -162,14 +117,14 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
162117
// proceed with retry
163118
}
164119

165-
blockResult, err := v.evnodeClient.GetBlock(ctx, status.blockHeight)
120+
blockResult, err := v.evnodeClient.GetBlock(ctx, blockHeight)
166121
if err != nil {
167122
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node")
168123
continue
169124
}
170125

171126
daHeight := blockResult.HeaderDaHeight
172-
if status.namespace == "data" {
127+
if namespace == "data" {
173128
daHeight = blockResult.DataDaHeight
174129
}
175130

@@ -178,13 +133,13 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
178133
continue
179134
}
180135

181-
blockResultWithBlobs, err := v.evnodeClient.GetBlockWithBlobs(ctx, status.blockHeight)
136+
blockResultWithBlobs, err := v.evnodeClient.GetBlockWithBlobs(ctx, blockHeight)
182137
if err != nil {
183138
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node")
184139
continue
185140
}
186141

187-
switch status.namespace {
142+
switch namespace {
188143
case "header":
189144
verified, err := v.celestiaClient.VerifyBlobAtHeight(ctx, blockResultWithBlobs.HeaderBlob, daHeight, v.headerNS)
190145

@@ -198,7 +153,7 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
198153
Uint64("da_height", daHeight).
199154
Dur("duration", time.Since(startTime)).
200155
Msg("header blob verified on Celestia")
201-
v.onVerified(status, true)
156+
v.onVerified(namespace, blockHeight, true)
202157
return
203158
}
204159

@@ -208,7 +163,7 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
208163
Uint64("da_height", daHeight).
209164
Dur("duration", time.Since(startTime)).
210165
Msg("max retries reached - header blob not verified")
211-
v.onVerified(status, false)
166+
v.onVerified(namespace, blockHeight, false)
212167
return
213168
}
214169
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")
@@ -218,7 +173,7 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
218173
logger.Info().
219174
Dur("duration", time.Since(startTime)).
220175
Msg("empty data block - no verification needed")
221-
v.onVerified(status, true)
176+
v.onVerified(namespace, blockHeight, true)
222177
return
223178
}
224179

@@ -234,7 +189,7 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
234189
Uint64("da_height", daHeight).
235190
Dur("duration", time.Since(startTime)).
236191
Msg("data blob verified on Celestia")
237-
v.onVerified(status, true)
192+
v.onVerified(namespace, blockHeight, true)
238193
return
239194
}
240195

@@ -244,18 +199,18 @@ func (v *BlockVerifier) retryBlock(ctx context.Context, status *daVerificationSt
244199
Uint64("da_height", daHeight).
245200
Dur("duration", time.Since(startTime)).
246201
Msg("max retries reached - data blob not verified")
247-
v.onVerified(status, false)
202+
v.onVerified(namespace, blockHeight, false)
248203
return
249204
}
250205
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")
251206

252207
default:
253-
logger.Error().Str("namespace", status.namespace).Msg("unknown namespace type")
208+
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
254209
return
255210
}
256211
}
257212

258213
// if loop completes without success, log final error
259214
logger.Error().Msg("max retries exhausted - ALERT: failed to verify block")
260-
v.onVerified(status, false)
215+
v.onVerified(namespace, blockHeight, false)
261216
}

0 commit comments

Comments
 (0)