Skip to content

Commit a21917b

Browse files
authored
fix: pruner retry only failed partition ranges instead of restarting all workers (#623)
1 parent 58153f0 commit a21917b

File tree

2 files changed

+427
-71
lines changed

2 files changed

+427
-71
lines changed
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
package pruner
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sort"
7+
"sync"
8+
"testing"
9+
10+
"github.com/bsv-blockchain/teranode/errors"
11+
"github.com/bsv-blockchain/teranode/ulogger"
12+
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
// ensurePrometheusMetrics initializes the package-level prometheus metrics
18+
// so tests that bypass NewService() don't hit nil pointer dereferences.
19+
func ensurePrometheusMetrics() {
20+
prometheusMetricsInitOnce.Do(func() {
21+
prometheusUtxoCleanupBatch = prometheus.NewHistogram(prometheus.HistogramOpts{
22+
Name: "test_utxo_cleanup_batch_duration_seconds",
23+
})
24+
prometheusUtxoRecordErrors = prometheus.NewCounter(prometheus.CounterOpts{
25+
Name: "test_utxo_pruner_record_errors_total",
26+
})
27+
prometheusUtxoBatchQueryError = prometheus.NewCounter(prometheus.CounterOpts{
28+
Name: "test_utxo_pruner_batch_query_errors_total",
29+
})
30+
prometheusUtxoRecordsDeleted = prometheus.NewCounter(prometheus.CounterOpts{
31+
Name: "test_utxo_pruner_records_deleted_total",
32+
})
33+
prometheusUtxoRecordsDeletedSkipped = prometheus.NewCounter(prometheus.CounterOpts{
34+
Name: "test_utxo_pruner_records_deleted_skipped_total",
35+
})
36+
prometheusUtxoParentsUpdated = prometheus.NewCounter(prometheus.CounterOpts{
37+
Name: "test_utxo_pruner_parents_updated_total",
38+
})
39+
prometheusUtxoParentsUpdatedSkipped = prometheus.NewCounter(prometheus.CounterOpts{
40+
Name: "test_utxo_pruner_parents_updated_skipped_total",
41+
})
42+
prometheusUtxoExternalFilesDeleted = prometheus.NewCounter(prometheus.CounterOpts{
43+
Name: "test_utxo_pruner_external_files_deleted_total",
44+
})
45+
prometheusUtxoExternalFilesDeletedSkipped = prometheus.NewCounter(prometheus.CounterOpts{
46+
Name: "test_utxo_pruner_external_files_deleted_skipped_total",
47+
})
48+
prometheusUtxoRetryAttempts = prometheus.NewCounter(prometheus.CounterOpts{
49+
Name: "test_utxo_pruner_retry_attempts_total",
50+
})
51+
prometheusUtxoTimeoutEvents = prometheus.NewCounter(prometheus.CounterOpts{
52+
Name: "test_utxo_pruner_timeout_events_total",
53+
})
54+
})
55+
}
56+
57+
// mockPartitionWorkerResult holds a configured result for a partition range
58+
type mockPartitionWorkerResult struct {
59+
processed int64
60+
skipped int64
61+
err error
62+
}
63+
64+
// mockPartitionWorkerCall records a single call to the mock partition worker
65+
type mockPartitionWorkerCall struct {
66+
partitionStart int
67+
partitionCount int
68+
}
69+
70+
// mockPartitionWorker tracks calls and returns configured results per partition range
71+
type mockPartitionWorker struct {
72+
mu sync.Mutex
73+
calls []mockPartitionWorkerCall
74+
results map[int][]mockPartitionWorkerResult // keyed by partitionStart, pops from front
75+
}
76+
77+
func newMockPartitionWorker() *mockPartitionWorker {
78+
return &mockPartitionWorker{
79+
results: make(map[int][]mockPartitionWorkerResult),
80+
}
81+
}
82+
83+
func (m *mockPartitionWorker) addResult(partitionStart int, r mockPartitionWorkerResult) {
84+
m.results[partitionStart] = append(m.results[partitionStart], r)
85+
}
86+
87+
func (m *mockPartitionWorker) worker(_ context.Context, _ uint32, start, count int) (int64, int64, error) {
88+
m.mu.Lock()
89+
defer m.mu.Unlock()
90+
91+
m.calls = append(m.calls, mockPartitionWorkerCall{partitionStart: start, partitionCount: count})
92+
93+
results, ok := m.results[start]
94+
if !ok || len(results) == 0 {
95+
panic(fmt.Sprintf("no mock result configured for partition start %d", start))
96+
}
97+
98+
r := results[0]
99+
m.results[start] = results[1:]
100+
return r.processed, r.skipped, r.err
101+
}
102+
103+
func (m *mockPartitionWorker) callCountForPartition(partitionStart int) int {
104+
m.mu.Lock()
105+
defer m.mu.Unlock()
106+
107+
count := 0
108+
for _, c := range m.calls {
109+
if c.partitionStart == partitionStart {
110+
count++
111+
}
112+
}
113+
return count
114+
}
115+
116+
func (m *mockPartitionWorker) totalCalls() int {
117+
m.mu.Lock()
118+
defer m.mu.Unlock()
119+
return len(m.calls)
120+
}
121+
122+
func (m *mockPartitionWorker) calledPartitions() []int {
123+
m.mu.Lock()
124+
defer m.mu.Unlock()
125+
126+
seen := map[int]bool{}
127+
for _, c := range m.calls {
128+
seen[c.partitionStart] = true
129+
}
130+
131+
result := make([]int, 0, len(seen))
132+
for p := range seen {
133+
result = append(result, p)
134+
}
135+
sort.Ints(result)
136+
return result
137+
}
138+
139+
func createTestServiceWithMockWorker(t *testing.T, mock *mockPartitionWorker) *Service {
140+
t.Helper()
141+
ensurePrometheusMetrics()
142+
svc := &Service{
143+
logger: ulogger.NewVerboseTestLogger(t),
144+
notifier: NewPrunerEventNotifier(),
145+
partitionWorkerFn: mock.worker,
146+
}
147+
return svc
148+
}
149+
150+
func TestPruneWithPartitions_AllSucceed(t *testing.T) {
151+
mock := newMockPartitionWorker()
152+
153+
// 4 workers, each succeeds with 100 processed
154+
mock.addResult(0, mockPartitionWorkerResult{processed: 100})
155+
mock.addResult(1024, mockPartitionWorkerResult{processed: 100})
156+
mock.addResult(2048, mockPartitionWorkerResult{processed: 100})
157+
mock.addResult(3072, mockPartitionWorkerResult{processed: 100})
158+
159+
svc := createTestServiceWithMockWorker(t, mock)
160+
total, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
161+
162+
require.NoError(t, err)
163+
assert.Equal(t, int64(400), total)
164+
assert.Equal(t, 4, mock.totalCalls())
165+
}
166+
167+
func TestPruneWithPartitions_OneWorkerTimeout_RetryOnlyFailed(t *testing.T) {
168+
mock := newMockPartitionWorker()
169+
170+
// First attempt: 3 succeed, 1 times out with partial progress
171+
mock.addResult(0, mockPartitionWorkerResult{processed: 100})
172+
mock.addResult(1024, mockPartitionWorkerResult{processed: 100})
173+
mock.addResult(2048, mockPartitionWorkerResult{processed: 50, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
174+
mock.addResult(3072, mockPartitionWorkerResult{processed: 100})
175+
176+
// Second attempt: only the failed range retries and succeeds
177+
mock.addResult(2048, mockPartitionWorkerResult{processed: 80})
178+
179+
svc := createTestServiceWithMockWorker(t, mock)
180+
total, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
181+
182+
require.NoError(t, err)
183+
// 100 + 100 + 50 (partial from timeout) + 100 + 80 (retry success)
184+
assert.Equal(t, int64(430), total)
185+
// Workers at 0, 1024, 3072 called once; worker at 2048 called twice
186+
assert.Equal(t, 1, mock.callCountForPartition(0))
187+
assert.Equal(t, 1, mock.callCountForPartition(1024))
188+
assert.Equal(t, 2, mock.callCountForPartition(2048))
189+
assert.Equal(t, 1, mock.callCountForPartition(3072))
190+
assert.Equal(t, 5, mock.totalCalls())
191+
}
192+
193+
func TestPruneWithPartitions_MultipleWorkersTimeout(t *testing.T) {
194+
mock := newMockPartitionWorker()
195+
196+
// First attempt: 2 succeed, 2 timeout
197+
mock.addResult(0, mockPartitionWorkerResult{processed: 100})
198+
mock.addResult(1024, mockPartitionWorkerResult{processed: 30, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
199+
mock.addResult(2048, mockPartitionWorkerResult{processed: 100})
200+
mock.addResult(3072, mockPartitionWorkerResult{processed: 20, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
201+
202+
// Second attempt: both failed ranges succeed
203+
mock.addResult(1024, mockPartitionWorkerResult{processed: 70})
204+
mock.addResult(3072, mockPartitionWorkerResult{processed: 80})
205+
206+
svc := createTestServiceWithMockWorker(t, mock)
207+
total, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
208+
209+
require.NoError(t, err)
210+
assert.Equal(t, int64(400), total) // 100+30+100+20+70+80
211+
assert.Equal(t, 1, mock.callCountForPartition(0))
212+
assert.Equal(t, 2, mock.callCountForPartition(1024))
213+
assert.Equal(t, 1, mock.callCountForPartition(2048))
214+
assert.Equal(t, 2, mock.callCountForPartition(3072))
215+
}
216+
217+
func TestPruneWithPartitions_NonTimeoutError_ImmediateReturn(t *testing.T) {
218+
mock := newMockPartitionWorker()
219+
220+
mock.addResult(0, mockPartitionWorkerResult{processed: 100})
221+
mock.addResult(1024, mockPartitionWorkerResult{processed: 50, err: errors.NewProcessingError("permanent failure")})
222+
mock.addResult(2048, mockPartitionWorkerResult{processed: 100})
223+
mock.addResult(3072, mockPartitionWorkerResult{processed: 100})
224+
225+
svc := createTestServiceWithMockWorker(t, mock)
226+
_, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
227+
228+
require.Error(t, err)
229+
assert.Contains(t, err.Error(), "permanent failure")
230+
// No retries — only 4 total calls
231+
assert.Equal(t, 4, mock.totalCalls())
232+
}
233+
234+
func TestPruneWithPartitions_MixedTimeoutAndNonTimeout(t *testing.T) {
235+
mock := newMockPartitionWorker()
236+
237+
// One worker times out, another has a permanent error
238+
mock.addResult(0, mockPartitionWorkerResult{processed: 100})
239+
mock.addResult(1024, mockPartitionWorkerResult{err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
240+
mock.addResult(2048, mockPartitionWorkerResult{err: errors.NewProcessingError("disk failure")})
241+
mock.addResult(3072, mockPartitionWorkerResult{processed: 100})
242+
243+
svc := createTestServiceWithMockWorker(t, mock)
244+
_, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
245+
246+
require.Error(t, err)
247+
assert.Contains(t, err.Error(), "disk failure")
248+
// Non-timeout error means no retry
249+
assert.Equal(t, 4, mock.totalCalls())
250+
}
251+
252+
func TestPruneWithPartitions_MaxRetriesExceeded(t *testing.T) {
253+
mock := newMockPartitionWorker()
254+
255+
// Worker at partition 0 always times out
256+
for i := 0; i < 10; i++ {
257+
mock.addResult(0, mockPartitionWorkerResult{processed: 10, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
258+
}
259+
// Other workers succeed on first attempt
260+
mock.addResult(1024, mockPartitionWorkerResult{processed: 100})
261+
mock.addResult(2048, mockPartitionWorkerResult{processed: 100})
262+
mock.addResult(3072, mockPartitionWorkerResult{processed: 100})
263+
264+
svc := createTestServiceWithMockWorker(t, mock)
265+
_, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
266+
267+
require.Error(t, err)
268+
assert.Contains(t, err.Error(), "max retries")
269+
// Partition 0 called 10 times, others called once
270+
assert.Equal(t, 10, mock.callCountForPartition(0))
271+
assert.Equal(t, 1, mock.callCountForPartition(1024))
272+
assert.Equal(t, 1, mock.callCountForPartition(2048))
273+
assert.Equal(t, 1, mock.callCountForPartition(3072))
274+
}
275+
276+
func TestPruneWithPartitions_ProgressiveRecovery(t *testing.T) {
277+
mock := newMockPartitionWorker()
278+
279+
// Attempt 1: 1 succeeds, 3 timeout
280+
mock.addResult(0, mockPartitionWorkerResult{processed: 100})
281+
mock.addResult(1024, mockPartitionWorkerResult{processed: 10, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
282+
mock.addResult(2048, mockPartitionWorkerResult{processed: 20, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
283+
mock.addResult(3072, mockPartitionWorkerResult{processed: 30, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
284+
285+
// Attempt 2: 2 of 3 succeed, 1 still times out
286+
mock.addResult(1024, mockPartitionWorkerResult{processed: 90})
287+
mock.addResult(2048, mockPartitionWorkerResult{processed: 80})
288+
mock.addResult(3072, mockPartitionWorkerResult{processed: 15, err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
289+
290+
// Attempt 3: last worker succeeds
291+
mock.addResult(3072, mockPartitionWorkerResult{processed: 55})
292+
293+
svc := createTestServiceWithMockWorker(t, mock)
294+
total, err := svc.PruneWithPartitions(context.Background(), 1000, "abc123", 4)
295+
296+
require.NoError(t, err)
297+
// 100 + 10 + 20 + 30 + 90 + 80 + 15 + 55 = 400
298+
assert.Equal(t, int64(400), total)
299+
assert.Equal(t, 1, mock.callCountForPartition(0))
300+
assert.Equal(t, 2, mock.callCountForPartition(1024))
301+
assert.Equal(t, 2, mock.callCountForPartition(2048))
302+
assert.Equal(t, 3, mock.callCountForPartition(3072))
303+
}
304+
305+
func TestPruneWithPartitions_ContextCancellation(t *testing.T) {
306+
ctx, cancel := context.WithCancel(context.Background())
307+
308+
mock := newMockPartitionWorker()
309+
310+
// First worker cancels the context, simulating external cancellation
311+
mock.addResult(0, mockPartitionWorkerResult{err: &TimeoutError{cause: errors.NewProcessingError("timeout")}})
312+
mock.addResult(1024, mockPartitionWorkerResult{processed: 100})
313+
mock.addResult(2048, mockPartitionWorkerResult{processed: 100})
314+
mock.addResult(3072, mockPartitionWorkerResult{processed: 100})
315+
316+
// On retry, the context is cancelled so the worker returns ctx.Err()
317+
cancel()
318+
mock.addResult(0, mockPartitionWorkerResult{err: context.Canceled})
319+
320+
svc := createTestServiceWithMockWorker(t, mock)
321+
_, err := svc.PruneWithPartitions(ctx, 1000, "abc123", 4)
322+
323+
require.Error(t, err)
324+
// The context.Canceled error is not a TimeoutError, so it returns immediately
325+
assert.Equal(t, 5, mock.totalCalls()) // 4 initial + 1 retry that fails with non-timeout
326+
}

0 commit comments

Comments
 (0)