Skip to content

Commit 1f07b85

Browse files
authored
fix: catch pg exceptions on queries outside of express (#1348)
* fix: catch db exceptions in token processor * feat: return 503 on API postgres errors * fix: add postgres.js connection errors * test: token queue and handler * fix: block_count on limit=0 for blocks
1 parent fa08a9c commit 1f07b85

File tree

8 files changed

+109
-34
lines changed

8 files changed

+109
-34
lines changed

src/api/init.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import * as expressWinston from 'express-winston';
55
import * as winston from 'winston';
66
import { v4 as uuid } from 'uuid';
77
import * as cors from 'cors';
8-
import * as WebSocket from 'ws';
9-
import * as SocketIO from 'socket.io';
108

119
import { createTxRouter } from './routes/tx';
1210
import { createDebugRouter } from './routes/debug';
@@ -47,6 +45,7 @@ import * as fs from 'fs';
4745
import { PgStore } from '../datastore/pg-store';
4846
import { PgWriteStore } from '../datastore/pg-write-store';
4947
import { WebSocketTransmitter } from './routes/ws/web-socket-transmitter';
48+
import { isPgConnectionError } from '../datastore/helpers';
5049

5150
export interface ApiServer {
5251
expressApp: express.Express;
@@ -300,6 +299,8 @@ export async function startApiServer(opts: {
300299
if (error && !res.headersSent) {
301300
if (error instanceof InvalidRequestError) {
302301
res.status(error.status).json({ error: error.message }).end();
302+
} else if (isPgConnectionError(error)) {
303+
res.status(503).json({ error: `The database service is unavailable` }).end();
303304
} else {
304305
res.status(500);
305306
const errorTag = uuid();

src/datastore/helpers.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,14 @@ export function isPgConnectionError(error: any): string | false {
184184
return 'Postgres connection ENOTFOUND';
185185
} else if (error.code === 'ECONNRESET') {
186186
return 'Postgres connection ECONNRESET';
187+
} else if (error.code === 'CONNECTION_CLOSED') {
188+
return 'Postgres connection CONNECTION_CLOSED';
189+
} else if (error.code === 'CONNECTION_ENDED') {
190+
return 'Postgres connection CONNECTION_ENDED';
191+
} else if (error.code === 'CONNECTION_DESTROYED') {
192+
return 'Postgres connection CONNECTION_DESTROYED';
193+
} else if (error.code === 'CONNECTION_CONNECT_TIMEOUT') {
194+
return 'Postgres connection CONNECTION_CONNECT_TIMEOUT';
187195
} else if (error.message) {
188196
const msg = (error as Error).message.toLowerCase();
189197
if (msg.includes('database system is starting up')) {

src/datastore/pg-store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ export class PgStore {
420420
if (blockHashValues.length === 0) {
421421
return {
422422
results: [],
423-
total: 0,
423+
total: block_count,
424424
};
425425
}
426426

src/tests-tokens/strict-mode-tests.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { stringCV } from '@stacks/transactions/dist/clarity/types/stringCV';
1212
import { getTokenMetadataFetchTimeoutMs } from '../token-metadata/helpers';
1313
import { PgWriteStore } from '../datastore/pg-write-store';
1414
import { cycleMigrations, runMigrations } from '../datastore/migrations';
15+
import { TokensProcessorQueue } from '../token-metadata/tokens-processor-queue';
1516

1617
const NFT_CONTRACT_ABI: ClarityAbi = {
1718
maps: [],
@@ -189,6 +190,33 @@ describe('token metadata strict mode', () => {
189190
expect(entry.result?.processed).toBe(false);
190191
});
191192

193+
test('db errors are handled gracefully in contract handler', async () => {
194+
process.env['STACKS_CORE_RPC_PORT'] = '11111'; // Make node unreachable
195+
process.env['STACKS_API_TOKEN_METADATA_STRICT_MODE'] = '1';
196+
process.env['STACKS_API_TOKEN_METADATA_MAX_RETRIES'] = '0';
197+
const handler = new TokensContractHandler({
198+
contractId: contractId,
199+
smartContractAbi: NFT_CONTRACT_ABI,
200+
datastore: db,
201+
chainId: ChainID.Testnet,
202+
txId: contractTxId,
203+
dbQueueId: 1,
204+
});
205+
await db.close(); // End connection to trigger postgres error
206+
await expect(handler.start()).resolves.not.toThrow();
207+
});
208+
209+
test('db errors are handled gracefully in queue', async () => {
210+
const queue = new TokensProcessorQueue(db, ChainID.Testnet);
211+
await db.close(); // End connection to trigger postgres error
212+
await expect(queue.checkDbQueue()).resolves.not.toThrow();
213+
await expect(queue.drainDbQueue()).resolves.not.toThrow();
214+
await expect(queue.queueNotificationHandler(1)).resolves.not.toThrow();
215+
await expect(
216+
queue.queueHandler({ queueId: 1, txId: '0x11', contractId: 'test' })
217+
).resolves.not.toThrow();
218+
});
219+
192220
test('node runtime errors get retried', async () => {
193221
const mockResponse = {
194222
okay: false,

src/tests-tokens/tokens-metadata-tests.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import {
1414
DbNonFungibleTokenMetadata,
1515
} from '../datastore/common';
1616
import { startApiServer, ApiServer } from '../api/init';
17-
import { PoolClient } from 'pg';
1817
import * as fs from 'fs';
1918
import { EventStreamServer, startEventServer } from '../event-stream/event-server';
2019
import { getStacksTestnetNetwork } from '../rosetta-helpers';

src/tests/other-tests.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,13 @@ describe('other tests', () => {
293293
expect(result.body.status).toBe('ready');
294294
});
295295

296+
test('database unavailable responses', async () => {
297+
// Close connection so we get an error.
298+
await db.close();
299+
const result = await supertest(api.server).get(`/extended/v1/block/`);
300+
expect(result.body.error).toBe('The database service is unavailable');
301+
});
302+
296303
afterEach(async () => {
297304
await api.terminate();
298305
await db?.close();

src/token-metadata/tokens-contract-handler.ts

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -149,30 +149,40 @@ export class TokensContractHandler {
149149
processingFinished = true;
150150
} catch (error) {
151151
if (error instanceof RetryableTokenMetadataError) {
152-
const retries = await this.db.increaseTokenMetadataQueueEntryRetryCount(this.dbQueueId);
153-
if (
154-
getTokenMetadataProcessingMode() === TokenMetadataProcessingMode.strict ||
155-
retries <= getTokenMetadataMaxRetries()
156-
) {
157-
logger.info(
158-
`[token-metadata] a recoverable error happened while processing ${this.contractId}, trying again later: ${error}`
159-
);
160-
} else {
161-
logger.warn(
162-
`[token-metadata] max retries reached while processing ${this.contractId}, giving up: ${error}`
163-
);
152+
try {
153+
const retries = await this.db.increaseTokenMetadataQueueEntryRetryCount(this.dbQueueId);
154+
if (
155+
getTokenMetadataProcessingMode() === TokenMetadataProcessingMode.strict ||
156+
retries <= getTokenMetadataMaxRetries()
157+
) {
158+
logger.info(
159+
`[token-metadata] a recoverable error happened while processing ${this.contractId}, trying again later: ${error}`
160+
);
161+
} else {
162+
logger.warn(
163+
`[token-metadata] max retries reached while processing ${this.contractId}, giving up: ${error}`
164+
);
165+
processingFinished = true;
166+
}
167+
} catch (error) {
168+
logger.error(error);
164169
processingFinished = true;
165170
}
166171
} else {
167172
// Something more serious happened, mark this contract as done.
173+
logger.error(error);
168174
processingFinished = true;
169175
}
170176
} finally {
171177
if (processingFinished) {
172-
await this.db.updateProcessedTokenMetadataQueueEntry(this.dbQueueId);
173-
logger.info(
174-
`[token-metadata] finished processing ${this.contractId} in ${sw.getElapsed()} ms`
175-
);
178+
try {
179+
await this.db.updateProcessedTokenMetadataQueueEntry(this.dbQueueId);
180+
logger.info(
181+
`[token-metadata] finished processing ${this.contractId} in ${sw.getElapsed()} ms`
182+
);
183+
} catch (error) {
184+
logger.error(error);
185+
}
176186
}
177187
}
178188
}

src/token-metadata/tokens-processor-queue.ts

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logError, logger } from '../helpers';
1+
import { FoundOrNot, logError, logger } from '../helpers';
22
import { Evt } from 'evt';
33
import PQueue from 'p-queue';
44
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
@@ -57,15 +57,18 @@ export class TokensProcessorQueue {
5757
return;
5858
}
5959
const queuedEntries = [...this.queuedEntries.keys()];
60-
entries = await this.db.getTokenMetadataQueue(
61-
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
62-
queuedEntries
63-
);
60+
try {
61+
entries = await this.db.getTokenMetadataQueue(
62+
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
63+
queuedEntries
64+
);
65+
} catch (error) {
66+
logger.error(error);
67+
}
6468
for (const entry of entries) {
6569
await this.queueHandler(entry);
6670
}
6771
await this.queue.onEmpty();
68-
// await this.queue.onIdle();
6972
} while (entries.length > 0 || this.queuedEntries.size > 0);
7073
}
7174

@@ -76,18 +79,30 @@ export class TokensProcessorQueue {
7679
const queuedEntries = [...this.queuedEntries.keys()];
7780
const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
7881
if (limit > 0) {
79-
const entries = await this.db.getTokenMetadataQueue(
80-
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
81-
queuedEntries
82-
);
82+
let entries: DbTokenMetadataQueueEntry[];
83+
try {
84+
entries = await this.db.getTokenMetadataQueue(
85+
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
86+
queuedEntries
87+
);
88+
} catch (error) {
89+
logger.error(error);
90+
return;
91+
}
8392
for (const entry of entries) {
8493
await this.queueHandler(entry);
8594
}
8695
}
8796
}
8897

8998
async queueNotificationHandler(queueId: number) {
90-
const queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
99+
let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
100+
try {
101+
queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
102+
} catch (error) {
103+
logger.error(error);
104+
return;
105+
}
91106
if (queueEntry.found) {
92107
await this.queueHandler(queueEntry.result);
93108
}
@@ -105,16 +120,23 @@ export class TokensProcessorQueue {
105120
) {
106121
return;
107122
}
108-
const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
109-
if (!contractQuery.found || !contractQuery.result.abi) {
123+
let abi: string;
124+
try {
125+
const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
126+
if (!contractQuery.found || !contractQuery.result.abi) {
127+
return;
128+
}
129+
abi = contractQuery.result.abi;
130+
} catch (error) {
131+
logger.error(error);
110132
return;
111133
}
112134
logger.info(
113135
`[token-metadata] queueing token contract for processing: ${queueEntry.contractId} from tx ${queueEntry.txId}`
114136
);
115137
this.queuedEntries.set(queueEntry.queueId, queueEntry);
116138

117-
const contractAbi: ClarityAbi = JSON.parse(contractQuery.result.abi);
139+
const contractAbi: ClarityAbi = JSON.parse(abi);
118140

119141
const tokenContractHandler = new TokensContractHandler({
120142
contractId: queueEntry.contractId,

0 commit comments

Comments
 (0)