Skip to content

Commit 13a6357

Browse files
committed
reconciler: Add 'retriesPending' to WaitUntilReconciled
Extend [Reconciler.WaitUntilReconciled] to also indicate whether retries are pending for any objects with a revision below or equal to [untilRevision]. The committing of results is split into two: one after normal incremental processing of pending objects and one after processing retries. This way the entries that failed to reconcile are pushed to the retry queue and we can check the low watermark to produce 'retriesPending'. Signed-off-by: Jussi Maki <jussi.maki@isovalent.com>
1 parent 39a626f commit 13a6357

File tree

9 files changed

+261
-114
lines changed

9 files changed

+261
-114
lines changed

iterator.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package statedb
66
import (
77
"fmt"
88
"iter"
9-
"runtime"
109
"slices"
1110

1211
"github.com/cilium/statedb/index"
@@ -253,11 +252,6 @@ func (it *changeIterator[Obj]) nextAny(txn ReadTxn) (iter.Seq2[Change[any], Revi
253252
}
254253

255254
func (it *changeIterator[Obj]) Close() {
256-
runtime.SetFinalizer(it, nil)
257-
it.close()
258-
}
259-
260-
func (it *changeIterator[Obj]) close() {
261255
it.iter = nil
262256
if it.dt != nil {
263257
it.dt.close()

reconciler/incremental.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,39 +45,42 @@ type opResult struct {
4545
id uint64 // the "pending" identifier
4646
}
4747

48-
func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, changed bool) {
48+
func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, retryLowWatermark statedb.Revision) {
4949
// Reconcile new and changed objects using either Operations
5050
// or BatchOperations.
5151
if incr.config.BatchOperations != nil {
52-
lastRev, changed = incr.batch(ctx, txn, changes)
52+
lastRev = incr.batch(ctx, txn, changes)
5353
} else {
54-
lastRev, changed = incr.single(ctx, txn, changes)
54+
lastRev = incr.single(ctx, txn, changes)
5555
}
5656

57+
// Commit status updates for new and changed objects.
58+
newErrors := incr.commitStatus()
59+
clear(incr.results)
60+
5761
// Process objects that need to be retried that were not cleared.
58-
incr.processRetries(ctx, txn)
62+
retryLowWatermark = incr.processRetries(ctx, txn)
5963

60-
// Finally commit the status updates.
61-
newErrors := incr.commitStatus()
64+
// Finally commit the status updates from retries.
65+
newErrors += incr.commitStatus()
6266

6367
// Since all failures are retried, we can return the errors from the retry
6468
// queue which includes both errors occurred in this round and the old
6569
// errors.
6670
errs = incr.retries.errors()
6771
incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs))
6872

69-
// Prepare for next round
73+
// Prepare for next round.
7074
incr.numReconciled = 0
7175
clear(incr.results)
7276

73-
return errs, lastRev, changed
77+
return errs, lastRev, retryLowWatermark
7478
}
7579

76-
func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) {
80+
func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) {
7781
// Iterate in revision order through new and changed objects.
7882
for change, rev := range changes {
7983
lastRev = rev
80-
changed = true
8184

8285
obj := change.Object
8386

@@ -102,14 +105,13 @@ func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, c
102105
return
103106
}
104107

105-
func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) {
108+
func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) {
106109
ops := incr.config.BatchOperations
107110
updateBatch := []BatchEntry[Obj]{}
108111
deleteBatch := []BatchEntry[Obj]{}
109112

110113
for change, rev := range changes {
111114
lastRev = rev
112-
changed = true
113115

114116
obj := change.Object
115117

@@ -152,7 +154,7 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch
152154
for _, entry := range deleteBatch {
153155
if entry.Result != nil {
154156
// Delete failed, queue a retry for it.
155-
incr.retries.Add(entry.original, entry.Revision, true, entry.Result)
157+
incr.retries.Add(entry.original, entry.Revision, entry.Revision, true, entry.Result)
156158
}
157159
}
158160
}
@@ -179,7 +181,7 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch
179181
return
180182
}
181183

182-
func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) {
184+
func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) statedb.Revision {
183185
now := time.Now()
184186
for incr.numReconciled < incr.config.IncrementalRoundSize {
185187
item, ok := incr.retries.Top()
@@ -190,6 +192,7 @@ func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.Re
190192
incr.processSingle(ctx, txn, item.object.(Obj), item.rev, item.delete)
191193
incr.numReconciled++
192194
}
195+
return incr.retries.LowWatermark()
193196
}
194197

