Skip to content

Commit 1acd9c5

Browse files
authored
Merge pull request #1901 from LerianStudio/feature/mdz-1880-4
feat: add balance scheduling tests and async persistence support - balance update
2 parents edd6a32 + 3d55923 commit 1acd9c5

File tree

5 files changed

+109
-327
lines changed

5 files changed

+109
-327
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) 2026 Lerian Studio. All rights reserved.
2+
// Use of this source code is governed by the Elastic License 2.0
3+
// that can be found in the LICENSE file.
4+
5+
package redis
6+
7+
import (
8+
"strings"
9+
"testing"
10+
11+
"github.com/LerianStudio/midaz/v3/pkg/utils"
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
// TestLuaScript_ContainsScheduleLogic is a structural verification test that checks
16+
// if the Lua script contains the expected scheduling tokens (ZADD, scheduleSync, etc.).
17+
//
18+
// IMPORTANT: This test only asserts string presence, not runtime behavior.
19+
// It protects against accidental removal of scheduling logic during refactors.
20+
// Actual scheduling behavior is tested in integration tests (balance.worker_integration_test.go).
21+
func TestLuaScript_ContainsScheduleLogic(t *testing.T) {
22+
// Verify the script has ZADD for scheduling
23+
assert.True(t, strings.Contains(balanceAtomicOperationLua, "ZADD"),
24+
"Lua script should contain ZADD for balance scheduling")
25+
26+
// Verify the script checks scheduleSync flag
27+
assert.True(t, strings.Contains(balanceAtomicOperationLua, "scheduleSync"),
28+
"Lua script should check scheduleSync flag")
29+
30+
// Verify the script uses scheduleKey from KEYS[3]
31+
assert.True(t, strings.Contains(balanceAtomicOperationLua, "KEYS[3]"),
32+
"Lua script should reference schedule key from KEYS[3]")
33+
34+
// Verify the script calculates dueAt for pre-expiry warning
35+
assert.True(t, strings.Contains(balanceAtomicOperationLua, "dueAt"),
36+
"Lua script should calculate dueAt for scheduling")
37+
}
38+
39+
// TestScheduleKeyConstant verifies the schedule key constant matches
40+
// what the BalanceSyncWorker expects.
41+
func TestScheduleKeyConstant(t *testing.T) {
42+
expectedKey := "schedule:{transactions}:balance-sync"
43+
assert.Equal(t, expectedKey, utils.BalanceSyncScheduleKey,
44+
"Schedule key constant should match expected format")
45+
}

components/transaction/internal/adapters/redis/consumer.redis.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,20 @@ func balanceRedisToBalance(b mmodel.BalanceRedis, mapBalances map[string]*mmodel
356356
}
357357
}
358358

359+
// ProcessBalanceAtomicOperation executes the balance_atomic_operation.lua script.
360+
//
361+
// BALANCE SCHEDULING FLOW:
362+
// 1. This method calls the Lua script with scheduleSync=1 (when balanceSyncEnabled is true)
363+
// 2. Lua script updates hot balance in Redis atomically
364+
// 3. Lua script calculates dueAt = now + ttl - 600 (10 min before expiry)
365+
// 4. Lua script calls ZADD to schedule:{transactions}:balance-sync with score=dueAt
366+
// 5. BalanceSyncWorker polls the sorted set for due balances
367+
// 6. Worker calls SyncBalancesBatch to persist to database
368+
//
369+
// This enables decoupled balance persistence:
370+
// - Transaction handler is append-only (no blocking on balance DB write)
371+
// - Balance DB writes are aggregated and batched by the worker
372+
// - Only the latest version per balance is persisted (aggregation)
359373
func (rr *RedisConsumerRepository) ProcessBalanceAtomicOperation(ctx context.Context, organizationID, ledgerID, transactionID uuid.UUID, transactionStatus string, pending bool, balancesOperation []mmodel.BalanceOperation) (*mmodel.BalanceAtomicResult, error) {
360374
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)
361375

components/transaction/internal/services/command/create-balance-transaction-operations-async.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,16 @@ import (
2828
"go.opentelemetry.io/otel/trace"
2929
)
3030

31-
// CreateBalanceTransactionOperationsAsync func that is responsible to create all transactions at the same async.
31+
// CreateBalanceTransactionOperationsAsync processes transaction asynchronously.
32+
// This is an append-only handler for transactions and operations:
33+
// - Hot balance already updated atomically by Lua script during validation
34+
// - Cold balance scheduled for async sync via sorted set (Lua script does ZADD)
35+
// - Transaction and operations persisted to database
36+
// - Events sent asynchronously
37+
//
38+
// Balance persistence is fully async via BalanceSyncWorker.
39+
// The Lua script (balance_atomic_operation.lua) does ZADD to schedule:balance-sync
40+
// when scheduleSync=1, which is the default for all balance-affecting transactions.
3241
func (uc *UseCase) CreateBalanceTransactionOperationsAsync(ctx context.Context, data mmodel.Queue) error {
3342
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)
3443

@@ -45,22 +54,6 @@ func (uc *UseCase) CreateBalanceTransactionOperationsAsync(ctx context.Context,
4554
}
4655
}
4756

48-
if t.Transaction.Status.Code != constant.NOTED {
49-
ctxProcessBalances, spanUpdateBalances := tracer.Start(ctx, "command.create_balance_transaction_operations.update_balances")
50-
defer spanUpdateBalances.End()
51-
52-
logger.Infof("Trying to update balances")
53-
54-
err := uc.UpdateBalances(ctxProcessBalances, data.OrganizationID, data.LedgerID, *t.Validate, t.Balances, t.BalancesAfter)
55-
if err != nil {
56-
libOpentelemetry.HandleSpanBusinessErrorEvent(&spanUpdateBalances, "Failed to update balances", err)
57-
58-
logger.Errorf("Failed to update balances: %v", err.Error())
59-
60-
return err
61-
}
62-
}
63-
6457
ctxProcessTransaction, spanUpdateTransaction := tracer.Start(ctx, "command.create_balance_transaction_operations.create_transaction")
6558
defer spanUpdateTransaction.End()
6659

0 commit comments

Comments
 (0)