Skip to content

Commit 3d5a147

Browse files
committed
Added LogPoller metric to track skipped blocks
1 parent 43212c2 commit 3d5a147

File tree

4 files changed

+52
-5
lines changed

4 files changed

+52
-5
lines changed

pkg/solana/logpoller/job_get_block.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func (j *getBlockJob) Done() <-chan struct{} {
5353

5454
func (j *getBlockJob) Abort(ctx context.Context) error {
5555
j.aborted = true
56+
j.metrics.SetBlockSkipped(ctx, int64(j.slotNumber)) // nolint:gosec // G115: integer overflow conversion uint64 -> int64
5657
var abort types.Block
5758
abort.Aborted = true
5859
abort.SlotNumber = j.slotNumber

pkg/solana/logpoller/job_get_block_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ type solLpPromTest struct {
2727
id string
2828
txsTruncated outcomeDependantTestMetric
2929
txsLogParsingError outcomeDependantTestMetric
30+
blockSkipped float64
3031
}
3132

3233
func (p solLpPromTest) assertEqual(t *testing.T) {
3334
assert.InDelta(t, p.txsTruncated.succeeded, testutil.ToFloat64(promSolLp.txsTruncated.succeeded.WithLabelValues(p.id)), 0.0001, "mismatch: truncated succeeded")
3435
assert.InDelta(t, p.txsTruncated.reverted, testutil.ToFloat64(promSolLp.txsTruncated.reverted.WithLabelValues(p.id)), 0.0001, "mismatch: truncated reverted")
3536
assert.InDelta(t, p.txsLogParsingError.succeeded, testutil.ToFloat64(promSolLp.txsLogParsingError.succeeded.WithLabelValues(p.id)), 0.0001, "mismatch: log parsing error succeeded")
3637
assert.InDelta(t, p.txsLogParsingError.reverted, testutil.ToFloat64(promSolLp.txsLogParsingError.reverted.WithLabelValues(p.id)), 0.0001, "mismatch: log parsing error reverted")
38+
assert.InDelta(t, p.blockSkipped, testutil.ToFloat64(promLpBlockSkipped.WithLabelValues(p.id)), 0.0001, "mismatch: block skipped")
3739
}
3840

3941
// resetPromMetricsForLabel clears the prometheus counters for the given label
@@ -43,6 +45,7 @@ func resetPromMetricsForLabel(label string) {
4345
promSolLp.txsTruncated.reverted.DeleteLabelValues(label)
4446
promSolLp.txsLogParsingError.succeeded.DeleteLabelValues(label)
4547
promSolLp.txsLogParsingError.reverted.DeleteLabelValues(label)
48+
promLpBlockSkipped.DeleteLabelValues(label)
4649
}
4750

4851
func TestGetBlockJob(t *testing.T) {
@@ -186,6 +189,32 @@ func TestGetBlockJob(t *testing.T) {
186189
default:
187190
}
188191
})
192+
t.Run("Abort emits block skipped metric", func(t *testing.T) {
193+
resetPromMetricsForLabel(t.Name())
194+
lggr := logger.Sugared(logger.Test(t))
195+
metrics, err := NewSolLpMetrics(t.Name())
196+
require.NoError(t, err)
197+
blocks := make(chan types.Block, 1)
198+
job := newGetBlockJob(nil, nil, blocks, lggr, slotNumber, metrics, nil)
199+
err = job.Abort(t.Context())
200+
require.NoError(t, err)
201+
202+
result := <-blocks
203+
assert.True(t, result.Aborted)
204+
assert.Equal(t, slotNumber, result.SlotNumber)
205+
206+
expectedMetrics := solLpPromTest{
207+
id: t.Name(),
208+
blockSkipped: float64(slotNumber),
209+
}
210+
expectedMetrics.assertEqual(t)
211+
212+
select {
213+
case <-job.Done():
214+
default:
215+
t.Fatal("expected job to be done")
216+
}
217+
})
189218
t.Run("Happy path", func(t *testing.T) {
190219
resetPromMetricsForLabel(t.Name()) // Reset counters to avoid accumulation across test runs
191220
client := mocks.NewRPCClient(t)

pkg/solana/logpoller/metrics.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ var promLpLastProcessedSlot = promauto.NewGaugeVec(prometheus.GaugeOpts{
2929
Help: "Last processed slot by log poller",
3030
}, []string{"chainID"})
3131

32+
var promLpBlockSkipped = promauto.NewGaugeVec(prometheus.GaugeOpts{
33+
Name: "solana_log_poller_block_skipped",
34+
Help: "Slot number of the most recently skipped block due to max retry exhaustion",
35+
}, []string{"chainID"})
36+
3237
type solLpMetrics struct {
3338
metrics.Labeler
3439
chainID string
@@ -37,6 +42,7 @@ type solLpMetrics struct {
3742
txsTruncated outcomeDependantMetric
3843
txsLogParsingError outcomeDependantMetric
3944
lastProcessedSlot metric.Int64Gauge
45+
blockSkipped metric.Int64Gauge
4046
}
4147

4248
func NewSolLpMetrics(chainID string) (*solLpMetrics, error) {
@@ -57,13 +63,19 @@ func NewSolLpMetrics(chainID string) (*solLpMetrics, error) {
5763
return nil, fmt.Errorf("failed to register solana_log_poller_last_processed_slot: %w", err)
5864
}
5965

66+
blockSkipped, err := meter.Int64Gauge("solana_log_poller_block_skipped")
67+
if err != nil {
68+
return nil, fmt.Errorf("failed to register solana_log_poller_block_skipped: %w", err)
69+
}
70+
6071
return &solLpMetrics{
6172
chainID: chainID,
6273
Labeler: metrics.NewLabeler().With("chainID", chainID),
6374

6475
txsTruncated: *truncatedTxs,
6576
txsLogParsingError: *txLogParsingError,
6677
lastProcessedSlot: lastProcessedSlot,
78+
blockSkipped: blockSkipped,
6779
}, nil
6880
}
6981

@@ -84,6 +96,11 @@ func (m *solLpMetrics) SetLatestProcessedSlot(ctx context.Context, slot int64) {
8496
m.lastProcessedSlot.Record(ctx, slot, metric.WithAttributes(m.GetOtelAttributes()...))
8597
}
8698

99+
func (m *solLpMetrics) SetBlockSkipped(ctx context.Context, slotNumber int64) {
100+
promLpBlockSkipped.WithLabelValues(m.chainID).Set(float64(slotNumber))
101+
m.blockSkipped.Record(ctx, slotNumber, metric.WithAttributes(m.GetOtelAttributes()...))
102+
}
103+
87104
func (m *solLpMetrics) incrementForOutcome(ctx context.Context, prom outcomeDependantProm, me outcomeDependantMetric, outcome txOutcome) {
88105
switch outcome {
89106
case txSucceeded:

pkg/solana/logpoller/worker/worker.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,21 +236,21 @@ func (g *Group) runRetryQueue(ctx context.Context) {
236236

237237
// retry count starts at 0 so check if it equals max retry count to determine if it has reached the threshold
238238
if retry.count >= g.maxRetryCount {
239-
g.lggr.Criticalf("job %s exceeded max retry count %d. Resolution most likely requires manual intervention. Errors: %s", failedAttempt.Job, g.maxRetryCount, errors.Join(retry.errs...))
239+
g.lggr.Criticalw("job exceeded max retry count. Resolution most likely requires manual intervention.", "job", failedAttempt.Job, "maxRetryCount", g.maxRetryCount, "errors", errors.Join(retry.errs...))
240240
if err := retry.Abort(ctx); err != nil {
241-
g.lggr.Criticalf("failed to abort retry: %s", err)
241+
g.lggr.Criticalw("failed to abort retry", "err", err)
242242
}
243243
// Continue to avoid adding job back to retry map
244244
continue
245245
}
246246

247-
g.lggr.Errorf("retrying job %s in %s", failedAttempt.Job, wait)
247+
g.lggr.Errorw("retrying job", "job", failedAttempt.Job, "wait", wait)
248248
retry.when = time.Now().Add(wait)
249249
default:
250250
// first retry
251251
wait := calculateExponentialBackoffWithJitter(0)
252252

253-
g.lggr.Errorf("retrying job %s in %s", failedAttempt.Job, wait)
253+
g.lggr.Errorw("retrying job", "job", failedAttempt.Job, "wait", wait)
254254

255255
retry = retryableJob{
256256
name: createRandomString(12),
@@ -264,7 +264,7 @@ func (g *Group) runRetryQueue(ctx context.Context) {
264264
g.retryMap[retry.name] = retry
265265

266266
if len(g.retryMap) >= DefaultNotifyRetryDepth {
267-
g.lggr.Errorf("retry queue depth: %d", len(g.retryMap))
267+
g.lggr.Errorw("retry queue depth", "depth", len(g.retryMap))
268268
}
269269
g.mu.Unlock()
270270
}

0 commit comments

Comments
 (0)