Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[
}
generateGitIgnore(cwd, stateDir);
const time = process.hrtime();
let pipelineIid: number;
if (argv.needs || argv.onlyNeeds) {
await state.incrementPipelineIid(cwd, stateDir);
pipelineIid = await state.incrementPipelineIid(cwd, stateDir);
} else {
pipelineIid = await state.getPipelineIid(cwd, stateDir);
}
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
await Commander.runJobs(argv, parser, writeStreams);
Expand All @@ -101,8 +103,7 @@ export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[
}
generateGitIgnore(cwd, stateDir);
const time = process.hrtime();
await state.incrementPipelineIid(cwd, stateDir);
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
const pipelineIid = await state.incrementPipelineIid(cwd, stateDir);
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
await Commander.runPipeline(argv, parser, writeStreams);
Expand Down
75 changes: 75 additions & 0 deletions src/pid-file-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import fs from "fs-extra";
import path from "path";
import {randomInt} from "crypto";

const LOCK_TIMEOUT_MS = 30_000;
const LOCK_RETRY_BASE_MS = 50;
const LOCK_RETRY_MAX_MS = 500;

const acquireLock = (lockPath: string): boolean => {
try {
fs.writeFileSync(lockPath, `${process.pid}`, {flag: "wx"});
return true;
} catch (err: any) {
if (err.code === "EEXIST") {
return false;
}
throw err;
}
};

const tryRemoveStaleLock = (lockPath: string): boolean => {
let content: string;
try {
content = fs.readFileSync(lockPath, "utf8").trim();
} catch {
return true;
}

const pid = parseInt(content, 10);
if (isNaN(pid)) {
return false;
}

try {
process.kill(pid, 0);
return false;
} catch {
// Process is dead — remove stale lock
}

try {
fs.unlinkSync(lockPath);
} catch {
// Another process already cleaned it up
}
return true;
};

export const withFileLock = async <T>(lockPath: string, fn: () => Promise<T>): Promise<T> => {
await fs.ensureDir(path.dirname(lockPath));
const startTime = Date.now();

while (!acquireLock(lockPath)) {
if (Date.now() - startTime > LOCK_TIMEOUT_MS) {
throw new Error(`Timed out waiting for lock: ${lockPath}`);
}

if (tryRemoveStaleLock(lockPath)) {
continue;
}

const jitter = randomInt(LOCK_RETRY_BASE_MS, LOCK_RETRY_MAX_MS);
await new Promise((resolve) => setTimeout(resolve, jitter));
}

try {
return await fn();
} finally {
try {
fs.unlinkSync(lockPath);
} catch {
// Lock already removed
}
}
};
18 changes: 14 additions & 4 deletions src/state.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fs from "fs-extra";
import * as yaml from "js-yaml";
import {withFileLock} from "./pid-file-lock.js";

const loadStateYML = async (stateFile: string): Promise<any> => {
if (!fs.existsSync(stateFile)) {
Expand All @@ -16,12 +17,21 @@ const getPipelineIid = async (cwd: string, stateDir: string) => {
return ymlData["pipelineIid"] ? ymlData["pipelineIid"] : 0;
};

const incrementPipelineIid = async (cwd: string, stateDir: string) => {
const incrementPipelineIid = async (cwd: string, stateDir: string): Promise<number> => {
const stateFile = `${cwd}/${stateDir}/state.yml`;
const ymlData = await loadStateYML(stateFile);
const lockPath = `${cwd}/${stateDir}/state.lock`;

return withFileLock(lockPath, async () => {
const ymlData = await loadStateYML(stateFile);
const newIid = ymlData["pipelineIid"] != null ? ymlData["pipelineIid"] + 1 : 0;
ymlData["pipelineIid"] = newIid;

const tmpFile = `${stateFile}.tmp.${process.pid}`;
await fs.outputFile(tmpFile, `---\n${yaml.dump(ymlData)}`);
await fs.rename(tmpFile, stateFile);

ymlData["pipelineIid"] = ymlData["pipelineIid"] != null ? ymlData["pipelineIid"] + 1 : 0;
await fs.outputFile(stateFile, `---\n${yaml.dump(ymlData)}`);
return newIid;
});
};

export {getPipelineIid, incrementPipelineIid};
5 changes: 5 additions & 0 deletions tests/test-cases/pipeline-iid/.gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
test-job:
image: busybox
script:
- echo "CI_PIPELINE_IID=$CI_PIPELINE_IID"
67 changes: 67 additions & 0 deletions tests/test-cases/pipeline-iid/integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import {WriteStreamsMock} from "../../../src/write-streams.js";
import {handler} from "../../../src/handler.js";
import {initSpawnSpy} from "../../mocks/utils.mock.js";
import {WhenStatics} from "../../mocks/when-statics.js";
import * as state from "../../../src/state.js";
import fs from "fs-extra";
import path from "path";

const cwd = "tests/test-cases/pipeline-iid";
const stateDir = ".gitlab-ci-local";
const stateFile = path.join(cwd, stateDir, "state.yml");
const lockFile = path.join(cwd, stateDir, "state.lock");

beforeAll(() => {
initSpawnSpy(WhenStatics.all);
});

afterEach(() => {
fs.removeSync(stateFile);
fs.removeSync(lockFile);
});

describe("pipeline-iid state locking", () => {
test("sequential increments return 0, 1, 2", async () => {
const first = await state.incrementPipelineIid(cwd, stateDir);
const second = await state.incrementPipelineIid(cwd, stateDir);
const third = await state.incrementPipelineIid(cwd, stateDir);

expect(first).toBe(0);
expect(second).toBe(1);
expect(third).toBe(2);
});

test("concurrent increments produce unique IIDs", async () => {
const count = 20;
const promises = Array.from({length: count}, () =>
state.incrementPipelineIid(cwd, stateDir),
);
const results = await Promise.all(promises);

const sorted = [...results].sort((a, b) => a - b);
expect(sorted).toEqual(Array.from({length: count}, (_, i) => i));
});

test("stale lock from dead PID is auto-cleaned", async () => {
fs.ensureDirSync(path.dirname(lockFile));
fs.writeFileSync(lockFile, "999999999");

const result = await state.incrementPipelineIid(cwd, stateDir);
expect(result).toBe(0);
expect(fs.existsSync(lockFile)).toBe(false);
});
});

test("pipeline-iid handler increments IID across runs", async () => {
const writeStreams = new WriteStreamsMock();
await handler({cwd: cwd}, writeStreams);

const iid = await state.getPipelineIid(cwd, stateDir);
expect(iid).toBe(0);

const writeStreams2 = new WriteStreamsMock();
await handler({cwd: cwd}, writeStreams2);

const iid2 = await state.getPipelineIid(cwd, stateDir);
expect(iid2).toBe(1);
});