Skip to content

Commit 1a164e0

Browse files
committed
Isolate PDF conversion failures with worker process
1 parent 6115600 commit 1a164e0

File tree

4 files changed

+293
-30
lines changed

4 files changed

+293
-30
lines changed

src/createEmbeddingText.ts

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { spawn as nodeSpawn } from "child_process";
2+
import { existsSync as nodeExistsSync } from "fs";
3+
import {
4+
EmbeddingText,
5+
CreateEmbeddingText,
6+
} from "@magda/semantic-indexer-sdk";
7+
8+
export const PDF_TO_MD_TIMEOUT_MS = 5 * 60 * 1000;
9+
10+
type SpawnFn = typeof nodeSpawn;
11+
type ExistsSyncFn = typeof nodeExistsSync;
12+
13+
type CreateEmbeddingTextDeps = {
14+
spawn?: SpawnFn;
15+
existsSync?: ExistsSyncFn;
16+
timeoutMs?: number;
17+
workerScriptPath?: string;
18+
processExecPath?: string;
19+
};
20+
21+
export function convertPdfToMarkdownInChildProcess(
22+
filePath: string,
23+
deps: CreateEmbeddingTextDeps = {}
24+
): Promise<string> {
25+
const spawn = deps.spawn ?? nodeSpawn;
26+
const timeoutMs = deps.timeoutMs ?? PDF_TO_MD_TIMEOUT_MS;
27+
const workerScriptPath =
28+
deps.workerScriptPath ?? new URL("./pdf2mdWorker.js", import.meta.url).pathname;
29+
const processExecPath = deps.processExecPath ?? process.execPath;
30+
31+
return new Promise((resolve, reject) => {
32+
const worker = spawn(processExecPath, [workerScriptPath, filePath], {
33+
stdio: ["ignore", "pipe", "pipe"],
34+
});
35+
36+
let output = "";
37+
let errorOutput = "";
38+
let settled = false;
39+
40+
const settle = (fn: () => void) => {
41+
if (settled) {
42+
return;
43+
}
44+
settled = true;
45+
clearTimeout(timer);
46+
fn();
47+
};
48+
49+
const timer = setTimeout(() => {
50+
worker.kill("SIGKILL");
51+
settle(() =>
52+
reject(new Error(`pdf2md worker timed out after ${timeoutMs}ms`))
53+
);
54+
}, timeoutMs);
55+
56+
worker.stdout.on("data", (chunk: Buffer | string) => {
57+
output += chunk.toString();
58+
});
59+
60+
worker.stderr.on("data", (chunk: Buffer | string) => {
61+
errorOutput += chunk.toString();
62+
});
63+
64+
worker.on("error", (err) => {
65+
settle(() =>
66+
reject(new Error(`Failed to start pdf2md worker: ${err.message}`))
67+
);
68+
});
69+
70+
worker.on("close", (code) => {
71+
if (code === 0) {
72+
settle(() => resolve(output));
73+
} else {
74+
const details = errorOutput.trim() || "No stderr output";
75+
settle(() =>
76+
reject(new Error(`pdf2md worker exited with code ${code}: ${details}`))
77+
);
78+
}
79+
});
80+
});
81+
}
82+
83+
export function makeCreateEmbeddingText(
84+
deps: CreateEmbeddingTextDeps = {}
85+
): CreateEmbeddingText {
86+
const existsSync = deps.existsSync ?? nodeExistsSync;
87+
88+
return async ({ format, filePath }) => {
89+
if (format !== "PDF" || !filePath) {
90+
throw new Error("Unexpected format or file path");
91+
}
92+
93+
if (!existsSync(filePath)) {
94+
throw new Error(`File not found: ${filePath}`);
95+
}
96+
97+
const result = await convertPdfToMarkdownInChildProcess(filePath, deps).catch(
98+
(err) => {
99+
throw new Error(`Failed to convert PDF to Markdown: ${err.message}`);
100+
}
101+
);
102+
103+
if (!result) {
104+
throw new Error("Empty conversion result");
105+
}
106+
107+
return { text: result } as EmbeddingText;
108+
};
109+
}
110+
111+
export const createEmbeddingText = makeCreateEmbeddingText();

