Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,10 @@ CREATE DATABASE jubilant_test WITH OWNER=jubilant ENCODING=UTF8;
\c jubilant_test;
CREATE EXTENSION IF NOT EXISTS CITEXT;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE EXTENSION IF NOT EXISTS pgrowlocks;
CREATE DATABASE jubilant WITH OWNER=jubilant ENCODING=UTF8;
\c jubilant;
CREATE EXTENSION IF NOT EXISTS CITEXT;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE EXTENSION IF NOT EXISTS pgrowlocks;
```

If you are using Docker, you may find it easiest to run the database in Docker by running `make run-docker-postgres`.
Expand Down
1 change: 0 additions & 1 deletion lib/bin/create-docker-databases.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const { log } = program.opts();
const dbj = connect(database);
await dbj.raw('create extension citext;');
await dbj.raw('create extension pg_trgm;');
await dbj.raw('create extension pgrowlocks;');
dbj.destroy();
}));

Expand Down
14 changes: 14 additions & 0 deletions lib/model/migrations/20251026-01-uninstall-pgrowlocks-extension.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2025 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const down = async (db) => db.raw('CREATE EXTENSION IF NOT EXISTS pgrowlocks');

const up = (db) => db.raw('DROP EXTENSION IF EXISTS pgrowlocks');

module.exports = { up, down };
63 changes: 51 additions & 12 deletions lib/model/query/blobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,62 @@ const getById = (blobId) => ({ maybeOne }) =>
maybeOne(sql`select * from blobs where id=${blobId}`)
.then(map(construct(Blob)));

const s3CountByStatus = (status) => ({ oneFirst }) => {
const s3CountByStatus = (status) => (db) => {
// Note:
// To count locked rows, this counts how many rows are left to be locked,
// and subtracts that from a total rowcount. Locking rows needs to happen
// in a transaction (or one simulated through savepoints) so that the rows
// we locked here for just the counting purpose will be released as soon
// as we're done counting, rather than at the end of a potential enveloping
// transaction.
const lockCTEs = sql`
notlockeds AS (
SELECT 1 FROM blobs FOR NO KEY UPDATE SKIP LOCKED
),
locked AS (
SELECT (
(SELECT count(id) FROM blobs)
-
(SELECT count(*) FROM notlockeds)
) as count
)`;
// in_progress is an implicit status
if (status === 'in_progress') {
return oneFirst(sql`SELECT COUNT(*) FROM PGROWLOCKS('blobs')`);
return db.transaction(tx =>
tx.query(
sql`SAVEPOINT peeklocked
`).then(() =>
tx.oneFirst(sql`
WITH ${lockCTEs}
SELECT count FROM locked
`).then(cnt =>
tx.query(sql`
ROLLBACK TO SAVEPOINT peeklocked
`).then(() => cnt)
)
)
);
} else if (status === 'pending') {
return oneFirst(sql`
WITH
allpending AS (
SELECT COUNT(*) FROM blobs WHERE s3_status='pending'
),
locked AS (
SELECT COUNT(*) FROM PGROWLOCKS('blobs')
return db.transaction(tx =>
tx.query(
sql`SAVEPOINT peeklocked
`).then(() =>
tx.oneFirst(sql`
WITH
allpending AS (
SELECT COUNT(*) FROM blobs WHERE s3_status='pending'
),
${lockCTEs}
SELECT allpending.count-locked.count FROM allpending, locked
`).then(cnt =>
tx.query(sql`
ROLLBACK TO SAVEPOINT peeklocked
`).then(() => cnt)
)
SELECT allpending.count-locked.count FROM allpending, locked
`);
)
);
} else {
return oneFirst(sql`SELECT COUNT(*) FROM blobs WHERE s3_status=${status}`);
return db.oneFirst(sql`SELECT COUNT(*) FROM blobs WHERE s3_status=${status}`);
}
};

Expand Down
33 changes: 5 additions & 28 deletions lib/task/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,16 @@ const assertEnabled = s3 => {
}
};

const isMissingRowlocks = err => err.code === '42883' && err.message === 'function pgrowlocks(unknown) does not exist';

const getUploadCount = async (Blobs, limit) => {
try {
const pendingCount = await Blobs.s3CountByStatus('pending');
return limit ? Math.min(pendingCount, limit) : pendingCount;
} catch (err) {
if (isMissingRowlocks(err)) return limit;
else throw err;
}
const pendingCount = await Blobs.s3CountByStatus('pending');
return limit ? Math.min(pendingCount, limit) : pendingCount;
};

const getCount = withContainer(({ s3, Blobs }) => async status => {
assertEnabled(s3);
try {
const count = await Blobs.s3CountByStatus(status);
console.log(count);
return count; // just for testing
} catch (err) {
if (isMissingRowlocks(err)) {
console.error(`

Error: cannot count blobs by status due to missing PostgreSQL extension: PGROWLOCKS.

To install this extension, execute the following query in your PostgreSQL instance:

CREATE EXTENSION IF NOT EXISTS pgrowlocks;
`);
process.exit(1);
} else {
throw err;
}
}
const count = await Blobs.s3CountByStatus(status);
console.log(count);
return count; // just for testing
});

const setFailedToPending = withContainer(({ s3, Blobs }) => async () => {
Expand Down
1 change: 1 addition & 0 deletions lib/util/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ const page = (options) => {
// it a try and a pull request.
const queryFuncs = (db, obj) => {
/* eslint-disable no-param-reassign */
obj.transaction = db.transaction;
obj.run = (s) => db.query(s).then(always(true));
obj.run.map = () => () => true;

Expand Down
Loading