Skip to content

Commit 942e423

Browse files
authored
Merge pull request #55 from leapfrogtechnology/sequential-run
Support both sequential and parallel execution strategies for running synchronization
2 parents 5500ae9 + e0c893b commit 942e423

File tree

12 files changed

+298
-24
lines changed

12 files changed

+298
-24
lines changed

examples/node-app-mssql/sync-db.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
basePath: src/sql
2+
execution: sequential
23
sql:
34
# Create objects in dbo schema
45
- function/dbo/sum.sql

src/SyncDb.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Command, flags } from '@oclif/command';
22

33
import { log } from './logger';
44
import { handleFlags } from './cli';
5+
import { getElapsedTime } from './util/ts';
56
import SyncResult from './domain/SyncResult';
67
import SyncParams from './domain/SyncParams';
78
import { printError, printLine } from './util/io';
@@ -35,11 +36,11 @@ class SyncDb extends Command {
3536
...userParams,
3637
// Individual success handler
3738
onSuccess: (context: ExecutionContext) =>
38-
printLine(` [✓] ${context.connectionId} - Successful (${context.timeElapsed.toFixed(2)}s)`),
39+
printLine(` [✓] ${context.connectionId} - Successful (${context.timeElapsed}s)`),
3940

4041
// Individual error handler
4142
onFailed: (context: ExecutionContext) =>
42-
printLine(` [✖] ${context.connectionId} - Failed (${context.timeElapsed.toFixed(2)}s)`)
43+
printLine(` [✖] ${context.connectionId} - Failed (${context.timeElapsed}s)`)
4344
};
4445
}
4546

