@@ -31,13 +31,14 @@ const getById = (blobId) => ({ maybeOne }) =>
3131 maybeOne ( sql `select * from blobs where id=${ blobId } ` )
3232 . then ( map ( construct ( Blob ) ) ) ;
3333
34- const s3CountByStatus = ( status ) => ( { oneFirst, run } ) => {
35- // Note: This takes out rowlocks on all rows in `blobs`, so
36- // if this is run in a transaction, then those locks aren't
37- // released until the end of the transaction.
38- // Hence the savepoints and rollbacks.
39- // But that brings another potential issue: trying to use a
40- // savepoint outside a transaction is an error.
34+ const s3CountByStatus = ( status ) => ( db ) => {
35+ // Note:
36+ // To count locked rows, this counts how many rows are left to be locked,
37+ // and subtracts that from a total rowcount. Locking rows needs to happen
38+ // in a transaction (or one simulated through savepoints) so that the rows
39+ // we locked here for just the counting purpose will be released as soon
40+ // as we're done counting, rather than at the end of a potential enveloping
41+ // transaction.
4142 const lockCTEs = sql `
4243 notlockeds AS (
4344 SELECT 1 FROM blobs FOR NO KEY UPDATE SKIP LOCKED
@@ -51,31 +52,25 @@ const s3CountByStatus = (status) => ({ oneFirst, run }) => {
5152 )` ;
5253 // in_progress is an implicit status
5354 if ( status === 'in_progress' ) {
54- return oneFirst ( sql `
55- SAVEPOINT peeklocked;
56- WITH ${ lockCTEs }
57- SELECT count FROM locked
58- ` ) . then ( cnt =>
59- run (
60- sql `ROLLBACK TO SAVEPOINT peeklocked`
61- ) . then ( ( ) => cnt )
55+ return db . transaction ( tx =>
56+ tx . oneFirst ( sql `
57+ WITH ${ lockCTEs }
58+ SELECT count FROM locked
59+ ` )
6260 ) ;
6361 } else if ( status === 'pending' ) {
64- return oneFirst ( sql `
65- SAVEPOINT peeklocked;
66- WITH
67- allpending AS (
68- SELECT COUNT(*) FROM blobs WHERE s3_status='pending'
69- ),
70- ${ lockCTEs }
71- SELECT allpending.count-locked.count FROM allpending, locked
72- ` ) . then ( cnt =>
73- run (
74- sql `ROLLBACK TO SAVEPOINT peeklocked`
75- ) . then ( ( ) => cnt )
62+ return db . transaction ( tx =>
63+ tx . oneFirst ( sql `
64+ WITH
65+ allpending AS (
66+ SELECT COUNT(*) FROM blobs WHERE s3_status='pending'
67+ ),
68+ ${ lockCTEs }
69+ SELECT allpending.count-locked.count FROM allpending, locked
70+ ` )
7671 ) ;
7772 } else {
78- return oneFirst ( sql `SELECT COUNT(*) FROM blobs WHERE s3_status=${ status } ` ) ;
73+ return db . oneFirst ( sql `SELECT COUNT(*) FROM blobs WHERE s3_status=${ status } ` ) ;
7974 }
8075} ;
8176
0 commit comments