Skip to content

Commit 2571a96

Browse files
committed
backend/project: stream exec typing, debugging
1 parent 7b86786 commit 2571a96

File tree

4 files changed

+113
-41
lines changed

4 files changed

+113
-41
lines changed

src/packages/backend/exec-stream.ts

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,21 @@
33
* Core streaming logic that can be used by different services.
44
*/
55

6-
import { executeCode, asyncCache } from "./execute-code";
7-
import { abspath } from "./misc_node";
8-
import getLogger from "./logger";
6+
import { unreachable } from "@cocalc/util/misc";
97
import {
108
ExecuteCodeOutputAsync,
119
ExecuteCodeStats,
1210
ExecuteCodeStreamEvent,
1311
} from "@cocalc/util/types/execute-code";
12+
import { asyncCache, executeCode } from "./execute-code";
13+
import getLogger from "./logger";
14+
import { abspath } from "./misc_node";
15+
16+
export type StreamEvent = {
17+
type?: "job" | ExecuteCodeStreamEvent["type"];
18+
data?: ExecuteCodeStreamEvent["data"];
19+
error?: string;
20+
};
1421

1522
const logger = getLogger("backend:exec-stream");
1623

@@ -32,11 +39,11 @@ export interface ExecuteStreamOptions {
3239
verbose?: boolean;
3340
project_id?: string;
3441
debug?: string;
35-
stream: (event: any) => void;
42+
stream: (event: StreamEvent | null) => void;
3643
}
3744

