Skip to content

Commit 7be5a96

Browse files
authored
fix(storage): share aloneInBucket flag per bucket via atomic.Bool in Factory (#1253)
The previous per-store bool meant that when a second ledger was created in the same bucket, existing stores kept isAloneInBucket=true and skipped the WHERE ledger=? predicate, causing cross-ledger data leaks in queries (e.g. TestAccountsList). Replace the private bool with a *atomic.Bool shared across all stores of the same bucket through the DefaultFactory. Any call to SetAloneInBucket now immediately propagates to every store in that bucket.
1 parent 3f3066c commit 7be5a96

File tree

5 files changed

+87
-22
lines changed

5 files changed

+87
-22
lines changed

internal/storage/driver/driver.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
7676
}
7777
}
7878

79+
count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
80+
if err != nil {
81+
return fmt.Errorf("counting ledgers in bucket: %w", err)
82+
}
83+
7984
ret = d.ledgerStoreFactory.Create(b, *l)
85+
ret.SetAloneInBucket(count == 1)
8086

8187
return nil
8288
})
@@ -89,12 +95,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
8995

9096
func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
9197
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
92-
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
98+
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
99+
// so all stores in the same bucket see updates immediately.
100+
systemStore := d.systemStoreFactory.Create(d.db)
101+
102+
ret, err := systemStore.GetLedger(ctx, name)
93103
if err != nil {
94104
return nil, nil, err
95105
}
96106

107+
count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
108+
if err != nil {
109+
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
110+
}
111+
97112
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
113+
store.SetAloneInBucket(count == 1)
98114

99115
return store, ret, err
100116
}

internal/storage/driver/system_generated_test.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/ledger/factory.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package ledger
22

33
import (
4+
"sync"
5+
"sync/atomic"
6+
47
ledger "github.com/formancehq/ledger/internal"
58
"github.com/formancehq/ledger/internal/storage/bucket"
69
"github.com/uptrace/bun"
@@ -13,15 +16,30 @@ type Factory interface {
1316
type DefaultFactory struct {
1417
db *bun.DB
1518
options []Option
19+
20+
mu sync.Mutex
21+
bucketFlags map[string]*atomic.Bool
1622
}
1723

1824
func NewFactory(db *bun.DB, options ...Option) *DefaultFactory {
1925
return &DefaultFactory{
20-
db: db,
21-
options: options,
26+
db: db,
27+
options: options,
28+
bucketFlags: make(map[string]*atomic.Bool),
2229
}
2330
}
2431

2532
func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store {
26-
return New(d.db, b, l, d.options...)
33+
d.mu.Lock()
34+
flag, ok := d.bucketFlags[l.Bucket]
35+
if !ok {
36+
flag = &atomic.Bool{}
37+
d.bucketFlags[l.Bucket] = flag
38+
}
39+
d.mu.Unlock()
40+
41+
store := New(d.db, b, l, d.options...)
42+
store.aloneInBucket = flag
43+
44+
return store
2745
}

internal/storage/ledger/store.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"sync/atomic"
78

89
"github.com/formancehq/go-libs/v2/bun/bunpaginate"
910
"github.com/formancehq/go-libs/v2/migrations"
@@ -27,6 +28,12 @@ type Store struct {
2728
bucket bucket.Bucket
2829
ledger ledger.Ledger
2930

31+
// aloneInBucket is a shared optimization hint (per bucket) indicating whether
32+
// this ledger is the only one in its bucket. The pointer is shared across all
33+
// stores in the same bucket via the Factory, so updating it from any store
34+
// (e.g. when a new ledger is created) immediately affects all stores.
35+
aloneInBucket *atomic.Bool
36+
3037
tracer trace.Tracer
3138
meter metric.Meter
3239
checkBucketSchemaHistogram metric.Int64Histogram
@@ -189,26 +196,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
189196
}
190197

191198
// newScopedSelect creates a new select query scoped to the current ledger.
192-
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
193-
// To avoid that, we use a WHERE OR to separate the two cases:
194-
// 1. Check if the ledger is the only one in the bucket
195-
// 2. Otherwise, filter by ledger name
199+
// When the ledger is alone in its bucket, we skip the WHERE clause to avoid
200+
// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger
201+
// name to use the composite index (ledger, id) efficiently.
202+
//
203+
// This relies on aloneInBucket being up to date (shared across the bucket).
196204
func (store *Store) newScopedSelect() *bun.SelectQuery {
197205
q := store.db.NewSelect()
198-
checkLedgerAlone := store.db.NewSelect().
199-
TableExpr("_system.ledgers").
200-
ColumnExpr("count = 1").
201-
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
202-
store.db.NewSelect().
203-
TableExpr("_system.ledgers").
204-
ColumnExpr("bucket").
205-
ColumnExpr("COUNT(*) AS count").
206-
Group("bucket"),
207-
).
208-
Where("_system.ledgers.name = ?", store.ledger.Name)
209-
210-
return q.
211-
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
206+
if store.aloneInBucket == nil || !store.aloneInBucket.Load() {
207+
q = q.Where("ledger = ?", store.ledger.Name)
208+
}
209+
return q
210+
}
211+
212+
func (store *Store) SetAloneInBucket(alone bool) {
213+
if store.aloneInBucket != nil {
214+
store.aloneInBucket.Store(alone)
215+
}
212216
}
213217

214218
func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {

internal/storage/system/store.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Store interface {
2424
ListLedgers(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
2525
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
2626
GetDistinctBuckets(ctx context.Context) ([]string, error)
27+
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)
2728

2829
Migrate(ctx context.Context, options ...migrations.Option) error
2930
GetMigrator(options ...migrations.Option) *migrations.Migrator
@@ -57,6 +58,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error)
5758
return buckets, nil
5859
}
5960

61+
func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) {
62+
count, err := d.db.NewSelect().
63+
Model(&ledger.Ledger{}).
64+
Where("bucket = ?", bucket).
65+
Count(ctx)
66+
if err != nil {
67+
return 0, postgres.ResolveError(err)
68+
}
69+
return count, nil
70+
}
71+
6072
func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error {
6173

6274
if l.Metadata == nil {

0 commit comments

Comments
 (0)