Skip to content

Commit aa29fe7

Browse files
committed
statedb: Add ChangeIterator.Close method
If we use 'synctest' then we can't close channels outside the synctest bubble. The Changes() method creates a `*changeIterator` and registers a finalizer for it that unregisters the delete tracker from the table. As the delete trackers are stored in a 'part.Map` there's a watch channel that gets closed and this triggers a panic if this happens in a synctest. To avoid this issue add a 'Close()' method to the 'ChangeIterator' interface to allow optionally closing the iterator and avoiding the finalizer. Signed-off-by: Jussi Maki <jussi.maki@isovalent.com>
1 parent 145a595 commit aa29fe7

File tree

5 files changed

+16
-0
lines changed

5 files changed

+16
-0
lines changed

derive.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
7474
if err != nil {
7575
return err
7676
}
77+
defer iter.Close()
78+
7779
for {
7880
wtxn := d.DB.WriteTxn(out)
7981
changes, watch := iter.Next(wtxn)

iterator.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package statedb
66
import (
77
"fmt"
88
"iter"
9+
"runtime"
910
"slices"
1011

1112
"github.com/cilium/statedb/index"
@@ -251,6 +252,11 @@ func (it *changeIterator[Obj]) nextAny(txn ReadTxn) (iter.Seq2[Change[any], Revi
251252
}, watch
252253
}
253254

255+
func (it *changeIterator[Obj]) Close() {
256+
runtime.SetFinalizer(it, nil)
257+
it.close()
258+
}
259+
254260
func (it *changeIterator[Obj]) close() {
255261
it.iter = nil
256262
if it.dt != nil {

observable.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func (to *observable[Obj]) Observe(ctx context.Context, next func(Change[Obj]),
3232
complete(err)
3333
return
3434
}
35+
defer iter.Close()
3536
defer complete(nil)
3637

3738
for {

reconciler/reconciler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health)
4343
if err != nil {
4444
return fmt.Errorf("watching for changes failed: %w", err)
4545
}
46+
defer changeIterator.Close()
4647

4748
tableWatchChan := closedWatchChannel
4849

types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ type ChangeIterator[Obj any] interface {
110110
//
111111
// Next will panic if called with a WriteTxn that has locked the target table.
112112
Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{})
113+
114+
// Close the change iterator. Once all change iterators for a given table are closed
115+
// deleted objects for that table are no longer set aside for the change iterators.
116+
//
117+
// Calling this method is optional as each iterator has a finalizer that closes it.
118+
Close()
113119
}
114120

115121
// RWTable provides methods for modifying the table under a write transaction

0 commit comments

Comments
 (0)