Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, c
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11)`,
task.ChainID, task.TaskID, task.Script, task.Kind,
fmt.Sprintf("%f seconds", float64(task.Duration)/1000000),
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Txid,
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Vxid,
task.IgnoreError)
if err != nil {
pge.l.WithError(err).Error("Failed to log chain element execution status")
Expand Down
9 changes: 6 additions & 3 deletions internal/pgengine/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)

// StartTransaction returns transaction object, transaction id and error
func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, txid int64, err error) {
// StartTransaction returns transaction object, virtual transaction id and error
func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, vxid int64, err error) {
if tx, err = pge.ConfigDb.Begin(ctx); err != nil {
return
}
err = tx.QueryRow(ctx, "SELECT txid_current()").Scan(&txid)
err = tx.QueryRow(ctx, `SELECT
(split_part(virtualxid, '/', 1)::int8 << 32) | split_part(virtualxid, '/', 2)::int8
FROM pg_locks
WHERE pid = pg_backend_pid() AND virtualxid IS NOT NULL`).Scan(&vxid)
return
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestStartTransaction(t *testing.T) {
assert.Error(t, err)

mockPool.ExpectBegin()
mockPool.ExpectQuery("SELECT txid_current()").WillReturnRows(pgxmock.NewRows([]string{"txid"}).AddRow(int64(42)))
mockPool.ExpectQuery("SELECT").WillReturnRows(pgxmock.NewRows([]string{"txid"}).AddRow(int64(42)))
tx, txid, err := pge.StartTransaction(ctx)
assert.NotNil(t, tx)
assert.EqualValues(t, 42, txid)
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ChainTask struct {
Timeout int `db:"timeout"` // in milliseconds
StartedAt time.Time `db:"-"`
Duration int64 `db:"-"` // in microseconds
Txid int64 `db:"-"`
Vxid int64 `db:"-"`
}

func (task *ChainTask) IsRemote() bool {
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,20 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
var ChainTasks []pgengine.ChainTask
var bctx context.Context
var cancel context.CancelFunc
var txid int64
var vxid int64

chainCtx, cancel := getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout)
if cancel != nil {
defer cancel()
}

chainL := sch.l.WithField("chain", chain.ChainID)
tx, txid, err := sch.pgengine.StartTransaction(chainCtx)
tx, vxid, err := sch.pgengine.StartTransaction(chainCtx)
if err != nil {
chainL.WithError(err).Error("Cannot start transaction")
return
}
chainL = chainL.WithField("txid", txid)
chainL = chainL.WithField("vxid", vxid)

err = sch.pgengine.GetChainElements(chainCtx, &ChainTasks, chain.ChainID)
if err != nil {
Expand All @@ -217,7 +217,7 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
/* now we can loop through every element of the task chain */
for _, task := range ChainTasks {
task.ChainID = chain.ChainID
task.Txid = txid
task.Vxid = vxid
l := chainL.WithField("task", task.TaskID)
l.Info("Starting task")
taskCtx := log.WithLogger(chainCtx, l)
Expand Down
Loading