Skip to content

Commit 526163a

Browse files
authored
fix(storage): share aloneInBucket flag per bucket via atomic.Bool in Factory (#1254)
The previous subquery-based approach in newScopedSelect executed a COUNT(*) on _system.ledgers for every scoped query. Replace it with a cached *atomic.Bool per bucket, shared across all stores through the DefaultFactory. When a new ledger is created, SetAloneInBucket(false) immediately propagates to every store in that bucket, ensuring the WHERE ledger=? predicate is never incorrectly skipped. Also adds CountLedgersInBucket to the system store interface so CreateLedger and OpenLedger can seed the flag correctly.
1 parent 42bbfb5 commit 526163a

File tree

6 files changed

+89
-22
lines changed

6 files changed

+89
-22
lines changed

internal/storage/driver/driver.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
7373
}
7474
}
7575

76+
count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
77+
if err != nil {
78+
return fmt.Errorf("counting ledgers in bucket: %w", err)
79+
}
80+
7681
ret = d.ledgerStoreFactory.Create(b, *l)
82+
ret.SetAloneInBucket(count == 1)
7783

7884
return nil
7985
})
@@ -86,12 +92,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
8692

8793
func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
8894
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
89-
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
95+
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
96+
// so all stores in the same bucket see updates immediately.
97+
systemStore := d.systemStoreFactory.Create(d.db)
98+
99+
ret, err := systemStore.GetLedger(ctx, name)
90100
if err != nil {
91101
return nil, nil, err
92102
}
93103

104+
count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
105+
if err != nil {
106+
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
107+
}
108+
94109
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
110+
store.SetAloneInBucket(count == 1)
95111

96112
return store, ret, err
97113
}

internal/storage/driver/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type SystemStore interface {
1616
//ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
1717
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
1818
GetDistinctBuckets(ctx context.Context) ([]string, error)
19+
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)
1920

2021
Migrate(ctx context.Context, options ...migrations.Option) error
2122
GetMigrator(options ...migrations.Option) *migrations.Migrator

internal/storage/driver/store_generated_test.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/ledger/factory.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package ledger
22

33
import (
4+
"sync"
5+
"sync/atomic"
6+
47
ledger "github.com/formancehq/ledger/internal"
58
"github.com/formancehq/ledger/internal/storage/bucket"
69
"github.com/uptrace/bun"
@@ -13,15 +16,30 @@ type Factory interface {
1316
type DefaultFactory struct {
1417
db *bun.DB
1518
options []Option
19+
20+
mu sync.Mutex
21+
bucketFlags map[string]*atomic.Bool
1622
}
1723

1824
func NewFactory(db *bun.DB, options ...Option) *DefaultFactory {
1925
return &DefaultFactory{
20-
db: db,
21-
options: options,
26+
db: db,
27+
options: options,
28+
bucketFlags: make(map[string]*atomic.Bool),
2229
}
2330
}
2431

2532
func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store {
26-
return New(d.db, b, l, d.options...)
33+
d.mu.Lock()
34+
flag, ok := d.bucketFlags[l.Bucket]
35+
if !ok {
36+
flag = &atomic.Bool{}
37+
d.bucketFlags[l.Bucket] = flag
38+
}
39+
d.mu.Unlock()
40+
41+
store := New(d.db, b, l, d.options...)
42+
store.aloneInBucket = flag
43+
44+
return store
2745
}

internal/storage/ledger/store.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"sync/atomic"
8+
79
"github.com/formancehq/go-libs/v3/bun/bunpaginate"
810
"github.com/formancehq/go-libs/v3/migrations"
911
"github.com/formancehq/go-libs/v3/platform/postgres"
@@ -24,6 +26,12 @@ type Store struct {
2426
bucket bucket.Bucket
2527
ledger ledger.Ledger
2628

29+
// aloneInBucket is a shared optimization hint (per bucket) indicating whether
30+
// this ledger is the only one in its bucket. The pointer is shared across all
31+
// stores in the same bucket via the Factory, so updating it from any store
32+
// (e.g. when a new ledger is created) immediately affects all stores.
33+
aloneInBucket *atomic.Bool
34+
2735
tracer trace.Tracer
2836
meter metric.Meter
2937
checkBucketSchemaHistogram metric.Int64Histogram
@@ -164,26 +172,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
164172
}
165173

166174
// newScopedSelect creates a new select query scoped to the current ledger.
167-
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
168-
// To avoid that, we use a WHERE OR to separate the two cases:
169-
// 1. Check if the ledger is the only one in the bucket
170-
// 2. Otherwise, filter by ledger name
175+
// When the ledger is alone in its bucket, we skip the WHERE clause to avoid
176+
// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger
177+
// name to use the composite index (ledger, id) efficiently.
178+
//
179+
// This relies on aloneInBucket being up to date (shared across the bucket).
171180
func (store *Store) newScopedSelect() *bun.SelectQuery {
172181
q := store.db.NewSelect()
173-
checkLedgerAlone := store.db.NewSelect().
174-
TableExpr("_system.ledgers").
175-
ColumnExpr("count = 1").
176-
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
177-
store.db.NewSelect().
178-
TableExpr("_system.ledgers").
179-
ColumnExpr("bucket").
180-
ColumnExpr("COUNT(*) AS count").
181-
Group("bucket"),
182-
).
183-
Where("_system.ledgers.name = ?", store.ledger.Name)
184-
185-
return q.
186-
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
182+
if store.aloneInBucket == nil || !store.aloneInBucket.Load() {
183+
q = q.Where("ledger = ?", store.ledger.Name)
184+
}
185+
return q
186+
}
187+
188+
func (store *Store) SetAloneInBucket(alone bool) {
189+
if store.aloneInBucket != nil {
190+
store.aloneInBucket.Store(alone)
191+
}
187192
}
188193

189194
func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {

internal/storage/system/store.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Store interface {
2323
Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload]
2424
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
2525
GetDistinctBuckets(ctx context.Context) ([]string, error)
26+
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)
2627

2728
Migrate(ctx context.Context, options ...migrations.Option) error
2829
GetMigrator(options ...migrations.Option) *migrations.Migrator
@@ -60,6 +61,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error)
6061
return buckets, nil
6162
}
6263

64+
func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) {
65+
count, err := d.db.NewSelect().
66+
Model(&ledger.Ledger{}).
67+
Where("bucket = ?", bucket).
68+
Count(ctx)
69+
if err != nil {
70+
return 0, postgres.ResolveError(err)
71+
}
72+
return count, nil
73+
}
74+
6375
func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error {
6476

6577
if l.Metadata == nil {

0 commit comments

Comments
 (0)