Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[
}
generateGitIgnore(cwd, stateDir);
const time = process.hrtime();
let pipelineIid: number;
if (argv.needs || argv.onlyNeeds) {
await state.incrementPipelineIid(cwd, stateDir);
pipelineIid = await state.incrementPipelineIid(cwd, stateDir);
} else {
pipelineIid = await state.getPipelineIid(cwd, stateDir);
}
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
await Commander.runJobs(argv, parser, writeStreams);
Expand All @@ -101,8 +103,7 @@ export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[
}
generateGitIgnore(cwd, stateDir);
const time = process.hrtime();
await state.incrementPipelineIid(cwd, stateDir);
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
const pipelineIid = await state.incrementPipelineIid(cwd, stateDir);
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
await Commander.runPipeline(argv, parser, writeStreams);
Expand Down
82 changes: 82 additions & 0 deletions src/pid-file-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import fs from "fs-extra";
import path from "node:path";
import {randomInt} from "node:crypto";

const LOCK_TIMEOUT_MS = 30_000;
const LOCK_RETRY_BASE_MS = 50;
const LOCK_RETRY_MAX_MS = 500;

const acquireLock = (lockPath: string): boolean => {
try {
fs.writeFileSync(lockPath, `${process.pid}`, {flag: "wx"});
return true;
} catch (err: any) {
if (err.code === "EEXIST") {
return false;
}
throw err;
}
};

const tryRemoveStaleLock = (lockPath: string): boolean => {
let content: string;
try {
content = fs.readFileSync(lockPath, "utf8").trim();
} catch {
return true;
}

const pid = Number.parseInt(content, 10);
if (Number.isNaN(pid)) {
try {
fs.unlinkSync(lockPath);
} catch {
// Another process already cleaned it up
}
return true;
}

try {
process.kill(pid, 0);
return false;
} catch (err: any) {
if (err.code !== "ESRCH") {
return false;
}
}

try {
fs.unlinkSync(lockPath);
} catch {
// Another process already cleaned it up
}
return true;
};

export const withFileLock = async <T>(lockPath: string, fn: () => Promise<T>): Promise<T> => {
await fs.ensureDir(path.dirname(lockPath));
const startTime = Date.now();

while (!acquireLock(lockPath)) {
if (Date.now() - startTime > LOCK_TIMEOUT_MS) {
throw new Error(`Timed out waiting for lock: ${lockPath}`);
}

if (tryRemoveStaleLock(lockPath)) {
continue;
}

const jitter = randomInt(LOCK_RETRY_BASE_MS, LOCK_RETRY_MAX_MS);
await new Promise((resolve) => setTimeout(resolve, jitter));
}

try {
return await fn();
} finally {
try {
fs.unlinkSync(lockPath);
} catch {
// Lock already removed
}
}
};
18 changes: 14 additions & 4 deletions src/state.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fs from "fs-extra";
import * as yaml from "js-yaml";
import {withFileLock} from "./pid-file-lock.js";

const loadStateYML = async (stateFile: string): Promise<any> => {
if (!fs.existsSync(stateFile)) {
Expand All @@ -16,12 +17,21 @@ const getPipelineIid = async (cwd: string, stateDir: string) => {
return ymlData["pipelineIid"] ? ymlData["pipelineIid"] : 0;
};

const incrementPipelineIid = async (cwd: string, stateDir: string) => {
const incrementPipelineIid = async (cwd: string, stateDir: string): Promise<number> => {
const stateFile = `${cwd}/${stateDir}/state.yml`;
const ymlData = await loadStateYML(stateFile);
const lockPath = `${cwd}/${stateDir}/state.lock`;

return withFileLock(lockPath, async () => {
const ymlData = await loadStateYML(stateFile);
const newIid = ymlData["pipelineIid"] == null ? 0 : ymlData["pipelineIid"] + 1;
ymlData["pipelineIid"] = newIid;

const tmpFile = `${stateFile}.tmp.${process.pid}`;
await fs.outputFile(tmpFile, `---\n${yaml.dump(ymlData)}`);
await fs.rename(tmpFile, stateFile);

ymlData["pipelineIid"] = ymlData["pipelineIid"] != null ? ymlData["pipelineIid"] + 1 : 0;
await fs.outputFile(stateFile, `---\n${yaml.dump(ymlData)}`);
return newIid;
});
};

export {getPipelineIid, incrementPipelineIid};
5 changes: 5 additions & 0 deletions tests/test-cases/pipeline-iid/.gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
test-job:
image: busybox
script:
- echo "CI_PIPELINE_IID=$CI_PIPELINE_IID"
67 changes: 67 additions & 0 deletions tests/test-cases/pipeline-iid/integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import {WriteStreamsMock} from "../../../src/write-streams.js";
import {handler} from "../../../src/handler.js";
import {initSpawnSpy} from "../../mocks/utils.mock.js";
import {WhenStatics} from "../../mocks/when-statics.js";
import * as state from "../../../src/state.js";
import fs from "fs-extra";
import path from "path";

const cwd = "tests/test-cases/pipeline-iid";
const stateDir = ".gitlab-ci-local";
const stateFile = path.join(cwd, stateDir, "state.yml");
const lockFile = path.join(cwd, stateDir, "state.lock");

beforeAll(() => {
initSpawnSpy(WhenStatics.all);
});

afterEach(() => {
fs.removeSync(stateFile);
fs.removeSync(lockFile);
});

describe("pipeline-iid state locking", () => {
test("sequential increments return 0, 1, 2", async () => {
const first = await state.incrementPipelineIid(cwd, stateDir);
const second = await state.incrementPipelineIid(cwd, stateDir);
const third = await state.incrementPipelineIid(cwd, stateDir);

expect(first).toBe(0);
expect(second).toBe(1);
expect(third).toBe(2);
});

test("concurrent increments produce unique IIDs", async () => {
const count = 20;
const promises = Array.from({length: count}, () =>
state.incrementPipelineIid(cwd, stateDir),
);
const results = await Promise.all(promises);

const sorted = [...results].sort((a, b) => a - b);
expect(sorted).toEqual(Array.from({length: count}, (_, i) => i));
});

test("stale lock from dead PID is auto-cleaned", async () => {
fs.ensureDirSync(path.dirname(lockFile));
fs.writeFileSync(lockFile, "999999999");

const result = await state.incrementPipelineIid(cwd, stateDir);
expect(result).toBe(0);
expect(fs.existsSync(lockFile)).toBe(false);
});
});

test("pipeline-iid handler increments IID across runs", async () => {
const writeStreams = new WriteStreamsMock();
await handler({cwd: cwd}, writeStreams);

const iid = await state.getPipelineIid(cwd, stateDir);
expect(iid).toBe(0);

const writeStreams2 = new WriteStreamsMock();
await handler({cwd: cwd}, writeStreams2);

const iid2 = await state.getPipelineIid(cwd, stateDir);
expect(iid2).toBe(1);
});