Skip to content

Commit f6a409b

Browse files
authored
Merge pull request #1897 from LerianStudio/feature/mdz-1880-2
feat: add batch synchronization and aggregation - balance update
2 parents dca1928 + 1b2214d commit f6a409b

File tree

5 files changed

+1225
-1
lines changed

5 files changed

+1225
-1
lines changed

components/transaction/internal/adapters/postgres/balance/balance.postgresql.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type Repository interface {
6868
Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error
6969
DeleteAllByIDs(ctx context.Context, organizationID, ledgerID uuid.UUID, ids []uuid.UUID) error
7070
Sync(ctx context.Context, organizationID, ledgerID uuid.UUID, b mmodel.BalanceRedis) (bool, error)
71+
SyncBatch(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []mmodel.BalanceRedis) (int64, error)
7172
UpdateAllByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, balance mmodel.UpdateBalance) error
7273
ListByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID) ([]*mmodel.Balance, error)
7374
ListByAccountIDAtTimestamp(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, timestamp time.Time) ([]*mmodel.Balance, error)
@@ -1408,6 +1409,125 @@ func (r *BalancePostgreSQLRepository) Sync(ctx context.Context, organizationID,
14081409
return affected > 0, nil
14091410
}
14101411

1412+
// SyncBatch persists multiple balances from cache to database in a single transaction.
1413+
// This is more efficient than calling Sync in a loop as it:
1414+
// 1. Uses a single database transaction for all updates
1415+
// 2. Reduces round-trips between application and database
1416+
// 3. Provides atomicity - all updates succeed or all fail
1417+
//
1418+
// Uses optimistic locking: only updates balances where version < incoming version.
1419+
// Returns count of actually updated rows.
1420+
func (r *BalancePostgreSQLRepository) SyncBatch(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []mmodel.BalanceRedis) (int64, error) {
1421+
if len(balances) == 0 {
1422+
return 0, nil
1423+
}
1424+
1425+
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)
1426+
1427+
ctx, span := tracer.Start(ctx, "postgres.sync_batch")
1428+
defer span.End()
1429+
1430+
db, err := r.getDB(ctx)
1431+
if err != nil {
1432+
libOpentelemetry.HandleSpanError(&span, "Failed to get database connection", err)
1433+
1434+
logger.Errorf("Failed to get database connection: %v", err)
1435+
1436+
return 0, err
1437+
}
1438+
1439+
tx, err := db.BeginTx(ctx, nil)
1440+
if err != nil {
1441+
libOpentelemetry.HandleSpanError(&span, "Failed to begin transaction", err)
1442+
1443+
logger.Errorf("Failed to begin transaction: %v", err)
1444+
1445+
return 0, err
1446+
}
1447+
1448+
committed := false
1449+
1450+
defer func() {
1451+
if committed {
1452+
return
1453+
}
1454+
1455+
if rollbackErr := tx.Rollback(); rollbackErr != nil {
1456+
logger.Errorf("Failed to rollback transaction: %v", rollbackErr)
1457+
}
1458+
}()
1459+
1460+
var totalUpdated int64
1461+
1462+
now := time.Now()
1463+
1464+
for _, balance := range balances {
1465+
// Check for context cancellation before processing each balance
1466+
if ctx.Err() != nil {
1467+
libOpentelemetry.HandleSpanError(&span, "Context cancelled during batch sync", ctx.Err())
1468+
1469+
logger.Warnf("SyncBatch cancelled: %v", ctx.Err())
1470+
1471+
return 0, ctx.Err()
1472+
}
1473+
1474+
id, parseErr := uuid.Parse(balance.ID)
1475+
if parseErr != nil {
1476+
libOpentelemetry.HandleSpanError(&span, "Invalid balance ID", parseErr)
1477+
1478+
logger.Errorf("Invalid balance ID %s: %v", balance.ID, parseErr)
1479+
1480+
return 0, parseErr
1481+
}
1482+
1483+
result, execErr := tx.ExecContext(ctx, `
1484+
UPDATE balance
1485+
SET available = $1, on_hold = $2, version = $3, updated_at = $4
1486+
WHERE organization_id = $5
1487+
AND ledger_id = $6
1488+
AND id = $7
1489+
AND version < $3
1490+
AND deleted_at IS NULL
1491+
`, balance.Available, balance.OnHold, balance.Version, now, organizationID, ledgerID, id)
1492+
if execErr != nil {
1493+
libOpentelemetry.HandleSpanError(&span, "Failed to update balance", execErr)
1494+
1495+
logger.Errorf("Failed to update balance %s: %v", balance.ID, execErr)
1496+
1497+
return 0, execErr
1498+
}
1499+
1500+
rowsAffected, rowsErr := result.RowsAffected()
1501+
if rowsErr != nil {
1502+
libOpentelemetry.HandleSpanError(&span, "Failed to get rows affected", rowsErr)
1503+
1504+
logger.Errorf("Failed to get rows affected for balance %s: %v", balance.ID, rowsErr)
1505+
1506+
return 0, rowsErr
1507+
}
1508+
1509+
totalUpdated += rowsAffected
1510+
1511+
if rowsAffected == 0 {
1512+
logger.Debugf("Balance %s skipped: version %d not newer than DB", balance.ID, balance.Version)
1513+
}
1514+
}
1515+
1516+
if commitErr := tx.Commit(); commitErr != nil {
1517+
libOpentelemetry.HandleSpanError(&span, "Failed to commit transaction", commitErr)
1518+
1519+
logger.Errorf("Failed to commit batch sync: %v", commitErr)
1520+
1521+
return 0, commitErr
1522+
}
1523+
1524+
committed = true
1525+
1526+
logger.Infof("SyncBatch: updated %d of %d balances", totalUpdated, len(balances))
1527+
1528+
return totalUpdated, nil
1529+
}
1530+
14111531
func (r *BalancePostgreSQLRepository) UpdateAllByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, balance mmodel.UpdateBalance) error {
14121532
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)
14131533

0 commit comments

Comments
 (0)