src/index.ts

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,14 @@
11
import semanticIndexer, {
2-
EmbeddingText,
32
SemanticIndexerOptions,
43
commonYargs,
5-
CreateEmbeddingText,
64
ChunkStrategyType,
75
} from "@magda/semantic-indexer-sdk";
8-
import pdf2md from "@opendocsg/pdf2md";
9-
import { existsSync, readFileSync } from "fs";
106
import { pdfSemanticIndexerArgs } from "./pdfSemanticIndexerArgs.js";
117
import { MarkdownChunker } from "./markdownChunker.js";
8+
import { createEmbeddingText } from "./createEmbeddingText.js";
129

1310
const port = pdfSemanticIndexerArgs.port;
1411
const args = commonYargs(port, `http://localhost:${port}`);
15-
16-
export const createEmbeddingText: CreateEmbeddingText = async ({
17-
record,
18-
format,
19-
filePath,
20-
url,
21-
}) => {
22-
if (format === "PDF" && filePath) {
23-
if (!existsSync(filePath)) {
24-
throw new Error(`File not found: ${filePath}`);
25-
}
26-
27-
const pdfBuffer = readFileSync(filePath);
28-
const result = await pdf2md(pdfBuffer).catch((err) => {
29-
throw new Error(`Failed to convert PDF to Markdown: ${err}`);
30-
});
31-
32-
if (!result) {
33-
throw new Error(`Empty conversion result`);
34-
}
35-
36-
return { text: result } as EmbeddingText;
37-
}
38-
throw new Error("Unexpected format or file path");
39-
};
40-
4112
const markdownChunker = new MarkdownChunker(
4213
pdfSemanticIndexerArgs.chunkSizeLimit,
4314
pdfSemanticIndexerArgs.overlap

src/pdf2mdWorker.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import pdf2md from "@opendocsg/pdf2md";
2+
import { existsSync, readFileSync } from "fs";
3+
4+
function fail(message: string, err?: unknown): never {
5+
if (err instanceof Error) {
6+
console.error(`${message}: ${err.message}`);
7+
} else if (err !== undefined) {
8+
console.error(`${message}: ${String(err)}`);
9+
} else {
10+
console.error(message);
11+
}
12+
process.exit(1);
13+
}
14+
15+
process.on("unhandledRejection", (reason) => {
16+
fail("pdf2md worker unhandled rejection", reason);
17+
});
18+
19+
process.on("uncaughtException", (err) => {
20+
fail("pdf2md worker uncaught exception", err);
21+
});
22+
23+
const filePath = process.argv[2];
24+
25+
if (!filePath) {
26+
fail("No input file path supplied to pdf2md worker");
27+
}
28+
29+
if (!existsSync(filePath)) {
30+
fail(`Input file does not exist: ${filePath}`);
31+
}
32+
33+
try {
34+
const pdfBuffer = readFileSync(filePath);
35+
const markdown = await pdf2md(pdfBuffer);
36+
if (!markdown) {
37+
fail("pdf2md worker produced empty output");
38+
}
39+
process.stdout.write(markdown);
40+
process.exit(0);
41+
} catch (err) {
42+
fail("pdf2md worker failed conversion", err);
43+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import { expect } from "chai";
2+
import { EventEmitter } from "events";
3+
import { PassThrough } from "stream";
4+
import { makeCreateEmbeddingText } from "../createEmbeddingText.js";
5+
6+
type FakeWorkerProcess = EventEmitter & {
7+
stdout: PassThrough;
8+
stderr: PassThrough;
9+
killCalls: string[];
10+
kill: (signal: string) => void;
11+
};
12+
13+
function createFakeWorkerProcess(): FakeWorkerProcess {
14+
const worker = new EventEmitter() as FakeWorkerProcess;
15+
worker.stdout = new PassThrough();
16+
worker.stderr = new PassThrough();
17+
worker.killCalls = [];
18+
worker.kill = (signal: string) => {
19+
worker.killCalls.push(signal);
20+
};
21+
return worker;
22+
}
23+
24+
async function expectThrowsMessage(
25+
task: Promise<unknown>,
26+
expectedMessage: string
27+
): Promise<void> {
28+
let thrown: unknown;
29+
try {
30+
await task;
31+
} catch (err) {
32+
thrown = err;
33+
}
34+
expect(thrown).to.be.instanceOf(Error);
35+
expect((thrown as Error).message).to.equal(expectedMessage);
36+
}
37+
38+
describe("createEmbeddingText", () => {
39+
it("should convert PDF through worker process", async () => {
40+
const worker = createFakeWorkerProcess();
41+
let spawnCallCount = 0;
42+
const spawn = () => {
43+
spawnCallCount += 1;
44+
setImmediate(() => {
45+
worker.stdout.write("converted markdown");
46+
worker.emit("close", 0);
47+
});
48+
return worker as any;
49+
};
50+
51+
const createEmbeddingText = makeCreateEmbeddingText({
52+
spawn: spawn as any,
53+
existsSync: () => true,
54+
workerScriptPath: "/tmp/pdf2mdWorker.js",
55+
processExecPath: "/usr/local/bin/node",
56+
timeoutMs: 200,
57+
});
58+
59+
const result = await createEmbeddingText({
60+
record: {} as any,
61+
format: "PDF",
62+
filePath: "/tmp/test.pdf",
63+
url: "http://example.com/test.pdf",
64+
readonlyRegistry: {} as any,
65+
});
66+
67+
expect(result.text).to.equal("converted markdown");
68+
expect(spawnCallCount).to.equal(1);
69+
});
70+
71+
it("should wrap worker failure as conversion error", async () => {
72+
const worker = createFakeWorkerProcess();
73+
const spawn = () => {
74+
setImmediate(() => {
75+
worker.stderr.write("trailing junk");
76+
worker.emit("close", 1);
77+
});
78+
return worker as any;
79+
};
80+
81+
const createEmbeddingText = makeCreateEmbeddingText({
82+
spawn: spawn as any,
83+
existsSync: () => true,
84+
timeoutMs: 200,
85+
});
86+
87+
await expectThrowsMessage(
88+
createEmbeddingText({
89+
record: {} as any,
90+
format: "PDF",
91+
filePath: "/tmp/test.pdf",
92+
url: "http://example.com/test.pdf",
93+
readonlyRegistry: {} as any,
94+
}),
95+
"Failed to convert PDF to Markdown: pdf2md worker exited with code 1: trailing junk"
96+
);
97+
});
98+
99+
it("should timeout worker and fail conversion", async () => {
100+
const worker = createFakeWorkerProcess();
101+
const spawn = () => worker as any;
102+
103+
const createEmbeddingText = makeCreateEmbeddingText({
104+
spawn: spawn as any,
105+
existsSync: () => true,
106+
timeoutMs: 10,
107+
});
108+
109+
await expectThrowsMessage(
110+
createEmbeddingText({
111+
record: {} as any,
112+
format: "PDF",
113+
filePath: "/tmp/test.pdf",
114+
url: "http://example.com/test.pdf",
115+
readonlyRegistry: {} as any,
116+
}),
117+
"Failed to convert PDF to Markdown: pdf2md worker timed out after 10ms"
118+
);
119+
expect(worker.killCalls).to.deep.equal(["SIGKILL"]);
120+
});
121+
122+
it("should fail when file does not exist", async () => {
123+
const createEmbeddingText = makeCreateEmbeddingText({
124+
existsSync: () => false,
125+
});
126+
127+
await expectThrowsMessage(
128+
createEmbeddingText({
129+
record: {} as any,
130+
format: "PDF",
131+
filePath: "/tmp/missing.pdf",
132+
url: "http://example.com/test.pdf",
133+
readonlyRegistry: {} as any,
134+
}),
135+
"File not found: /tmp/missing.pdf"
136+
);
137+
});
138+
});

0 commit comments

Comments
 (0)