Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/test/json-lines-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import * as assert from "node:assert";

import { JsonLinesStream } from "../utils/json-lines-stream.js";

const setup = () => {
const stream = new JsonLinesStream();

const callbackCalls: unknown[] = [];
stream.onJson((data: unknown) => {
callbackCalls.push(data);
});

return {
stream,
callbackCalls,
};
};

suite("JsonLinesStream Test Suite", () => {
test("should parse and emit complete JSON lines messages", () => {
const { stream, callbackCalls } = setup();

// Test with multiple JSON objects in a single write
const testData = Buffer.from('{"key1":"value1"}\n{"key2":"value2"}\n');

stream.write(testData);

assert.strictEqual(callbackCalls.length, 2);
assert.deepStrictEqual(callbackCalls[0], { key1: "value1" });
assert.deepStrictEqual(callbackCalls[1], { key2: "value2" });
});

test("should handle incomplete JSON lines messages across multiple writes", () => {
const { stream, callbackCalls } = setup();

// First write with partial message
const firstChunk = Buffer.from('{"key":"value');
stream.write(firstChunk);

// Shouldn't emit anything yet
assert.strictEqual(callbackCalls.length, 0);

// Complete the message in second write
const secondChunk = Buffer.from('1"}\n');
stream.write(secondChunk);

// Now it should emit the complete message
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { key: "value1" });
});

test("should handle multiple messages in chunks", () => {
const { stream, callbackCalls } = setup();

// Write first message and part of second
const firstChunk = Buffer.from('{"first":1}\n{"second":');
stream.write(firstChunk);

// First message should be emitted
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { first: 1 });

// Complete second message and add third
const secondChunk = Buffer.from('2}\n{"third":3}\n');
stream.write(secondChunk);

// Should have all three messages now
assert.strictEqual(callbackCalls.length, 3);
assert.deepStrictEqual(callbackCalls[1], { second: 2 });
assert.deepStrictEqual(callbackCalls[2], { third: 3 });
});

test("should ignore invalid JSON lines", () => {
const { stream, callbackCalls } = setup();

const testData = Buffer.from('not json\n{"valid":true}\n{invalid}\n');

stream.write(testData);

// Should only emit the valid JSON object
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { valid: true });
});

test("should handle empty lines", () => {
const { stream, callbackCalls } = setup();

const testData = Buffer.from('\n\n{"key":"value"}\n\n');

stream.write(testData);

// Should only emit the valid JSON object
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { key: "value" });
});
});
54 changes: 24 additions & 30 deletions src/utils/container-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Disposable, LogOutputChannel } from "vscode";
import * as z from "zod/v4-mini";

import { createEmitter } from "./emitter.ts";
import { JsonLinesStream } from "./json-lines-stream.ts";

export type ContainerStatus = "running" | "stopping" | "stopped";

Expand Down Expand Up @@ -63,14 +64,6 @@ const DockerEventsSchema = z.object({
}),
});

function safeJsonParse(text: string): unknown {
try {
return JSON.parse(text);
} catch {
return undefined;
}
}

function listenToContainerStatus(
containerName: string,
outputChannel: LogOutputChannel,
Expand Down Expand Up @@ -130,32 +123,33 @@ function listenToContainerStatus(
throw new Error("Failed to get stdout from docker events process");
}

dockerEvents.stdout.on("data", (data: Buffer) => {
const lines = data.toString().split("\n").filter(Boolean);
for (const line of lines) {
const json = safeJsonParse(line);
const parsed = DockerEventsSchema.safeParse(json);
if (!parsed.success) {
continue;
}
const jsonlStream = new JsonLinesStream();
jsonlStream.onJson((json) => {
const parsed = DockerEventsSchema.safeParse(json);
if (!parsed.success) {
return;
}

if (parsed.data.Actor.Attributes.name !== containerName) {
continue;
}
if (parsed.data.Actor.Attributes.name !== containerName) {
return;
}

switch (parsed.data.Action) {
case "start":
onStatusChange("running");
break;
case "kill":
onStatusChange("stopping");
break;
case "die":
onStatusChange("stopped");
break;
}
outputChannel.debug(`[container.status]: ${parsed.data.Action}`);

switch (parsed.data.Action) {
case "start":
onStatusChange("running");
break;
case "kill":
onStatusChange("stopping");
break;
case "die":
onStatusChange("stopped");
break;
}
});

dockerEvents.stdout.pipe(jsonlStream);
} catch (error) {
// If we can't spawn the process, try again after a delay
scheduleRestart();
Expand Down
52 changes: 52 additions & 0 deletions src/utils/json-lines-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Writable } from "node:stream";

/**
* Safely parses a JSON string, returning undefined if parsing fails.
* @param str - The JSON string to parse.
* @returns The parsed object or undefined if invalid.
*/
export function safeJsonParse(str: string): unknown {
try {
return JSON.parse(str);
} catch {
return undefined;
}
}

/**
* Writable stream that buffers data until a newline,
* parses each line as JSON, and emits the parsed object.
*/
export class JsonLinesStream extends Writable {
constructor() {
let buffer = "";
super({
write: (chunk, _encoding, callback) => {
buffer += String(chunk);

let newlineIndex = buffer.indexOf("\n");
while (newlineIndex !== -1) {
const line = buffer.substring(0, newlineIndex).trim();
buffer = buffer.substring(newlineIndex + 1);

const json = safeJsonParse(line);
if (json !== undefined) {
this.emit("json", json);
}

newlineIndex = buffer.indexOf("\n");
}

callback();
},
});
}

/**
* Registers a listener for parsed JSON objects.
* @param listener - Function called with each parsed object.
*/
onJson(callback: (json: unknown) => void) {
this.on("json", callback);
}
}
Loading