Skip to content

Commit b594c3b

Browse files
committed
statedb: Restore ability to use Changes with WriteTxn
The index refactoring in cd27022 removed the ability to use ChangeIterator.Next with a WriteTxn targeting the table being observed. This was done to avoid having to hold onto the old instance of the tableEntry, but it broke a useful pattern for Changes(): the ability for a component A to write into a table and component B to be able to observe and "augment" the objects created by A. Restore this ability to by keeping a pointer to the old root in the write transaction. Before: BenchmarkDB_WriteTxn_CommitOnly_100Tables-8 1428603 838.6 ns/op 1112 B/op 5 allocs/op BenchmarkDB_WriteTxn_CommitOnly_1Table-8 2391542 503.3 ns/op 224 B/op 5 allocs/op BenchmarkDB_NewWriteTxn-8 2607277 458.1 ns/op 200 B/op 4 allocs/op BenchmarkDB_WriteTxnCommit100-8 1455978 823.8 ns/op 1096 B/op 5 allocs/op After: BenchmarkDB_WriteTxn_CommitOnly_100Tables-8 1239177 962.4 ns/op 1112 B/op 5 allocs/op BenchmarkDB_WriteTxn_CommitOnly_1Table-8 2332510 515.2 ns/op 224 B/op 5 allocs/op BenchmarkDB_NewWriteTxn-8 2566347 468.2 ns/op 200 B/op 4 allocs/op BenchmarkDB_WriteTxnCommit100-8 1452818 892.0 ns/op 1096 B/op 5 allocs/op No practical difference since we keep a pool for writeTxnHandle and thus don't really allocate more memory even though the writeTxnHandle is larger now. The impact this may have is to workloads that have a huge table and churn through all objects and now WriteTxn holds onto both the old root and the new root being prepared and thus do not allow GC to collect old objects. This however seems unlikely to be an issue since we do hold onto the old root via [DB] and only way for constructing lots of potentially garbage objects is to have two tables churning and have the two WriteTxn's hold onto "garbage" of the other table that is no longer reachable via [DB.ReadTxn]. Signed-off-by: Jussi Maki <jussi.maki@isovalent.com>
1 parent 591cec8 commit b594c3b

File tree

6 files changed

+57
-3
lines changed

6 files changed

+57
-3
lines changed

