Skip to content

Commit 052baef

Browse files
committed
reconciler: Add WaitUntilReconciled method
Add `Reconciler.WaitUntilReconciled` method to allow waiting for the reconciler to catch up processing to a given revision. Example usage: ``` wtxn := db.WriteTxn(table) table.Insert(wtxn, &Obj{ID: 1, Status: reconciler.StatusPending()}) table.Insert(wtxn, &Obj{ID: 2, Status: reconciler.StatusPending()}) table.Insert(wtxn, &Obj{ID: 3, Status: reconciler.StatusPending()}) revToWaitFor := table.Revision(wtxn) wtxn.Commit() // Block until reconciler has catched up to [revToWaitFor] or [ctx] // is cancelled. myReconciler.WaitUntilReconciled(ctx, revToWaitFor) ``` Signed-off-by: Jussi Maki <jussi.maki@isovalent.com>
1 parent aa29fe7 commit 052baef

File tree

6 files changed

+291
-8
lines changed

6 files changed

+291
-8
lines changed

reconciler/builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func Register[Obj comparable](
7878
retries: newRetries(cfg.RetryBackoffMinDuration, cfg.RetryBackoffMaxDuration, objectToKey),
7979
externalPruneTrigger: make(chan struct{}, 1),
8080
primaryIndexer: idx,
81+
progress: newProgressTracker(),
8182
}
8283

8384
params.JobGroup.Add(job.OneShot("reconcile", r.reconcileLoop))

reconciler/incremental.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ type opResult struct {
4545
id uint64 // the "pending" identifier
4646
}
4747

48-
func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error {
48+
func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, changed bool) {
4949
// Reconcile new and changed objects using either Operations
5050
// or BatchOperations.
5151
if incr.config.BatchOperations != nil {
52-
incr.batch(ctx, txn, changes)
52+
lastRev, changed = incr.batch(ctx, txn, changes)
5353
} else {
54-
incr.single(ctx, txn, changes)
54+
lastRev, changed = incr.single(ctx, txn, changes)
5555
}
5656

5757
// Process objects that need to be retried that were not cleared.
@@ -63,19 +63,22 @@ func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, chan
6363
// Since all failures are retried, we can return the errors from the retry
6464
// queue which includes both errors occurred in this round and the old
6565
// errors.
66-
errs := incr.retries.errors()
66+
errs = incr.retries.errors()
6767
incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs))
6868

6969
// Prepare for next round
7070
incr.numReconciled = 0
7171
clear(incr.results)
7272

73-
return errs
73+
return errs, lastRev, changed
7474
}
7575

76-
func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
76+
func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) {
7777
// Iterate in revision order through new and changed objects.
7878
for change, rev := range changes {
79+
lastRev = rev
80+
changed = true
81+
7982
obj := change.Object
8083

8184
status := incr.config.GetObjectStatus(obj)
@@ -95,14 +98,19 @@ func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, c
9598
break
9699
}
97100
}
101+
102+
return
98103
}
99104

100-
func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
105+
func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) {
101106
ops := incr.config.BatchOperations
102107
updateBatch := []BatchEntry[Obj]{}
103108
deleteBatch := []BatchEntry[Obj]{}
104109

105110
for change, rev := range changes {
111+
lastRev = rev
112+
changed = true
113+
106114
obj := change.Object
107115

108116
status := incr.config.GetObjectStatus(obj)
@@ -167,6 +175,8 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch
167175
incr.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original}
168176
}
169177
}
178+
179+
return
170180
}
171181

172182
func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) {

reconciler/progress.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package reconciler
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"github.com/cilium/statedb"
11+
)
12+
13+
// progressTracker tracks the highest revision observed as reconciled and
14+
// allows callers to wait until a target revision is reached.
15+
type progressTracker struct {
16+
mu sync.Mutex
17+
revision statedb.Revision
18+
watch chan struct{}
19+
}
20+
21+
func newProgressTracker() *progressTracker {
22+
return &progressTracker{
23+
watch: make(chan struct{}),
24+
}
25+
}
26+
27+
func (p *progressTracker) update(rev statedb.Revision) {
28+
p.mu.Lock()
29+
if rev > p.revision {
30+
p.revision = rev
31+
close(p.watch)
32+
p.watch = make(chan struct{})
33+
}
34+
p.mu.Unlock()
35+
}
36+
37+
func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, error) {
38+
for {
39+
p.mu.Lock()
40+
current := p.revision
41+
watch := p.watch
42+
p.mu.Unlock()
43+
44+
if current >= rev {
45+
return current, nil
46+
}
47+
select {
48+
case <-ctx.Done():
49+
return current, ctx.Err()
50+
case <-watch:
51+
}
52+
}
53+
}

