-
Notifications
You must be signed in to change notification settings - Fork 11
fix(embeddings): pi spawn-on-miss + openclaw embedding producer (#178) #183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c9478ec
17f9435
8d7df3d
bb9df97
f04f00a
76ecd0b
6cc2d9c
e4c8449
cf68f97
379883b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; | ||
| import { | ||
| readFileSync, existsSync, appendFileSync, mkdirSync, writeFileSync, | ||
| openSync, closeSync, renameSync, readdirSync, statSync, unlinkSync, | ||
| openSync, closeSync, writeSync, renameSync, readdirSync, statSync, unlinkSync, | ||
| constants as fsConstants, | ||
| } from "node:fs"; | ||
| import { homedir, tmpdir } from "node:os"; | ||
|
|
@@ -164,51 +164,204 @@ async function dlQuery(creds: Creds, sql: string): Promise<unknown[]> { | |
| // Pi avoids importing EmbedClient (which is bundled into other agents but | ||
| // here would break the "raw .ts, zero deps" promise of pi extensions). | ||
| // Instead we open a Unix socket directly to the daemon at the same well-known | ||
| // path EmbedClient uses. If the socket isn't there yet, we spawn the | ||
| // canonical daemon at ~/.hivemind/embed-deps/embed-daemon.js (deposited by | ||
| // `hivemind embeddings install`) and wait for it to listen, mirroring the | ||
| // auto-spawn-on-miss logic in src/embeddings/client.ts. Subsequent agents | ||
| // (codex, CC, cursor, hermes, …) connect to the SAME daemon — pi pays the | ||
| // cold-start cost only when it's the first user on the box. | ||
| // path EmbedClient uses. If the socket isn't there yet AND the canonical | ||
| // daemon binary exists at ~/.hivemind/embed-deps/embed-daemon.js (deposited | ||
| // by `hivemind embeddings install`), we spawn it under an O_EXCL pidfile | ||
| // lock and wait for it to listen. Subsequent agents (codex, CC, cursor, | ||
| // hermes, …) connect to the SAME daemon — pi pays the cold-start cost only | ||
| // when it's the first user on the box. This logic matches the source-tree | ||
| // helper at src/embeddings/standalone-embed-client.ts (kept in lockstep: | ||
| // the unit tests there cover the 11 edge cases mirrored here). | ||
| // | ||
| // Graceful fallback: any failure → return null → caller writes NULL into | ||
| // message_embedding. Embedding is never on the critical path. | ||
| // message_embedding. Embedding is NEVER on the critical path; pi must keep | ||
| // working when the daemon is unreachable. | ||
|
|
||
| const EMBED_DAEMON_ENTRY = join(homedir(), ".hivemind", "embed-deps", "embed-daemon.js"); | ||
| const EMBED_SOCKET_PATH = (() => { | ||
| const uid = typeof process.getuid === "function" ? String(process.getuid()) : (process.env.USER ?? "default"); | ||
| return `/tmp/hivemind-embed-${uid}.sock`; | ||
| })(); | ||
| // `process.env.USER` removed as a fallback: even though pi doesn't go | ||
| // through ClawHub static-scan, we keep the source in lockstep with | ||
| // src/embeddings/standalone-embed-client.ts (which DOES) so the two | ||
| // implementations stay byte-identical. On Linux/macOS `process.getuid` | ||
| // is always present; "default" is a fine sentinel elsewhere. | ||
| const EMBED_UID = typeof process.getuid === "function" ? String(process.getuid()) : "default"; | ||
| const EMBED_SOCKET_PATH = `/tmp/hivemind-embed-${EMBED_UID}.sock`; | ||
| const EMBED_PID_PATH = `/tmp/hivemind-embed-${EMBED_UID}.pid`; | ||
|
|
||
| function isPidAlive(pid: number): boolean { | ||
| if (!Number.isFinite(pid) || pid <= 0) return false; | ||
| try { process.kill(pid, 0); return true; } catch { return false; } | ||
| } | ||
|
|
||
| // Three-state read: "empty" means the file exists but hasn't been | ||
| // written yet — another caller is mid-spawn between openSync(wx) and | ||
| // writeSync(pid). Treating that as stale lets two racing callers each | ||
| // spawn a daemon, the second crashing on bind(). Mirrors | ||
| // src/embeddings/standalone-embed-client.ts:readPidFile. | ||
| function readPidFileInline(path: string): number | "empty" | null { | ||
| let raw: string; | ||
| try { raw = readFileSync(path, "utf-8").trim(); } catch { return null; } | ||
| if (raw === "") return "empty"; | ||
| const pid = Number(raw); | ||
| if (!pid || Number.isNaN(pid)) return null; | ||
| return pid; | ||
| } | ||
|
|
||
| function tryEmbedOverSocket(text: string, kind: "document" | "query"): Promise<number[] | null> { | ||
| function connectDaemonOnce(timeoutMs: number): Promise<ReturnType<typeof connect> | null> { | ||
| return new Promise((resolve) => { | ||
| let resolved = false; | ||
| const settle = (v: number[] | null) => { if (!resolved) { resolved = true; resolve(v); } }; | ||
| const sock = connect(EMBED_SOCKET_PATH); | ||
| let buf = ""; | ||
| const timer = setTimeout(() => { sock.destroy(); settle(null); }, 5000); | ||
| sock.on("connect", () => { | ||
| // Protocol shape comes from src/embeddings/protocol.ts: {op, id, kind, text}. | ||
| // id is a string ("1"), not a number, and the verb field is "op" not "type". | ||
| sock.write(JSON.stringify({ op: "embed", id: "1", kind, text }) + "\n"); | ||
| const to = setTimeout(() => { try { sock.destroy(); } catch { /* */ } resolve(null); }, timeoutMs); | ||
| sock.once("connect", () => { clearTimeout(to); resolve(sock); }); | ||
| sock.once("error", () => { clearTimeout(to); resolve(null); }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Spawn the canonical daemon under an O_EXCL pidfile lock. Returns true | ||
| * if THIS pi turn owns the spawn. Mirrors the helper in | ||
| * src/embeddings/standalone-embed-client.ts: | ||
| * - live pidfile owner (case 6/7) → don't SIGTERM (PID-reuse risk from PR #168), let caller wait | ||
| * - dead/garbage pidfile (case 5) → cleanup + spawn | ||
| * - spawn() throws (case 8) → roll pidfile back so the next turn can retry | ||
| */ | ||
| function trySpawnDaemonInline(): boolean { | ||
| let fd: number; | ||
| try { | ||
| fd = openSync(EMBED_PID_PATH, "wx", 0o600); | ||
| // Write the placeholder PID through the open fd. The previous version | ||
| // used writeFileSync(path, ...) which races with concurrent unlink + | ||
| // re-open elsewhere — it could overwrite another caller's pidfile | ||
| // entirely. writeSync(fd, ...) writes to OUR fd only. | ||
| writeSync(fd, String(process.pid)); | ||
| } catch { | ||
| const existing = readPidFileInline(EMBED_PID_PATH); | ||
| // Empty file: another caller won openSync(wx) but hasn't written its | ||
| // PID yet. We MUST NOT unlink + respawn — that lets us race past | ||
| // the legitimate writer and spawn a duplicate daemon. Wait instead. | ||
| if (existing === "empty") return false; | ||
| if (existing !== null && isPidAlive(existing)) { | ||
| // Live owner: another agent / pi turn is bringing the daemon up. Wait. | ||
| return false; | ||
| } | ||
| try { unlinkSync(EMBED_PID_PATH); } catch { /* */ } | ||
| try { | ||
| fd = openSync(EMBED_PID_PATH, "wx", 0o600); | ||
| writeSync(fd, String(process.pid)); | ||
| } catch { | ||
| return false; // sub-ms race: another caller claimed it between our unlink and reopen | ||
| } | ||
| } | ||
| try { | ||
| // No explicit `env: process.env` — it's the spawn default, and a | ||
| // literal `process.env` reference in source kept in lockstep with | ||
| // src/embeddings/standalone-embed-client.ts (which DOES go through | ||
| // ClawHub static-scan from the openclaw bundle). | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @coderabbitai why there is clawhub static scan here in pi? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here's the breakdown:
So the ClawHub references are essentially cross-implementation traceability notes — they explain why a certain pattern was chosen in the shared module, and why pi's copy preserves the same pattern even though it has no direct obligation to do so. If the two implementations ever diverge, the comment tells a future reader which file is the canonical source of truth for the constraint.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kaghni fair callout — agreed the source shouldn't name the AI review tool that flagged the invariant. Six months from now nobody knows what "codex P1" means. Cleaned up in
Kept the ClawHub references where they appear, because that one IS a recurring CI gate (the static scan on the openclaw bundle blocks any literal There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| const child = spawn(process.execPath, [EMBED_DAEMON_ENTRY], { | ||
| detached: true, | ||
| stdio: "ignore", | ||
| }); | ||
| child.unref(); | ||
| logHm(`embed: spawned daemon pid=${child.pid}`); | ||
| return true; | ||
| } catch (e: any) { | ||
| logHm(`embed: spawn failed: ${e?.message ?? e}`); | ||
| try { unlinkSync(EMBED_PID_PATH); } catch { /* */ } | ||
| return false; | ||
| } finally { | ||
| try { closeSync(fd); } catch { /* */ } | ||
| } | ||
| } | ||
|
|
||
| // After a spawnWaitMs timeout with daemon never opening socket, the | ||
| // pidfile still holds OUR placeholder PID. Every subsequent pi turn | ||
| // would see "live owner" (we're still running) and wait forever instead | ||
| // of retrying the spawn. Clean up the placeholder, but only if it's | ||
| // still ours — the daemon may have already overwritten it. | ||
| // | ||
| // Also clears an empty pidfile: if a prior pi turn was SIGKILL'd | ||
| // between openSync(wx) and writeSync(pid), the empty file would persist | ||
| // and every later turn would wait forever. By the time we hit this | ||
| // cleanup we've waited 5s — orders of magnitude longer than the | ||
| // legitimate openSync→writeSync gap. | ||
| function maybeCleanupOwnPlaceholderInline(): void { | ||
| const existing = readPidFileInline(EMBED_PID_PATH); | ||
| if (existing === process.pid || existing === "empty") { | ||
| try { unlinkSync(EMBED_PID_PATH); } catch { /* already gone */ } | ||
| } | ||
| } | ||
|
|
||
| async function sendEmbedRequest(sock: ReturnType<typeof connect>, text: string, kind: "document" | "query", timeoutMs: number): Promise<number[] | null> { | ||
| return new Promise((resolve) => { | ||
| let resolved = false; | ||
| const settle = (v: number[] | null) => { if (!resolved) { resolved = true; resolve(v); try { sock.destroy(); } catch { /* */ } } }; | ||
| let buf = ""; | ||
| const timer = setTimeout(() => settle(null), timeoutMs); | ||
| sock.on("data", (chunk: Buffer) => { | ||
| buf += chunk.toString("utf-8"); | ||
| const nl = buf.indexOf("\n"); | ||
| if (nl !== -1) { | ||
| clearTimeout(timer); | ||
| try { | ||
| const resp = JSON.parse(buf.slice(0, nl)); | ||
| settle(Array.isArray(resp.embedding) ? resp.embedding : null); | ||
| } catch { settle(null); } | ||
| sock.destroy(); | ||
| } | ||
| if (nl === -1) return; | ||
| clearTimeout(timer); | ||
| try { | ||
| const resp = JSON.parse(buf.slice(0, nl)); | ||
| // Daemon may return `{ error: "unknown op" }` from an older protocol — graceful NULL. | ||
| if (!Array.isArray(resp.embedding)) return settle(null); | ||
| // JSON-over-socket is untrusted at runtime. Reject any non-finite | ||
| // element (string, null, NaN, Infinity, object). Without this, a | ||
| // misbehaving daemon could ship bad values that flow into the | ||
| // ARRAY[...]::FLOAT4[] SQL literal. | ||
| for (const v of resp.embedding) { | ||
| if (typeof v !== "number" || !Number.isFinite(v)) return settle(null); | ||
| } | ||
| settle(resp.embedding); | ||
| } catch { settle(null); } | ||
| }); | ||
| sock.on("error", () => { clearTimeout(timer); settle(null); }); | ||
| sock.on("close", () => { clearTimeout(timer); settle(null); }); | ||
| // Protocol shape comes from src/embeddings/protocol.ts: { op, id, kind, text }. | ||
| // id is a string ("1"), not a number, and the verb field is "op" not "type". | ||
| sock.write(JSON.stringify({ op: "embed", id: "1", kind, text }) + "\n"); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Full spawn-on-miss embedding flow. Returns null on any failure; never | ||
| * throws. 11 edge cases mirror the unit tests in | ||
| * tests/shared/standalone-embed-client.test.ts. | ||
| */ | ||
| async function tryEmbedOverSocket(text: string, kind: "document" | "query"): Promise<number[] | null> { | ||
| // Case 3 — happy path: socket alive, daemon ready. | ||
| let sock = await connectDaemonOnce(1000); | ||
| if (!sock) { | ||
| // Case 1 — binary missing: never spawn. | ||
| if (!existsSync(EMBED_DAEMON_ENTRY)) { | ||
| logHm(`embed: no daemon at ${EMBED_DAEMON_ENTRY} — run 'hivemind embeddings install'`); | ||
| return null; | ||
| } | ||
| // Cases 2 / 4 / 5 / 7 / 8 — trySpawn handles them; loser waits. | ||
| trySpawnDaemonInline(); | ||
| // Case 9 — poll for socket up to 5s. | ||
| const deadline = Date.now() + 5000; | ||
| let delay = 30; | ||
| while (Date.now() < deadline) { | ||
| await new Promise(r => setTimeout(r, delay)); | ||
| delay = Math.min(delay * 1.5, 300); | ||
| if (!existsSync(EMBED_SOCKET_PATH)) continue; | ||
| sock = await connectDaemonOnce(1000); | ||
| if (sock) break; | ||
| } | ||
| if (!sock) { | ||
| // Clean up our placeholder PID so the next pi turn can retry the | ||
| // spawn instead of waiting on us forever. | ||
| maybeCleanupOwnPlaceholderInline(); | ||
| logHm(`embed: daemon never opened socket within 5s`); | ||
| return null; | ||
| } | ||
| } | ||
| // Cases 10 / 11 — request timeout / daemon error → null. | ||
| const v = await sendEmbedRequest(sock, text, kind, 5000); | ||
| if (v === null) logHm(`embed: daemon returned null (timeout or error)`); | ||
| return v; | ||
| } | ||
|
|
||
| // ---------- summary state + wiki-worker spawn --------------------------------- | ||
| // | ||
| // Mirror of src/hooks/summary-state.ts (same dir, same JSON shape, shared | ||
|
|
@@ -569,39 +722,12 @@ async function embed(text: string): Promise<number[] | null> { | |
| logHm(`embed: skipped (empty text)`); | ||
| return null; | ||
| } | ||
| // 1) socket already up (another agent or us in a previous turn) → fast path | ||
| let v = await tryEmbedOverSocket(text, "document"); | ||
| if (v !== null) { | ||
| logHm(`embed: ok via existing socket (dims=${v.length})`); | ||
| return v; | ||
| } | ||
| // 2) no daemon binary deposited → fallback NULL | ||
| if (!existsSync(EMBED_DAEMON_ENTRY)) { | ||
| logHm(`embed: no daemon at ${EMBED_DAEMON_ENTRY} — run 'hivemind embeddings install'`); | ||
| return null; | ||
| } | ||
| // 3) spawn the canonical daemon detached; daemon's own pidfile lock guards | ||
| // against double-spawn if multiple pi turns race. | ||
| logHm(`embed: spawning daemon at ${EMBED_DAEMON_ENTRY}`); | ||
| try { | ||
| spawn(process.execPath, [EMBED_DAEMON_ENTRY], { detached: true, stdio: "ignore" }).unref(); | ||
| } catch (e: any) { | ||
| logHm(`embed: spawn failed: ${e?.message ?? e}`); | ||
| return null; | ||
| } | ||
| // 4) poll for the socket up to ~5s, then retry the embed once | ||
| for (let i = 0; i < 25; i++) { | ||
| await new Promise(r => setTimeout(r, 200)); | ||
| if (existsSync(EMBED_SOCKET_PATH)) { | ||
| v = await tryEmbedOverSocket(text, "document"); | ||
| if (v !== null) { | ||
| logHm(`embed: ok after spawn (dims=${v.length}, polls=${i + 1})`); | ||
| return v; | ||
| } | ||
| } | ||
| } | ||
| logHm(`embed: timed out after spawn (5s)`); | ||
| return null; | ||
| // Single round-trip: tryEmbedOverSocket spawns the daemon on miss | ||
| // (O_EXCL race-safe, mirrors src/embeddings/standalone-embed-client.ts) | ||
| // and embeds in one call. Returns null on any failure. | ||
| const v = await tryEmbedOverSocket(text, "document"); | ||
| if (v !== null) logHm(`embed: ok (dims=${v.length})`); | ||
| return v; | ||
| } | ||
|
|
||
| function embedSqlLiteral(emb: number[] | null): string { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Embed the message body, not the JSON envelope.
Line 1288 embeds
line(UUID/timestamp/role + content JSON), which adds noisy, per-row tokens and weakens semantic similarity quality. Embedtext(or another stable content-only string) instead.Suggested fix
🤖 Prompt for AI Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed and declined for this PR — the suggestion is correct in vacuum but would break cross-agent semantic comparability.
All four sibling agents that write into the same
sessions.message_embeddingcolumn embedline(the JSON envelope), nottext:src/hooks/capture.ts:146(claude-code)src/hooks/codex/capture.ts:131+src/hooks/codex/stop.ts:126src/hooks/cursor/capture.ts:158src/hooks/hermes/capture.ts:138Changing only openclaw to embed
textwould mix two different embedding distributions in the same column: asearch_query: "user wants X"query would compare againstembed({id, type, session_id, content: "user wants X", timestamp})for the other agents andembed("user wants X")for openclaw — different magnitudes, different score ranges, ranking gets unbalanced.The only place
textis embedded today is the wiki-worker path (src/hooks/wiki-worker.ts:197and its per-agent mirrors), which writes to a different column (memory.summary_embedding) — there the input IS just the summary text, no envelope, sotextis correct.If the team wants to switch the entire sessions capture path to content-only embeddings (which IS likely the right long-term call — less token noise, better recall), that's a separate PR that updates all five agents and re-embeds existing rows. Out of scope here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@efenocchi, that's a clear and well-reasoned explanation — thank you. The cross-agent consistency constraint onsessions.message_embeddingis exactly the kind of context a static review misses. Accepted.I'll record this so future reviews don't re-raise the same point on any of the five capture hooks.
Would you like me to open a follow-up issue to track the content-only embedding migration (updating all five agents + re-embedding existing rows)?
✏️ Learnings added