-
Notifications
You must be signed in to change notification settings - Fork 202
Expand file tree
/
Copy pathstate.ts
More file actions
110 lines (92 loc) · 3.21 KB
/
state.ts
File metadata and controls
110 lines (92 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import fs from "fs-extra";
import * as yaml from "js-yaml";
import path from "path";
import {randomInt} from "crypto";
const LOCK_TIMEOUT_MS = 30_000;
const LOCK_RETRY_BASE_MS = 50;
const LOCK_RETRY_MAX_MS = 500;
const isPidAlive = (pid: number): boolean => {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
};
const withFileLock = async <T>(lockPath: string, fn: () => Promise<T>): Promise<T> => {
const startTime = Date.now();
while (true) {
let fd: number | undefined;
try {
await fs.ensureDir(path.dirname(lockPath));
fd = fs.openSync(lockPath, "wx");
fs.writeFileSync(fd, `${process.pid}`);
fs.closeSync(fd);
fd = undefined;
break;
} catch (err: any) {
if (fd !== undefined) {
fs.closeSync(fd);
}
if (err.code !== "EEXIST") {
throw err;
}
if (Date.now() - startTime > LOCK_TIMEOUT_MS) {
throw new Error(`Timed out waiting for lock: ${lockPath}`, {cause: err});
}
let stalePid: number | undefined;
try {
const content = fs.readFileSync(lockPath, "utf8").trim();
stalePid = parseInt(content, 10);
} catch {
// Lock file disappeared between open attempt and read — retry immediately
continue;
}
if (!isNaN(stalePid!) && !isPidAlive(stalePid!)) {
try {
fs.unlinkSync(lockPath);
} catch {
// Another process already cleaned it up
}
continue;
}
const jitter = randomInt(LOCK_RETRY_BASE_MS, LOCK_RETRY_MAX_MS);
await new Promise((resolve) => setTimeout(resolve, jitter));
}
}
try {
return await fn();
} finally {
try {
fs.unlinkSync(lockPath);
} catch {
// Lock already removed
}
}
};
const loadStateYML = async (stateFile: string): Promise<any> => {
if (!fs.existsSync(stateFile)) {
return {};
}
const stateFileContent = await fs.readFile(stateFile, "utf8");
return yaml.load(stateFileContent) || {};
};
const getPipelineIid = async (cwd: string, stateDir: string) => {
const stateFile = `${cwd}/${stateDir}/state.yml`;
const ymlData = await loadStateYML(stateFile);
return ymlData["pipelineIid"] ? ymlData["pipelineIid"] : 0;
};
const incrementPipelineIid = async (cwd: string, stateDir: string): Promise<number> => {
const stateFile = `${cwd}/${stateDir}/state.yml`;
const lockPath = `${cwd}/${stateDir}/state.lock`;
return withFileLock(lockPath, async () => {
const ymlData = await loadStateYML(stateFile);
const newIid = ymlData["pipelineIid"] != null ? ymlData["pipelineIid"] + 1 : 0;
ymlData["pipelineIid"] = newIid;
const tmpFile = `${stateFile}.tmp.${process.pid}`;
await fs.outputFile(tmpFile, `---\n${yaml.dump(ymlData)}`);
await fs.rename(tmpFile, stateFile);
return newIid;
});
};
export {getPipelineIid, incrementPipelineIid};