reconciler/progress_test.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package reconciler_test
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"iter"
10+
"log/slog"
11+
"sync"
12+
"testing"
13+
"testing/synctest"
14+
"time"
15+
16+
"github.com/cilium/hive"
17+
"github.com/cilium/hive/cell"
18+
"github.com/cilium/hive/hivetest"
19+
"github.com/cilium/hive/job"
20+
"github.com/cilium/statedb"
21+
"github.com/cilium/statedb/index"
22+
"github.com/cilium/statedb/reconciler"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
type waitObject struct {
27+
ID uint64
28+
Status reconciler.Status
29+
}
30+
31+
// TableHeader implements statedb.TableWritable.
32+
func (w waitObject) TableHeader() []string {
33+
return []string{"ID", "Status"}
34+
}
35+
36+
// TableRow implements statedb.TableWritable.
37+
func (w waitObject) TableRow() []string {
38+
return []string{
39+
fmt.Sprintf("%d", w.ID),
40+
w.Status.String(),
41+
}
42+
}
43+
44+
var _ statedb.TableWritable = waitObject{}
45+
46+
func (w *waitObject) Clone() *waitObject {
47+
w2 := *w
48+
return &w2
49+
}
50+
51+
func (w *waitObject) GetStatus() reconciler.Status {
52+
return w.Status
53+
}
54+
55+
func (w *waitObject) SetStatus(status reconciler.Status) *waitObject {
56+
w.Status = status
57+
return w
58+
}
59+
60+
var waitObjectIDIndex = statedb.Index[*waitObject, uint64]{
61+
Name: "id",
62+
FromObject: func(obj *waitObject) index.KeySet {
63+
return index.NewKeySet(index.Uint64(obj.ID))
64+
},
65+
FromKey: index.Uint64,
66+
Unique: true,
67+
}
68+
69+
type waitOps struct {
70+
started chan struct{}
71+
unblock chan struct{}
72+
startedOnce sync.Once
73+
}
74+
75+
func newWaitOps() *waitOps {
76+
return &waitOps{
77+
started: make(chan struct{}),
78+
unblock: make(chan struct{}),
79+
}
80+
}
81+
82+
// Delete implements reconciler.Operations.
83+
func (*waitOps) Delete(context.Context, statedb.ReadTxn, statedb.Revision, *waitObject) error {
84+
return nil
85+
}
86+
87+
// Prune implements reconciler.Operations.
88+
func (*waitOps) Prune(context.Context, statedb.ReadTxn, iter.Seq2[*waitObject, statedb.Revision]) error {
89+
return nil
90+
}
91+
92+
// Update implements reconciler.Operations.
93+
func (w *waitOps) Update(ctx context.Context, txn statedb.ReadTxn, rev statedb.Revision, obj *waitObject) error {
94+
w.startedOnce.Do(func() {
95+
close(w.started)
96+
})
97+
select {
98+
case <-w.unblock:
99+
return nil
100+
case <-ctx.Done():
101+
return ctx.Err()
102+
}
103+
}
104+
105+
var _ reconciler.Operations[*waitObject] = &waitOps{}
106+
107+
func TestWaitUntilReconciled(t *testing.T) {
108+
synctest.Test(t, func(t *testing.T) {
109+
var (
110+
table statedb.RWTable[*waitObject]
111+
db *statedb.DB
112+
r reconciler.Reconciler[*waitObject]
113+
)
114+
ops := newWaitOps()
115+
116+
hive := hive.New(
117+
statedb.Cell,
118+
job.Cell,
119+
cell.Provide(
120+
cell.NewSimpleHealth,
121+
reconciler.NewExpVarMetrics,
122+
func(r job.Registry, h cell.Health, lc cell.Lifecycle) job.Group {
123+
return r.NewGroup(h, lc)
124+
},
125+
),
126+
cell.Invoke(func(db_ *statedb.DB) (err error) {
127+
db = db_
128+
table, err = statedb.NewTable(db, "wait-objects", waitObjectIDIndex)
129+
return err
130+
}),
131+
cell.Module("test", "test",
132+
cell.Invoke(func(params reconciler.Params) error {
133+
var err error
134+
r, err = reconciler.Register(
135+
params,
136+
table,
137+
(*waitObject).Clone,
138+
(*waitObject).SetStatus,
139+
(*waitObject).GetStatus,
140+
ops,
141+
nil,
142+
reconciler.WithoutPruning(),
143+
)
144+
return err
145+
}),
146+
),
147+
)
148+
149+
log := hivetest.Logger(t, hivetest.LogLevel(slog.LevelError))
150+
require.NoError(t, hive.Start(log, context.TODO()), "Start")
151+
defer func() {
152+
require.NoError(t, hive.Stop(log, context.TODO()), "Stop")
153+
}()
154+
155+
waitCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
156+
defer cancel()
157+
158+
// Won't block if we query with 0 revision.
159+
_, err := r.WaitUntilReconciled(waitCtx, 0)
160+
require.NoError(t, err)
161+
162+
// Insert an object and wait for it to be reconciled.
163+
wtxn := db.WriteTxn(table)
164+
table.Insert(wtxn, &waitObject{
165+
ID: 1,
166+
Status: reconciler.StatusPending(),
167+
})
168+
revision := table.Revision(wtxn)
169+
wtxn.Commit()
170+
171+
type waitResult struct {
172+
rev statedb.Revision
173+
err error
174+
}
175+
done := make(chan waitResult, 1)
176+
go func() {
177+
rev, err := r.WaitUntilReconciled(waitCtx, revision)
178+
done <- waitResult{rev: rev, err: err}
179+
}()
180+
181+
synctest.Wait()
182+
select {
183+
case <-ops.started:
184+
default:
185+
t.Fatal("expected update to start")
186+
}
187+
188+
select {
189+
case result := <-done:
190+
t.Fatalf("WaitUntilReconciled returned early: %v", result.err)
191+
default:
192+
}
193+
194+
close(ops.unblock)
195+
synctest.Wait()
196+
197+
select {
198+
case result := <-done:
199+
require.NoError(t, result.err)
200+
require.Equal(t, revision, result.rev)
201+
default:
202+
t.Fatal("expected WaitUntilReconciled to complete")
203+
}
204+
})
205+
}

