Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions tests/dbhiccup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/* eslint-disable @typescript-eslint/require-await */
import { DBOS } from '../src';
import { DBOSConfig } from '../src/dbos-executor';
import { causeChaos, generateDBOSTestConfig, setUpDBOSTestSysDb } from './helpers';

let config: DBOSConfig;

class DisruptiveWFs {
@DBOS.workflow()
static async dbLossBetweenSteps() {
await DBOS.runStep(
async () => {
return 'A';
},
{ name: 'A' },
);
await DBOS.runStep(
async () => {
return 'B';
},
{ name: 'B' },
);
await causeChaos(config!.systemDatabaseUrl!);
await DBOS.runStep(
async () => {
return 'C';
},
{ name: 'C' },
);
await DBOS.runStep(
async () => {
return 'D';
},
{ name: 'D' },
);
return 'Hehehe';
}

@DBOS.workflow()
static async runChildWf() {
await causeChaos(config!.systemDatabaseUrl!);
const wfh = await DBOS.startWorkflow(DisruptiveWFs).dbLossBetweenSteps();
await causeChaos(config!.systemDatabaseUrl!);
return await wfh.getResult();
}

@DBOS.workflow()
static async wfPart1() {
await causeChaos(config!.systemDatabaseUrl!);
const r = await DBOS.recv<string>('topic', 5);
await causeChaos(config!.systemDatabaseUrl!);
await DBOS.setEvent('key', 'v1');
await causeChaos(config!.systemDatabaseUrl!);
return 'Part1' + r;
}

@DBOS.workflow()
static async wfPart2(id1: string) {
await causeChaos(config!.systemDatabaseUrl!);
await DBOS.send(id1, 'hello1', 'topic');
await causeChaos(config!.systemDatabaseUrl!);
const v1 = await DBOS.getEvent<string>(id1, 'key', 10);
await causeChaos(config!.systemDatabaseUrl!);
return 'Part2' + v1;
}
}

describe('sys-db-hiccup', () => {
beforeAll(async () => {
config = generateDBOSTestConfig();
await setUpDBOSTestSysDb(config);
DBOS.setConfig(config);
});

beforeEach(async () => {
await DBOS.launch();
});

afterEach(async () => {
await DBOS.shutdown();
});

test('a-little-chaos', async () => {
await expect(DisruptiveWFs.dbLossBetweenSteps()).resolves.toBe('Hehehe');
await expect(DisruptiveWFs.runChildWf()).resolves.toBe('Hehehe');

const h1 = await DBOS.startWorkflow(DisruptiveWFs).wfPart1();
const h2 = await DBOS.startWorkflow(DisruptiveWFs).wfPart2(h1.workflowID);

await expect(h1.getResult()).resolves.toBe('Part1hello1');
await expect(h2.getResult()).resolves.toBe('Part2v1');
}, 30000);
});
24 changes: 24 additions & 0 deletions tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { isValidDatabaseName, translateDbosConfig } from '../src/config';
import { ensureSystemDatabase } from '../src/system_database';
import { GlobalLogger } from '../src/telemetry/logs';
import { dropPGDatabase, maskDatabaseUrl } from '../src/datasource';
import { Client } from 'pg';

/* DB management helpers */
function getSysDatabaseUrlFromUserDb(userDB: string) {
Expand Down Expand Up @@ -136,3 +137,26 @@ export async function dropDatabase(connectionString: string, database?: string)
throw new Error(`Unable to drop ${maskDatabaseUrl(connectionString)}`);
}
}

export async function causeChaos(db: string): Promise<void> {
const client = new Client({
connectionString: db, // or your config object
});

try {
await client.connect();

await client.query(`
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = current_database();
`);
} catch (err) {
//throw new Error(`Could not cause chaos, credentials insufficient? ${err as Error}`);
} finally {
try {
await client.end();
} catch (err) {}
}
}