Skip to content

Commit 93e0fbf

Browse files
committed
refactor: replace Piscina with native worker_threads
Removes the piscina dependency in favor of a custom worker pool implementation using Node.js built-in worker_threads. This reduces external dependencies while maintaining the same async tokenization functionality. Changes: - Added workerPool.ts to manage worker thread lifecycle and message passing - Updated tokenizer.worker.ts to handle parentPort messages directly - Modified tokenizer.ts to use new run() function instead of Piscina - Migrated unit tests from Jest to bun test for consistency - Updated CI and Makefile to use bun test for coverage The custom pool creates a single persistent worker at module load time and handles request/response matching via message IDs.
1 parent 4cc2333 commit 93e0fbf

File tree

7 files changed

+137
-58
lines changed

7 files changed

+137
-58
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ jobs:
7575
- uses: ./.github/actions/setup-cmux
7676

7777
- name: Run tests with coverage
78-
run: bun x jest --coverage --coverageReporters=lcov ${{ github.event.inputs.test_filter || 'src' }}
78+
run: bun test --coverage --coverage-reporter=lcov ${{ github.event.inputs.test_filter || 'src' }}
7979

8080
- name: Upload coverage to Codecov
8181
uses: codecov/codecov-action@v5

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,11 @@ check-deadcode: node_modules/.installed ## Check for potential dead code (manual
191191

192192
## Testing
193193
test-integration: node_modules/.installed ## Run all tests (unit + integration)
194-
@bun x jest src
194+
@bun test src
195195
@TEST_INTEGRATION=1 bun x jest tests
196196

197-
test-unit: node_modules/.installed build-main ## Run unit tests (requires build-main for worker compilation)
198-
@bun x jest src
197+
test-unit: node_modules/.installed ## Run unit tests
198+
@bun test src
199199

200200
test: test-unit ## Alias for test-unit
201201

bun.lock

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
"lru-cache": "^11.2.2",
2929
"markdown-it": "^14.1.0",
3030
"minimist": "^1.2.8",
31-
"piscina": "^5.1.3",
3231
"rehype-harden": "^1.1.5",
3332
"shescape": "^2.1.6",
3433
"source-map-support": "^0.5.21",
@@ -395,42 +394,6 @@
395394

396395
"@mermaid-js/parser": ["@mermaid-js/[email protected]", "", { "dependencies": { "langium": "3.3.1" } }, "sha512-lnjOhe7zyHjc+If7yT4zoedx2vo4sHaTmtkl1+or8BRTnCtDmcTpAjpzDSfCZrshM5bCoz0GyidzadJAH1xobA=="],
397396

