Skip to content

Commit 6fa9d38

Browse files
authored
fix: partial line handling in docker events listener (#8)
Improves the handling of stdout data from the docker events process by buffering incomplete lines and only processing complete lines. This prevents errors when event data arrives in chunks that do not align with line boundaries.
1 parent 2ff0f6b commit 6fa9d38

File tree

3 files changed

+172
-30
lines changed

3 files changed

+172
-30
lines changed

src/test/json-lines-stream.test.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import * as assert from "node:assert";
2+
3+
import { JsonLinesStream } from "../utils/json-lines-stream.js";
4+
5+
const setup = () => {
6+
const stream = new JsonLinesStream();
7+
8+
const callbackCalls: unknown[] = [];
9+
stream.onJson((data: unknown) => {
10+
callbackCalls.push(data);
11+
});
12+
13+
return {
14+
stream,
15+
callbackCalls,
16+
};
17+
};
18+
19+
suite("JsonLinesStream Test Suite", () => {
20+
test("should parse and emit complete JSON lines messages", () => {
21+
const { stream, callbackCalls } = setup();
22+
23+
// Test with multiple JSON objects in a single write
24+
const testData = Buffer.from('{"key1":"value1"}\n{"key2":"value2"}\n');
25+
26+
stream.write(testData);
27+
28+
assert.strictEqual(callbackCalls.length, 2);
29+
assert.deepStrictEqual(callbackCalls[0], { key1: "value1" });
30+
assert.deepStrictEqual(callbackCalls[1], { key2: "value2" });
31+
});
32+
33+
test("should handle incomplete JSON lines messages across multiple writes", () => {
34+
const { stream, callbackCalls } = setup();
35+
36+
// First write with partial message
37+
const firstChunk = Buffer.from('{"key":"value');
38+
stream.write(firstChunk);
39+
40+
// Shouldn't emit anything yet
41+
assert.strictEqual(callbackCalls.length, 0);
42+
43+
// Complete the message in second write
44+
const secondChunk = Buffer.from('1"}\n');
45+
stream.write(secondChunk);
46+
47+
// Now it should emit the complete message
48+
assert.strictEqual(callbackCalls.length, 1);
49+
assert.deepStrictEqual(callbackCalls[0], { key: "value1" });
50+
});
51+
52+
test("should handle multiple messages in chunks", () => {
53+
const { stream, callbackCalls } = setup();
54+
55+
// Write first message and part of second
56+
const firstChunk = Buffer.from('{"first":1}\n{"second":');
57+
stream.write(firstChunk);
58+
59+
// First message should be emitted
60+
assert.strictEqual(callbackCalls.length, 1);
61+
assert.deepStrictEqual(callbackCalls[0], { first: 1 });
62+
63+
// Complete second message and add third
64+
const secondChunk = Buffer.from('2}\n{"third":3}\n');
65+
stream.write(secondChunk);
66+
67+
// Should have all three messages now
68+
assert.strictEqual(callbackCalls.length, 3);
69+
assert.deepStrictEqual(callbackCalls[1], { second: 2 });
70+
assert.deepStrictEqual(callbackCalls[2], { third: 3 });
71+
});
72+
73+
test("should ignore invalid JSON lines", () => {
74+
const { stream, callbackCalls } = setup();
75+
76+
const testData = Buffer.from('not json\n{"valid":true}\n{invalid}\n');
77+
78+
stream.write(testData);
79+
80+
// Should only emit the valid JSON object
81+
assert.strictEqual(callbackCalls.length, 1);
82+
assert.deepStrictEqual(callbackCalls[0], { valid: true });
83+
});
84+
85+
test("should handle empty lines", () => {
86+
const { stream, callbackCalls } = setup();
87+
88+
const testData = Buffer.from('\n\n{"key":"value"}\n\n');
89+
90+
stream.write(testData);
91+
92+
// Should only emit the valid JSON object
93+
assert.strictEqual(callbackCalls.length, 1);
94+
assert.deepStrictEqual(callbackCalls[0], { key: "value" });
95+
});
96+
});

src/utils/container-status.ts

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { Disposable, LogOutputChannel } from "vscode";
44
import * as z from "zod/v4-mini";
55

66
import { createEmitter } from "./emitter.ts";
7+
import { JsonLinesStream } from "./json-lines-stream.ts";
78

89
export type ContainerStatus = "running" | "stopping" | "stopped";
910

@@ -64,14 +65,6 @@ const DockerEventsSchema = z.object({
6465
}),
6566
});
6667

