Skip to content

Commit a1f3f65

Browse files
authored
feat: concurrency safety (#99)
1 parent f9d042d commit a1f3f65

File tree

9 files changed

+266
-80
lines changed

9 files changed

+266
-80
lines changed

src/adapter.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import chalk from 'chalk'
12
import { parse } from 'url'
3+
import { Logger } from '.'
24
import { Commit } from './git'
35
import { Migration, Task, TaskType } from './migration'
46

@@ -14,6 +16,16 @@ export class UnsupportedDialectError extends Error {
1416
}
1517
}
1618

19+
export class PendingMigrationTimedOutError extends Error {
20+
/* istanbul ignore next */
21+
public readonly name = 'PendingMigrationTimedOutError'
22+
}
23+
24+
export class PendingMigrationFoundError extends Error {
25+
/* istanbul ignore next */
26+
public readonly name = 'PendingMigrationFoundError'
27+
}
28+
1729
export interface TableRow {
1830
id: number
1931
name: string
@@ -26,9 +38,47 @@ export interface TableRow {
2638
export abstract class DbAdapter {
2739
public abstract init(): Promise<void>
2840
public abstract getLastMigrationTask(): Promise<Task | null>
29-
public abstract logMigrationTask(task: Task): Promise<void>
41+
public abstract beginMigrationTask(task: Task): Promise<void>
42+
public abstract finishMigrationTask(task: Task): Promise<void>
3043
public abstract checkIfTaskCanExecute(task: Task): Promise<void>
3144
public abstract close(): Promise<void>
45+
protected abstract hasPendingMigration(): Promise<boolean>
46+
47+
public async waitForPending(logger: Logger): Promise<boolean> {
48+
let wasPending = false
49+
let shouldRetry = true
50+
await Promise.race([
51+
new Promise<never>((_, reject) =>
52+
setTimeout(() => reject(new PendingMigrationTimedOutError()), 1000 * 60 * 10)
53+
),
54+
(async () => {
55+
// fail after 10 min
56+
let interval: NodeJS.Timer | undefined
57+
while (shouldRetry) {
58+
// if there are rows, a migration is already running
59+
if (!(await this.hasPendingMigration())) {
60+
if (wasPending) {
61+
logger.log('\n\n')
62+
}
63+
break
64+
}
65+
if (!wasPending) {
66+
logger.log(`${chalk.yellow('Waiting for pending migrations')} ...`)
67+
// we had to wait for at least 1 pending migration
68+
wasPending = true
69+
interval = setInterval(() => logger.log('.'), 300)
70+
}
71+
// wait for 1000ms before retrying
72+
await new Promise<void>(resolve => setTimeout(resolve, 1000))
73+
}
74+
if (interval) {
75+
clearInterval(interval)
76+
}
77+
})(),
78+
])
79+
shouldRetry = false
80+
return wasPending
81+
}
3282

3383
protected rowToTask(row: TableRow): Task {
3484
const task = new Task({

src/adapters/postgres.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as pg from 'pg'
22
import { SQL } from 'sql-template-strings'
3-
import { DbAdapter } from '../adapter'
3+
import { DbAdapter, PendingMigrationFoundError } from '../adapter'
44
import { FirstDownMigrationError, MigrationRunTwiceError, Task } from '../migration'
55

66
export class PostgresAdapter extends DbAdapter {
@@ -37,13 +37,15 @@ export class PostgresAdapter extends DbAdapter {
3737
"type" merkel_migration_type,
3838
"commit" TEXT,
3939
"head" TEXT NOT NULL,
40-
"applied_at" TIMESTAMP WITH TIME ZONE NOT NULL
40+
"applied_at" TIMESTAMP WITH TIME ZONE
4141
);
4242
`)
43+
// migrate schema from merkel <= 0.19
44+
await this.client.query(`ALTER TABLE "merkel_meta" ALTER COLUMN "applied_at" DROP NOT NULL`)
4345
}
4446

4547
public close(): Promise<void> {
46-
return new Promise<void>((resolve, reject) => {
48+
return new Promise<void>(resolve => {
4749
this.client.on('end', resolve)
4850
// tslint:disable-next-line:no-floating-promises
4951
this.client.end()
@@ -58,28 +60,59 @@ export class PostgresAdapter extends DbAdapter {
5860
const { rows } = await this.client.query(`
5961
SELECT "id", "name", "applied_at", "type", "commit", "head"
6062
FROM "merkel_meta"
63+
WHERE "applied_at" IS NOT NULL
6164
ORDER BY "id" DESC
6265
LIMIT 1
6366
`)
6467
return rows.length === 0 ? null : this.rowToTask(rows[0])
6568
}
6669

6770
/**
68-
* Logs an executed task to the database. Sets the task ID
71+
* Logs a task to the database. Sets the task ID
6972
*/
70-
public async logMigrationTask(task: Task): Promise<void> {
71-
const { rows } = await this.client.query(SQL`
72-
INSERT INTO merkel_meta ("name", "type", "commit", "head", "applied_at")
73-
VALUES (
74-
${task.migration.name},
75-
${task.type},
76-
${task.commit ? task.commit.sha1 : null},
77-
${task.head ? task.head.sha1 : null},
78-
${task.appliedAt}
79-
)
80-
RETURNING id
73+
public async beginMigrationTask(task: Task): Promise<void> {
74+
/* istanbul ignore if */
75+
if (!task.head) {
76+
throw new Error('Task has no HEAD')
77+
}
78+
await this.client.query(`BEGIN TRANSACTION`)
79+
try {
80+
await this.client.query(`LOCK TABLE "merkel_meta"`)
81+
if (await this.hasPendingMigration()) {
82+
/* istanbul ignore next */
83+
throw new PendingMigrationFoundError()
84+
}
85+
const { rows } = await this.client.query(SQL`
86+
INSERT INTO merkel_meta ("name", "type", "commit", "head")
87+
VALUES (
88+
${task.migration.name},
89+
${task.type},
90+
${task.commit ? task.commit.sha1 : null},
91+
${task.head.sha1}
92+
)
93+
RETURNING id
94+
`)
95+
await this.client.query(`COMMIT`)
96+
task.id = rows[0].id
97+
} finally {
98+
await this.client.query(`ROLLBACK`)
99+
}
100+
}
101+
102+
/**
103+
* Marks the task as finished
104+
*/
105+
public async finishMigrationTask(task: Task): Promise<void> {
106+
const head = task.head ? task.head.sha1 : null
107+
const commit = task.commit ? task.commit.sha1 : null
108+
await this.client.query(SQL`
109+
UPDATE merkel_meta
110+
SET
111+
"applied_at" = ${task.appliedAt},
112+
"head" = ${head},
113+
"commit" = ${commit}
114+
WHERE "id" = ${task.id}
81115
`)
82-
task.id = rows[0].id
83116
}
84117

85118
/**
@@ -91,6 +124,7 @@ export class PostgresAdapter extends DbAdapter {
91124
SELECT "type"
92125
FROM "merkel_meta"
93126
WHERE "name" = ${task.migration.name}
127+
AND "applied_at" IS NOT NULL
94128
ORDER BY "id" DESC
95129
LIMIT 1
96130
`)
@@ -106,4 +140,14 @@ export class PostgresAdapter extends DbAdapter {
106140
}
107141
}
108142
}
143+
144+
protected async hasPendingMigration(): Promise<boolean> {
145+
const { rows } = await this.client.query(SQL`
146+
SELECT "type"
147+
FROM "merkel_meta"
148+
WHERE "applied_at" IS NULL
149+
LIMIT 1
150+
`)
151+
return rows.length !== 0
152+
}
109153
}

src/cli.ts

Lines changed: 81 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import { createAdapterFromUrl } from './adapter'
88
import { getHead } from './git'
99
import { addGitHook, HookAlreadyFoundError } from './git'
1010
import {
11+
CLI_LOGGER,
1112
createConfig,
1213
createMigrationDir,
1314
generate,
1415
getConfigurationForCommit,
1516
getStatus,
1617
isMerkelRepository,
18+
PendingMigrationFoundError,
1719
prepareCommitMsg,
1820
} from './index'
1921
import { Migration, Task, TaskType } from './migration'
@@ -133,12 +135,12 @@ yargs.command(
133135
when: () => !!argv.db,
134136
},
135137
])
136-
if (await createMigrationDir(migrationDir as string)) {
137-
process.stdout.write(`Created ${chalk.cyan(migrationDir as string)}\n`)
138+
if (await createMigrationDir(migrationDir)) {
139+
process.stdout.write(`Created ${chalk.cyan(migrationDir)}\n`)
138140
}
139141
await createConfig({
140-
migrationDir: migrationDir as string,
141-
migrationOutDir: (migrationOutDir as string) || './migrations',
142+
migrationDir,
143+
migrationOutDir: migrationOutDir || './migrations',
142144
})
143145
process.stdout.write(`Created ${chalk.cyan(path.join('.', '.merkelrc.json'))}\n`)
144146
if (initMetaNow) {
@@ -249,6 +251,8 @@ yargs.command(
249251
const adapter = createAdapterFromUrl(argv.db!)
250252
await adapter.init()
251253
const head = await getHead()
254+
// wait for current migration to finish
255+
await adapter.waitForPending(CLI_LOGGER)
252256
const status = await getStatus(adapter, head)
253257
process.stdout.write('\n' + status.toString())
254258
if (status.newCommits.some(commit => commit.tasks.length > 0)) {
@@ -283,33 +287,62 @@ yargs.command(
283287
try {
284288
const adapter = createAdapterFromUrl(argv.db!)
285289
await adapter.init()
286-
const head = await getHead()
287-
const status = await getStatus(adapter, head)
288-
process.stdout.write(status.toString())
289-
if (status.newCommits.some(commit => commit.tasks.length > 0)) {
290-
if (argv.confirm) {
291-
const answer = await inquirer.prompt<{ continue: boolean }>({
292-
type: 'confirm',
293-
name: 'continue',
294-
message: 'Continue?',
295-
})
296-
if (!answer.continue) {
297-
process.exit(0)
290+
while (true) {
291+
const head = await getHead()
292+
const status = await getStatus(adapter, head)
293+
process.stdout.write(status.toString())
294+
const tasks = status.newCommits.reduce<Task[]>((prev, next) => prev.concat(next.tasks), [])
295+
if (tasks.length > 0) {
296+
if (argv.confirm) {
297+
const answer = await inquirer.prompt<{ continue: boolean }>({
298+
type: 'confirm',
299+
name: 'continue',
300+
message: 'Continue?',
301+
})
302+
if (!answer.continue) {
303+
process.exit(0)
304+
}
305+
process.stdout.write('\n')
298306
}
299-
process.stdout.write('\n')
300-
}
301-
process.stdout.write('Starting migration\n\n')
302-
for (const commit of status.newCommits) {
303-
process.stdout.write(`${chalk.yellow(commit.shortSha1)} ${commit.subject}\n`)
304-
for (const task of commit.tasks) {
305-
process.stdout.write(task.toString() + ' ...')
306-
const interval = setInterval(() => process.stdout.write('.'), 100)
307-
await task.execute(argv.migrationOutDir!, adapter, head, commit)
308-
clearInterval(interval)
309-
process.stdout.write(' Success\n')
307+
308+
process.stdout.write('Starting migration\n\n')
309+
310+
const hasChanged = await adapter.waitForPending(CLI_LOGGER)
311+
312+
if (hasChanged) {
313+
process.stdout.write('The migrations have changed, reloading..\n\n')
314+
continue
315+
}
316+
// create pending tasks
317+
for (const task of tasks) {
318+
try {
319+
task.head = head
320+
await adapter.beginMigrationTask(task)
321+
} catch (error) {
322+
if (error instanceof PendingMigrationFoundError) {
323+
continue
324+
} else {
325+
throw error
326+
}
327+
}
310328
}
329+
330+
for (const commit of status.newCommits) {
331+
process.stdout.write(`${chalk.yellow(commit.shortSha1)} ${commit.subject}\n`)
332+
for (const task of commit.tasks) {
333+
process.stdout.write(task.toString() + ' ...')
334+
const interval = setInterval(() => process.stdout.write('.'), 100)
335+
try {
336+
await task.execute(argv.migrationOutDir!, adapter, head, commit)
337+
} finally {
338+
clearInterval(interval)
339+
}
340+
process.stdout.write(' Success\n')
341+
}
342+
}
343+
process.stdout.write(chalk.green('\nAll migrations successful\n'))
311344
}
312-
process.stdout.write(chalk.green('\nAll migrations successful\n'))
345+
break
313346
}
314347
process.exit(0)
315348
} catch (err) {
@@ -328,13 +361,29 @@ const migrationCommand = (type: TaskType) => async (argv: MigrationCommandArgv)
328361
try {
329362
const adapter = createAdapterFromUrl(argv.db!)
330363
await adapter.init()
331-
const head = await getHead()
332-
for (const name of argv.migrations!) {
333-
const task = new Task({ type, migration: new Migration(name) })
364+
const tasks = argv.migrations!.map(name => new Task({ type, migration: new Migration(name) }))
365+
while (true) {
366+
await adapter.waitForPending(CLI_LOGGER)
367+
const head = await getHead()
368+
for (const task of tasks) {
369+
try {
370+
task.head = head
371+
await adapter.beginMigrationTask(task)
372+
} catch (error) {
373+
if (error instanceof PendingMigrationFoundError) {
374+
continue
375+
} else {
376+
throw error
377+
}
378+
}
379+
}
380+
break
381+
}
382+
for (const task of tasks) {
334383
process.stdout.write(`${task.toString()} ...`)
335384
const interval = setInterval(() => process.stdout.write('.'), 100)
336385
try {
337-
await task.execute(argv.migrationOutDir!, adapter, head)
386+
await task.execute(argv.migrationOutDir!, adapter)
338387
} finally {
339388
clearInterval(interval)
340389
}

src/git.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import chalk from 'chalk'
2-
import { ChildProcess, execFile, spawn } from 'mz/child_process'
2+
import { execFile, spawn } from 'mz/child_process'
33
import * as fs from 'mz/fs'
44
import * as path from 'path'
55
import { basename, resolve } from 'path'

0 commit comments

Comments
 (0)