diff --git a/src/test/json-lines-stream.test.ts b/src/test/json-lines-stream.test.ts new file mode 100644 index 0000000..88d7785 --- /dev/null +++ b/src/test/json-lines-stream.test.ts @@ -0,0 +1,96 @@ +import * as assert from "node:assert"; + +import { JsonLinesStream } from "../utils/json-lines-stream.js"; + +const setup = () => { + const stream = new JsonLinesStream(); + + const callbackCalls: unknown[] = []; + stream.onJson((data: unknown) => { + callbackCalls.push(data); + }); + + return { + stream, + callbackCalls, + }; +}; + +suite("JsonLinesStream Test Suite", () => { + test("should parse and emit complete JSON lines messages", () => { + const { stream, callbackCalls } = setup(); + + // Test with multiple JSON objects in a single write + const testData = Buffer.from('{"key1":"value1"}\n{"key2":"value2"}\n'); + + stream.write(testData); + + assert.strictEqual(callbackCalls.length, 2); + assert.deepStrictEqual(callbackCalls[0], { key1: "value1" }); + assert.deepStrictEqual(callbackCalls[1], { key2: "value2" }); + }); + + test("should handle incomplete JSON lines messages across multiple writes", () => { + const { stream, callbackCalls } = setup(); + + // First write with partial message + const firstChunk = Buffer.from('{"key":"value'); + stream.write(firstChunk); + + // Shouldn't emit anything yet + assert.strictEqual(callbackCalls.length, 0); + + // Complete the message in second write + const secondChunk = Buffer.from('1"}\n'); + stream.write(secondChunk); + + // Now it should emit the complete message + assert.strictEqual(callbackCalls.length, 1); + assert.deepStrictEqual(callbackCalls[0], { key: "value1" }); + }); + + test("should handle multiple messages in chunks", () => { + const { stream, callbackCalls } = setup(); + + // Write first message and part of second + const firstChunk = Buffer.from('{"first":1}\n{"second":'); + stream.write(firstChunk); + + // First message should be emitted + assert.strictEqual(callbackCalls.length, 1); + assert.deepStrictEqual(callbackCalls[0], { first: 1 }); + + // Complete second message and add third + const secondChunk = Buffer.from('2}\n{"third":3}\n'); + stream.write(secondChunk); + + // Should have all three messages now + assert.strictEqual(callbackCalls.length, 3); + assert.deepStrictEqual(callbackCalls[1], { second: 2 }); + assert.deepStrictEqual(callbackCalls[2], { third: 3 }); + }); + + test("should ignore invalid JSON lines", () => { + const { stream, callbackCalls } = setup(); + + const testData = Buffer.from('not json\n{"valid":true}\n{invalid}\n'); + + stream.write(testData); + + // Should only emit the valid JSON object + assert.strictEqual(callbackCalls.length, 1); + assert.deepStrictEqual(callbackCalls[0], { valid: true }); + }); + + test("should handle empty lines", () => { + const { stream, callbackCalls } = setup(); + + const testData = Buffer.from('\n\n{"key":"value"}\n\n'); + + stream.write(testData); + + // Should only emit the valid JSON object + assert.strictEqual(callbackCalls.length, 1); + assert.deepStrictEqual(callbackCalls[0], { key: "value" }); + }); +}); diff --git a/src/utils/container-status.ts b/src/utils/container-status.ts index 782323a..4b1a09e 100644 --- a/src/utils/container-status.ts +++ b/src/utils/container-status.ts @@ -4,6 +4,7 @@ import type { Disposable, LogOutputChannel } from "vscode"; import * as z from "zod/v4-mini"; import { createEmitter } from "./emitter.ts"; +import { JsonLinesStream } from "./json-lines-stream.ts"; export type ContainerStatus = "running" | "stopping" | "stopped"; @@ -63,14 +64,6 @@ const DockerEventsSchema = z.object({ }), }); -function safeJsonParse(text: string): unknown { - try { - return JSON.parse(text); - } catch { - return undefined; - } -} - function listenToContainerStatus( containerName: string, outputChannel: LogOutputChannel, @@ -130,32 +123,33 @@ function listenToContainerStatus( throw new Error("Failed to get stdout from docker events process"); } - dockerEvents.stdout.on("data", (data: Buffer) => { - const lines = data.toString().split("\n").filter(Boolean); - for (const line of lines) { - const json = safeJsonParse(line); - const parsed = DockerEventsSchema.safeParse(json); - if (!parsed.success) { - continue; - } + const jsonlStream = new JsonLinesStream(); + jsonlStream.onJson((json) => { + const parsed = DockerEventsSchema.safeParse(json); + if (!parsed.success) { + return; + } - if (parsed.data.Actor.Attributes.name !== containerName) { - continue; - } + if (parsed.data.Actor.Attributes.name !== containerName) { + return; + } - switch (parsed.data.Action) { - case "start": - onStatusChange("running"); - break; - case "kill": - onStatusChange("stopping"); - break; - case "die": - onStatusChange("stopped"); - break; - } + outputChannel.debug(`[container.status]: ${parsed.data.Action}`); + + switch (parsed.data.Action) { + case "start": + onStatusChange("running"); + break; + case "kill": + onStatusChange("stopping"); + break; + case "die": + onStatusChange("stopped"); + break; } }); + + dockerEvents.stdout.pipe(jsonlStream); } catch (error) { // If we can't spawn the process, try again after a delay scheduleRestart(); diff --git a/src/utils/json-lines-stream.ts b/src/utils/json-lines-stream.ts new file mode 100644 index 0000000..f337421 --- /dev/null +++ b/src/utils/json-lines-stream.ts @@ -0,0 +1,52 @@ +import { Writable } from "node:stream"; + +/** + * Safely parses a JSON string, returning undefined if parsing fails. + * @param str - The JSON string to parse. + * @returns The parsed object or undefined if invalid. + */ +export function safeJsonParse(str: string): unknown { + try { + return JSON.parse(str); + } catch { + return undefined; + } +} + +/** + * Writable stream that buffers data until a newline, + * parses each line as JSON, and emits the parsed object. + */ +export class JsonLinesStream extends Writable { + constructor() { + let buffer = ""; + super({ + write: (chunk, _encoding, callback) => { + buffer += String(chunk); + + let newlineIndex = buffer.indexOf("\n"); + while (newlineIndex !== -1) { + const line = buffer.substring(0, newlineIndex).trim(); + buffer = buffer.substring(newlineIndex + 1); + + const json = safeJsonParse(line); + if (json !== undefined) { + this.emit("json", json); + } + + newlineIndex = buffer.indexOf("\n"); + } + + callback(); + }, + }); + } + + /** + * Registers a listener for parsed JSON objects. + * @param listener - Function called with each parsed object. + */ + onJson(callback: (json: unknown) => void) { + this.on("json", callback); + } +}