Skip to content

Commit 126e8c1

Browse files
committed
fix: address PR #5354 review feedback
- Fixed duplicate 'codeIndex' sections in all 17 non-English locale files - Implemented memory management safeguards in WorkerPool: - Added queue size limits (default: 1000 tasks) - Added memory threshold monitoring (90% limit) - Added worker health checks with stale worker detection - Added status reporting method for monitoring - Made throttling interval configurable in CodeIndexStateManager - Added comprehensive worker message protocol documentation - Updated all WorkerPool instantiations to use new options-based constructor - Added tests for memory management features - Fixed unhandled promise rejection in health check test - Fixed ESLint warnings about Function type usage
1 parent ace9510 commit 126e8c1

26 files changed

+547
-600
lines changed

src/services/code-index/__tests__/state-manager-throttling.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import { CodeIndexStateManager, IndexingState } from "../state-manager"
44
// Mock vscode module
55
vi.mock("vscode", () => ({
66
EventEmitter: class EventEmitter {
7-
private listeners: Map<string, Function[]> = new Map()
7+
private listeners: Map<string, Array<(...args: any[]) => void>> = new Map()
88

9-
event = (listener: Function) => {
9+
event = (listener: (...args: any[]) => void) => {
1010
if (!this.listeners.has("event")) {
1111
this.listeners.set("event", [])
1212
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"
2+
import { WorkerPool } from "../workers/worker-pool"
3+
import { EventEmitter } from "events"
4+
5+
// Mock worker_threads module
6+
vi.mock("worker_threads")
7+
8+
// Mock os module
9+
vi.mock("os", () => ({
10+
cpus: vi.fn(() => new Array(4)),
11+
}))
12+
13+
describe("WorkerPool - Memory Management", () => {
14+
let pool: WorkerPool | null = null
15+
let mockWorkers: any[] = []
16+
let Worker: any
17+
18+
beforeEach(async () => {
19+
vi.clearAllTimers()
20+
vi.clearAllMocks()
21+
mockWorkers = []
22+
pool = null
23+
24+
// Get the mocked Worker class
25+
const workerThreads = await import("worker_threads")
26+
Worker = workerThreads.Worker
27+
28+
// Reset and re-implement the Worker mock
29+
vi.mocked(Worker).mockClear()
30+
vi.mocked(Worker).mockImplementation((filename: string | URL) => {
31+
const worker = new EventEmitter()
32+
;(worker as any).terminate = vi.fn().mockResolvedValue(undefined)
33+
;(worker as any).postMessage = vi.fn()
34+
;(worker as any).scriptPath = filename
35+
mockWorkers.push(worker)
36+
37+
// Simulate worker ready after a short delay
38+
setTimeout(() => worker.emit("online"), 10)
39+
40+
return worker as any
41+
})
42+
})
43+
44+
afterEach(async () => {
45+
if (pool) {
46+
try {
47+
await pool.shutdown()
48+
} catch (e) {
49+
// Ignore shutdown errors in tests
50+
}
51+
pool = null
52+
}
53+
54+
vi.clearAllMocks()
55+
vi.clearAllTimers()
56+
mockWorkers = []
57+
})
58+
59+
describe("queue size limits", () => {
60+
it("should reject tasks when queue is full", async () => {
61+
pool = new WorkerPool("/path/to/worker.js", {
62+
maxWorkers: 1,
63+
maxQueueSize: 2,
64+
})
65+
66+
// Fill up the worker
67+
const task1 = pool.execute({ type: "task1" })
68+
69+
// Fill up the queue
70+
const task2 = pool.execute({ type: "task2" })
71+
const task3 = pool.execute({ type: "task3" })
72+
73+
// This should be rejected
74+
await expect(pool.execute({ type: "task4" })).rejects.toThrow("Worker pool queue is full")
75+
76+
// Complete tasks to avoid hanging
77+
await new Promise((resolve) => setTimeout(resolve, 20))
78+
mockWorkers[0].emit("message", { success: true, data: "result1" })
79+
mockWorkers[0].emit("message", { success: true, data: "result2" })
80+
mockWorkers[0].emit("message", { success: true, data: "result3" })
81+
82+
await Promise.all([task1, task2, task3])
83+
})
84+
})
85+
86+
describe("memory threshold", () => {
87+
it("should reject tasks when memory usage is too high", async () => {
88+
// Mock process.memoryUsage to return high memory
89+
const originalMemoryUsage = process.memoryUsage
90+
process.memoryUsage = vi.fn().mockReturnValue({
91+
heapUsed: 600 * 1024 * 1024, // 600MB
92+
heapTotal: 800 * 1024 * 1024,
93+
external: 0,
94+
arrayBuffers: 0,
95+
rss: 900 * 1024 * 1024,
96+
}) as any
97+
98+
pool = new WorkerPool("/path/to/worker.js", {
99+
maxWorkers: 2,
100+
memoryThresholdMB: 500, // 500MB threshold
101+
})
102+
103+
await expect(pool.execute({ type: "test" })).rejects.toThrow(/Memory usage too high/)
104+
105+
// Restore original
106+
process.memoryUsage = originalMemoryUsage
107+
})
108+
})
109+
110+
describe("health checks", () => {
111+
it("should perform periodic health checks", async () => {
112+
vi.useFakeTimers()
113+
const consoleDebugSpy = vi.spyOn(console, "debug").mockImplementation(() => {})
114+
115+
pool = new WorkerPool("/path/to/worker.js", {
116+
maxWorkers: 2,
117+
healthCheckIntervalMs: 1000,
118+
})
119+
120+
// Advance time to trigger health check
121+
vi.advanceTimersByTime(1000)
122+
123+
// Health check should log status
124+
expect(consoleDebugSpy).toHaveBeenCalledWith(expect.stringContaining("[WorkerPool] Status"))
125+
126+
consoleDebugSpy.mockRestore()
127+
vi.useRealTimers()
128+
})
129+
130+
it("should replace stale workers", async () => {
131+
vi.useFakeTimers()
132+
const consoleWarnSpy = vi.spyOn(console, "warn").mockImplementation(() => {})
133+
134+
pool = new WorkerPool("/path/to/worker.js", {
135+
maxWorkers: 1,
136+
healthCheckIntervalMs: 1000,
137+
})
138+
139+
// Start a task and handle the rejection
140+
const taskPromise = pool.execute({ type: "test" }).catch((e: Error) => e)
141+
142+
// Wait for task to be assigned
143+
await vi.advanceTimersByTimeAsync(20)
144+
145+
// Advance time past stale threshold (1 minute)
146+
await vi.advanceTimersByTimeAsync(61000)
147+
148+
// Task should be rejected due to timeout
149+
const result = await taskPromise
150+
expect(result).toBeInstanceOf(Error)
151+
expect((result as Error).message).toBe("Worker task timed out")
152+
153+
// Should log warning about stale worker
154+
expect(consoleWarnSpy).toHaveBeenCalledWith(expect.stringContaining("Worker appears to be stale"))
155+
156+
consoleWarnSpy.mockRestore()
157+
vi.useRealTimers()
158+
})
159+
})
160+
161+
describe("status reporting", () => {
162+
it("should provide pool status", async () => {
163+
pool = new WorkerPool("/path/to/worker.js", {
164+
maxWorkers: 2,
165+
maxQueueSize: 100,
166+
memoryThresholdMB: 512,
167+
})
168+
169+
const status = pool.getStatus()
170+
171+
expect(status).toMatchObject({
172+
activeWorkers: 0,
173+
availableWorkers: 2,
174+
queueLength: 0,
175+
maxQueueSize: 100,
176+
memoryThresholdMB: 512,
177+
})
178+
expect(status.memoryUsageMB).toBeGreaterThan(0)
179+
})
180+
})
181+
})

src/services/code-index/__tests__/worker-pool.spec.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ describe("WorkerPool", () => {
6060

6161
describe("initialization", () => {
6262
it("should create workers with specified concurrency", () => {
63-
pool = new WorkerPool("/path/to/worker.js", 4)
63+
pool = new WorkerPool("/path/to/worker.js", { maxWorkers: 4 })
6464
expect(Worker).toHaveBeenCalledTimes(4)
6565
expect(Worker).toHaveBeenCalledWith("/path/to/worker.js")
6666
})
@@ -82,7 +82,7 @@ describe("WorkerPool", () => {
8282

8383
describe("task execution", () => {
8484
beforeEach(() => {
85-
pool = new WorkerPool("/path/to/worker.js", 2)
85+
pool = new WorkerPool("/path/to/worker.js", { maxWorkers: 2 })
8686
})
8787

8888
it("should execute tasks and return results", async () => {
@@ -97,7 +97,7 @@ describe("WorkerPool", () => {
9797

9898
// Simulate worker response
9999
const worker = mockWorkers[0]
100-
const messageHandler = worker.listeners("message")[0] as Function
100+
const messageHandler = worker.listeners("message")[0] as (...args: any[]) => void
101101
messageHandler(expectedResult)
102102

103103
const result = await resultPromise
@@ -142,7 +142,7 @@ describe("WorkerPool", () => {
142142
expect(mockWorkers[1].postMessage).toHaveBeenCalledTimes(1)
143143

144144
// Complete first task
145-
const messageHandler0 = mockWorkers[0].listeners("message")[0] as Function
145+
const messageHandler0 = mockWorkers[0].listeners("message")[0] as (...args: any[]) => void
146146
messageHandler0({ success: true, data: "task1" })
147147

148148
// Wait for queue processing
@@ -154,7 +154,7 @@ describe("WorkerPool", () => {
154154

155155
// Complete remaining tasks
156156
messageHandler0({ success: true, data: "task3" })
157-
const messageHandler1 = mockWorkers[1].listeners("message")[0] as Function
157+
const messageHandler1 = mockWorkers[1].listeners("message")[0] as (...args: any[]) => void
158158
messageHandler1({ success: true, data: "task2" })
159159

160160
const results = await Promise.all(promises)
@@ -206,7 +206,7 @@ describe("WorkerPool", () => {
206206

207207
describe("shutdown", () => {
208208
beforeEach(() => {
209-
pool = new WorkerPool("/path/to/worker.js", 2)
209+
pool = new WorkerPool("/path/to/worker.js", { maxWorkers: 2 })
210210
})
211211

212212
it("should terminate all workers", async () => {
@@ -218,7 +218,7 @@ describe("WorkerPool", () => {
218218

219219
it("should reject pending tasks on shutdown", async () => {
220220
// Create a new pool with limited workers
221-
const testPool = new WorkerPool("/path/to/worker.js", 1)
221+
const testPool = new WorkerPool("/path/to/worker.js", { maxWorkers: 1 })
222222

223223
// Wait for workers to be ready
224224
await new Promise((resolve) => setTimeout(resolve, 20))
@@ -272,7 +272,7 @@ describe("WorkerPool", () => {
272272

273273
describe("error handling", () => {
274274
beforeEach(() => {
275-
pool = new WorkerPool("/path/to/worker.js", 1)
275+
pool = new WorkerPool("/path/to/worker.js", { maxWorkers: 1 })
276276
})
277277

278278
it("should handle worker initialization errors", async () => {
@@ -316,7 +316,7 @@ describe("WorkerPool", () => {
316316

317317
describe("performance", () => {
318318
it("should process tasks concurrently", async () => {
319-
pool = new WorkerPool("/path/to/worker.js", 3)
319+
pool = new WorkerPool("/path/to/worker.js", { maxWorkers: 3 })
320320

321321
const tasks = Array(6)
322322
.fill(null)
@@ -331,7 +331,7 @@ describe("WorkerPool", () => {
331331
// Complete first batch of tasks
332332
for (let i = 0; i < 3; i++) {
333333
const worker = mockWorkers[i]
334-
const handler = worker.listeners("message")[0] as Function
334+
const handler = worker.listeners("message")[0] as (...args: any[]) => void
335335
handler({ success: true, data: `result${i}` })
336336
}
337337

@@ -341,7 +341,7 @@ describe("WorkerPool", () => {
341341
// Complete second batch
342342
for (let i = 0; i < 3; i++) {
343343
const worker = mockWorkers[i]
344-
const handler = worker.listeners("message")[0] as Function
344+
const handler = worker.listeners("message")[0] as (...args: any[]) => void
345345
handler({ success: true, data: `result${i + 3}` })
346346
}
347347

src/services/code-index/manager.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ export class CodeIndexManager {
5252
private constructor(workspacePath: string, context: vscode.ExtensionContext) {
5353
this.workspacePath = workspacePath
5454
this.context = context
55+
// Create state manager with default throttle interval (500ms)
56+
// Can be customized by passing a different value to the constructor
5557
this._stateManager = new CodeIndexStateManager()
5658
}
5759

src/services/code-index/processors/scanner.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ export class DirectoryScanner implements IDirectoryScanner {
4242
) {
4343
// Initialize worker pool for file processing
4444
try {
45-
this.workerPool = new WorkerPool(
46-
path.join(__dirname, "../workers/file-processor.worker.js"),
47-
PARSING_CONCURRENCY,
48-
)
45+
this.workerPool = new WorkerPool(path.join(__dirname, "../workers/file-processor.worker.js"), {
46+
maxWorkers: PARSING_CONCURRENCY,
47+
})
4948
} catch (error) {
5049
console.warn(
5150
"[DirectoryScanner] Failed to initialize worker pool, falling back to main thread processing:",

src/services/code-index/state-manager.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ export class CodeIndexStateManager {
1313
// Throttling properties
1414
private _throttleTimer?: NodeJS.Timeout
1515
private _pendingUpdate?: ReturnType<typeof this.getCurrentStatus>
16-
private readonly THROTTLE_INTERVAL_MS = 500
16+
private readonly throttleIntervalMs: number
17+
18+
constructor(throttleIntervalMs: number = 500) {
19+
this.throttleIntervalMs = throttleIntervalMs
20+
}
1721

1822
// --- Public API ---
1923

@@ -128,7 +132,7 @@ export class CodeIndexStateManager {
128132
this._pendingUpdate = undefined
129133
}
130134
this._throttleTimer = undefined
131-
}, this.THROTTLE_INTERVAL_MS)
135+
}, this.throttleIntervalMs)
132136
}
133137

134138
public dispose(): void {

0 commit comments

Comments
 (0)