Skip to content

Commit 0f157e9

Browse files
authored
Merge pull request #1898 from LerianStudio/feature/mdz-1880-3
feat: implement batch processing for balance synchronization - balance update
2 parents 094cd44 + 3f38dec commit 0f157e9

File tree

4 files changed

+834
-1
lines changed

4 files changed

+834
-1
lines changed

components/transaction/internal/bootstrap/balance.worker.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,23 @@ func (w *BalanceSyncWorker) processBalancesToExpire(ctx context.Context, rds red
265265
return false
266266
}
267267

268+
// Check for shutdown before starting batch processing
269+
if w.shouldShutdown(ctx) {
270+
w.logger.Info("BalanceSyncWorker: shutting down...")
271+
return true
272+
}
273+
274+
// This is guaranteed by the worker's scheduling mechanism which fetches keys
275+
// from a single ZSET scoped per tenant context. In multi-tenant mode,
276+
// processTenantBalances is called per-tenant, ensuring batch homogeneity.
277+
orgID, ledgerID, extractErr := w.extractIDsFromMember(members[0])
278+
if extractErr == nil {
279+
return w.processBalancesToExpireBatch(ctx, orgID, ledgerID, members)
280+
}
281+
282+
w.logger.Warnf("BalanceSyncWorker: failed to extract IDs for batch, falling back to individual processing: %v", extractErr)
283+
284+
// Fallback: individual processing (only when batch mode fails)
268285
workers := w.maxWorkers
269286
if int64(workers) > w.batchSize {
270287
workers = int(w.batchSize)
@@ -315,6 +332,58 @@ func (w *BalanceSyncWorker) processBalancesToExpire(ctx context.Context, rds red
315332
return true
316333
}
317334

335+
// processBalancesToExpireBatch processes all due keys using batch aggregation.
336+
// This is more efficient than individual processing as it:
337+
// 1. Fetches all balance values in single MGET
338+
// 2. Aggregates by composite key, keeping only highest version
339+
// 3. Persists in single database transaction
340+
// 4. Removes all processed keys in batch
341+
//
342+
// Returns true if any balances were processed.
343+
func (w *BalanceSyncWorker) processBalancesToExpireBatch(ctx context.Context, organizationID, ledgerID uuid.UUID, keys []string) bool {
344+
if len(keys) == 0 {
345+
return false
346+
}
347+
348+
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
349+
defer cancel()
350+
351+
_, tracer, _, metricFactory := libCommons.NewTrackingFromContext(ctx)
352+
353+
ctx, span := tracer.Start(ctx, "balance.worker.process_batch")
354+
defer span.End()
355+
356+
result, err := w.useCase.SyncBalancesBatch(ctx, organizationID, ledgerID, keys)
357+
if err != nil {
358+
w.logger.Errorf("BalanceSyncWorker: batch sync failed: %v", err)
359+
360+
// Emit failure metric for monitoring
361+
metricFactory.Counter(utils.BalanceSyncBatchFailures).WithLabels(map[string]string{
362+
"organization_id": organizationID.String(),
363+
"ledger_id": ledgerID.String(),
364+
}).AddOne(ctx)
365+
366+
return false
367+
}
368+
369+
if result.BalancesSynced > 0 {
370+
metricFactory.Counter(utils.BalanceSynced).WithLabels(map[string]string{
371+
"organization_id": organizationID.String(),
372+
"ledger_id": ledgerID.String(),
373+
"mode": "batch",
374+
}).Add(ctx, result.BalancesSynced)
375+
}
376+
377+
// Log aggregation ratio for monitoring deduplication effectiveness
378+
if result.KeysProcessed > 0 {
379+
aggregationRatio := float64(result.BalancesAggregated) / float64(result.KeysProcessed)
380+
w.logger.Infof("BalanceSyncWorker: batch processed=%d, aggregated=%d (ratio=%.2f), synced=%d",
381+
result.KeysProcessed, result.BalancesAggregated, aggregationRatio, result.BalancesSynced)
382+
}
383+
384+
return result.BalancesSynced > 0 || result.BalancesAggregated > 0
385+
}
386+
318387
// waitForNextOrBackoff waits based on the next schedule entry or backs off if none.
319388
// Returns true if it shut down while waiting.
320389
func (w *BalanceSyncWorker) waitForNextOrBackoff(ctx context.Context, rds redis.UniversalClient) bool {
@@ -409,7 +478,7 @@ func (w *BalanceSyncWorker) processBalanceToExpire(ctx context.Context, rds redi
409478

410479
synced, err := w.useCase.SyncBalance(ctx, organizationID, ledgerID, balance)
411480
if err != nil {
412-
w.logger.Errorf("BalanceSyncWorker: SyncBalance error for member %s with content %+v: %v", member, balance, err)
481+
w.logger.Errorf("BalanceSyncWorker: SyncBalance error for member %s, balanceID=%s: %v", member, balance.ID, err)
413482

414483
return
415484
}
@@ -418,6 +487,7 @@ func (w *BalanceSyncWorker) processBalanceToExpire(ctx context.Context, rds redi
418487
metricFactory.Counter(utils.BalanceSynced).WithLabels(map[string]string{
419488
"organization_id": organizationID.String(),
420489
"ledger_id": ledgerID.String(),
490+
"mode": "individual",
421491
}).AddOne(ctx)
422492

423493
w.logger.Infof("BalanceSyncWorker: Synced key %s", member)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright (c) 2026 Lerian Studio. All rights reserved.
2+
// Use of this source code is governed by the Elastic License 2.0
3+
// that can be found in the LICENSE file.
4+
5+
package command
6+
7+
import (
8+
"context"
9+
10+
libCommons "github.com/LerianStudio/lib-commons/v3/commons"
11+
libOpentelemetry "github.com/LerianStudio/lib-commons/v3/commons/opentelemetry"
12+
redisBalance "github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/redis/balance"
13+
"github.com/LerianStudio/midaz/v3/pkg/mmodel"
14+
"github.com/LerianStudio/midaz/v3/pkg/utils"
15+
"github.com/google/uuid"
16+
)
17+
18+
// SyncBalancesBatchResult holds the result of a batch sync operation.
19+
type SyncBalancesBatchResult struct {
20+
// KeysProcessed is the number of Redis keys that were attempted
21+
KeysProcessed int
22+
// BalancesAggregated is the number of unique balances after deduplication
23+
BalancesAggregated int
24+
// BalancesSynced is the number of balances actually written to database
25+
BalancesSynced int64
26+
// KeysRemoved is the number of keys removed from the schedule
27+
KeysRemoved int64
28+
}
29+
30+
// SyncBalancesBatch performs a batch sync of balances from Redis to PostgreSQL.
31+
//
32+
// Algorithm:
33+
// 1. Fetch balance values for all provided keys using MGET
34+
// 2. Aggregate by composite key, keeping only highest version per key
35+
// 3. Persist aggregated balances to database in single transaction
36+
// 4. Remove synced keys from the schedule
37+
//
38+
// This method is resilient to:
39+
// - Missing keys (already expired): skipped in aggregation
40+
// - Version conflicts: optimistic locking in DB update
41+
// - Partial failures: keys only removed after successful DB write
42+
func (uc *UseCase) SyncBalancesBatch(ctx context.Context, organizationID, ledgerID uuid.UUID, keys []string) (*SyncBalancesBatchResult, error) {
43+
logger, tracer, _, metricFactory := libCommons.NewTrackingFromContext(ctx)
44+
45+
ctx, span := tracer.Start(ctx, "command.sync_balances_batch")
46+
defer span.End()
47+
48+
result := &SyncBalancesBatchResult{
49+
KeysProcessed: len(keys),
50+
}
51+
52+
if len(keys) == 0 {
53+
return result, nil
54+
}
55+
56+
balanceMap, err := uc.RedisRepo.GetBalancesByKeys(ctx, keys)
57+
if err != nil {
58+
libOpentelemetry.HandleSpanError(&span, "Failed to get balances by keys", err)
59+
logger.Errorf("Failed to get balances by keys: %v", err)
60+
61+
return nil, err
62+
}
63+
64+
aggregatedBalances := make([]*redisBalance.AggregatedBalance, 0, len(keys))
65+
66+
for _, key := range keys {
67+
balance := balanceMap[key]
68+
if balance == nil {
69+
logger.Debugf("Balance key %s has no data (expired), skipping", key)
70+
continue
71+
}
72+
73+
compositeKey, parseErr := redisBalance.BalanceCompositeKeyFromRedisKey(key)
74+
if parseErr != nil {
75+
logger.Warnf("Failed to parse composite key from %s: %v", key, parseErr)
76+
continue
77+
}
78+
79+
compositeKey.AssetCode = balance.AssetCode
80+
81+
aggregatedBalances = append(aggregatedBalances, &redisBalance.AggregatedBalance{
82+
RedisKey: key,
83+
Balance: balance,
84+
Key: compositeKey,
85+
})
86+
}
87+
88+
aggregator := redisBalance.NewInMemoryAggregator()
89+
deduplicated := aggregator.Aggregate(ctx, aggregatedBalances)
90+
result.BalancesAggregated = len(deduplicated)
91+
92+
if len(deduplicated) == 0 {
93+
logger.Info("No balances to sync after aggregation")
94+
return result, nil
95+
}
96+
97+
balancesToSync := make([]mmodel.BalanceRedis, 0, len(deduplicated))
98+
keysToRemove := make([]string, 0, len(deduplicated))
99+
100+
for _, ab := range deduplicated {
101+
balancesToSync = append(balancesToSync, *ab.Balance)
102+
keysToRemove = append(keysToRemove, ab.RedisKey)
103+
}
104+
105+
synced, err := uc.BalanceRepo.SyncBatch(ctx, organizationID, ledgerID, balancesToSync)
106+
if err != nil {
107+
libOpentelemetry.HandleSpanError(&span, "Failed to sync batch to database", err)
108+
logger.Errorf("Failed to sync batch to database: %v", err)
109+
110+
return nil, err
111+
}
112+
113+
result.BalancesSynced = synced
114+
115+
removed, err := uc.RedisRepo.RemoveBalanceSyncKeysBatch(ctx, keysToRemove)
116+
if err != nil {
117+
logger.Warnf("Failed to remove synced keys from schedule: %v", err)
118+
119+
metricFactory.Counter(utils.BalanceSyncCleanupFailures).WithLabels(map[string]string{
120+
"organization_id": organizationID.String(),
121+
"ledger_id": ledgerID.String(),
122+
}).AddOne(ctx)
123+
}
124+
125+
result.KeysRemoved = removed
126+
127+
logger.Infof("SyncBalancesBatch: processed=%d, aggregated=%d, synced=%d, removed=%d",
128+
result.KeysProcessed, result.BalancesAggregated, result.BalancesSynced, result.KeysRemoved)
129+
130+
return result, nil
131+
}

0 commit comments

Comments
 (0)