From f6df394336801452d25b74cbe2ec645fa02873a4 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Tue, 4 Feb 2025 20:10:34 +0100 Subject: [PATCH] [+] switch to `VirtualXID` internally from `TransactionXID`, solves #674 Turned out issuing `txid_current()` at the beginning of every chain transaction is creating a session that's sits idle in transaction for the duration of the entire chain. In current case, with a single task `{kind == PROGRAM}`, that idle transaction doesn't do anything aside from pin the `xmin` horizon and block vacuum for 6+ hours. The same issue occurs for SQL tasks that are `Remote` or `Autonomous`. --- internal/pgengine/access.go | 2 +- internal/pgengine/transaction.go | 9 ++++++--- internal/pgengine/transaction_test.go | 2 +- internal/pgengine/types.go | 2 +- internal/scheduler/chain.go | 8 ++++---- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/pgengine/access.go b/internal/pgengine/access.go index b26b8080..15373ad6 100644 --- a/internal/pgengine/access.go +++ b/internal/pgengine/access.go @@ -34,7 +34,7 @@ chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, c VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11)`, task.ChainID, task.TaskID, task.Script, task.Kind, fmt.Sprintf("%f seconds", float64(task.Duration)/1000000), - retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Txid, + retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Vxid, task.IgnoreError) if err != nil { pge.l.WithError(err).Error("Failed to log chain element execution status") diff --git a/internal/pgengine/transaction.go b/internal/pgengine/transaction.go index 246f9094..5f7a0ff0 100644 --- a/internal/pgengine/transaction.go +++ b/internal/pgengine/transaction.go @@ -14,12 +14,15 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -// StartTransaction returns transaction object, transaction id and error -func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, txid int64, err error) { +// StartTransaction returns transaction object, virtual transaction id and error +func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, vxid int64, err error) { if tx, err = pge.ConfigDb.Begin(ctx); err != nil { return } - err = tx.QueryRow(ctx, "SELECT txid_current()").Scan(&txid) + err = tx.QueryRow(ctx, `SELECT +(split_part(virtualxid, '/', 1)::int8 << 32) | split_part(virtualxid, '/', 2)::int8 +FROM pg_locks +WHERE pid = pg_backend_pid() AND virtualxid IS NOT NULL`).Scan(&vxid) return } diff --git a/internal/pgengine/transaction_test.go b/internal/pgengine/transaction_test.go index 2c750239..e12f81b3 100644 --- a/internal/pgengine/transaction_test.go +++ b/internal/pgengine/transaction_test.go @@ -45,7 +45,7 @@ func TestStartTransaction(t *testing.T) { assert.Error(t, err) mockPool.ExpectBegin() - mockPool.ExpectQuery("SELECT txid_current()").WillReturnRows(pgxmock.NewRows([]string{"txid"}).AddRow(int64(42))) + mockPool.ExpectQuery("SELECT").WillReturnRows(pgxmock.NewRows([]string{"txid"}).AddRow(int64(42))) tx, txid, err := pge.StartTransaction(ctx) assert.NotNil(t, tx) assert.EqualValues(t, 42, txid) diff --git a/internal/pgengine/types.go b/internal/pgengine/types.go index 8b86dc56..a7da2d70 100644 --- a/internal/pgengine/types.go +++ b/internal/pgengine/types.go @@ -53,7 +53,7 @@ type ChainTask struct { Timeout int `db:"timeout"` // in milliseconds StartedAt time.Time `db:"-"` Duration int64 `db:"-"` // in microseconds - Txid int64 `db:"-"` + Vxid int64 `db:"-"` } func (task *ChainTask) IsRemote() bool { diff --git a/internal/scheduler/chain.go b/internal/scheduler/chain.go index 964ee5a2..2faa092d 100644 --- a/internal/scheduler/chain.go +++ b/internal/scheduler/chain.go @@ -192,7 +192,7 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { var ChainTasks []pgengine.ChainTask var bctx context.Context var cancel context.CancelFunc - var txid int64 + var vxid int64 chainCtx, cancel := getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout) if cancel != nil { @@ -200,12 +200,12 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { } chainL := sch.l.WithField("chain", chain.ChainID) - tx, txid, err := sch.pgengine.StartTransaction(chainCtx) + tx, vxid, err := sch.pgengine.StartTransaction(chainCtx) if err != nil { chainL.WithError(err).Error("Cannot start transaction") return } - chainL = chainL.WithField("txid", txid) + chainL = chainL.WithField("vxid", vxid) err = sch.pgengine.GetChainElements(chainCtx, &ChainTasks, chain.ChainID) if err != nil { @@ -217,7 +217,7 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { /* now we can loop through every element of the task chain */ for _, task := range ChainTasks { task.ChainID = chain.ChainID - task.Txid = txid + task.Vxid = vxid l := chainL.WithField("task", task.TaskID) l.Info("Starting task") taskCtx := log.WithLogger(chainCtx, l)