Skip to content

Add pg-transaction module #3518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions packages/pg-transaction/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "pg-transaction",
"version": "1.0.0",
"main": "dist/index.js",
"type": "module",
"license": "MIT",
"scripts": {
"build": "tsc",
"pretest": "yarn build",
"test": "node dist/index.test.js"
},
"dependencies": {},
"engines": {
"node": ">=16.0.0"
},
"devDependencies": {
"@types/pg": "^8.10.9",
"@types/node": "^24.0.14",
"pg": "^8.11.3",
"typescript": "^5.8.3"
}
}
135 changes: 135 additions & 0 deletions packages/pg-transaction/src/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { describe, it, before, beforeEach, after } from 'node:test'
import { strict as assert } from 'assert'
import { Client, Pool } from 'pg'
import { transaction } from './index.js'

const withClient = async (cb: (client: Client) => Promise<void>): Promise<void> => {
const client = new Client()
await client.connect()
try {
await cb(client)
} finally {
await client.end()
}
}

describe('Transaction', () => {
before(async () => {
// Ensure the test table is created before running tests
await withClient(async (client) => {
await client.query('CREATE TABLE IF NOT EXISTS test_table (id SERIAL PRIMARY KEY, name TEXT)')
})
})

beforeEach(async () => {
await withClient(async (client) => {
await client.query('TRUNCATE test_table')
})
})

after(async () => {
// Clean up the test table after running tests
await withClient(async (client) => {
await client.query('DROP TABLE IF EXISTS test_table')
})
})

it('should create a client with an empty temp table', async () => {
await withClient(async (client) => {
const { rowCount } = await client.query('SELECT * FROM test_table')
assert.equal(rowCount, 0, 'Temp table should be empty on creation')
})
})

it('should auto-commit at end of callback', async () => {
await withClient(async (client) => {
await transaction(client, async (client) => {
await client.query('INSERT INTO test_table (name) VALUES ($1)', ['AutoCommit'])
// row should be visible within transaction
const { rows } = await client.query('SELECT * FROM test_table')
assert.equal(rows.length, 1, 'Row should be inserted within transaction')

// while inside this transaction, the changes are not visible outside
await withClient(async (innerClient) => {
const { rowCount } = await innerClient.query('SELECT * FROM test_table')
assert.equal(rowCount, 0, 'Temp table should still be empty inside transaction')
})
})
})
})

it('should rollback on error', async () => {
await withClient(async (client) => {
try {
await transaction(client, async (client) => {
await client.query('INSERT INTO test_table (name) VALUES ($1)', ['RollbackTest'])
throw new Error('Intentional Error to trigger rollback')
})
} catch (error) {
// Expected error, do nothing
}

// After rollback, the table should still be empty
const { rowCount } = await client.query('SELECT * FROM test_table')
assert.equal(rowCount, 0, 'Temp table should be empty after rollback')
})
})

it('works with Pool', async () => {
const pool = new Pool()
try {
await transaction(pool, async (client) => {
await client.query('INSERT INTO test_table (name) VALUES ($1)', ['PoolTransaction'])
const { rows } = await client.query('SELECT * FROM test_table')
assert.equal(rows.length, 1, 'Row should be inserted in pool transaction')
})

assert.equal(pool.idleCount, 1, 'Pool should have idle clients after transaction')

// Verify the row is visible outside the transaction
const { rows } = await pool.query('SELECT * FROM test_table')
assert.equal(rows.length, 1, 'Row should be visible after pool transaction')
} finally {
await pool.end()
}
})

it('rollsback errors with pool', async () => {
const pool = new Pool()
try {
try {
await transaction(pool, async (client) => {
await client.query('INSERT INTO test_table (name) VALUES ($1)', ['PoolRollbackTest'])
throw new Error('Intentional Error to trigger rollback')
})
} catch (error) {
// Expected error, do nothing
}

// After rollback, the table should still be empty
const { rowCount } = await pool.query('SELECT * FROM test_table')
assert.equal(rowCount, 0, 'Temp table should be empty after pool rollback')
} finally {
await pool.end()
}
})

it('can be bound to first argument', async () => {
const pool = new Pool()
try {
const txn = transaction.bind(null, pool)

await txn(async (client) => {
await client.query('INSERT INTO test_table (name) VALUES ($1)', ['BoundTransaction'])
const { rows } = await client.query('SELECT * FROM test_table')
assert.equal(rows.length, 1, 'Row should be inserted in bound transaction')
})

// Verify the row is visible outside the transaction
const { rows } = await pool.query('SELECT * FROM test_table')
assert.equal(rows.length, 1, 'Row should be visible after bound transaction')
} finally {
await pool.end()
}
})
})
35 changes: 35 additions & 0 deletions packages/pg-transaction/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import type { Client, Pool, PoolClient } from 'pg'