3845
export async function executeStream(options: ExecuteStreamOptions) {
39-
const { stream, debug, project_id: reqProjectId, ...opts } = options;
46+
const { stream, debug, project_id, ...opts } = options;
4047

4148
// Log debug message for debugging purposes
4249
if (debug) {
@@ -47,7 +54,7 @@ export async function executeStream(options: ExecuteStreamOptions) {
4754
let done = false;
4855
let stats: ExecuteCodeStats = [];
4956

50-
// Create streaming callback
57+
// Create streaming callback, passed into execute-code::executeCode call
5158
const streamCB = (event: ExecuteCodeStreamEvent) => {
5259
if (done) {
5360
logger.debug(
@@ -65,12 +72,14 @@ export async function executeStream(options: ExecuteStreamOptions) {
6572
data: event.data,
6673
});
6774
break;
75+
6876
case "stderr":
6977
stream({
7078
type: "stderr",
7179
data: event.data,
7280
});
7381
break;
82+
7483
case "stats":
7584
// Stats are accumulated in the stats array for the final result
7685
if (
@@ -89,6 +98,7 @@ export async function executeStream(options: ExecuteStreamOptions) {
8998
});
9099
}
91100
break;
101+
92102
case "done":
93103
logger.debug(`executeStream: processing done event`);
94104
const result = event.data as ExecuteCodeOutputAsync;
@@ -101,12 +111,16 @@ export async function executeStream(options: ExecuteStreamOptions) {
101111
done = true;
102112
stream(null); // End the stream
103113
break;
114+
104115
case "error":
105116
logger.debug(`executeStream: processing error event`);
106117
stream({ error: event.data as string });
107118
done = true;
108119
stream(null);
109120
break;
121+
122+
default:
123+
unreachable(event.type);
110124
}
111125
};
112126

@@ -161,8 +175,8 @@ export async function executeStream(options: ExecuteStreamOptions) {
161175

162176
// Stats monitoring is now handled by execute-code.ts via streamCB
163177
// No need for duplicate monitoring here
164-
} catch (err) {
165-
stream({ error: `${err}` });
166-
stream(null); // End the stream
167-
}
178+
} catch (err) {
179+
stream({ error: `${err}` });
180+
stream(null); // End the stream
181+
}
168182
}

src/packages/backend/execute-code.ts

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@ import { envToInt } from "@cocalc/backend/misc/env-to-number";
2323
import { aggregate } from "@cocalc/util/aggregate";
2424
import { callback_opts } from "@cocalc/util/async-utils";
2525
import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";
26-
import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
26+
import {
27+
to_json,
28+
trunc,
29+
trunc_middle,
30+
uuid,
31+
walltime,
32+
} from "@cocalc/util/misc";
2733
import {
2834
ExecuteCodeOutputAsync,
2935
ExecuteCodeOutputBlocking,
@@ -239,7 +245,15 @@ async function executeCodeNoAggregate(
239245
const child = doSpawn(
240246
{ ...opts, origCommand, job_id, job_config },
241247
async (err, result) => {
242-
log.debug("async/doSpawn returned", { err, result });
248+
log.debug("async/doSpawn returned", {
249+
err,
250+
result: {
251+
type: result?.type,
252+
stdout: trunc_middle(result?.stdout),
253+
stderr: trunc_middle(result?.stderr),
254+
exit_code: result?.exit_code,
255+
},
256+
});
243257
try {
244258
const info: Omit<
245259
ExecuteCodeOutputAsync,
@@ -428,6 +442,45 @@ function doSpawn(
428442
log.debug("listening for stdout, stderr and exit_code...");
429443
}
430444

445+
// Batching mechanism for streaming to reduce message frequency -- otherwise there could be 100msg/s to process
446+
let streamBatchTimer: NodeJS.Timeout | undefined;
447+
const streamBuffer = { stdout: "", stderr: "" };
448+
449+
// Send batched stream data
450+
const sendBatchedStream = () => {
451+
if (!opts.streamCB) return;
452+
453+
const hasStdout = streamBuffer.stdout.length > 0;
454+
const hasStderr = streamBuffer.stderr.length > 0;
455+
456+
if (hasStdout || hasStderr) {
457+
// Send stdout if available
458+
if (hasStdout) {
459+
opts.streamCB({ type: "stdout", data: streamBuffer.stdout });
460+
streamBuffer.stdout = "";
461+
}
462+
// Send stderr if available
463+
if (hasStderr) {
464+
opts.streamCB({ type: "stderr", data: streamBuffer.stderr });
465+
streamBuffer.stderr = "";
466+
}
467+
}
468+
};
469+
470+
// Flush any remaining buffered data and cleanup
471+
const flushStreamBuffer = () => {
472+
if (streamBatchTimer) {
473+
clearInterval(streamBatchTimer);
474+
streamBatchTimer = undefined;
475+
}
476+
sendBatchedStream();
477+
};
478+
479+
// Start batch timer if streaming is enabled, every 100ms
480+
if (opts.streamCB) {
481+
streamBatchTimer = setInterval(sendBatchedStream, 100);
482+
}
483+
431484
function update_async(
432485
job_id: string | undefined,
433486
aspect: "stdout" | "stderr" | "pid",
@@ -456,16 +509,16 @@ function doSpawn(
456509
if (stdout.length < opts.max_output) {
457510
const newData = data.slice(0, opts.max_output - stdout.length);
458511
stdout += newData;
459-
// Stream only the new portion
512+
// Buffer the new portion for batched streaming
460513
if (opts.streamCB && stdout.length > prevLength) {
461-
opts.streamCB({ type: "stdout", data: newData });
514+
streamBuffer.stdout += newData;
462515
}
463516
}
464517
} else {
465518
stdout += data;
466-
// Stream the new data
519+
// Buffer the new data for batched streaming
467520
if (opts.streamCB) {
468-
opts.streamCB({ type: "stdout", data });
521+
streamBuffer.stdout += data;
469522
}
470523
}
471524
update_async(opts.job_id, "stdout", stdout);
@@ -478,16 +531,16 @@ function doSpawn(
478531
if (stderr.length < opts.max_output) {
479532
const newData = data.slice(0, opts.max_output - stderr.length);
480533
stderr += newData;
481-
// Stream only the new portion
534+
// Buffer the new portion for batched streaming
482535
if (opts.streamCB && stderr.length > prevLength) {
483-
opts.streamCB({ type: "stderr", data: newData });
536+
streamBuffer.stderr += newData;
484537
}
485538
}
486539
} else {
487540
stderr += data;
488-
// Stream the new data
541+
// Buffer the new data for batched streaming
489542
if (opts.streamCB) {
490-
opts.streamCB({ type: "stderr", data });
543+
streamBuffer.stderr += data;
491544
}
492545
}
493546
update_async(opts.job_id, "stderr", stderr);
@@ -524,8 +577,9 @@ function doSpawn(
524577
stderr += to_json(err);
525578
// a fundamental issue, we were not running some code
526579
ran_code = false;
527-
// For streaming, send error event immediately
580+
// For streaming, flush buffer and send error event
528581
if (opts.streamCB && opts.async_call && opts.job_id) {
582+
flushStreamBuffer(); // Flush any buffered data first
529583
const errorResult: ExecuteCodeOutputAsync = {
530584
type: "async",
531585
job_id: opts.job_id,
@@ -579,6 +633,11 @@ function doSpawn(
579633
// finally finish up – this will also terminate the monitor
580634
callback_done = true;
581635

636+
// Flush any remaining buffered stream data before finishing
637+
if (opts.streamCB) {
638+
flushStreamBuffer();
639+
}
640+
582641
if (timer != null) {
583642
clearTimeout(timer);
584643
timer = undefined;

src/packages/frontend/frame-editors/latex-editor/build-command.tsx

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,19 @@
77
Customization and selection of the build command.
88
*/
99

10-
import { Alert, Select, Form, Input } from "antd";
11-
import { List } from "immutable";
1210
import { SaveOutlined } from "@ant-design/icons";
11+
import { Alert, Form, Input, Select } from "antd";
12+
import { List } from "immutable";
13+
1314
import { Button } from "@cocalc/frontend/antd-bootstrap";
1415
import { React } from "@cocalc/frontend/app-framework";
1516
import { Icon, Loading, Paragraph } from "@cocalc/frontend/components";
1617
import { split } from "@cocalc/util/misc";
1718
import { Actions } from "./actions";
1819
import {
19-
build_command as latexmk_build_command,
2020
Engine,
2121
ENGINES,
22+
build_command as latexmk_build_command,
2223
} from "./latexmk";
2324

2425
// cmd could be undefined -- https://github.com/sagemathinc/cocalc/issues/3290
@@ -185,7 +186,7 @@ export const BuildCommand: React.FC<Props> = React.memo((props: Props) => {
185186
return (
186187
<Button
187188
disabled={!dirty}
188-
bsSize={"small"}
189+
bsSize={"xsmall"}
189190
bsStyle={dirty ? "success" : undefined}
190191
title={"Saves the modified command (or just hit the 'Return' key)"}
191192
onClick={() => handle_build_change()}

src/packages/project/exec-stream.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ Project-side exec-stream service that handles streaming execution requests.
33
Similar to how the project API service works, but specifically for streaming exec.
44
*/
55

6-
import { executeStream as backendExecuteStream } from "@cocalc/backend/exec-stream";
6+
import { executeStream, StreamEvent } from "@cocalc/backend/exec-stream";
7+
import { Message, Subscription } from "@cocalc/conat/core/client";
78
import { projectSubject } from "@cocalc/conat/names";
89
import { connectToConat } from "@cocalc/project/conat/connection";
910
import { project_id } from "@cocalc/project/data";
1011
import { getLogger } from "@cocalc/project/logger";
1112

1213
const logger = getLogger("project:exec-stream");
1314

15+
16+
1417
export function init() {
1518
serve();
1619
}
@@ -24,31 +27,26 @@ async function serve() {
2427
service: "exec-stream",
2528
});
2629

27-
logger.debug(`serve: creating exec-stream service for project ${project_id}`);
30+
logger.debug(
31+
`serve: creating exec-stream service for project ${project_id} and subject='${subject}'`,
32+
);
2833
const api = await cn.subscribe(subject, { queue: "q" });
29-
logger.debug(`serve: subscribed to subject='${subject}'`);
3034
await listen(api, subject);
3135
}
3236

33-
async function listen(api, _subject) {
37+
async function listen(api: Subscription, subject: string) {
38+
logger.debug(`Listening on subject='${subject}'`);
39+
3440
for await (const mesg of api) {
3541
handleMessage(mesg);
3642
}
3743
}
3844

39-
async function handleMessage(mesg) {
45+
async function handleMessage(mesg: Message) {
4046
const options = mesg.data;
4147

4248
let seq = 0;
43-
const respond = ({
44-
type,
45-
data,
46-
error,
47-
}: {
48-
type?: string;
49-
data?: any;
50-
error?: string;
51-
}) => {
49+
const respond = ({ type, data, error }: StreamEvent) => {
5250
mesg.respondSync({ type, data, error, seq });
5351
seq += 1;
5452
};
@@ -61,7 +59,7 @@ async function handleMessage(mesg) {
6159
mesg.respondSync(null);
6260
};
6361

64-
const stream = (event?) => {
62+
const stream = (event: StreamEvent) => {
6563
if (done) return;
6664
if (event != null) {
6765
respond(event);
@@ -80,7 +78,7 @@ async function handleMessage(mesg) {
8078
const { stream: _, project_id: reqProjectId, ...opts } = options;
8179

8280
// Call the backend executeStream function
83-
await backendExecuteStream({
81+
await executeStream({
8482
...opts,
8583
project_id: reqProjectId,
8684
stream,

0 commit comments

Comments
 (0)