398-
"@napi-rs/nice": ["@napi-rs/[email protected]", "", { "optionalDependencies": { "@napi-rs/nice-android-arm-eabi": "1.1.1", "@napi-rs/nice-android-arm64": "1.1.1", "@napi-rs/nice-darwin-arm64": "1.1.1", "@napi-rs/nice-darwin-x64": "1.1.1", "@napi-rs/nice-freebsd-x64": "1.1.1", "@napi-rs/nice-linux-arm-gnueabihf": "1.1.1", "@napi-rs/nice-linux-arm64-gnu": "1.1.1", "@napi-rs/nice-linux-arm64-musl": "1.1.1", "@napi-rs/nice-linux-ppc64-gnu": "1.1.1", "@napi-rs/nice-linux-riscv64-gnu": "1.1.1", "@napi-rs/nice-linux-s390x-gnu": "1.1.1", "@napi-rs/nice-linux-x64-gnu": "1.1.1", "@napi-rs/nice-linux-x64-musl": "1.1.1", "@napi-rs/nice-openharmony-arm64": "1.1.1", "@napi-rs/nice-win32-arm64-msvc": "1.1.1", "@napi-rs/nice-win32-ia32-msvc": "1.1.1", "@napi-rs/nice-win32-x64-msvc": "1.1.1" } }, "sha512-xJIPs+bYuc9ASBl+cvGsKbGrJmS6fAKaSZCnT0lhahT5rhA2VVy9/EcIgd2JhtEuFOJNx7UHNn/qiTPTY4nrQw=="],
399-
400-
"@napi-rs/nice-android-arm-eabi": ["@napi-rs/[email protected]", "", { "os": "android", "cpu": "arm" }, "sha512-kjirL3N6TnRPv5iuHw36wnucNqXAO46dzK9oPb0wj076R5Xm8PfUVA9nAFB5ZNMmfJQJVKACAPd/Z2KYMppthw=="],
401-
402-
"@napi-rs/nice-android-arm64": ["@napi-rs/[email protected]", "", { "os": "android", "cpu": "arm64" }, "sha512-blG0i7dXgbInN5urONoUCNf+DUEAavRffrO7fZSeoRMJc5qD+BJeNcpr54msPF6qfDD6kzs9AQJogZvT2KD5nw=="],
403-
404-
"@napi-rs/nice-darwin-arm64": ["@napi-rs/[email protected]", "", { "os": "darwin", "cpu": "arm64" }, "sha512-s/E7w45NaLqTGuOjC2p96pct4jRfo61xb9bU1unM/MJ/RFkKlJyJDx7OJI/O0ll/hrfpqKopuAFDV8yo0hfT7A=="],
405-
406-
"@napi-rs/nice-darwin-x64": ["@napi-rs/[email protected]", "", { "os": "darwin", "cpu": "x64" }, "sha512-dGoEBnVpsdcC+oHHmW1LRK5eiyzLwdgNQq3BmZIav+9/5WTZwBYX7r5ZkQC07Nxd3KHOCkgbHSh4wPkH1N1LiQ=="],
407-
408-
"@napi-rs/nice-freebsd-x64": ["@napi-rs/[email protected]", "", { "os": "freebsd", "cpu": "x64" }, "sha512-kHv4kEHAylMYmlNwcQcDtXjklYp4FCf0b05E+0h6nDHsZ+F0bDe04U/tXNOqrx5CmIAth4vwfkjjUmp4c4JktQ=="],
409-
410-
"@napi-rs/nice-linux-arm-gnueabihf": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "arm" }, "sha512-E1t7K0efyKXZDoZg1LzCOLxgolxV58HCkaEkEvIYQx12ht2pa8hoBo+4OB3qh7e+QiBlp1SRf+voWUZFxyhyqg=="],
411-
412-
"@napi-rs/nice-linux-arm64-gnu": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "arm64" }, "sha512-CIKLA12DTIZlmTaaKhQP88R3Xao+gyJxNWEn04wZwC2wmRapNnxCUZkVwggInMJvtVElA+D4ZzOU5sX4jV+SmQ=="],
413-
414-
"@napi-rs/nice-linux-arm64-musl": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "arm64" }, "sha512-+2Rzdb3nTIYZ0YJF43qf2twhqOCkiSrHx2Pg6DJaCPYhhaxbLcdlV8hCRMHghQ+EtZQWGNcS2xF4KxBhSGeutg=="],
415-
416-
"@napi-rs/nice-linux-ppc64-gnu": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "ppc64" }, "sha512-4FS8oc0GeHpwvv4tKciKkw3Y4jKsL7FRhaOeiPei0X9T4Jd619wHNe4xCLmN2EMgZoeGg+Q7GY7BsvwKpL22Tg=="],
417-
418-
"@napi-rs/nice-linux-riscv64-gnu": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "none" }, "sha512-HU0nw9uD4FO/oGCCk409tCi5IzIZpH2agE6nN4fqpwVlCn5BOq0MS1dXGjXaG17JaAvrlpV5ZeyZwSon10XOXw=="],
419-
420-
"@napi-rs/nice-linux-s390x-gnu": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "s390x" }, "sha512-2YqKJWWl24EwrX0DzCQgPLKQBxYDdBxOHot1KWEq7aY2uYeX+Uvtv4I8xFVVygJDgf6/92h9N3Y43WPx8+PAgQ=="],
421-
422-
"@napi-rs/nice-linux-x64-gnu": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "x64" }, "sha512-/gaNz3R92t+dcrfCw/96pDopcmec7oCcAQ3l/M+Zxr82KT4DljD37CpgrnXV+pJC263JkW572pdbP3hP+KjcIg=="],
423-
424-
"@napi-rs/nice-linux-x64-musl": ["@napi-rs/[email protected]", "", { "os": "linux", "cpu": "x64" }, "sha512-xScCGnyj/oppsNPMnevsBe3pvNaoK7FGvMjT35riz9YdhB2WtTG47ZlbxtOLpjeO9SqqQ2J2igCmz6IJOD5JYw=="],
425-
426-
"@napi-rs/nice-openharmony-arm64": ["@napi-rs/[email protected]", "", { "os": "none", "cpu": "arm64" }, "sha512-6uJPRVwVCLDeoOaNyeiW0gp2kFIM4r7PL2MczdZQHkFi9gVlgm+Vn+V6nTWRcu856mJ2WjYJiumEajfSm7arPQ=="],
427-
428-
"@napi-rs/nice-win32-arm64-msvc": ["@napi-rs/[email protected]", "", { "os": "win32", "cpu": "arm64" }, "sha512-uoTb4eAvM5B2aj/z8j+Nv8OttPf2m+HVx3UjA5jcFxASvNhQriyCQF1OB1lHL43ZhW+VwZlgvjmP5qF3+59atA=="],
429-
430-
"@napi-rs/nice-win32-ia32-msvc": ["@napi-rs/[email protected]", "", { "os": "win32", "cpu": "ia32" }, "sha512-CNQqlQT9MwuCsg1Vd/oKXiuH+TcsSPJmlAFc5frFyX/KkOh0UpBLEj7aoY656d5UKZQMQFP7vJNa1DNUNORvug=="],
431-
432-
"@napi-rs/nice-win32-x64-msvc": ["@napi-rs/[email protected]", "", { "os": "win32", "cpu": "x64" }, "sha512-vB+4G/jBQCAh0jelMTY3+kgFy00Hlx2f2/1zjMoH821IbplbWZOkLiTYXQkygNTzQJTq5cvwBDgn2ppHD+bglQ=="],
433-
434397
"@napi-rs/wasm-runtime": ["@napi-rs/[email protected]", "", { "dependencies": { "@emnapi/core": "^1.4.3", "@emnapi/runtime": "^1.4.3", "@tybys/wasm-util": "^0.10.0" } }, "sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ=="],
435398