67-
function safeJsonParse(text: string): unknown {
68-
try {
69-
return JSON.parse(text);
70-
} catch {
71-
return undefined;
72-
}
73-
}
74-
7568
function listenToContainerStatus(
7669
containerName: string,
7770
outputChannel: LogOutputChannel,
@@ -131,32 +124,33 @@ function listenToContainerStatus(
131124
throw new Error("Failed to get stdout from docker events process");
132125
}
133126

134-
dockerEvents.stdout.on("data", (data: Buffer) => {
135-
const lines = data.toString().split("\n").filter(Boolean);
136-
for (const line of lines) {
137-
const json = safeJsonParse(line);
138-
const parsed = DockerEventsSchema.safeParse(json);
139-
if (!parsed.success) {
140-
continue;
141-
}
127+
const jsonlStream = new JsonLinesStream();
128+
jsonlStream.onJson((json) => {
129+
const parsed = DockerEventsSchema.safeParse(json);
130+
if (!parsed.success) {
131+
return;
132+
}
142133

143-
if (parsed.data.Actor.Attributes.name !== containerName) {
144-
continue;
145-
}
134+
if (parsed.data.Actor.Attributes.name !== containerName) {
135+
return;
136+
}
146137

147-
switch (parsed.data.Action) {
148-
case "start":
149-
onStatusChange("running");
150-
break;
151-
case "kill":
152-
onStatusChange("stopping");
153-
break;
154-
case "die":
155-
onStatusChange("stopped");
156-
break;
157-
}
138+
outputChannel.debug(`[container.status]: ${parsed.data.Action}`);
139+
140+
switch (parsed.data.Action) {
141+
case "start":
142+
onStatusChange("running");
143+
break;
144+
case "kill":
145+
onStatusChange("stopping");
146+
break;
147+
case "die":
148+
onStatusChange("stopped");
149+
break;
158150
}
159151
});
152+
153+
dockerEvents.stdout.pipe(jsonlStream);
160154
} catch (error) {
161155
// If we can't spawn the process, try again after a delay
162156
scheduleRestart();

src/utils/json-lines-stream.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { Writable } from "node:stream";
2+
3+
/**
4+
* Safely parses a JSON string, returning undefined if parsing fails.
5+
* @param str - The JSON string to parse.
6+
* @returns The parsed object or undefined if invalid.
7+
*/
8+
export function safeJsonParse(str: string): unknown {
9+
try {
10+
return JSON.parse(str);
11+
} catch {
12+
return undefined;
13+
}
14+
}
15+
16+
/**
17+
* Writable stream that buffers data until a newline,
18+
* parses each line as JSON, and emits the parsed object.
19+
*/
20+
export class JsonLinesStream extends Writable {
21+
constructor() {
22+
let buffer = "";
23+
super({
24+
write: (chunk, _encoding, callback) => {
25+
buffer += String(chunk);
26+
27+
let newlineIndex = buffer.indexOf("\n");
28+
while (newlineIndex !== -1) {
29+
const line = buffer.substring(0, newlineIndex).trim();
30+
buffer = buffer.substring(newlineIndex + 1);
31+
32+
const json = safeJsonParse(line);
33+
if (json !== undefined) {
34+
this.emit("json", json);
35+
}
36+
37+
newlineIndex = buffer.indexOf("\n");
38+
}
39+
40+
callback();
41+
},
42+
});
43+
}
44+
45+
/**
46+
* Registers a listener for parsed JSON objects.
47+
* @param listener - Function called with each parsed object.
48+
*/
49+
onJson(callback: (json: unknown) => void) {
50+
this.on("json", callback);
51+
}
52+
}

0 commit comments

Comments
 (0)