Skip to content

Commit aa97f40

Browse files
authored
Fix state.yml race condition with PID-based file locking (#1783)
1 parent 4977171 commit aa97f40

File tree

5 files changed

+173
-8
lines changed

5 files changed

+173
-8
lines changed

src/handler.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,12 @@ export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[
7474
}
7575
generateGitIgnore(cwd, stateDir);
7676
const time = process.hrtime();
77+
let pipelineIid: number;
7778
if (argv.needs || argv.onlyNeeds) {
78-
await state.incrementPipelineIid(cwd, stateDir);
79+
pipelineIid = await state.incrementPipelineIid(cwd, stateDir);
80+
} else {
81+
pipelineIid = await state.getPipelineIid(cwd, stateDir);
7982
}
80-
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
8183
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
8284
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
8385
await Commander.runJobs(argv, parser, writeStreams);
@@ -101,8 +103,7 @@ export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[
101103
}
102104
generateGitIgnore(cwd, stateDir);
103105
const time = process.hrtime();
104-
await state.incrementPipelineIid(cwd, stateDir);
105-
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
106+
const pipelineIid = await state.incrementPipelineIid(cwd, stateDir);
106107
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
107108
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
108109
await Commander.runPipeline(argv, parser, writeStreams);

src/pid-file-lock.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import fs from "fs-extra";
2+
import path from "node:path";
3+
import {randomInt} from "node:crypto";
4+
5+
const LOCK_TIMEOUT_MS = 30_000;
6+
const LOCK_RETRY_BASE_MS = 50;
7+
const LOCK_RETRY_MAX_MS = 500;
8+
9+
const acquireLock = (lockPath: string): boolean => {
10+
try {
11+
fs.writeFileSync(lockPath, `${process.pid}`, {flag: "wx"});
12+
return true;
13+
} catch (err: any) {
14+
if (err.code === "EEXIST") {
15+
return false;
16+
}
17+
throw err;
18+
}
19+
};
20+
21+
const tryRemoveStaleLock = (lockPath: string): boolean => {
22+
let content: string;
23+
try {
24+
content = fs.readFileSync(lockPath, "utf8").trim();
25+
} catch {
26+
return true;
27+
}
28+
29+
const pid = Number.parseInt(content, 10);
30+
if (Number.isNaN(pid)) {
31+
try {
32+
fs.unlinkSync(lockPath);
33+
} catch {
34+
// Another process already cleaned it up
35+
}
36+
return true;
37+
}
38+
39+
try {
40+
process.kill(pid, 0);
41+
return false;
42+
} catch (err: any) {
43+
if (err.code !== "ESRCH") {
44+
return false;
45+
}
46+
}
47+
48+
try {
49+
fs.unlinkSync(lockPath);
50+
} catch {
51+
// Another process already cleaned it up
52+
}
53+
return true;
54+
};
55+
56+
export const withFileLock = async <T>(lockPath: string, fn: () => Promise<T>): Promise<T> => {
57+
await fs.ensureDir(path.dirname(lockPath));
58+
const startTime = Date.now();
59+
60+
while (!acquireLock(lockPath)) {
61+
if (Date.now() - startTime > LOCK_TIMEOUT_MS) {
62+
throw new Error(`Timed out waiting for lock: ${lockPath}`);
63+
}
64+
65+
if (tryRemoveStaleLock(lockPath)) {
66+
continue;
67+
}
68+
69+
const jitter = randomInt(LOCK_RETRY_BASE_MS, LOCK_RETRY_MAX_MS);
70+
await new Promise((resolve) => setTimeout(resolve, jitter));
71+
}
72+
73+
try {
74+
return await fn();
75+
} finally {
76+
try {
77+
fs.unlinkSync(lockPath);
78+
} catch {
79+
// Lock already removed
80+
}
81+
}
82+
};

src/state.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from "fs-extra";
22
import * as yaml from "js-yaml";
3+
import {withFileLock} from "./pid-file-lock.js";
34

45
const loadStateYML = async (stateFile: string): Promise<any> => {
56
if (!fs.existsSync(stateFile)) {
@@ -16,12 +17,21 @@ const getPipelineIid = async (cwd: string, stateDir: string) => {
1617
return ymlData["pipelineIid"] ? ymlData["pipelineIid"] : 0;
1718
};
1819

19-
const incrementPipelineIid = async (cwd: string, stateDir: string) => {
20+
const incrementPipelineIid = async (cwd: string, stateDir: string): Promise<number> => {
2021
const stateFile = `${cwd}/${stateDir}/state.yml`;
21-
const ymlData = await loadStateYML(stateFile);
22+
const lockPath = `${cwd}/${stateDir}/state.lock`;
23+
24+
return withFileLock(lockPath, async () => {
25+
const ymlData = await loadStateYML(stateFile);
26+
const newIid = ymlData["pipelineIid"] == null ? 0 : ymlData["pipelineIid"] + 1;
27+
ymlData["pipelineIid"] = newIid;
28+
29+
const tmpFile = `${stateFile}.tmp.${process.pid}`;
30+
await fs.outputFile(tmpFile, `---\n${yaml.dump(ymlData)}`);
31+
await fs.rename(tmpFile, stateFile);
2232

23-
ymlData["pipelineIid"] = ymlData["pipelineIid"] != null ? ymlData["pipelineIid"] + 1 : 0;
24-
await fs.outputFile(stateFile, `---\n${yaml.dump(ymlData)}`);
33+
return newIid;
34+
});
2535
};
2636

2737
export {getPipelineIid, incrementPipelineIid};
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
test-job:
3+
image: busybox
4+
script:
5+
- echo "CI_PIPELINE_IID=$CI_PIPELINE_IID"
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import {WriteStreamsMock} from "../../../src/write-streams.js";
2+
import {handler} from "../../../src/handler.js";
3+
import {initSpawnSpy} from "../../mocks/utils.mock.js";
4+
import {WhenStatics} from "../../mocks/when-statics.js";
5+
import * as state from "../../../src/state.js";
6+
import fs from "fs-extra";
7+
import path from "path";
8+
9+
const cwd = "tests/test-cases/pipeline-iid";
10+
const stateDir = ".gitlab-ci-local";
11+
const stateFile = path.join(cwd, stateDir, "state.yml");
12+
const lockFile = path.join(cwd, stateDir, "state.lock");
13+
14+
beforeAll(() => {
15+
initSpawnSpy(WhenStatics.all);
16+
});
17+
18+
afterEach(() => {
19+
fs.removeSync(stateFile);
20+
fs.removeSync(lockFile);
21+
});
22+
23+
describe("pipeline-iid state locking", () => {
24+
test("sequential increments return 0, 1, 2", async () => {
25+
const first = await state.incrementPipelineIid(cwd, stateDir);
26+
const second = await state.incrementPipelineIid(cwd, stateDir);
27+
const third = await state.incrementPipelineIid(cwd, stateDir);
28+
29+
expect(first).toBe(0);
30+
expect(second).toBe(1);
31+
expect(third).toBe(2);
32+
});
33+
34+
test("concurrent increments produce unique IIDs", async () => {
35+
const count = 20;
36+
const promises = Array.from({length: count}, () =>
37+
state.incrementPipelineIid(cwd, stateDir),
38+
);
39+
const results = await Promise.all(promises);
40+
41+
const sorted = [...results].sort((a, b) => a - b);
42+
expect(sorted).toEqual(Array.from({length: count}, (_, i) => i));
43+
});
44+
45+
test("stale lock from dead PID is auto-cleaned", async () => {
46+
fs.ensureDirSync(path.dirname(lockFile));
47+
fs.writeFileSync(lockFile, "999999999");
48+
49+
const result = await state.incrementPipelineIid(cwd, stateDir);
50+
expect(result).toBe(0);
51+
expect(fs.existsSync(lockFile)).toBe(false);
52+
});
53+
});
54+
55+
test("pipeline-iid handler increments IID across runs", async () => {
56+
const writeStreams = new WriteStreamsMock();
57+
await handler({cwd: cwd}, writeStreams);
58+
59+
const iid = await state.getPipelineIid(cwd, stateDir);
60+
expect(iid).toBe(0);
61+
62+
const writeStreams2 = new WriteStreamsMock();
63+
await handler({cwd: cwd}, writeStreams2);
64+
65+
const iid2 = await state.getPipelineIid(cwd, stateDir);
66+
expect(iid2).toBe(1);
67+
});

0 commit comments

Comments
 (0)