Skip to content

Commit 994b6ec

Browse files
[supervisor] Make sure we read the last chunk of message when task is closed (#20062)
* [supervisor] Make sure we read the last chunk of message when task is closed * [server, dashboard] Prebuild logs: stream as UInt8Array instead of string, and make sure we send the last chunk before closing the response * Make log buffer more efficient Merging done with `TypedArray.prototype.set()` as suggested here: https://stackoverflow.com/a/73074813/10199319 * [dashboard] PrebuildTaskTab: use hash(err.message) as toastId * [server] prebuild logs: Only call res.end after a timeout, so clients can continue reading from the stream * [dashboard] prebuild logs emitter: If we receive code "200", forward prefixChunk, and stop streaming * fixup! [dashboard] PrebuildTaskTab: use hash(err.message) as toastId --------- Co-authored-by: Filip Troníček <[email protected]>
1 parent a4c1a31 commit 994b6ec

File tree

11 files changed

+193
-226
lines changed

11 files changed

+193
-226
lines changed

components/dashboard/src/components/PrebuildLogs.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ export default function PrebuildLogs(props: PrebuildLogsProps) {
103103
if (!content) {
104104
return;
105105
}
106-
logsEmitter.emit("logs", content.text);
106+
logsEmitter.emit("logs", content.data);
107107
},
108108
}),
109109
);
@@ -173,7 +173,12 @@ export default function PrebuildLogs(props: PrebuildLogsProps) {
173173
<div className="rounded-xl overflow-hidden bg-gray-100 dark:bg-gray-800 flex flex-col mb-8">
174174
<div className="h-96 flex">
175175
<Suspense fallback={<div />}>
176-
<WorkspaceLogs classes="h-full w-full" logsEmitter={logsEmitter} errorMessage={error?.message} />
176+
<WorkspaceLogs
177+
taskId="undefined"
178+
classes="h-full w-full"
179+
logsEmitter={logsEmitter}
180+
errorMessage={error?.message}
181+
/>
177182
</Suspense>
178183
</div>
179184
<div className="w-full bottom-0 h-20 px-6 bg-gray-50 dark:bg-gray-800 border-t border-gray-200 dark:border-gray-600 flex flex-row items-center space-x-2">

components/dashboard/src/components/WorkspaceLogs.tsx

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const lightTheme: ITheme = {
2626
};
2727

2828
export interface Props {
29+
taskId: string;
2930
logsEmitter: EventEmitter;
3031
errorMessage?: string;
3132
classes?: string;
@@ -56,14 +57,14 @@ export default function WorkspaceLogs({ logsEmitter, errorMessage, classes, xter
5657
terminal.loadAddon(fitAddon);
5758
terminal.open(xTermParentRef.current);
5859

59-
let logBuffer = "";
60+
let logBuffer = new Uint8Array();
6061
let isWriting = false;
6162

6263
const processNextLog = () => {
6364
if (isWriting || logBuffer.length === 0) return;
6465

65-
const logs = logBuffer.slice(0, MAX_CHUNK_SIZE);
66-
logBuffer = logBuffer.slice(logs.length);
66+
const logs = logBuffer.subarray(0, MAX_CHUNK_SIZE);
67+
logBuffer = logBuffer.subarray(logs.length);
6768
if (logs) {
6869
isWriting = true;
6970
terminal.write(logs, () => {
@@ -73,16 +74,20 @@ export default function WorkspaceLogs({ logsEmitter, errorMessage, classes, xter
7374
}
7475
};
7576

76-
const logListener = (logs: string) => {
77+
const logListener = (logs: Uint8Array) => {
7778
if (!logs) return;
7879

79-
logBuffer += logs;
80+
const newBuffer = new Uint8Array(logBuffer.length + logs.length);
81+
newBuffer.set(logBuffer);
82+
newBuffer.set(logs, logBuffer.length);
83+
logBuffer = newBuffer;
84+
8085
processNextLog();
8186
};
8287

8388
const resetListener = () => {
8489
terminal.clear();
85-
logBuffer = "";
90+
logBuffer = new Uint8Array();
8691
isWriting = false;
8792
};
8893

components/dashboard/src/data/prebuilds/prebuild-logs-emitter.ts

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,14 @@ export function usePrebuildLogsEmitter(prebuild: PlainMessage<Prebuild>, taskId:
8080
const disposables = new DisposableCollection();
8181
disposables.push(
8282
streamPrebuildLogs(
83+
taskId,
8384
task.logUrl,
84-
(msg) => {
85-
const error = matchPrebuildError(msg);
86-
if (!error) {
87-
emitter.emit("logs", msg);
88-
} else {
89-
emitter.emit("logs-error", error);
90-
}
85+
(chunk) => {
86+
emitter.emit("logs", chunk);
87+
},
88+
(err) => {
89+
emitter.emit("logs-error", err);
9190
},
92-
async () => false,
9391
() => {
9492
emitter.markReachedEnd();
9593
},
@@ -110,9 +108,10 @@ export function usePrebuildLogsEmitter(prebuild: PlainMessage<Prebuild>, taskId:
110108
}
111109

112110
function streamPrebuildLogs(
111+
taskId: string,
113112
streamUrl: string,
114-
onLog: (chunk: string) => void,
115-
checkIsDone: () => Promise<boolean>,
113+
onLog: (chunk: Uint8Array) => void,
114+
onError: (err: Error) => void,
116115
onEnd?: () => void,
117116
): DisposableCollection {
118117
const disposables = new DisposableCollection();
@@ -127,10 +126,6 @@ function streamPrebuildLogs(
127126
let delayInSeconds = initialDelaySeconds;
128127

129128
const startWatchingLogs = async () => {
130-
if (await checkIsDone()) {
131-
return;
132-
}
133-
134129
const retryBackoff = async (reason: string, err?: Error) => {
135130
delayInSeconds = Math.min(delayInSeconds * backoffFactor, maxBackoffSeconds);
136131

@@ -145,25 +140,21 @@ function streamPrebuildLogs(
145140
};
146141

147142
let response: Response | undefined = undefined;
148-
let abortController: AbortController | undefined = undefined;
149143
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined = undefined;
150144
try {
151-
abortController = new AbortController();
152145
disposables.push({
153146
dispose: async () => {
154-
abortController?.abort();
155147
await reader?.cancel();
156148
},
157149
});
158150
console.debug("fetching from streamUrl: " + streamUrl);
159151
response = await fetch(streamUrl, {
160152
method: "GET",
161-
cache: "reload",
153+
cache: "no-store", // we don't want the browser to a) look at the cache, or b) update the cache (which would interrupt any running fetches to that resource!)
162154
credentials: "include",
163155
headers: {
164156
TE: "trailers", // necessary to receive stream status code
165157
},
166-
signal: abortController.signal,
167158
redirect: "follow",
168159
});
169160
reader = response.body?.getReader();
@@ -174,41 +165,51 @@ function streamPrebuildLogs(
174165

175166
const decoder = new TextDecoder("utf-8");
176167
let chunk = await reader.read();
168+
let received200 = false;
177169
while (!chunk.done) {
178170
if (disposables.disposed) {
179171
// stop reading when disposed
180172
return;
181173
}
182-
const msg = decoder.decode(chunk.value, { stream: true });
183174

184175
// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).
185176
// So we resort to this hand-written solution:
177+
const msg = decoder.decode(chunk.value, { stream: true });
186178
const matches = msg.match(HEADLESS_LOG_STREAM_STATUS_CODE_REGEX);
187179
const prebuildMatches = matchPrebuildError(msg);
188180
if (matches) {
189181
if (matches.length < 2) {
190182
console.debug("error parsing log stream status code. msg: " + msg);
191183
} else {
184+
const prefix = msg.substring(0, matches.index);
185+
if (prefix) {
186+
const prefixChunk = new TextEncoder().encode(prefix);
187+
onLog(prefixChunk);
188+
}
192189
const code = parseStatusCode(matches[1]);
193190
if (code !== 200) {
194191
throw new StreamError(code);
195192
}
193+
if (code === 200) {
194+
received200 = true;
195+
break;
196+
}
197+
}
198+
} else if (prebuildMatches) {
199+
if (prebuildMatches.code === ErrorCodes.HEADLESS_LOG_NOT_YET_AVAILABLE) {
200+
// reset backoff because this error is expected
201+
delayInSeconds = initialDelaySeconds;
202+
throw prebuildMatches;
196203
}
197-
} else if (prebuildMatches && prebuildMatches.code === ErrorCodes.HEADLESS_LOG_NOT_YET_AVAILABLE) {
198-
// reset backoff because this error is expected
199-
delayInSeconds = initialDelaySeconds;
200-
throw prebuildMatches;
204+
onError(prebuildMatches);
201205
} else {
202-
onLog(msg);
206+
onLog(chunk.value);
203207
}
204208

205209
chunk = await reader.read();
206210
}
211+
console.info("[stream] end of stream", { received200 });
207212
reader.cancel();
208-
209-
if (await checkIsDone()) {
210-
return;
211-
}
212213
} catch (err) {
213214
if (err instanceof DOMException && err.name === "AbortError") {
214215
console.debug("stopped watching headless logs, not retrying: method got disposed of");

components/dashboard/src/prebuilds/detail/PrebuildTaskTab.tsx

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,35 +30,21 @@ export const PrebuildTaskTab = memo(({ taskId, prebuild }: Props) => {
3030
const history = useHistory();
3131

3232
useEffect(() => {
33-
const errorListener = (err: Error) => {
34-
if (err?.name === "AbortError") {
35-
return;
36-
}
37-
if (err instanceof ApplicationError && err.code === ErrorCodes.NOT_FOUND) {
38-
// We don't want to show a toast for this error, we handle it in the UI
39-
return;
40-
}
41-
if (err?.message) {
42-
toast("Fetching logs failed: " + err.message);
43-
}
44-
};
45-
46-
const logErrorListener = (err: ApplicationError) => {
33+
const logErrorListener = async (err: ApplicationError) => {
4734
if (err.code === ErrorCodes.NOT_FOUND) {
4835
setError(err);
4936
return;
5037
}
5138

52-
const toastId = crypto.randomUUID();
39+
const digest = await crypto.subtle.digest("sha256", new TextEncoder().encode(err.message + ":" + err.code));
40+
const toastId = new TextDecoder().decode(digest);
5341
toast("Fetching logs failed: " + err.message, { autoHide: false, id: toastId });
5442
setActiveToasts((prev) => new Set(prev).add(toastId));
5543
};
5644

57-
logEmitter.on("error", errorListener);
5845
logEmitter.on("logs-error", logErrorListener);
5946

6047
return () => {
61-
logEmitter.removeListener("error", errorListener);
6248
logEmitter.removeListener("logs-error", logErrorListener);
6349
setError(undefined);
6450
};
@@ -109,6 +95,7 @@ export const PrebuildTaskTab = memo(({ taskId, prebuild }: Props) => {
10995
key={prebuild.id + taskId}
11096
classes="w-full h-full"
11197
xtermClasses="absolute top-0 left-0 bottom-0 right-0 ml-6 my-0 mt-4"
98+
taskId={taskId}
11299
logsEmitter={logEmitter}
113100
/>
114101
</Suspense>

components/dashboard/src/start/StartWorkspace.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,7 @@ function ImageBuildView(props: ImageBuildViewProps) {
794794
if (!content) {
795795
return;
796796
}
797-
logsEmitter.emit("logs", content.text);
797+
logsEmitter.emit("logs", content.data);
798798
},
799799
});
800800

@@ -807,7 +807,7 @@ function ImageBuildView(props: ImageBuildViewProps) {
807807
return (
808808
<StartPage title="Building Image" phase={props.phase} workspaceId={props.workspaceId}>
809809
<Suspense fallback={<div />}>
810-
<WorkspaceLogs logsEmitter={logsEmitter} errorMessage={props.error?.message} />
810+
<WorkspaceLogs taskId="image-build" logsEmitter={logsEmitter} errorMessage={props.error?.message} />
811811
</Suspense>
812812
{!!props.onStartWithDefaultImage && (
813813
<>

components/gitpod-protocol/src/protocol.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -974,9 +974,7 @@ export namespace WorkspaceImageBuild {
974974
maxSteps?: number;
975975
}
976976
export interface LogContent {
977-
text: string;
978-
upToLine?: number;
979-
isDiff?: boolean;
977+
data: Uint8Array;
980978
}
981979
export type LogCallback = (info: StateInfo, content: LogContent | undefined) => void;
982980
export namespace LogLine {

components/server/src/prebuilds/prebuild-manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ export class PrebuildManager {
654654
userId: string,
655655
prebuildId: string,
656656
taskId: string,
657-
onLog: (message: string) => Promise<void>,
657+
onLog: (chunk: Uint8Array) => Promise<void>,
658658
): Promise<{ taskUrl: string } | undefined> {
659659
const prebuild = await this.getPrebuild({}, userId, prebuildId);
660660
const organizationId = prebuild?.info.teamId;

0 commit comments

Comments
 (0)