db.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ func (db *DB) WriteTxn(tables ...TableMeta) WriteTxn {
205205
txn.smus.Lock()
206206
acquiredAt := time.Now()
207207

208-
txn.tableEntries = slices.Clone(*db.root.Load())
208+
txn.oldRoot = db.root.Load()
209+
210+
// Clone the root. This new allocation will become the new root when
211+
// we commit.
212+
txn.tableEntries = slices.Clone(*txn.oldRoot)
213+
209214
txn.handle = db.handleName
210215
txn.acquiredAt = acquiredAt
211216

db_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,29 @@ func TestDB_Changes(t *testing.T) {
592592

593593
assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
594594
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")
595+
596+
// Create another iterator and test observing changes using a WriteTxn
597+
// that is mutating the table. This will observe the changes up to the
598+
// point WriteTxn() was called, but not changes made in the WriteTxn.
599+
wtxn = db.WriteTxn(table)
600+
iter3, err := table.Changes(wtxn)
601+
require.NoError(t, err, "failed to create ChangeIterator")
602+
_, _, err = table.Insert(wtxn, &testObject{ID: 1})
603+
require.NoError(t, err, "Insert failed")
604+
wtxn.Commit()
605+
606+
wtxn = db.WriteTxn(table)
607+
_, _, err = table.Insert(wtxn, &testObject{ID: 2})
608+
require.NoError(t, err, "Insert failed")
609+
changes, _ = iter3.Next(wtxn)
610+
// We don't observe the insert of ID 2
611+
count = 0
612+
for change := range changes {
613+
require.EqualValues(t, 1, change.Object.ID)
614+
count++
615+
}
616+
require.Equal(t, 1, count)
617+
wtxn.Abort()
595618
}
596619

597620
func TestDB_Observable(t *testing.T) {

iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ type changeIterator[Obj any] struct {
166166
}
167167

168168
func (it *changeIterator[Obj]) refresh(txn ReadTxn) {
169-
tableEntry := txn.root()[it.table.tablePos()]
169+
tableEntry := txn.committedRoot()[it.table.tablePos()]
170170
if it.iter != nil && tableEntry.locked {
171171
var obj Obj
172172
panic(fmt.Sprintf("Table[%T].Changes().Next() called with the target table locked. This is not supported.", obj))

read_txn.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ func (r *readTxn) root() dbRoot {
3636
return dbRoot(*r)
3737
}
3838

39+
// committedRoot implements ReadTxn.
40+
func (r *readTxn) committedRoot() dbRoot {
41+
return dbRoot(*r)
42+
}
43+
3944
// WriteJSON marshals out the database as JSON into the given writer.
4045
// If tables are given then only these tables are written.
4146
func (r *readTxn) WriteJSON(w io.Writer, tables ...string) error {

types.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ type Table[Obj any] interface {
7575
//
7676
// If an object is created and deleted before the observer has iterated
7777
// over the creation then only the deletion is seen.
78+
//
79+
// If [ChangeIterator.Next] is called with a [WriteTxn] targeting the
80+
// table being observed then only the changes prior to that [WriteTxn]
81+
// are observed.
7882
Changes(WriteTxn) (ChangeIterator[Obj], error)
7983
}
8084

@@ -108,7 +112,9 @@ type ChangeIterator[Obj any] interface {
108112
// The returned sequence is a single-use sequence and subsequent calls will return
109113
// an empty sequence.
110114
//
111-
// Next will panic if called with a WriteTxn that has locked the target table.
115+
// If Next is called with a [WriteTxn] targeting the table being observed then only
116+
// the changes made prior to that [WriteTxn] are observed, e.g. we can only observe
117+
// committed changes.
112118
Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{})
113119

114120
// Close the change iterator. Once all change iterators for a given table are closed
@@ -254,8 +260,17 @@ type ReadTxn interface {
254260
indexReadTxn(meta TableMeta, indexPos int) (tableIndexReader, error)
255261
mustIndexReadTxn(meta TableMeta, indexPos int) tableIndexReader
256262
getTableEntry(meta TableMeta) *tableEntry
263+
264+
// root returns the database root. If this is a WriteTxn it returns
265+
// the current modified root.
257266
root() dbRoot
258267

268+
// committedRoot returns the committed database root. If this is a
269+
// WriteTxn it returns the root snapshotted at the time the WriteTxn
270+
// was constructed and thus does not reflect any changes made in the
271+
// transaction.
272+
committedRoot() dbRoot
273+
259274
// WriteJSON writes the contents of the database as JSON.
260275
WriteJSON(w io.Writer, tables ...string) error
261276
}

write_txn.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type writeTxnState struct {
3232
acquiredAt time.Time // the time at which the transaction acquired the locks
3333
duration atomic.Uint64 // the transaction duration after it finished
3434

35+
oldRoot *dbRoot // snapshot of the root at the time WriteTxn was called
3536
tableEntries []*tableEntry // table entries being modified
3637
numTxns int // number of index transactions opened
3738
smus internal.SortableMutexes // the (sorted) table locks
@@ -44,6 +45,10 @@ func (txn *writeTxnState) unwrap() *writeTxnState {
4445
return txn
4546
}
4647

48+
func (txn *writeTxnState) committedRoot() dbRoot {
49+
return *txn.oldRoot
50+
}
51+
4752
func (txn *writeTxnState) root() dbRoot {
4853
return txn.tableEntries
4954
}
@@ -286,6 +291,7 @@ func (txn *writeTxnState) delete(meta TableMeta, guardRevision Revision, data an
286291
// and returns it to the pool.
287292
func (handle *writeTxnHandle) returnToPool() {
288293
txn := handle.writeTxnState
294+
txn.oldRoot = nil
289295
txn.tableEntries = nil
290296
txn.numTxns = 0
291297
clear(txn.smus)

0 commit comments

Comments
 (0)