diff --git a/tests/dbhiccup.test.ts b/tests/dbhiccup.test.ts new file mode 100644 index 000000000..2b2542847 --- /dev/null +++ b/tests/dbhiccup.test.ts @@ -0,0 +1,93 @@ +/* eslint-disable @typescript-eslint/require-await */ +import { DBOS } from '../src'; +import { DBOSConfig } from '../src/dbos-executor'; +import { causeChaos, generateDBOSTestConfig, setUpDBOSTestSysDb } from './helpers'; + +let config: DBOSConfig; + +class DisruptiveWFs { + @DBOS.workflow() + static async dbLossBetweenSteps() { + await DBOS.runStep( + async () => { + return 'A'; + }, + { name: 'A' }, + ); + await DBOS.runStep( + async () => { + return 'B'; + }, + { name: 'B' }, + ); + await causeChaos(config!.systemDatabaseUrl!); + await DBOS.runStep( + async () => { + return 'C'; + }, + { name: 'C' }, + ); + await DBOS.runStep( + async () => { + return 'D'; + }, + { name: 'D' }, + ); + return 'Hehehe'; + } + + @DBOS.workflow() + static async runChildWf() { + await causeChaos(config!.systemDatabaseUrl!); + const wfh = await DBOS.startWorkflow(DisruptiveWFs).dbLossBetweenSteps(); + await causeChaos(config!.systemDatabaseUrl!); + return await wfh.getResult(); + } + + @DBOS.workflow() + static async wfPart1() { + await causeChaos(config!.systemDatabaseUrl!); + const r = await DBOS.recv('topic', 5); + await causeChaos(config!.systemDatabaseUrl!); + await DBOS.setEvent('key', 'v1'); + await causeChaos(config!.systemDatabaseUrl!); + return 'Part1' + r; + } + + @DBOS.workflow() + static async wfPart2(id1: string) { + await causeChaos(config!.systemDatabaseUrl!); + await DBOS.send(id1, 'hello1', 'topic'); + await causeChaos(config!.systemDatabaseUrl!); + const v1 = await DBOS.getEvent(id1, 'key', 10); + await causeChaos(config!.systemDatabaseUrl!); + return 'Part2' + v1; + } +} + +describe('sys-db-hiccup', () => { + beforeAll(async () => { + config = generateDBOSTestConfig(); + await setUpDBOSTestSysDb(config); + DBOS.setConfig(config); + }); + + beforeEach(async () => { + await DBOS.launch(); + }); + + afterEach(async () => { + await DBOS.shutdown(); + }); + + test('a-little-chaos', async () => { + await expect(DisruptiveWFs.dbLossBetweenSteps()).resolves.toBe('Hehehe'); + await expect(DisruptiveWFs.runChildWf()).resolves.toBe('Hehehe'); + + const h1 = await DBOS.startWorkflow(DisruptiveWFs).wfPart1(); + const h2 = await DBOS.startWorkflow(DisruptiveWFs).wfPart2(h1.workflowID); + + await expect(h1.getResult()).resolves.toBe('Part1hello1'); + await expect(h2.getResult()).resolves.toBe('Part2v1'); + }, 30000); +}); diff --git a/tests/helpers.ts b/tests/helpers.ts index 526acc55a..ef4715d48 100644 --- a/tests/helpers.ts +++ b/tests/helpers.ts @@ -5,6 +5,7 @@ import { isValidDatabaseName, translateDbosConfig } from '../src/config'; import { ensureSystemDatabase } from '../src/system_database'; import { GlobalLogger } from '../src/telemetry/logs'; import { dropPGDatabase, maskDatabaseUrl } from '../src/datasource'; +import { Client } from 'pg'; /* DB management helpers */ function getSysDatabaseUrlFromUserDb(userDB: string) { @@ -136,3 +137,26 @@ export async function dropDatabase(connectionString: string, database?: string) throw new Error(`Unable to drop ${maskDatabaseUrl(connectionString)}`); } } + +export async function causeChaos(db: string): Promise { + const client = new Client({ + connectionString: db, // or your config object + }); + + try { + await client.connect(); + + await client.query(` + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE pid <> pg_backend_pid() + AND datname = current_database(); + `); + } catch (err) { + //throw new Error(`Could not cause chaos, credentials insufficient? ${err as Error}`); + } finally { + try { + await client.end(); + } catch (err) {} + } +}