Skip to content

Commit 82da892

Browse files
authored
chore: resubmit header on failed verification (#22)
1 parent 882bcf4 commit 82da892

File tree

1 file changed

+25
-30
lines changed

1 file changed

+25
-30
lines changed

pkg/exporters/verifier/verifier.go

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,27 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
113113
}
114114

115115
// processBlocks processes blocks from the queue
116-
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue <-chan *types.Header) {
116+
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) {
117117
logger := e.logger.With().Int("worker_id", workerID).Logger()
118118
logger.Debug().Msg("worker started")
119119

120120
for header := range blockQueue {
121-
e.verifyBlock(ctx, m, header)
121+
reEnqueue := e.verifyBlock(ctx, m, header)
122+
if reEnqueue {
123+
// re-queue with delay in background
124+
go func(h *types.Header) {
125+
select {
126+
case <-time.After(5 * time.Minute):
127+
select {
128+
case blockQueue <- h:
129+
logger.Debug().Uint64("block", h.Number.Uint64()).Msg("re-queued block after cooldown")
130+
case <-ctx.Done():
131+
// graceful shutdown, don't send
132+
}
133+
case <-ctx.Done():
134+
}
135+
}(header)
136+
}
122137
}
123138

124139
logger.Debug().Msg("worker stopped")
@@ -135,7 +150,7 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight,
135150
}
136151

137152
// verifyBlock attempts to verify a DA height for a given block status.
138-
func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) {
153+
func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool {
139154
blockHeight := header.Number.Uint64()
140155

141156
// check if block has transactions
@@ -175,7 +190,7 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
175190
case <-ctx.Done():
176191
// context cancelled during graceful shutdown, not an error
177192
logger.Debug().Msg("block verification stopped due to shutdown")
178-
return
193+
return false
179194
case <-time.After(interval):
180195
// proceed with retry
181196
}
@@ -226,27 +241,16 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
226241
Dur("duration", time.Since(startTime)).
227242
Msg("header blob verified on Celestia")
228243
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
229-
return
230-
}
231-
232-
// verification failed
233-
if retries >= len(retryIntervals)+1 {
234-
logger.Error().
235-
Uint64("da_height", daHeight).
236-
Dur("duration", time.Since(startTime)).
237-
Msg("max retries reached - header blob not verified")
238-
e.onVerified(m, namespace, blockHeight, daHeight, false, 0)
239-
return
244+
return false
240245
}
241-
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")
242246

243247
case "data":
244248
if len(blockResultWithBlobs.DataBlob) == 0 {
245249
logger.Info().
246250
Dur("duration", time.Since(startTime)).
247251
Msg("empty data block - no verification needed")
248252
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
249-
return
253+
return false
250254
}
251255

252256
// perform actual verification between bytes from ev-node and Celestia.
@@ -262,27 +266,18 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
262266
Dur("duration", time.Since(startTime)).
263267
Msg("data blob verified on Celestia")
264268
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
265-
return
266-
}
267-
268-
// verification failed
269-
if retries >= len(retryIntervals)+1 {
270-
logger.Error().
271-
Uint64("da_height", daHeight).
272-
Dur("duration", time.Since(startTime)).
273-
Msg("max retries reached - data blob not verified")
274-
e.onVerified(m, namespace, blockHeight, daHeight, false, 0)
275-
return
269+
return false
276270
}
277271
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")
278272

279273
default:
280274
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
281-
return
275+
return false
282276
}
283277
}
284278

285279
// if loop completes without success, log final error
286-
logger.Error().Msg("max retries exhausted - ALERT: failed to verify block")
280+
logger.Error().Msg("max retries exhausted: failed to verify block")
287281
e.onVerified(m, namespace, blockHeight, 0, false, 0)
282+
return true
288283
}

0 commit comments

Comments
 (0)