Skip to content

Commit 2f16006

Browse files
committed
fix: buffered transactional writes in Spanner cannot be read
We therefore need to read them ourselves, manually, in the new schema writer
1 parent 918008e commit 2f16006

File tree

3 files changed

+120
-2
lines changed

3 files changed

+120
-2
lines changed

internal/datastore/spanner/caveat.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ func (rwt spannerReadWriteTXN) LegacyWriteCaveats(ctx context.Context, caveats [
107107
[]string{colName, colCaveatDefinition, colCaveatTS},
108108
[]any{caveat.Name, serialized, spanner.CommitTimestamp},
109109
))
110+
111+
// Track the buffered caveat write so we can return it from List methods
112+
// without attempting to read from Spanner (which doesn't see buffered writes)
113+
rwt.bufferedCaveats[caveat.Name] = caveat
114+
// Remove from deleted set in case it was previously deleted in this transaction
115+
delete(rwt.deletedCaveats, caveat.Name)
110116
}
111117

112118
if err := rwt.spannerRWT.BufferWrite(mutations); err != nil {
@@ -120,6 +126,9 @@ func (rwt spannerReadWriteTXN) LegacyDeleteCaveats(ctx context.Context, names []
120126
keys := make([]spanner.Key, 0, len(names))
121127
for _, n := range names {
122128
keys = append(keys, spanner.Key{n})
129+
// Remove from buffered caveats and mark as deleted so List methods won't return it
130+
delete(rwt.bufferedCaveats, n)
131+
rwt.deletedCaveats[n] = struct{}{}
123132
}
124133
err := rwt.spannerRWT.BufferWrite([]*spanner.Mutation{
125134
spanner.Delete(tableCaveat, spanner.KeySetFromKeys(keys...)),

internal/datastore/spanner/readwrite.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,104 @@ import (
3131
type spannerReadWriteTXN struct {
3232
spannerReader
3333
spannerRWT *spanner.ReadWriteTransaction
34+
35+
// IMPORTANT: Spanner Read-Write Transaction Limitation
36+
// =====================================================
37+
// In Cloud Spanner, reads within a read-write transaction do NOT see the effects of
38+
// buffered writes (mutations) performed earlier in that same transaction. This is because
39+
// writes are buffered locally at the client and are not sent to the server until commit.
40+
// This is a fundamental Spanner design, not an emulator limitation.
41+
//
42+
// To work around this, we track all schema writes and deletes in memory maps below.
43+
// When List methods are called, we merge buffered writes with committed data read from
44+
// Spanner, ensuring the legacy schema writer can correctly compute diffs without attempting
45+
// to read buffered writes from Spanner.
46+
47+
// bufferedNamespaces tracks namespaces written in this transaction
48+
bufferedNamespaces map[string]*core.NamespaceDefinition
49+
50+
// deletedNamespaces tracks namespaces deleted in this transaction
51+
deletedNamespaces map[string]struct{}
52+
53+
// bufferedCaveats tracks caveats written in this transaction
54+
bufferedCaveats map[string]*core.CaveatDefinition
55+
56+
// deletedCaveats tracks caveats deleted in this transaction
57+
deletedCaveats map[string]struct{}
3458
}
3559

3660
const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits
3761

62+
// LegacyListAllNamespaces reads namespaces from Spanner and merges them with any buffered writes.
63+
// This is necessary because in Spanner, buffered writes in a read-write transaction are not visible
64+
// to reads in the same transaction. The buffered map contains namespaces written in this transaction,
65+
// and the deleted map tracks namespaces deleted in this transaction.
66+
func (rwt *spannerReadWriteTXN) LegacyListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) {
67+
// First, read from Spanner (this will get committed data, not buffered writes)
68+
existing, err := rwt.spannerReader.LegacyListAllNamespaces(ctx)
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
// Build a map of existing namespaces by name, excluding deleted ones
74+
merged := make(map[string]datastore.RevisionedNamespace)
75+
for _, ns := range existing {
76+
if _, deleted := rwt.deletedNamespaces[ns.Definition.Name]; !deleted {
77+
merged[ns.Definition.Name] = ns
78+
}
79+
}
80+
81+
// Overlay buffered writes (these override anything read from Spanner)
82+
for name, def := range rwt.bufferedNamespaces {
83+
merged[name] = datastore.RevisionedNamespace{
84+
Definition: def,
85+
LastWrittenRevision: datastore.NoRevision, // Will be set on commit
86+
}
87+
}
88+
89+
// Convert map back to slice
90+
result := make([]datastore.RevisionedNamespace, 0, len(merged))
91+
for _, ns := range merged {
92+
result = append(result, ns)
93+
}
94+
95+
return result, nil
96+
}
97+
98+
// LegacyListAllCaveats reads caveats from Spanner and merges them with any buffered writes.
99+
// See LegacyListAllNamespaces for the rationale.
100+
func (rwt *spannerReadWriteTXN) LegacyListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) {
101+
// First, read from Spanner (this will get committed data, not buffered writes)
102+
existing, err := rwt.spannerReader.LegacyListAllCaveats(ctx)
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
// Build a map of existing caveats by name, excluding deleted ones
108+
merged := make(map[string]datastore.RevisionedCaveat)
109+
for _, caveat := range existing {
110+
if _, deleted := rwt.deletedCaveats[caveat.Definition.Name]; !deleted {
111+
merged[caveat.Definition.Name] = caveat
112+
}
113+
}
114+
115+
// Overlay buffered writes (these override anything read from Spanner)
116+
for name, def := range rwt.bufferedCaveats {
117+
merged[name] = datastore.RevisionedCaveat{
118+
Definition: def,
119+
LastWrittenRevision: datastore.NoRevision, // Will be set on commit
120+
}
121+
}
122+
123+
// Convert map back to slice
124+
result := make([]datastore.RevisionedCaveat, 0, len(merged))
125+
for _, caveat := range merged {
126+
result = append(result, caveat)
127+
}
128+
129+
return result, nil
130+
}
131+
38132
func (rwt *spannerReadWriteTXN) RegisterCounter(ctx context.Context, name string, filter *core.RelationshipFilter) error {
39133
// Ensure the counter doesn't already exist.
40134
counters, err := rwt.lookupCounters(ctx, name)
@@ -374,6 +468,12 @@ func (rwt *spannerReadWriteTXN) LegacyWriteNamespaces(ctx context.Context, newCo
374468
[]string{colNamespaceName, colNamespaceConfig, colTimestamp},
375469
[]any{newConfig.Name, serialized, spanner.CommitTimestamp},
376470
))
471+
472+
// Track the buffered namespace write so we can return it from List methods
473+
// without attempting to read from Spanner (which doesn't see buffered writes)
474+
rwt.bufferedNamespaces[newConfig.Name] = newConfig
475+
// Remove from deleted set in case it was previously deleted in this transaction
476+
delete(rwt.deletedNamespaces, newConfig.Name)
377477
}
378478

379479
if err := rwt.spannerRWT.BufferWrite(mutations); err != nil {
@@ -416,6 +516,10 @@ func (rwt *spannerReadWriteTXN) LegacyDeleteNamespaces(ctx context.Context, nsNa
416516
if err != nil {
417517
return fmt.Errorf(errUnableToDeleteConfig, err)
418518
}
519+
520+
// Remove from buffered namespaces and mark as deleted so List methods won't return it
521+
delete(rwt.bufferedNamespaces, nsName)
522+
rwt.deletedNamespaces[nsName] = struct{}{}
419523
}
420524

421525
return nil

internal/datastore/spanner/spanner.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/authzed/spicedb/internal/telemetry/otelconv"
3636
"github.com/authzed/spicedb/pkg/datastore"
3737
dsoptions "github.com/authzed/spicedb/pkg/datastore/options"
38+
core "github.com/authzed/spicedb/pkg/proto/core/v1"
3839
"github.com/authzed/spicedb/pkg/tuple"
3940
)
4041

@@ -411,8 +412,12 @@ func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUser
411412

412413
executor := common.QueryRelationshipsExecutor{Executor: queryExecutor(txSource)}
413414
rwt := &spannerReadWriteTXN{
414-
spannerReader{executor, txSource, sd.filterMaximumIDCount, sd.schema, sd.schemaMode, datastore.NoRevision, string(datastore.NoSchemaHashInTransaction), sd.schemaReaderWriter},
415-
spannerRWT,
415+
spannerReader: spannerReader{executor, txSource, sd.filterMaximumIDCount, sd.schema, sd.schemaMode, datastore.NoRevision, string(datastore.NoSchemaHashInTransaction), sd.schemaReaderWriter},
416+
spannerRWT: spannerRWT,
417+
bufferedNamespaces: make(map[string]*core.NamespaceDefinition),
418+
deletedNamespaces: make(map[string]struct{}),
419+
bufferedCaveats: make(map[string]*core.CaveatDefinition),
420+
deletedCaveats: make(map[string]struct{}),
416421
}
417422
err := func() error {
418423
innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc")

0 commit comments

Comments
 (0)