195198
func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.ReadTxn, obj Obj, rev statedb.Revision, delete bool) {
@@ -204,7 +207,7 @@ func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.Rea
204207
err = incr.config.Operations.Delete(ctx, txn, rev, obj)
205208
if err != nil {
206209
// Deletion failed. Retry again later.
207-
incr.retries.Add(obj, rev, true, err)
210+
incr.retries.Add(obj, rev, rev, true, err)
208211
}
209212
} else {
210213
// Clone the object so it can be mutated by Update()
@@ -267,7 +270,7 @@ func (incr *incremental[Obj]) commitStatus() (numErrors int) {
267270
// Reconciliation of the object had failed and the status was updated
268271
// successfully (object had not changed). Queue the retry for the object.
269272
newRevision := incr.table.Revision(wtxn)
270-
incr.retries.Add(result.original.(Obj), newRevision, false, result.err)
273+
incr.retries.Add(result.original.(Obj), newRevision, result.rev, false, result.err)
271274
}
272275
}
273276
return

reconciler/progress.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ import (
1313
// progressTracker tracks the highest revision observed as reconciled and
1414
// allows callers to wait until a target revision is reached.
1515
type progressTracker struct {
16-
mu sync.Mutex
17-
revision statedb.Revision
18-
watch chan struct{}
16+
mu sync.Mutex
17+
revision statedb.Revision
18+
retryLowWatermark statedb.Revision
19+
watch chan struct{}
1920
}
2021

2122
func newProgressTracker() *progressTracker {
@@ -24,29 +25,38 @@ func newProgressTracker() *progressTracker {
2425
}
2526
}
2627

27-
func (p *progressTracker) update(rev statedb.Revision) {
28+
func (p *progressTracker) update(rev statedb.Revision, retryLowWatermark statedb.Revision) {
2829
p.mu.Lock()
30+
updated := false
2931
if rev > p.revision {
3032
p.revision = rev
33+
updated = true
34+
}
35+
if retryLowWatermark != p.retryLowWatermark {
36+
p.retryLowWatermark = retryLowWatermark
37+
updated = true
38+
}
39+
if updated {
3140
close(p.watch)
3241
p.watch = make(chan struct{})
3342
}
3443
p.mu.Unlock()
3544
}
3645

37-
func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, error) {
46+
func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, statedb.Revision, error) {
3847
for {
3948
p.mu.Lock()
4049
current := p.revision
50+
retryLowWatermark := p.retryLowWatermark
4151
watch := p.watch
4252
p.mu.Unlock()
4353

4454
if current >= rev {
45-
return current, nil
55+
return current, retryLowWatermark, nil
4656
}
4757
select {
4858
case <-ctx.Done():
49-
return current, ctx.Err()
59+
return current, retryLowWatermark, ctx.Err()
5060
case <-watch:
5161
}
5262
}

reconciler/progress_test.go

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package reconciler_test
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"iter"
1011
"log/slog"
1112
"sync"
13+
"sync/atomic"
1214
"testing"
1315
"testing/synctest"
1416
"time"
@@ -25,24 +27,25 @@ import (
2527

2628
type waitObject struct {
2729
ID uint64
30+
Fail *atomic.Bool
2831
Status reconciler.Status
2932
}
3033

3134
// TableHeader implements statedb.TableWritable.
32-
func (w waitObject) TableHeader() []string {
33-
return []string{"ID", "Status"}
35+
func (w *waitObject) TableHeader() []string {
36+
return []string{"ID", "Fail", "Status"}
3437
}
3538

3639
// TableRow implements statedb.TableWritable.
37-
func (w waitObject) TableRow() []string {
40+
func (w *waitObject) TableRow() []string {
41+
fail := w.Fail.Load()
3842
return []string{
3943
fmt.Sprintf("%d", w.ID),
44+
fmt.Sprintf("%t", fail),
4045
w.Status.String(),
4146
}
4247
}
4348

44-
var _ statedb.TableWritable = waitObject{}
45-
4649
func (w *waitObject) Clone() *waitObject {
4750
w2 := *w
4851
return &w2
@@ -69,14 +72,19 @@ var waitObjectIDIndex = statedb.Index[*waitObject, uint64]{
6972
type waitOps struct {
7073
started chan struct{}
7174
unblock chan struct{}
72-
startedOnce sync.Once
75+
markStarted func()
7376
}
7477

7578
func newWaitOps() *waitOps {
76-
return &waitOps{
79+
w := &waitOps{
7780
started: make(chan struct{}),
7881
unblock: make(chan struct{}),
7982
}
83+
84+
w.markStarted = sync.OnceFunc(func() {
85+
close(w.started)
86+
})
87+
return w
8088
}
8189

8290
// Delete implements reconciler.Operations.
@@ -91,9 +99,10 @@ func (*waitOps) Prune(context.Context, statedb.ReadTxn, iter.Seq2[*waitObject, s
9199

92100
// Update implements reconciler.Operations.
93101
func (w *waitOps) Update(ctx context.Context, txn statedb.ReadTxn, rev statedb.Revision, obj *waitObject) error {
94-
w.startedOnce.Do(func() {
95-
close(w.started)
96-
})
102+
w.markStarted()
103+
if obj.Fail.Load() {
104+
return errors.New("fail")
105+
}
97106
select {
98107
case <-w.unblock:
99108
return nil
@@ -140,6 +149,7 @@ func TestWaitUntilReconciled(t *testing.T) {
140149
ops,
141150
nil,
142151
reconciler.WithoutPruning(),
152+
reconciler.WithRetry(10*time.Millisecond, 10*time.Millisecond),
143153
)
144154
return err
145155
}),
@@ -156,32 +166,42 @@ func TestWaitUntilReconciled(t *testing.T) {
156166
defer cancel()
157167

158168
// Won't block if we query with 0 revision.
159-
_, err := r.WaitUntilReconciled(waitCtx, 0)
169+
_, retryRevision, err := r.WaitUntilReconciled(waitCtx, 0)
160170
require.NoError(t, err)
171+
require.Zero(t, retryRevision)
161172

162173
// Insert an object and wait for it to be reconciled.
163174
wtxn := db.WriteTxn(table)
164175
table.Insert(wtxn, &waitObject{
165176
ID: 1,
177+
Fail: new(atomic.Bool),
166178
Status: reconciler.StatusPending(),
167179
})
168180
revision := table.Revision(wtxn)
169181
wtxn.Commit()
170182

171183
type waitResult struct {
172-
rev statedb.Revision
173-
err error
184+
rev statedb.Revision
185+
retryRevision statedb.Revision
186+
err error
174187
}
175188
done := make(chan waitResult, 1)
176189
go func() {
177-
rev, err := r.WaitUntilReconciled(waitCtx, revision)
178-
done <- waitResult{rev: rev, err: err}
190+
rev, retryRevision, err := r.WaitUntilReconciled(waitCtx, revision)
191+
done <- waitResult{rev: rev, err: err, retryRevision: retryRevision}
179192
}()
180193

181-
synctest.Wait()
182-
select {
183-
case <-ops.started:
184-
default:
194+
started := false
195+
for !started {
196+
// Advance the fake time
197+
time.Sleep(50 * time.Millisecond)
198+
select {
199+
case <-ops.started:
200+
started = true
201+
default:
202+
}
203+
}
204+
if !started {
185205
t.Fatal("expected update to start")
186206
}
187207

@@ -192,14 +212,48 @@ func TestWaitUntilReconciled(t *testing.T) {
192212
}
193213

194214
close(ops.unblock)
215+
195216
synctest.Wait()
196217

197218
select {
198219
case result := <-done:
199220
require.NoError(t, result.err)
200-
require.Equal(t, revision, result.rev)
221+
require.Zero(t, result.retryRevision)
201222
default:
202223
t.Fatal("expected WaitUntilReconciled to complete")
203224
}
225+
226+
wtxn = db.WriteTxn(table)
227+
obj := &waitObject{
228+
ID: 2,
229+
Fail: new(atomic.Bool),
230+
Status: reconciler.StatusPending(),
231+
}
232+
obj.Fail.Store(true)
233+
table.Insert(wtxn, obj)
234+
235+
retryRevision = table.Revision(wtxn)
236+
wtxn.Commit()
237+
238+
synctest.Wait()
239+
240+
origRetryRevision := retryRevision
241+
rev, returnedRetryRevision, err := r.WaitUntilReconciled(waitCtx, origRetryRevision)
242+
require.NoError(t, err)
243+
require.Equal(t, origRetryRevision, rev)
244+
require.Equal(t, origRetryRevision, returnedRetryRevision)
245+
246+
obj.Fail.Store(false)
247+
// Advance the fake time enough that a retry has happened.
248+
time.Sleep(time.Second)
249+
250+
obj, newRev, ok := table.Get(db.ReadTxn(), waitObjectIDIndex.Query(2))
251+
require.True(t, ok && obj.Status.Kind == reconciler.StatusKindDone)
252+
253+
rev, returnedRetryRevision, err = r.WaitUntilReconciled(waitCtx, origRetryRevision)
254+
require.NoError(t, err)
255+
require.Equal(t, newRev, rev)
256+
require.Zero(t, returnedRetryRevision)
257+
204258
})
205259
}

0 commit comments

Comments
 (0)