reconciler/reconciler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type reconciler[Obj comparable] struct {
1919
retries *retries
2020
externalPruneTrigger chan struct{}
2121
primaryIndexer statedb.Indexer[Obj]
22+
progress *progressTracker
2223
}
2324

2425
func (r *reconciler[Obj]) Prune() {
@@ -28,6 +29,10 @@ func (r *reconciler[Obj]) Prune() {
2829
}
2930
}
3031

32+
func (r *reconciler[Obj]) WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, error) {
33+
return r.progress.wait(ctx, untilRevision)
34+
}
35+
3136
func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) error {
3237
var pruneTickerChan <-chan time.Time
3338
if r.config.PruneInterval > 0 {
@@ -101,7 +106,10 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health)
101106

102107
// Perform incremental reconciliation and retries of previously failed
103108
// objects.
104-
errs := incremental.run(ctx, txn, changes)
109+
errs, lastRevision, changed := incremental.run(ctx, txn, changes)
110+
if changed {
111+
r.progress.update(lastRevision)
112+
}
105113

106114
if tableInitialized && (prune || externalPrune) {
107115
if err := r.prune(ctx, txn); err != nil {

reconciler/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ type Reconciler[Obj any] interface {
3434
// that something has gone wrong in the reconciliation target and full
3535
// reconciliation is needed to recover.
3636
Prune()
37+
38+
// WaitUntilReconciled blocks until the reconciler has processed all
39+
// table changes up to untilRevision. Returns the latest processed
40+
// revision and ctx.Err() if the context is cancelled.
41+
// Note: errors from Update/Delete are treated as reconciled.
42+
WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, error)
3743
}
3844

3945
// Params are the reconciler dependencies that are independent of the

0 commit comments

Comments
 (0)