436399
"@nodelib/fs.scandir": ["@nodelib/[email protected]", "", { "dependencies": { "@nodelib/fs.stat": "2.0.5", "run-parallel": "^1.1.9" } }, "sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g=="],
@@ -2341,8 +2304,6 @@
23412304

23422305
"pirates": ["[email protected]", "", {}, "sha512-TfySrs/5nm8fQJDcBDuUng3VOUKsd7S+zqvbOTiGXHfxX4wK31ard+hoNuvkicM/2YFzlpDgABOevKSsB4G/FA=="],
23432306

2344-
"piscina": ["[email protected]", "", { "optionalDependencies": { "@napi-rs/nice": "^1.0.4" } }, "sha512-0u3N7H4+hbr40KjuVn2uNhOcthu/9usKhnw5vT3J7ply79v3D3M8naI00el9Klcy16x557VsEkkUQaHCWFXC/g=="],
2345-
23462307
"pkg-dir": ["[email protected]", "", { "dependencies": { "find-up": "^4.0.0" } }, "sha512-HRDzbaKjC+AOWVXxAU/x54COGeIv9eb+6CkDSQoNTt4XyWoIJvuPsXizxu/Fr23EiekbtZwmh1IcIG/l/a10GQ=="],
23472308

23482309
"pkg-types": ["[email protected]", "", { "dependencies": { "confbox": "^0.2.2", "exsolve": "^1.0.7", "pathe": "^2.0.3" } }, "sha512-SIqCzDRg0s9npO5XQ3tNZioRY1uK06lA41ynBC1YmFTmnY6FjUjVt6s4LoADmwoig1qqD0oK8h1p/8mlMx8Oig=="],

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
"lru-cache": "^11.2.2",
7070
"markdown-it": "^14.1.0",
7171
"minimist": "^1.2.8",
72-
"piscina": "^5.1.3",
7372
"rehype-harden": "^1.1.5",
7473
"shescape": "^2.1.6",
7574
"source-map-support": "^0.5.21",

