Skip to content

Commit caa65ca

Browse files
Merge branch 'master' into express-5-8529
2 parents 4501a47 + 9b82b9c commit caa65ca

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2925
-435
lines changed

src/packages/backend/exec-stream.test.ts

Lines changed: 963 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Backend exec-stream functionality for streaming code execution.
3+
* Core streaming logic that can be used by different services.
4+
*/
5+
6+
import { unreachable } from "@cocalc/util/misc";
7+
import {
8+
ExecuteCodeOutput,
9+
ExecuteCodeOutputAsync,
10+
ExecuteCodeStats,
11+
ExecuteCodeStreamEvent,
12+
} from "@cocalc/util/types/execute-code";
13+
import { asyncCache, executeCode } from "./execute-code";
14+
import getLogger from "./logger";
15+
import { abspath } from "./misc_node";
16+
17+
export type StreamEvent = {
18+
type?: "job" | ExecuteCodeStreamEvent["type"];
19+
data?: ExecuteCodeStreamEvent["data"];
20+
error?: string;
21+
};
22+
23+
const logger = getLogger("backend:exec-stream");
24+
25+
const MONITOR_STATS_LENGTH_MAX = 100; // Max stats entries
26+
27+
function truncStats(stats: ExecuteCodeStats): ExecuteCodeStats {
28+
return stats.slice(stats.length - MONITOR_STATS_LENGTH_MAX);
29+
}
30+
31+
export interface ExecuteStreamOptions {
32+
command?: string;
33+
args?: string[];
34+
path?: string;
35+
compute_server_id?: number;
36+
bash?: boolean;
37+
env?: { [key: string]: string };
38+
timeout?: number;
39+
max_output?: number;
40+
verbose?: boolean;
41+
project_id?: string;
42+
debug?: string;
43+
stream: (event: StreamEvent | null) => void;
44+
waitForCompletion?: boolean;
45+
}
46+
47+
export async function executeStream(
48+
options: ExecuteStreamOptions,
49+
): Promise<ExecuteCodeOutput | undefined> {
50+
const { stream, debug, project_id, waitForCompletion, ...opts } = options;
51+
52+
// Log debug message for debugging purposes
53+
if (debug) {
54+
logger.debug(`executeStream: ${debug}`);
55+
}
56+
57+
let job: ExecuteCodeOutput | undefined;
58+
59+
try {
60+
let done = false;
61+
let stats: ExecuteCodeStats = [];
62+
63+
// Create streaming callback, passed into execute-code::executeCode call
64+
const streamCB = (event: ExecuteCodeStreamEvent) => {
65+
if (done) {
66+
logger.debug(
67+
`executeStream: ignoring event type=${event.type} because stream is done`,
68+
);
69+
return;
70+
}
71+
72+
logger.debug(`executeStream: received event type=${event.type}`);
73+
74+
switch (event.type) {
75+
case "stdout":
76+
stream({
77+
type: "stdout",
78+
data: event.data,
79+
});
80+
break;
81+
82+
case "stderr":
83+
stream({
84+
type: "stderr",
85+
data: event.data,
86+
});
87+
break;
88+
89+
case "stats":
90+
// Stats are accumulated in the stats array for the final result
91+
if (
92+
event.data &&
93+
typeof event.data === "object" &&
94+
"timestamp" in event.data
95+
) {
96+
stats.push(event.data as ExecuteCodeStats[0]);
97+
// Keep stats array bounded
98+
if (stats.length > MONITOR_STATS_LENGTH_MAX) {
99+
stats.splice(0, stats.length - MONITOR_STATS_LENGTH_MAX);
100+
}
101+
stream({
102+
type: "stats",
103+
data: event.data,
104+
});
105+
}
106+
break;
107+
108+
case "done":
109+
logger.debug(`executeStream: processing done event`);
110+
const result = event.data as ExecuteCodeOutputAsync;
111+
// Include accumulated stats in final result
112+
result.stats = truncStats(stats);
113+
stream({
114+
type: "done",
115+
data: result,
116+
});
117+
done = true;
118+
stream(null); // End the stream
119+
break;
120+
121+
case "error":
122+
logger.debug(`executeStream: processing error event`);
123+
stream({ error: event.data as string });
124+
done = true;
125+
stream(null);
126+
break;
127+
128+
default:
129+
unreachable(event.type);
130+
}
131+
};
132+
133+
// Start an async execution job with streaming callback
134+
job = await executeCode({
135+
command: opts.command || "",
136+
path: !!opts.compute_server_id ? opts.path : abspath(opts.path ?? ""),
137+
...opts,
138+
async_call: true, // Force async mode for streaming
139+
streamCB, // Add the streaming callback
140+
});
141+
142+
if (job?.type !== "async") {
143+
stream({ error: "Failed to create async job for streaming" });
144+
stream(null);
145+
return undefined;
146+
}
147+
148+
// Send initial job info with full async structure
149+
// Get the current job status from cache in case it completed immediately
150+
const currentJob = asyncCache.get(job.job_id);
151+
const initialJobInfo: ExecuteCodeOutputAsync = {
152+
type: "async",
153+
job_id: job.job_id,
154+
pid: job.pid,
155+
status: currentJob?.status ?? job.status,
156+
start: job.start,
157+
stdout: currentJob?.stdout ?? "",
158+
stderr: currentJob?.stderr ?? "",
159+
exit_code: currentJob?.exit_code ?? 0, // Default to 0, will be updated when job completes
160+
stats: currentJob?.stats ?? [],
161+
};
162+
163+
stream({
164+
type: "job",
165+
data: initialJobInfo,
166+
});
167+
168+
// If job already completed, send done event immediately
169+
if (currentJob && currentJob.status !== "running") {
170+
logger.debug(
171+
`executeStream: job ${job.job_id} already completed, sending done event`,
172+
);
173+
stream({
174+
type: "done",
175+
data: currentJob,
176+
});
177+
done = true;
178+
stream(null);
179+
return currentJob;
180+
}
181+
182+
// Stats monitoring is now handled by execute-code.ts via streamCB
183+
} catch (err) {
184+
stream({ error: `${err}` });
185+
stream(null); // End the stream
186+
return undefined;
187+
}
188+
189+
// Return the job object so caller can wait for completion if desired
190+
return job;
191+
}

0 commit comments

Comments
 (0)