@@ -61,11 +62,20 @@ class SyncDb extends Command {
6162
const config = await loadConfig();
6263
const connections = await resolveConnections();
6364
const { synchronize } = await import('./api');
65+
const timeStart = process.hrtime();
6466

6567
await printLine('Synchronizing...\n');
6668

6769
const results = await synchronize(config, connections, params);
68-
const { totalCount, failedCount } = await this.processResults(results);
70+
const { totalCount, failedCount, successfulCount } = await this.processResults(results);
71+
72+
if (successfulCount > 0) {
73+
// Display output.
74+
await printLine(
75+
`Synchronization complete for ${successfulCount} / ${totalCount} connection(s). ` +
76+
`(${getElapsedTime(timeStart)}s)`
77+
);
78+
}
6979

7080
// If all completed successfully, exit gracefully.
7181
if (failedCount === 0) {
@@ -101,11 +111,6 @@ class SyncDb extends Command {
101111

102112
await printLine();
103113

104-
if (successfulCount > 0) {
105-
// Display output.
106-
await printLine(`Synchronization successful for ${successfulCount} / ${totalCount} connection(s).`);
107-
}
108-
109114
// If there are errors, display all of them.
110115
if (!allComplete) {
111116
await printLine(`Synchronization failed for ${failedCount} connection(s):\n`);

src/api.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import SyncParams from './domain/SyncParams';
77
import SyncConfig from './domain/SyncConfig';
88
import SyncResult from './domain/SyncResult';
99
import { DEFAULT_SYNC_PARAMS } from './constants';
10-
import { synchronizeDatabase } from './services/sync';
1110
import ConnectionConfig from './domain/ConnectionConfig';
1211
import ConnectionReference from './domain/ConnectionReference';
1312
import { isKnexInstance, getConfig, createInstance } from './util/db';
1413

14+
// Services
15+
import { synchronizeDatabase } from './services/sync';
16+
import { executeProcesses } from './services/execution';
17+
1518
/**
1619
* Synchronize all the configured database connections.
1720
*
@@ -31,7 +34,7 @@ export async function synchronize(
3134
const connections = mapToConnectionReferences(connectionList);
3235
const params = mergeDeepRight(DEFAULT_SYNC_PARAMS, options || {});
3336
const isCLI = process.env.SYNC_DB_CLI === 'true';
34-
const promises = connections.map(({ connection, id: connectionId }) =>
37+
const processes = connections.map(({ connection, id: connectionId }) => () =>
3538
synchronizeDatabase(connection, {
3639
isCLI,
3740
config,
@@ -40,7 +43,9 @@ export async function synchronize(
4043
})
4144
);
4245

43-
const results = await Promise.all(promises);
46+
// Explicitly suppressing the `| Error` type since
47+
// all errors are already caught inside synchronizeDatabase().
48+
const results = (await executeProcesses(processes, config)) as SyncResult[];
4449

4550
log('Synchronization completed.');
4651

src/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export const CONNECTIONS_FILENAME = 'connections.sync-db.json';
1616
export const INJECTED_CONFIG_TABLE = '__sync_db_injected_config';
1717
export const DEFAULT_CONFIG: SyncConfig = {
1818
basePath: path.resolve(process.cwd(), 'src/sql'),
19+
execution: 'parallel',
1920
sql: [],
2021
hooks: {
2122
pre_sync: [],

src/domain/SyncConfig.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import Mapping from './Mapping';
55
*/
66
interface SyncConfig {
77
basePath: string;
8+
execution: 'parallel' | 'sequential';
89
sql: string[];
910
hooks: {
1011
pre_sync: string[];

src/services/execution.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { log } from '../logger';
2+
import SyncConfig from '../domain/SyncConfig';
3+
import { Promiser, runSequentially } from '../util/promise';
4+
5+
/**
6+
* Execute a list of processes according to the configuration.
7+
*
8+
* @param {Promiser<T>[]} processes
9+
* @param {SyncConfig} config
10+
* @returns {Promise<T[]>}
11+
*/
12+
export function executeProcesses<T>(processes: Promiser<T>[], config: SyncConfig): Promise<(T | Error)[]> {
13+
log(`Execution Strategy: ${config.execution}`);
14+
15+
switch (config.execution) {
16+
case 'sequential':
17+
return runSequentially(processes);
18+
19+
case 'parallel':
20+
return Promise.all(processes.map(fn => fn()));
21+
22+
default:
23+
throw new Error(`Execution strategy should be "sequential" or "parallel" found: "${config.execution}".`);
24+
}
25+
}

src/services/sqlRunner.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,14 @@ export function getDropStatement(type: string, fqon: string): string {
109109
* @param {Knex} trx
110110
* @param {SqlCode[]} files
111111
* @param {string} connectionId
112-
* @returns {Promise<void>}
112+
* @returns {Promise<any[]>}
113113
*/
114-
export function runSequentially(trx: Knex, files: SqlCode[], connectionId: string): Promise<void> {
114+
export function runSequentially(trx: Knex, files: SqlCode[], connectionId: string): Promise<any[]> {
115115
const log = dbLogger(connectionId);
116116
const promises = files.map(file => {
117117
log(`Running ${file.name}`);
118118

119-
return trx.raw(file.sql);
119+
return () => trx.raw(file.sql);
120120
});
121121

122122
return promise.runSequentially(promises);

src/services/sync.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as Knex from 'knex';
22

33
import { dbLogger } from '../logger';
44
import * as sqlRunner from './sqlRunner';
5-
import { NS_PER_SEC } from '../constants';
5+
import { getElapsedTime } from '../util/ts';
66
import SyncResult from '../domain/SyncResult';
77
import SyncContext from '../domain/SyncContext';
88
import * as configInjection from './configInjection';
@@ -109,10 +109,9 @@ export async function synchronizeDatabase(connection: Knex, context: SyncContext
109109
result.error = e;
110110
}
111111

112-
const timeDiff = process.hrtime(timeStart);
113-
const timeElapsed = Number(timeDiff[0]) + Number(timeDiff[1] / NS_PER_SEC);
112+
const timeElapsed = getElapsedTime(timeStart);
114113

115-
log(`Execution completed in ${timeDiff[0]} s, ${timeDiff[1]} ns`);
114+
log(`Execution completed in ${timeElapsed} s`);
116115

117116
// If it's a CLI environment, invoke the handler.
118117
if (context.isCLI) {

src/util/promise.ts

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,45 @@
1+
import { promisify } from 'util';
2+
13
/**
2-
* Run each of the promise sequentially.
4+
* Promiser - A function that returns a promise.
5+
*/
6+
export type Promiser<T> = () => PromiseLike<T>;
7+
8+
/**
9+
* Resolve a promise after a timeout.
10+
*/
11+
export const timeout = promisify(setTimeout);
12+
13+
/**
14+
* Run each of the promise sequentially and return their results in the same order.
315
*
4-
* @param {PromiseLike<T>[]} promises
5-
* @returns {Promise<void>}
16+
* @param {Promiser<T>[]} promisers
17+
* @param {boolean} [failCascade=true]
18+
* @returns {(Promise<(T | Error)[]>)}
619
*/
7-
export async function runSequentially<T>(promises: PromiseLike<T>[]): Promise<void> {
8-
for (const promise of promises) {
9-
await promise;
20+
export async function runSequentially<T>(
21+
promisers: Promiser<T>[],
22+
failCascade: boolean = true
23+
): Promise<(T | Error)[]> {
24+
const result: (T | Error)[] = [];
25+
26+
for (const promiser of promisers) {
27+
try {
28+
const value = await promiser();
29+
30+
result.push(value);
31+
} catch (err) {
32+
// If failCascade = true,
33+
// any error (promise rejection) will be cascaded thus halting the process.
34+
if (failCascade) {
35+
throw err;
36+
}
37+
38+
// If failCascade = false,
39+
// the failed promise will be resolved with the rejected error as a value.
40+
result.push(err instanceof Error ? err : new Error(err));
41+
}
1042
}
43+
44+
return result;
1145
}

src/util/ts.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { NS_PER_SEC } from '../constants';
2+
3+
/**
4+
* Calculate elapsed time from the given start time (process.hrtime()).
5+
*
6+
* @param {[number, number]} timeStart
7+
* @param {(false | number)} fixed
8+
* @returns {(number | string)}
9+
*/
10+
export function getElapsedTime(timeStart: [number, number], fixed: false | number = 2): number {
11+
const timeDiff = process.hrtime(timeStart);
12+
const timeElapsed = Number(timeDiff[0]) + Number(timeDiff[1] / NS_PER_SEC);
13+
14+
if (fixed === false) {
15+
return timeElapsed;
16+
}
17+
18+
return Number(timeElapsed.toFixed(fixed));
19+
}

0 commit comments

Comments
 (0)