Skip to content

Commit 2845643

Browse files
authored
Merge pull request #1904 from LerianStudio/fix/balanceSync-worker-always-active
refactor: balance sync always enabled
2 parents 322de06 + add0e74 commit 2845643

17 files changed

+209
-161
lines changed

components/ledger/.env.example

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ MAX_PAGINATION_LIMIT=100
205205
MAX_PAGINATION_MONTH_DATE_RANGE=3
206206

207207
# BALANCE SYNC WORKER
208-
# When enabled, balances are scheduled for sync to PostgreSQL before Redis TTL expires.
209-
# Default: true (if not set, the worker is enabled)
208+
# DEPRECATED: BALANCE_SYNC_WORKER_ENABLED is ignored - balance sync is always enabled.
209+
# This env var is kept for backwards compatibility but has no effect.
210210
# BALANCE_SYNC_WORKER_ENABLED=true
211211
BALANCE_SYNC_MAX_WORKERS=5

components/transaction/.env.example

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ MAX_PAGINATION_LIMIT=100
153153
MAX_PAGINATION_MONTH_DATE_RANGE=1
154154

155155
# BALANCE SYNC WORKER
156-
# When enabled, balances are scheduled for sync to PostgreSQL before Redis TTL expires.
157-
# Default: true (if not set, the worker is enabled)
156+
# DEPRECATED: BALANCE_SYNC_WORKER_ENABLED is ignored - balance sync is always enabled.
157+
# This env var is kept for backwards compatibility but has no effect.
158158
# BALANCE_SYNC_WORKER_ENABLED=true
159159
BALANCE_SYNC_MAX_WORKERS=5
160160