src/utils/main/tokenizer.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ import assert from "@/utils/assert";
22
import CRC32 from "crc-32";
33
import { LRUCache } from "lru-cache";
44
import { getAvailableTools, getToolSchemas } from "@/utils/tools/toolDefinitions";
5-
import Piscina from "piscina";
6-
import { createRequire } from "module";
75
import type { CountTokensInput } from "./tokenizer.worker";
86
import { models, type ModelName } from "ai-tokenizer";
7+
import { run } from "./workerPool";
98

109
/**
1110
* Public tokenizer interface exposed to callers.
12-
* countTokens is async because the heavy lifting happens in a Piscina worker.
11+
* countTokens is async because the heavy lifting happens in a worker thread.
1312
*/
1413
export interface Tokenizer {
1514
encoding: string;
@@ -26,13 +25,6 @@ const DEFAULT_WARM_MODELS = [
2625
"anthropic:claude-sonnet-4-5",
2726
] as const;
2827

29-
const requireForResolve = createRequire(__filename);
30-
const workerPath = requireForResolve.resolve("./tokenizer.worker");
31-
const tokenizerPool = new Piscina({
32-
filename: workerPath,
33-
idleTimeout: Infinity,
34-
});
35-
3628
const encodingPromises = new Map<ModelName, Promise<string>>();
3729
const inFlightCounts = new Map<string, Promise<number>>();
3830
const tokenCountCache = new LRUCache<string, number>({
@@ -59,8 +51,7 @@ function normalizeModelKey(modelName: string): ModelName {
5951
function resolveEncoding(modelName: ModelName): Promise<string> {
6052
let promise = encodingPromises.get(modelName);
6153
if (!promise) {
62-
promise = tokenizerPool
63-
.run(modelName, { name: "encodingName" })
54+
promise = run<string>("encodingName", modelName)
6455
.then((result: unknown) => {
6556
assert(
6657
typeof result === "string" && result.length > 0,
@@ -97,8 +88,7 @@ async function countTokensInternal(modelName: ModelName, text: string): Promise<
9788
let pending = inFlightCounts.get(key);
9889
if (!pending) {
9990
const payload: CountTokensInput = { modelName, input: text };
100-
pending = tokenizerPool
101-
.run(payload, { name: "countTokens" })
91+
pending = run<number>("countTokens", payload)
10292
.then((value: unknown) => {
10393
assert(
10494
typeof value === "number" && Number.isFinite(value) && value >= 0,

src/utils/main/tokenizer.worker.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import assert from "node:assert";
2+
import { parentPort } from "node:worker_threads";
23
import { Tokenizer, models } from "ai-tokenizer";
34
import type { ModelName } from "ai-tokenizer";
45
import * as encoding from "ai-tokenizer/encoding";
@@ -38,3 +39,36 @@ export function encodingName(modelName: ModelName): string {
3839
assert(model, `Unknown tokenizer model '${modelName}'`);
3940
return model.encoding;
4041
}
42+
43+
// Handle messages from main thread
44+
if (parentPort) {
45+
parentPort.on("message", (message: { messageId: number; taskName: string; data: unknown }) => {
46+
try {
47+
let result: unknown;
48+
49+
switch (message.taskName) {
50+
case "countTokens":
51+
result = countTokens(message.data as CountTokensInput);
52+
break;
53+
case "encodingName":
54+
result = encodingName(message.data as ModelName);
55+
break;
56+
default:
57+
throw new Error(`Unknown task: ${message.taskName}`);
58+
}
59+
60+
parentPort!.postMessage({
61+
messageId: message.messageId,
62+
result,
63+
});
64+
} catch (error) {
65+
parentPort!.postMessage({
66+
messageId: message.messageId,
67+
error: {
68+
message: error instanceof Error ? error.message : String(error),
69+
stack: error instanceof Error ? error.stack : undefined,
70+
},
71+
});
72+
}
73+
});
74+
}

src/utils/main/workerPool.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { Worker } from "node:worker_threads";
2+
import { createRequire } from "node:module";
3+
4+
interface WorkerRequest {
5+
messageId: number;
6+
taskName: string;
7+
data: unknown;
8+
}
9+
10+
interface WorkerSuccessResponse {
11+
messageId: number;
12+
result: unknown;
13+
}
14+
15+
interface WorkerErrorResponse {
16+
messageId: number;
17+
error: {
18+
message: string;
19+
stack?: string;
20+
};
21+
}
22+
23+
type WorkerResponse = WorkerSuccessResponse | WorkerErrorResponse;
24+
25+
let messageIdCounter = 0;
26+
const pendingPromises = new Map<
27+
number,
28+
{ resolve: (value: unknown) => void; reject: (error: Error) => void }
29+
>();
30+
31+
// Create the worker at module load time
32+
// This is safe because the heavy ai-tokenizer imports happen in the worker thread, not the main thread
33+
const requireForResolve = createRequire(__filename);
34+
const workerPath = requireForResolve.resolve("./tokenizer.worker");
35+
const worker = new Worker(workerPath);
36+
37+
// Handle messages from worker
38+
worker.on("message", (response: WorkerResponse) => {
39+
const pending = pendingPromises.get(response.messageId);
40+
if (!pending) {
41+
console.error(`[workerPool] No pending promise for messageId ${response.messageId}`);
42+
return;
43+
}
44+
45+
pendingPromises.delete(response.messageId);
46+
47+
if ("error" in response) {
48+
const error = new Error(response.error.message);
49+
error.stack = response.error.stack;
50+
pending.reject(error);
51+
} else {
52+
pending.resolve(response.result);
53+
}
54+
});
55+
56+
// Handle worker errors
57+
worker.on("error", (error) => {
58+
console.error("[workerPool] Worker error:", error);
59+
// Reject all pending promises
60+
for (const pending of pendingPromises.values()) {
61+
pending.reject(error);
62+
}
63+
pendingPromises.clear();
64+
});
65+
66+
// Handle worker exit
67+
worker.on("exit", (code) => {
68+
if (code !== 0) {
69+
console.error(`[workerPool] Worker stopped with exit code ${code}`);
70+
const error = new Error(`Worker stopped with exit code ${code}`);
71+
for (const pending of pendingPromises.values()) {
72+
pending.reject(error);
73+
}
74+
pendingPromises.clear();
75+
}
76+
});
77+
78+
/**
79+
* Run a task on the worker thread
80+
* @param taskName The name of the task to run (e.g., "countTokens", "encodingName")
81+
* @param data The data to pass to the task
82+
* @returns A promise that resolves with the task result
83+
*/
84+
export function run<T>(taskName: string, data: unknown): Promise<T> {
85+
const messageId = messageIdCounter++;
86+
const request: WorkerRequest = { messageId, taskName, data };
87+
88+
return new Promise<T>((resolve, reject) => {
89+
pendingPromises.set(messageId, {
90+
resolve: resolve as (value: unknown) => void,
91+
reject,
92+
});
93+
worker.postMessage(request);
94+
});
95+
}

0 commit comments

Comments
 (0)