-
Notifications
You must be signed in to change notification settings - Fork 8
openclaw: dedup skillify spawn per-session + stale-lock recovery (#100 + #110) #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,8 @@ import { homedir, tmpdir } from "node:os"; | |
| import { | ||
| existsSync as fsExists, mkdirSync as fsMkdir, openSync as fsOpen, | ||
| closeSync as fsClose, writeFileSync as fsWriteFile, constants as fsConstants, | ||
| readFileSync as fsReadFile, renameSync as fsRename, | ||
| readFileSync as fsReadFile, renameSync as fsRename, unlinkSync as fsUnlink, | ||
| statSync as fsStat, | ||
| } from "node:fs"; | ||
| import { createHash } from "node:crypto"; | ||
| // node:child_process is stubbed in the main openclaw bundle (see esbuild.config.mjs | ||
|
|
@@ -374,6 +375,15 @@ let captureEnabled = true; | |
| const capturedCounts = new Map<string, number>(); | ||
| const fallbackSessionId = crypto.randomUUID(); | ||
|
|
||
| // Per-runtime dedup of skillify worker spawns. Without this, every | ||
| // agent_end after the previous worker exits re-acquires the on-disk | ||
| // lock and spawns a fresh worker, which does one watermark-check SQL | ||
| // round-trip and exits — wasted Node cold-start + DB I/O across a long | ||
| // session. Single-spawn-per-session-per-runtime matches what the | ||
| // non-openclaw agents already do via `tryAcquireWorkerLock` semantics | ||
| // in src/skillify/state.ts. See #100. | ||
| const skillifySpawnedFor = new Set<string>(); | ||
|
|
||
| // --- Skillify worker spawn (mirror of src/skillify/spawn-skillify-worker.ts) --- | ||
| // | ||
| // OpenClaw can't import the shared skillify TS modules — its bundle is | ||
|
|
@@ -425,14 +435,61 @@ function deriveOpenclawProjectKey(channel: string): { key: string; project: stri | |
| return { key, project }; | ||
| } | ||
|
|
||
| // Per-project filesystem lock guarding the skillify worker spawn. | ||
| // Mirrors `tryAcquireWorkerLock` in src/skillify/state.ts: writes a ms | ||
| // timestamp into the lock file when acquired, treats locks older than | ||
| // LOCK_MAX_AGE_MS as stale (abnormal worker death, kernel kill, OOM — | ||
| // the worker's `finally`-release didn't run), unlinks and re-acquires. | ||
| // Without this, a single crashed worker halts mining for that | ||
| // project_key permanently until manual cleanup. See #110. | ||
| // | ||
| // Empty pre-existing locks (from earlier code that wrote no payload) | ||
| // parse as NaN and are treated as immediately stale — clean migration | ||
| // on first patched run. | ||
| const LOCK_MAX_AGE_MS = 10 * 60 * 1000; // 10 min, generous vs typical | ||
| // worker run (<30s + buffer) | ||
|
|
||
| function tryAcquireOpenclawSkillifyLock(projectKey: string): boolean { | ||
| try { | ||
| migrateOpenclawSkillifyLegacyStateDir(); | ||
| fsMkdir(OPENCLAW_SKILLIFY_STATE_DIR, { recursive: true }); | ||
| const lockPath = joinPath(OPENCLAW_SKILLIFY_STATE_DIR, `${projectKey}.worker.lock`); | ||
| const fd = fsOpen(lockPath, fsConstants.O_CREAT | fsConstants.O_EXCL | fsConstants.O_WRONLY); | ||
| fsClose(fd); | ||
| return true; | ||
| const acquire = (): boolean => { | ||
| const fd = fsOpen(lockPath, fsConstants.O_CREAT | fsConstants.O_EXCL | fsConstants.O_WRONLY); | ||
| try { | ||
| fsWriteFile(fd, String(Date.now())); | ||
| } finally { | ||
| fsClose(fd); | ||
| } | ||
| return true; | ||
| }; | ||
| try { | ||
| return acquire(); | ||
| } catch { | ||
| // O_EXCL failed → lock file already exists. Check staleness. | ||
| // There's a brief window between O_CREAT|O_EXCL and the timestamp | ||
| // write where a racing caller can see an empty body. Don't treat | ||
| // empty/NaN as immediately stale (CodeRabbit on #172) — fall back | ||
| // to the file's mtime to decide. If the FILE is fresh, the | ||
| // competitor is mid-write and we should yield; if the file is | ||
| // older than LOCK_MAX_AGE_MS, the previous holder crashed without | ||
| // writing the timestamp (or the disk lost it), and we can recycle. | ||
| try { | ||
| const body = fsReadFile(lockPath, "utf-8"); | ||
| const ts = Number.parseInt(body.trim(), 10); | ||
| const ageByBody = Number.isFinite(ts) ? Date.now() - ts : Number.POSITIVE_INFINITY; | ||
| let ageByMtime = 0; | ||
| try { ageByMtime = Date.now() - fsStat(lockPath).mtimeMs; } catch { ageByMtime = 0; } | ||
| const effectiveAge = Number.isFinite(ts) ? ageByBody : ageByMtime; | ||
| if (effectiveAge > LOCK_MAX_AGE_MS) { | ||
| try { fsUnlink(lockPath); } catch { /* race; recheck below */ } | ||
| try { return acquire(); } catch { return false; } | ||
| } | ||
| return false; // fresh lock held by a live worker — skip spawn | ||
| } catch { | ||
| return false; // couldn't stat/read; safer to skip than double-spawn | ||
| } | ||
| } | ||
| } catch { return false; } | ||
| } | ||
|
|
||
|
|
@@ -489,26 +546,36 @@ function detectOpenclawGateAgent(): GateAgent | null { | |
| return null; | ||
| } | ||
|
|
||
| function spawnOpenclawSkillifyWorker(a: OpenclawSpawnArgs): void { | ||
| /** | ||
| * Returns true when the worker was actually spawned (the caller can | ||
| * record the session in the per-runtime dedup set). Returns false on | ||
| * any "didn't spawn" outcome — missing worker, no delegate gate CLI, | ||
| * lock not acquired, mkdir/config write failure, or spawn() throw — | ||
| * so the caller can let a future agent_end retry. CodeRabbit on #172 | ||
| * caught the previous flow that recorded the session before knowing | ||
| * whether spawn succeeded, suppressing retries forever within the | ||
| * runtime. | ||
| */ | ||
| function spawnOpenclawSkillifyWorker(a: OpenclawSpawnArgs): boolean { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we have the specific function?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @efenocchi yes — openclaw has its own spawn function instead of reusing the shared
Two specific constraints driving the duplication:
It IS code duplication, deliberately. The two functions stay in sync via the (slightly-pun-aware) header comment If we ever drop the |
||
| if (!fsExists(OPENCLAW_SKILLIFY_WORKER_PATH)) { | ||
| a.loggerWarn?.(`skillify worker missing at ${OPENCLAW_SKILLIFY_WORKER_PATH} — reinstall openclaw plugin`); | ||
| return; | ||
| return false; | ||
| } | ||
| const gateAgent = detectOpenclawGateAgent(); | ||
| if (!gateAgent) { | ||
| a.loggerWarn?.(`skillify spawn: no delegate gate CLI found on PATH (need one of: claude, codex, cursor-agent, hermes, pi). Mining skipped.`); | ||
| return; | ||
| return false; | ||
| } | ||
| const { key: projectKey, project } = deriveOpenclawProjectKey(a.channel); | ||
| if (!tryAcquireOpenclawSkillifyLock(projectKey)) { | ||
| // A worker is already running for this project — skip (next agent_end may | ||
| // re-fire after the worker releases the lock, or the worker watermark | ||
| // advance makes the re-fire a no-op). | ||
| return; | ||
| return false; | ||
| } | ||
| const tmpDir = joinPath(tmpdir(), `deeplake-skillify-openclaw-${projectKey}-${Date.now()}`); | ||
| try { fsMkdir(tmpDir, { recursive: true, mode: 0o700 }); } | ||
| catch (e: any) { a.loggerWarn?.(`skillify spawn: mkdir failed: ${e?.message ?? e}`); return; } | ||
| catch (e: any) { a.loggerWarn?.(`skillify spawn: mkdir failed: ${e?.message ?? e}`); return false; } | ||
| const configPath = joinPath(tmpDir, "config.json"); | ||
|
|
||
| // install: "global" — openclaw has no per-project filesystem cwd, so written | ||
|
|
@@ -549,16 +616,18 @@ function spawnOpenclawSkillifyWorker(a: OpenclawSpawnArgs): void { | |
| }, | ||
| }; | ||
| try { fsWriteFile(configPath, JSON.stringify(config), { mode: 0o600 }); } | ||
| catch (e: any) { a.loggerWarn?.(`skillify spawn: config write failed: ${e?.message ?? e}`); return; } | ||
| catch (e: any) { a.loggerWarn?.(`skillify spawn: config write failed: ${e?.message ?? e}`); return false; } | ||
|
|
||
| try { | ||
| realSpawn(process.execPath, [OPENCLAW_SKILLIFY_WORKER_PATH, configPath], { | ||
| detached: true, | ||
| stdio: "ignore", | ||
| env: { ...inheritedEnv.env, HIVEMIND_SKILLIFY_WORKER: "1", HIVEMIND_CAPTURE: "false" }, | ||
| }).unref(); | ||
| return true; | ||
| } catch (e: any) { | ||
| a.loggerWarn?.(`skillify spawn: spawn failed: ${e?.message ?? e}`); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1258,23 +1327,43 @@ export default definePluginEntry({ | |
| // never blocks the agent. Worker reads from the sessions table we | ||
| // just wrote to. Non-fatal: a spawn failure here only loses one | ||
| // mining attempt, never breaks capture. | ||
| try { | ||
| spawnOpenclawSkillifyWorker({ | ||
| apiUrl: cfg.apiUrl, | ||
| token: cfg.token, | ||
| orgId: cfg.orgId, | ||
| workspaceId: cfg.workspaceId, | ||
| userName: cfg.userName, | ||
| channel: ev.channel || "openclaw", | ||
| sessionId: sid, | ||
| loggerWarn: (msg) => logger.error(`Skillify spawn: ${msg}`), | ||
| // Pass the same tuning dispatch the plugin populated at | ||
| // register-time. The worker will repopulate its own | ||
| // globalThis from this. | ||
| tuning: (globalThis as Record<string, unknown>).__hivemind_tuning__ as Record<string, string | undefined> | undefined, | ||
| }); | ||
| } catch (e: any) { | ||
| logger.error(`Skillify spawn threw: ${e?.message ?? e}`); | ||
| // | ||
| // Per-runtime dedup (see #100): on long sessions, agent_end fires | ||
| // many times, and the previous worker has typically finished by | ||
| // the second or third turn — releasing the on-disk lock. Without | ||
| // this guard, every subsequent agent_end re-acquires the lock and | ||
| // spawns a fresh worker that does one watermark-check SQL roundtrip | ||
| // and exits. The on-disk lock is still authoritative across | ||
| // processes (e.g. multiple gateway restarts); this Set only | ||
| // suppresses redundant spawns within the same runtime. | ||
| if (!skillifySpawnedFor.has(sid)) { | ||
| // Only record the session as deduped on SUCCESSFUL spawn. | ||
| // spawnOpenclawSkillifyWorker has multiple non-exception | ||
| // failure paths (no delegate CLI, lock held by a fresh | ||
| // worker, mkdir/config write failure, spawn throw). If we | ||
| // add to the set before knowing the outcome, one transient | ||
| // failure suppresses every retry for the rest of the | ||
| // runtime. CodeRabbit on #172. | ||
| try { | ||
| if (spawnOpenclawSkillifyWorker({ | ||
| apiUrl: cfg.apiUrl, | ||
| token: cfg.token, | ||
| orgId: cfg.orgId, | ||
| workspaceId: cfg.workspaceId, | ||
| userName: cfg.userName, | ||
| channel: ev.channel || "openclaw", | ||
| sessionId: sid, | ||
| loggerWarn: (msg) => logger.error(`Skillify spawn: ${msg}`), | ||
| // Pass the same tuning dispatch the plugin populated at | ||
| // register-time. The worker will repopulate its own | ||
| // globalThis from this. | ||
| tuning: (globalThis as Record<string, unknown>).__hivemind_tuning__ as Record<string, string | undefined> | undefined, | ||
| })) { | ||
| skillifySpawnedFor.add(sid); | ||
| } | ||
| } catch (e: any) { | ||
| logger.error(`Skillify spawn threw: ${e?.message ?? e}`); | ||
| } | ||
|
Comment on lines
+1339
to
+1366
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Record the session only after a worker was actually launched. Lines 1208-1209 mark the session as deduped before Have Suggested fix-function spawnOpenclawSkillifyWorker(a: OpenclawSpawnArgs): void {
+function spawnOpenclawSkillifyWorker(a: OpenclawSpawnArgs): boolean {
if (!fsExists(OPENCLAW_SKILLIFY_WORKER_PATH)) {
a.loggerWarn?.(`skillify worker missing at ${OPENCLAW_SKILLIFY_WORKER_PATH} — reinstall openclaw plugin`);
- return;
+ return false;
}
const gateAgent = detectOpenclawGateAgent();
if (!gateAgent) {
a.loggerWarn?.(`skillify spawn: no delegate gate CLI found on PATH (need one of: claude, codex, cursor-agent, hermes, pi). Mining skipped.`);
- return;
+ return false;
}
const { key: projectKey, project } = deriveOpenclawProjectKey(a.channel);
if (!tryAcquireOpenclawSkillifyLock(projectKey)) {
- return;
+ return false;
}
...
- try { fsWriteFile(configPath, JSON.stringify(config), { mode: 0o600 }); }
- catch (e: any) { a.loggerWarn?.(`skillify spawn: config write failed: ${e?.message ?? e}`); return; }
+ try { fsWriteFile(configPath, JSON.stringify(config), { mode: 0o600 }); }
+ catch (e: any) { a.loggerWarn?.(`skillify spawn: config write failed: ${e?.message ?? e}`); return false; }
try {
realSpawn(process.execPath, [OPENCLAW_SKILLIFY_WORKER_PATH, configPath], {
detached: true,
stdio: "ignore",
env: { ...process.env, HIVEMIND_SKILLIFY_WORKER: "1", HIVEMIND_CAPTURE: "false" },
}).unref();
+ return true;
} catch (e: any) {
a.loggerWarn?.(`skillify spawn: spawn failed: ${e?.message ?? e}`);
+ return false;
}
}
...
if (!skillifySpawnedFor.has(sid)) {
- skillifySpawnedFor.add(sid);
try {
- spawnOpenclawSkillifyWorker({
+ if (spawnOpenclawSkillifyWorker({
apiUrl: cfg.apiUrl,
token: cfg.token,
orgId: cfg.orgId,
workspaceId: cfg.workspaceId,
userName: cfg.userName,
channel: ev.channel || "openclaw",
sessionId: sid,
loggerWarn: (msg) => logger.error(`Skillify spawn: ${msg}`),
- });
+ })) {
+ skillifySpawnedFor.add(sid);
+ }
} catch (e: any) {
logger.error(`Skillify spawn threw: ${e?.message ?? e}`);
}
}🤖 Prompt for AI Agents |
||
| } | ||
| } catch (err) { | ||
| logger.error(`Auto-capture failed: ${err instanceof Error ? err.message : String(err)}`); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid treating a freshly-created empty lock file as stale.
Line 382 creates the lock file before Line 384 writes the timestamp. A racing process that hits the read path during that window will see
"", parseNaN, unlink the file, and acquire the same lock. That reintroduces overlapping workers across processes.A simple fix is to only treat
NaN/ empty contents as stale when the file itself is older than the stale threshold (or at least older than a short grace window), instead of immediately unlinking on parse failure.Suggested direction
📝 Committable suggestion
🤖 Prompt for AI Agents