function isPoolClient(clientOrPool: Client | PoolClient): clientOrPool is PoolClient {
return 'release' in clientOrPool
}

function isPool(clientOrPool: Client | Pool): clientOrPool is Pool {
return 'idleCount' in clientOrPool
}

async function transaction<T>(clientOrPool: Client | Pool, cb: (client: Client) => Promise<T>): Promise<T> {
let client: Client | PoolClient
if (isPool(clientOrPool)) {
// It's a Pool
client = await clientOrPool.connect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn’t do the error listener dance, so connection errors can crash a Node process here. It might be surprising that the behavior is different from pool.query. But maybe pg should add a default handler for the error event in a minor version update anyway?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn’t do the error listener dance, so connection errors can crash a Node process here

Sorry I'm not following! What do you mean by error listener dance? I thought await pool.connect() would reject on connection errors? not have to result to listeners?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool.connect() itself does, but if there’s a connection-level error after connecting (e.g. during the BEGIN query), the default behavior is to crash the process (because it’s an error event). (I think a lot of people probably use pg without doing this part strictly right…)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of people probably use pg without doing this part strictly right…)

yeah, honestly myself included....for yeras, in multiple apps. 😬

the whole async/evented error stuff is so so tricky to get "really just right" - would love to see an example if you have one. I'm totally cool working on some kind of "buffer the error(s) if no default .on('error') listener is added and throw it on the next call to the library" feature instead of what it is now which is effectively "if its not an in-flight query, then you're probably just gonna crash, which doesn't happen much, but async race conditions are their own kind of personal hell." I do think that's a minor version bump fix too because it could probably be backwards-comp where if there is an error listener things proceede as normal but if there isn't we don't just explode the universe w/ an unhandledError and instead buffer it.

Honestly it could potentially be a lot to take on and needs some thinking about where the right place to handle these errors is (because legacy reasons we do pool -> client -> connection -> pg-protocol) callstacks, but its probably worth spending more time there myself.

Copy link
Collaborator

@charmander charmander Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already did the error replay part in #1503; it’d just be a matter of adding this.on('error', () => {}) to Client’s constructor (and Pool’s too) *actually, I don’t remember if the pool automatically drops broken clients that are released to it without an explicit error, and there might be a question of whether .end() should start rejecting with the connection error… maybe only if there are no error listeners… it might not be that simple, and a new major version could be the safest option.

} else {
// It's a Client
client = clientOrPool as Client
}
await client.query('BEGIN')
try {
const result = await cb(client as Client)
await client.query('COMMIT')
return result
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
if (isPoolClient(client)) {
client.release()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the rollback failed, client could be returned to the pool while still in a transaction or in a broken-connection state. I don’t know if the former is actually possible in practice (ROLLBACK failing without breaking the connection), but the latter definitely is (when error doesn’t crash the process, e.g. because the user is attaching error listeners to all clients using the pool’s connect event). Might want to add another catch for the ROLLBACK and .release(rollbackError).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's a good point. I'll take stab at this...I think I can repro by pg_terminate_backend the active in transaction client backed from within the transaction itself. That should disconnect it and make it fail to be able to send rollback - anyways....I'll test a few more failure modes for both this comment and the other one about the error listener dance and see if I can make it throw an uncaught anywhere else.

}
}
}

export { transaction }
15 changes: 15 additions & 0 deletions packages/pg-transaction/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "esnext",
"moduleResolution": "node",
"outDir": "./dist",
"rootDir": "./src",
"declaration": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipLibCheck": false
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "test"]
}
Loading