Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,31 @@ func (db *DB) Delete(
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
}

// ExecTx executes the provided function within a database transaction.
//
// Starts a new transaction, executes the provided function, and commits the transaction
// if the function succeeds. If the function returns an error, the transaction is rolled back.
//
// Returns an error if starting the transaction, executing the function, or committing the transaction fails.
func (db *DB) ExecTx(ctx context.Context, fn func(context.Context, *sqlx.Tx) error) error {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrap(err, "can't start transaction")
}
// We don't expect meaningful errors from rolling back the tx other than the sql.ErrTxDone, so just ignore it.
defer func() { _ = tx.Rollback() }()

if err := fn(ctx, tx); err != nil {
return errors.WithStack(err)
}

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "can't commit transaction")
}

return nil
}

func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
db.tableSemaphoresMu.Lock()
defer db.tableSemaphoresMu.Unlock()
Expand Down