components/transaction/internal/adapters/http/in/transaction_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func setupTestInfra(t *testing.T) *testInfra {
103103
operationRepo := operation.NewOperationPostgreSQLRepository(infra.pgConn)
104104
balanceRepo := balance.NewBalancePostgreSQLRepository(infra.pgConn)
105105
metadataRepo := mongodb.NewMetadataMongoDBRepository(mongoConn)
106-
redisRepo, err := redis.NewConsumerRedis(redisConn, false)
106+
redisRepo, err := redis.NewConsumerRedis(redisConn)
107107
require.NoError(t, err, "failed to create Redis repository")
108108

109109
// Store repositories for test assertions
@@ -607,7 +607,7 @@ func setupAsyncTestInfra(t *testing.T) *testAsyncInfra {
607607
operationRepo := operation.NewOperationPostgreSQLRepository(infra.pgConn)
608608
balanceRepo := balance.NewBalancePostgreSQLRepository(infra.pgConn)
609609
metadataRepo := mongodb.NewMetadataMongoDBRepository(mongoConn)
610-
redisRepo, err := redis.NewConsumerRedis(redisConn, false)
610+
redisRepo, err := redis.NewConsumerRedis(redisConn)
611611
require.NoError(t, err, "failed to create Redis repository")
612612

613613
// Store Redis repository for test assertions

components/transaction/internal/adapters/redis/consumer.redis.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,14 @@ type RedisRepository interface {
8787

8888
// RedisConsumerRepository is a Redis implementation of the Redis consumer.
8989
type RedisConsumerRepository struct {
90-
conn *libRedis.RedisConnection
91-
balanceSyncEnabled bool
90+
conn *libRedis.RedisConnection
9291
}
9392

9493
// NewConsumerRedis returns a new instance of RedisRepository using the given Redis connection.
95-
// The balanceSyncEnabled parameter controls whether balance keys are scheduled for sync.
96-
// When false, the ZADD to the balance sync schedule is skipped in the Lua script.
97-
func NewConsumerRedis(rc *libRedis.RedisConnection, balanceSyncEnabled bool) (*RedisConsumerRepository, error) {
94+
// Balance sync is always enabled - balances are scheduled for sync to PostgreSQL.
95+
func NewConsumerRedis(rc *libRedis.RedisConnection) (*RedisConsumerRepository, error) {
9896
r := &RedisConsumerRepository{
99-
conn: rc,
100-
balanceSyncEnabled: balanceSyncEnabled,
97+
conn: rc,
10198
}
10299
if _, err := r.conn.GetClient(context.Background()); err != nil {
103100
return nil, fmt.Errorf("failed to connect on redis: %w", err)
@@ -359,7 +356,7 @@ func balanceRedisToBalance(b mmodel.BalanceRedis, mapBalances map[string]*mmodel
359356
// ProcessBalanceAtomicOperation executes the balance_atomic_operation.lua script.
360357
//
361358
// BALANCE SCHEDULING FLOW:
362-
// 1. This method calls the Lua script with scheduleSync=1 (when balanceSyncEnabled is true)
359+
// 1. This method calls the Lua script with scheduleSync=1 (balance sync is always enabled)
363360
// 2. Lua script updates hot balance in Redis atomically
364361
// 3. Lua script calculates dueAt = now + ttl - 600 (10 min before expiry)
365362
// 4. Lua script calls ZADD to schedule:{transactions}:balance-sync with score=dueAt
@@ -435,11 +432,8 @@ func (rr *RedisConsumerRepository) ProcessBalanceAtomicOperation(ctx context.Con
435432

436433
transactionKey := utils.TransactionInternalKey(organizationID, ledgerID, transactionID.String())
437434

438-
// Prepend balanceSyncEnabled flag (1 = enabled, 0 = disabled) to args
439-
scheduleSync := 0
440-
if rr.balanceSyncEnabled {
441-
scheduleSync = 1
442-
}
435+
// Balance sync is always enabled - prepend flag (1 = enabled) to args
436+
scheduleSync := 1
443437

444438
finalArgs := append([]any{scheduleSync}, args...)
445439

@@ -791,7 +785,7 @@ func (rr *RedisConsumerRepository) RemoveBalanceSyncKey(ctx context.Context, mem
791785
// This preserves the earliest scheduled sync time for each balance key.
792786
// Large inputs are processed in chunks of maxRedisBatchSize to prevent oversized payloads.
793787
func (rr *RedisConsumerRepository) ScheduleBalanceSyncBatch(ctx context.Context, members []redis.Z) error {
794-
if len(members) == 0 || !rr.balanceSyncEnabled {
788+
if len(members) == 0 {
795789
return nil
796790
}
797791

components/transaction/internal/adapters/redis/consumer.redis_chaos_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ func setupRedisChaosNetworkInfra(t *testing.T) *chaosNetworkTestInfra {
9090
}
9191

9292
proxyRepo := &RedisConsumerRepository{
93-
conn: proxyConn,
94-
balanceSyncEnabled: false,
93+
conn: proxyConn,
9594
}
9695

9796
return &chaosNetworkTestInfra{

components/transaction/internal/adapters/redis/consumer.redis_fuzz_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func FuzzKeyNamespacing_MGet(f *testing.F) {
171171

172172
// Use a fresh recording client — no panic must occur.
173173
conn, recorder := newRecordingConnection(t)
174-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
174+
repo := &RedisConsumerRepository{conn: conn}
175175

176176
if keyCount == 0 {
177177
// Empty slice path — MGet returns early with an empty map.
@@ -311,7 +311,7 @@ func FuzzKeyNamespacing_QueueKey(f *testing.F) {
311311
}
312312

313313
conn, recorder := newRecordingConnection(t)
314-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
314+
repo := &RedisConsumerRepository{conn: conn}
315315

316316
// Exercise AddMessageToQueue — must not panic.
317317
err := repo.AddMessageToQueue(ctx, msgKey, []byte("fuzz-payload"))

components/transaction/internal/adapters/redis/consumer.redis_get_balances_test.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ func newMockMGetConnection(client *mockMGetClient) *libRedis.RedisConnection {
4141
func TestGetBalancesByKeys_EmptyInput(t *testing.T) {
4242
// Create a repository with nil connection to test early return
4343
repo := &RedisConsumerRepository{
44-
conn: nil,
45-
balanceSyncEnabled: true,
44+
conn: nil,
4645
}
4746

4847
// Empty input should return empty map without any Redis call
@@ -64,8 +63,7 @@ func TestGetBalancesByKeys_EmptyInput_NoRedisCall(t *testing.T) {
6463
}
6564

6665
repo := &RedisConsumerRepository{
67-
conn: newMockMGetConnection(mockClient),
68-
balanceSyncEnabled: true,
66+
conn: newMockMGetConnection(mockClient),
6967
}
7068

7169
result, err := repo.GetBalancesByKeys(context.Background(), []string{})
@@ -87,8 +85,7 @@ func TestGetBalancesByKeys_SingleKey_Found(t *testing.T) {
8785
}
8886

8987
repo := &RedisConsumerRepository{
90-
conn: newMockMGetConnection(mockClient),
91-
balanceSyncEnabled: true,
88+
conn: newMockMGetConnection(mockClient),
9289
}
9390

9491
result, err := repo.GetBalancesByKeys(context.Background(), []string{"balance:key1"})
@@ -113,8 +110,7 @@ func TestGetBalancesByKeys_SingleKey_NotFound(t *testing.T) {
113110
}
114111

115112
repo := &RedisConsumerRepository{
116-
conn: newMockMGetConnection(mockClient),
117-
balanceSyncEnabled: true,
113+
conn: newMockMGetConnection(mockClient),
118114
}
119115

120116
result, err := repo.GetBalancesByKeys(context.Background(), []string{"balance:key1"})
@@ -139,8 +135,7 @@ func TestGetBalancesByKeys_MultipleKeys_MixedResults(t *testing.T) {
139135
}
140136

141137
repo := &RedisConsumerRepository{
142-
conn: newMockMGetConnection(mockClient),
143-
balanceSyncEnabled: true,
138+
conn: newMockMGetConnection(mockClient),
144139
}
145140

146141
result, err := repo.GetBalancesByKeys(context.Background(), []string{"key1", "key2", "key3"})
@@ -175,8 +170,7 @@ func TestGetBalancesByKeys_MalformedJSON(t *testing.T) {
175170
}
176171

177172
repo := &RedisConsumerRepository{
178-
conn: newMockMGetConnection(mockClient),
179-
balanceSyncEnabled: true,
173+
conn: newMockMGetConnection(mockClient),
180174
}
181175

182176
result, err := repo.GetBalancesByKeys(context.Background(), []string{"key1", "key2", "key3"})
@@ -209,8 +203,7 @@ func TestGetBalancesByKeys_ByteSliceValue(t *testing.T) {
209203
}
210204

211205
repo := &RedisConsumerRepository{
212-
conn: newMockMGetConnection(mockClient),
213-
balanceSyncEnabled: true,
206+
conn: newMockMGetConnection(mockClient),
214207
}
215208

216209
result, err := repo.GetBalancesByKeys(context.Background(), []string{"balance:key1"})
@@ -234,8 +227,7 @@ func TestGetBalancesByKeys_UnexpectedValueType(t *testing.T) {
234227
}
235228

236229
repo := &RedisConsumerRepository{
237-
conn: newMockMGetConnection(mockClient),
238-
balanceSyncEnabled: true,
230+
conn: newMockMGetConnection(mockClient),
239231
}
240232

241233
result, err := repo.GetBalancesByKeys(context.Background(), []string{"balance:key1"})
@@ -259,8 +251,7 @@ func TestGetBalancesByKeys_RedisError(t *testing.T) {
259251
}
260252

261253
repo := &RedisConsumerRepository{
262-
conn: newMockMGetConnection(mockClient),
263-
balanceSyncEnabled: true,
254+
conn: newMockMGetConnection(mockClient),
264255
}
265256

266257
result, err := repo.GetBalancesByKeys(context.Background(), []string{"balance:key1"})
@@ -281,8 +272,7 @@ func TestGetBalancesByKeys_AllKeysNotFound(t *testing.T) {
281272
}
282273

283274
repo := &RedisConsumerRepository{
284-
conn: newMockMGetConnection(mockClient),
285-
balanceSyncEnabled: true,
275+
conn: newMockMGetConnection(mockClient),
286276
}
287277

288278
result, err := repo.GetBalancesByKeys(context.Background(), []string{"key1", "key2", "key3"})

components/transaction/internal/adapters/redis/consumer.redis_integration_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,9 @@ func setupRedisIntegrationInfra(t *testing.T) *integrationTestInfra {
7878
// Create lib-commons Redis connection
7979
conn := redistestutil.CreateConnection(t, redisContainer.Addr)
8080

81-
// Create repository with balance sync enabled
81+
// Create repository
8282
repo := &RedisConsumerRepository{
83-
conn: conn,
84-
balanceSyncEnabled: true,
83+
conn: conn,
8584
}
8685

8786
return &integrationTestInfra{
@@ -100,10 +99,9 @@ func setupRedisChaosInfra(t *testing.T) *chaosTestInfra {
10099
// Create lib-commons Redis connection
101100
conn := redistestutil.CreateConnection(t, redisContainer.Addr)
102101

103-
// Create repository with balance sync enabled
102+
// Create repository
104103
repo := &RedisConsumerRepository{
105-
conn: conn,
106-
balanceSyncEnabled: true,
104+
conn: conn,
107105
}
108106

109107
// Create chaos orchestrator
@@ -151,8 +149,7 @@ func setupRedisNetworkChaosInfra(t *testing.T) *networkChaosTestInfra {
151149
}
152150

153151
proxyRepo := &RedisConsumerRepository{
154-
conn: proxyConn,
155-
balanceSyncEnabled: true,
152+
conn: proxyConn,
156153
}
157154

158155
return &networkChaosTestInfra{

components/transaction/internal/adapters/redis/consumer.redis_namespace_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func TestKeyNamespacing_SimpleKeyMethods(t *testing.T) {
315315
t.Parallel()
316316

317317
conn, recorder := newRecordingConnection(t)
318-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
318+
repo := &RedisConsumerRepository{conn: conn}
319319

320320
ctx := context.Background()
321321
if tc.tenantID != "" {
@@ -410,7 +410,7 @@ func TestKeyNamespacing_MGet(t *testing.T) {
410410
t.Parallel()
411411

412412
conn, recorder := newRecordingConnection(t)
413-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
413+
repo := &RedisConsumerRepository{conn: conn}
414414

415415
ctx := context.Background()
416416
if tc.tenantID != "" {
@@ -477,7 +477,7 @@ func TestKeyNamespacing_QueueOperations(t *testing.T) {
477477
t.Parallel()
478478

479479
conn, recorder := newRecordingConnection(t)
480-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
480+
repo := &RedisConsumerRepository{conn: conn}
481481

482482
ctx := context.Background()
483483
if tc.tenantID != "" {
@@ -571,7 +571,7 @@ func TestKeyNamespacing_ListBalanceByKey(t *testing.T) {
571571
// Configure the Get stub to return valid BalanceRedis JSON.
572572
recorder.getReturnVal = string(balanceRedisJSON)
573573

574-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
574+
repo := &RedisConsumerRepository{conn: conn}
575575

576576
ctx := context.Background()
577577
if tc.tenantID != "" {
@@ -636,7 +636,7 @@ func TestKeyNamespacing_ProcessBalanceAtomicOperation(t *testing.T) {
636636
t.Parallel()
637637

638638
conn, scripter := newScriptCapturingConnection(t)
639-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: true}
639+
repo := &RedisConsumerRepository{conn: conn}
640640

641641
ctx := context.Background()
642642
if tc.tenantID != "" {
@@ -736,7 +736,7 @@ func TestKeyNamespacing_GetBalanceSyncKeys(t *testing.T) {
736736
t.Parallel()
737737

738738
conn, scripter := newScriptCapturingConnection(t)
739-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: true}
739+
repo := &RedisConsumerRepository{conn: conn}
740740

741741
ctx := context.Background()
742742
if tc.tenantID != "" {
@@ -794,7 +794,7 @@ func TestKeyNamespacing_RemoveBalanceSyncKey(t *testing.T) {
794794
t.Parallel()
795795

796796
conn, scripter := newScriptCapturingConnection(t)
797-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: true}
797+
repo := &RedisConsumerRepository{conn: conn}
798798

799799
ctx := context.Background()
800800
if tc.tenantID != "" {
@@ -835,7 +835,7 @@ func TestKeyNamespacing_BackwardsCompatible_NoTenantInContext(t *testing.T) {
835835
t.Parallel()
836836

837837
conn, recorder := newRecordingConnection(t)
838-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
838+
repo := &RedisConsumerRepository{conn: conn}
839839

840840
originalKey := "my:original:key"
841841

@@ -861,7 +861,7 @@ func TestKeyNamespacing_BackwardsCompatible_NoTenantInContext(t *testing.T) {
861861
t.Parallel()
862862

863863
conn, recorder := newRecordingConnection(t)
864-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
864+
repo := &RedisConsumerRepository{conn: conn}
865865

866866
originalKeys := []string{"key:a", "key:b"}
867867
_, _ = repo.MGet(ctx, originalKeys)
@@ -875,7 +875,7 @@ func TestKeyNamespacing_BackwardsCompatible_NoTenantInContext(t *testing.T) {
875875
t.Parallel()
876876

877877
conn, recorder := newRecordingConnection(t)
878-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
878+
repo := &RedisConsumerRepository{conn: conn}
879879

880880
msgKey := "tx:orig-key"
881881
_ = repo.AddMessageToQueue(ctx, msgKey, []byte("data"))

components/transaction/internal/adapters/redis/consumer.redis_property_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func TestProperty_MGet_OutputKeysMatchOriginal(t *testing.T) {
164164
originalKeys := []string{keyA, keyB}
165165

166166
conn, _ := newRecordingConnection(t)
167-
repo := &RedisConsumerRepository{conn: conn, balanceSyncEnabled: false}
167+
repo := &RedisConsumerRepository{conn: conn}
168168

169169
ctx := context.Background()
170170
if tenantID != "" {

0 commit comments

Comments
 (0)