Skip to content

Commit 10810ce

Browse files
Various prebuild fixes (#20057)
* log it all * fix failing image logs prebuild updates * make sure we dispose of requests properly * minor cleanup * handle aborts gracefully * Try using a tailing library for reading supervisor task logs * Fix caching requests on Firefox * Revert "Try using a tailing library for reading supervisor task logs" This reverts commit fdc3183. * Change log order Make sure we wait for the line to be written first * Tweak abortion error message * Remove extra log line
1 parent e7ffef9 commit 10810ce

File tree

6 files changed

+41
-144
lines changed

6 files changed

+41
-144
lines changed

components/dashboard/src/data/prebuilds/prebuild-logs-emitter.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,29 +145,40 @@ function streamPrebuildLogs(
145145
};
146146

147147
let response: Response | undefined = undefined;
148+
let abortController: AbortController | undefined = undefined;
148149
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined = undefined;
149150
try {
151+
abortController = new AbortController();
152+
disposables.push({
153+
dispose: async () => {
154+
abortController?.abort();
155+
await reader?.cancel();
156+
},
157+
});
150158
console.debug("fetching from streamUrl: " + streamUrl);
151159
response = await fetch(streamUrl, {
152160
method: "GET",
153-
cache: "no-cache",
161+
cache: "reload",
154162
credentials: "include",
155-
keepalive: true,
156163
headers: {
157164
TE: "trailers", // necessary to receive stream status code
158165
},
166+
signal: abortController.signal,
159167
redirect: "follow",
160168
});
161169
reader = response.body?.getReader();
162170
if (!reader) {
163171
await retryBackoff("no reader");
164172
return;
165173
}
166-
disposables.push({ dispose: () => reader?.cancel() });
167174

168175
const decoder = new TextDecoder("utf-8");
169176
let chunk = await reader.read();
170177
while (!chunk.done) {
178+
if (disposables.disposed) {
179+
// stop reading when disposed
180+
return;
181+
}
171182
const msg = decoder.decode(chunk.value, { stream: true });
172183

173184
// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).
@@ -199,6 +210,10 @@ function streamPrebuildLogs(
199210
return;
200211
}
201212
} catch (err) {
213+
if (err instanceof DOMException && err.name === "AbortError") {
214+
console.debug("stopped watching headless logs, not retrying: method got disposed of");
215+
return;
216+
}
202217
reader?.cancel().catch(console.debug);
203218
if (err.code === 400) {
204219
// sth is really off, and we _should not_ retry

components/public-api/typescript-common/src/prebuild-utils.ts

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
* See License.AGPL.txt in the project root for license information.
55
*/
66

7-
import { Disposable, DisposableCollection, HEADLESS_LOG_STREAM_STATUS_CODE_REGEX } from "@gitpod/gitpod-protocol";
87
import { ApplicationError, ErrorCode, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error";
98

109
/**
@@ -44,134 +43,3 @@ export function getPrebuildErrorMessage(err: any) {
4443
}
4544
return `${PREBUILD_LOG_STREAM_ERROR}#${code}#${message}#${PREBUILD_LOG_STREAM_ERROR}`;
4645
}
47-
48-
const defaultBackoffTimes = 3;
49-
interface Options {
50-
includeCredentials: boolean;
51-
maxBackoffTimes?: number;
52-
onEnd?: () => void;
53-
}
54-
55-
/**
56-
* backoff fetch prebuild logs
57-
* @returns a function to cancel fetching
58-
*/
59-
export function onDownloadPrebuildLogsUrl(
60-
streamUrl: string,
61-
onLog: (message: string) => void,
62-
options: Options,
63-
): () => void {
64-
const disposables = new DisposableCollection();
65-
66-
// initializing non-empty here to use this as a stopping signal for the retries down below
67-
disposables.push(Disposable.NULL);
68-
69-
// retry configuration goes here
70-
const initialDelaySeconds = 1;
71-
const backoffFactor = 1.2;
72-
const maxBackoffSeconds = 5;
73-
let delayInSeconds = initialDelaySeconds;
74-
let currentBackoffTimes = 0;
75-
76-
const startWatchingLogs = async () => {
77-
const retryBackoff = async (reason: string, err?: Error) => {
78-
delayInSeconds = Math.min(delayInSeconds * backoffFactor, maxBackoffSeconds);
79-
console.debug("re-trying headless-logs because: " + reason, err);
80-
await new Promise((resolve) => {
81-
setTimeout(resolve, delayInSeconds * 1000);
82-
});
83-
if (disposables.disposed) {
84-
return; // and stop retrying
85-
}
86-
startWatchingLogs().catch(console.error);
87-
};
88-
89-
let response: Response | undefined = undefined;
90-
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined = undefined;
91-
try {
92-
currentBackoffTimes += 1;
93-
console.debug("fetching from streamUrl: " + streamUrl);
94-
response = await fetch(streamUrl, {
95-
method: "GET",
96-
cache: "no-cache",
97-
credentials: options.includeCredentials ? "include" : undefined,
98-
keepalive: true,
99-
headers: {
100-
TE: "trailers", // necessary to receive stream status code
101-
},
102-
redirect: "follow",
103-
});
104-
reader = response.body?.getReader();
105-
if (!reader) {
106-
await retryBackoff("no reader");
107-
return;
108-
}
109-
disposables.push({ dispose: () => reader?.cancel() });
110-
111-
const decoder = new TextDecoder("utf-8");
112-
let chunk = await reader.read();
113-
while (!chunk.done) {
114-
const msg = decoder.decode(chunk.value, { stream: true });
115-
116-
// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).
117-
// So we resort to this hand-written solution:
118-
const matches = msg.match(HEADLESS_LOG_STREAM_STATUS_CODE_REGEX);
119-
const prebuildMatches = matchPrebuildError(msg);
120-
if (matches) {
121-
if (matches.length < 2) {
122-
console.debug("error parsing log stream status code. msg: " + msg);
123-
} else {
124-
const code = parseStatusCode(matches[1]);
125-
if (code !== 200) {
126-
throw new ApplicationError(
127-
ErrorCodes.INTERNAL_SERVER_ERROR,
128-
`prebuild log download status code: ${code}`,
129-
);
130-
}
131-
}
132-
} else if (prebuildMatches && prebuildMatches.code === ErrorCodes.HEADLESS_LOG_NOT_YET_AVAILABLE) {
133-
// reset backoff because this error is expected
134-
delayInSeconds = initialDelaySeconds;
135-
currentBackoffTimes = 0;
136-
throw prebuildMatches;
137-
} else {
138-
onLog(msg);
139-
}
140-
141-
chunk = await reader.read();
142-
}
143-
} catch (err) {
144-
if (currentBackoffTimes > (options.maxBackoffTimes ?? defaultBackoffTimes)) {
145-
console.debug("stopped watching headless logs, max backoff reached", err);
146-
return;
147-
}
148-
if (err.code === 400) {
149-
// sth is really off, and we _should not_ retry
150-
console.debug("stopped watching headless logs", err);
151-
return;
152-
}
153-
await retryBackoff("error while listening to stream", err);
154-
} finally {
155-
reader?.cancel().catch(console.debug);
156-
if (options.onEnd) {
157-
options.onEnd();
158-
}
159-
}
160-
};
161-
startWatchingLogs().catch(console.error);
162-
163-
return () => {
164-
disposables.dispose();
165-
};
166-
}
167-
168-
function parseStatusCode(code: string | undefined): number | undefined {
169-
try {
170-
if (!code) {
171-
return undefined;
172-
}
173-
return Number.parseInt(code);
174-
} catch (err) {
175-
return undefined;
176-
}
177-
}

components/server/src/workspace/headless-log-controller.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,14 @@ export class HeadlessLogController {
6969
const user = req.user as User; // verified by authenticateAndAuthorize
7070
await runWithSubjectId(SubjectId.fromUserId(user.id), async () => {
7171
try {
72-
const instanceId = req.params.instanceId;
73-
const terminalId = req.params.terminalId;
72+
const { instanceId, terminalId } = req.params;
7473

7574
const logCtx = { userId: user.id, instanceId };
7675
try {
7776
const head = {
7877
"Content-Type": "text/html; charset=utf-8", // is text/plain, but with that node.js won't stream...
7978
"Transfer-Encoding": "chunked",
80-
"Cache-Control": "no-cache, no-store, must-revalidate", // make sure stream are not re-used on reconnect
79+
"Cache-Control": "no-cache, no-store, must-revalidate", // make sure streams are not re-used on reconnect
8180
};
8281
res.writeHead(200, head);
8382

@@ -291,12 +290,11 @@ export class HeadlessLogController {
291290
return;
292291
}
293292

294-
const prebuildId = req.params.prebuildId;
293+
const { prebuildId, taskId } = req.params;
295294
if (!uuidValidate(prebuildId)) {
296295
res.status(400).send("prebuildId is invalid");
297296
return;
298297
}
299-
const { taskId } = req.params;
300298
const logCtx = { userId: user.id, prebuildId, taskId };
301299

302300
let firstChunk = true;

components/server/src/workspace/workspace-starter.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ export class WorkspaceStarter {
851851
try {
852852
if (workspace.type === "prebuild") {
853853
const prebuild = await this.workspaceDb.trace({ span }).findPrebuildByWorkspaceID(workspace.id);
854-
if (prebuild && prebuild.state !== "failed") {
854+
if (prebuild && prebuild.state !== "failed" && prebuild.projectId) {
855855
prebuild.state = "failed";
856856
prebuild.error = err.toString();
857857

@@ -860,6 +860,12 @@ export class WorkspaceStarter {
860860
type: HeadlessWorkspaceEventType.Failed,
861861
workspaceID: workspace.id,
862862
});
863+
await this.publisher.publishPrebuildUpdate({
864+
status: "failed",
865+
prebuildID: prebuild.id,
866+
projectID: prebuild.projectId,
867+
workspaceID: workspace.id,
868+
});
863869
}
864870
}
865871
} catch (err) {

components/supervisor/pkg/supervisor/services.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,7 @@ func (s *taskService) ListenToOutput(req *api.ListenToOutputRequest, srv api.Tas
11331133
if err == io.EOF {
11341134
if isClosed.Load() {
11351135
// We are done
1136+
log.Println("[prebuildlog]: closing because task state is done and we are done reading")
11361137
return nil
11371138
}
11381139
}
@@ -1149,8 +1150,10 @@ func (s *taskService) ListenToOutput(req *api.ListenToOutputRequest, srv api.Tas
11491150

11501151
select {
11511152
case <-srv.Context().Done():
1153+
log.Println("[prebuildlog]: closing because context done")
11521154
return nil
11531155
case <-s.willShutdownCtx.Done():
1156+
log.Println("[prebuildlog]: closing because willShutdownCtx fired")
11541157
return nil
11551158
case <-closedChannel:
11561159
continue

components/supervisor/pkg/supervisor/tasks.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,13 @@ func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup, successChan
323323
return true
324324
})
325325

326+
taskWatchWg := &sync.WaitGroup{}
327+
326328
go func(t *task, term *terminal.Term) {
327329
state, err := term.Wait()
330+
taskLog.Info("task terminal has been closed. Waiting for watch() to finish...")
331+
taskWatchWg.Wait()
332+
taskLog.Info("watch() has finished, setting task state to closed")
328333
if state != nil {
329334
if state.Success() {
330335
t.successChan <- taskSuccessful
@@ -341,11 +346,10 @@ func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup, successChan
341346

342347
t.successChan <- taskFailed(fmt.Sprintf("%s: %s", msg, t.lastOutput))
343348
}
344-
taskLog.Info("task terminal has been closed")
345349
tm.setTaskState(t, api.TaskState_closed)
346350
}(t, term)
347351

348-
tm.watch(t, term)
352+
tm.watch(t, term, taskWatchWg)
349353

350354
if t.command != "" {
351355
term.PTY.Write([]byte(t.command + "\n"))
@@ -446,7 +450,7 @@ func prebuildLogFileName(task *task, storeLocation string) string {
446450
return logs.PrebuildLogFileName(storeLocation, task.Id)
447451
}
448452

449-
func (tm *tasksManager) watch(task *task, term *terminal.Term) {
453+
func (tm *tasksManager) watch(task *task, term *terminal.Term, wg *sync.WaitGroup) {
450454
if !tm.config.isPrebuild() {
451455
return
452456
}
@@ -462,6 +466,9 @@ func (tm *tasksManager) watch(task *task, term *terminal.Term) {
462466
go func() {
463467
defer stdout.Close()
464468

469+
wg.Add(1)
470+
defer wg.Done()
471+
465472
var (
466473
fileName = prebuildLogFileName(task, tm.storeLocation)
467474
oldFileName = fileName + "-old"

0 commit comments

Comments
 (0)