Skip to content

Commit 58c387a

Browse files
committed
api/v2/exec: tweak monitoring data + api endpoint
1 parent 4382784 commit 58c387a

File tree

8 files changed

+167
-62
lines changed

8 files changed

+167
-62
lines changed

src/packages/backend/execute-code.test.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
/*
2+
* This file is part of CoCalc: Copyright © 2024 Sagemath, Inc.
3+
* License: MS-RSL – see LICENSE.md for details
4+
*/
5+
16
process.env.COCALC_PROJECT_MONITOR_INTERVAL_S = "1";
27

38
import { executeCode } from "./execute-code";
@@ -267,19 +272,29 @@ describe("async", () => {
267272
if (typeof job_id !== "string") return;
268273
await new Promise((done) => setTimeout(done, 5500));
269274
// now we check up on the job
270-
const s = await executeCode({ async_get: job_id });
275+
const s = await executeCode({ async_get: job_id, async_stats: true });
271276
expect(s.type).toEqual("async");
272277
if (s.type !== "async") return;
273278
expect(s.elapsed_s).toBeGreaterThan(5);
274279
expect(s.exit_code).toBe(0);
275280
expect(s.pid).toBeGreaterThan(1);
276281
expect(s.stats).toBeDefined();
277282
if (!Array.isArray(s.stats)) return;
278-
const last_stat = s.stats[s.stats.length - 2];
279-
expect(last_stat.cpu_pct).toBeGreaterThan(10);
280-
expect(last_stat.cpu_secs).toBeGreaterThan(1);
281-
expect(last_stat.mem_rss).toBeGreaterThan(1);
283+
const pcts = Math.max(...s.stats.map((s) => s.cpu_pct));
284+
const secs = Math.max(...s.stats.map((s) => s.cpu_secs));
285+
const mems = Math.max(...s.stats.map((s) => s.mem_rss));
286+
expect(pcts).toBeGreaterThan(10);
287+
expect(secs).toBeGreaterThan(1);
288+
expect(mems).toBeGreaterThan(1);
282289
expect(s.stdout).toEqual("foo\nbar\n");
290+
// now without stats, after retrieving it
291+
const s2 = await executeCode({ async_get: job_id });
292+
if (s2.type !== "async") return;
293+
expect(s2.stats).toBeUndefined();
294+
// and check, that this is not removing stats entirely
295+
const s3 = await executeCode({ async_get: job_id, async_stats: true });
296+
if (s3.type !== "async") return;
297+
expect(Array.isArray(s3.stats)).toBeTruthy();
283298
},
284299
10 * 1000,
285300
);

src/packages/backend/execute-code.ts

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
//########################################################################
2-
// This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3-
// License: MS-RSL – see LICENSE.md for details
4-
//########################################################################
1+
/*
2+
* This file is part of CoCalc: Copyright © 2020–2024 Sagemath, Inc.
3+
* License: MS-RSL – see LICENSE.md for details
4+
*/
55

66
// Execute code in a subprocess.
77

@@ -21,6 +21,7 @@ import getLogger from "@cocalc/backend/logger";
2121
import { envToInt } from "@cocalc/backend/misc/env-to-number";
2222
import { aggregate } from "@cocalc/util/aggregate";
2323
import { callback_opts } from "@cocalc/util/async-utils";
24+
import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";
2425
import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
2526
import {
2627
ExecuteCodeOutputAsync,
@@ -38,10 +39,15 @@ import { ProcessStats } from "./process-stats";
3839

3940
const log = getLogger("execute-code");
4041

41-
const ASYNC_CACHE_MAX = envToInt("COCALC_PROJECT_ASYNC_EXEC_CACHE_MAX", 100);
42-
const ASYNC_CACHE_TTL_S = envToInt("COCALC_PROJECT_ASYNC_EXEC_TTL_S", 60 * 60);
42+
const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";
43+
const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);
44+
const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);
4345
// for async execution, every that many secs check up on the child-tree
44-
const MONITOR_INTERVAL_S = envToInt("COCALC_PROJECT_MONITOR_INTERVAL_S", 60);
46+
const MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);
47+
const MONITOR_STATS_LENGTH_MAX = envToInt(
48+
`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,
49+
100,
50+
);
4551

4652
const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({
4753
max: ASYNC_CACHE_MAX,
@@ -56,6 +62,8 @@ function asyncCacheUpdate(job_id: string, upd) {
5662
const obj = asyncCache.get(job_id);
5763
if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {
5864
obj.stats.push(...upd.stats);
65+
// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries
66+
obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);
5967
}
6068
asyncCache.set(job_id, { ...obj, ...upd });
6169
}
@@ -73,7 +81,14 @@ export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
7381
(opts: ExecuteCodeOptionsWithCallback): void => {
7482
(async () => {
7583
try {
76-
opts.cb?.(undefined, await executeCodeNoAggregate(opts));
84+
let data = await executeCodeNoAggregate(opts);
85+
if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {
86+
// stats could contain a lot of data. we only return it if requested.
87+
if (opts.async_stats !== true) {
88+
data = { ...data, stats: undefined };
89+
}
90+
}
91+
opts.cb?.(undefined, data);
7792
} catch (err) {
7893
opts.cb?.(err);
7994
}
@@ -101,7 +116,7 @@ async function executeCodeNoAggregate(
101116
}
102117

103118
opts.args ??= [];
104-
opts.timeout ??= 10;
119+
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
105120
opts.ulimit_timeout ??= true;
106121
opts.err_on_exit ??= true;
107122
opts.verbose ??= true;
@@ -166,15 +181,15 @@ async function executeCodeNoAggregate(
166181
if (opts.async_call) {
167182
// we return an ID, the caller can then use it to query the status
168183
opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;
169-
opts.timeout ??= 10 * 60;
184+
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
170185
const job_id = uuid();
171-
const start = new Date();
186+
const start = Date.now();
172187
const job_config: ExecuteCodeOutputAsync = {
173188
type: "async",
174-
stdout: `Process started running at ${start.toISOString()}`,
189+
stdout: "",
175190
stderr: "",
176191
exit_code: 0,
177-
start: start.getTime(),
192+
start,
178193
job_id,
179194
status: "running",
180195
};
@@ -184,15 +199,14 @@ async function executeCodeNoAggregate(
184199
{ ...opts, origCommand, job_id, job_config },
185200
async (err, result) => {
186201
try {
187-
const started = asyncCache.get(job_id)?.start ?? 0;
188202
const info: Omit<
189203
ExecuteCodeOutputAsync,
190204
"stdout" | "stderr" | "exit_code"
191205
> = {
192206
job_id,
193207
type: "async",
194-
elapsed_s: (Date.now() - started) / 1000,
195-
start: start.getTime(),
208+
elapsed_s: (Date.now() - start) / 1000,
209+
start,
196210
status: "error",
197211
};
198212
if (err) {
@@ -311,21 +325,20 @@ function doSpawn(
311325
let stdout_is_done = false;
312326
let killed = false;
313327
let callback_done = false;
314-
let monitorRef: NodeJS.Timer | null = null;
315328
let timer: NodeJS.Timeout | undefined = undefined;
316329

317330
// periodically check up on the child process tree and record stats
318331
// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution
319-
async function setupMonitor() {
332+
async function startMonitor() {
320333
const pid = child.pid;
321-
const job_id = opts.job_id;
322-
if (job_id == null || pid == null) return;
334+
const { job_id, job_config } = opts;
335+
if (job_id == null || pid == null || job_config == null) return;
323336
const monitor = new ProcessStats();
324337
await monitor.init();
325338
await new Promise((done) => setTimeout(done, 1000));
326339
if (callback_done) return;
327340

328-
monitorRef = setInterval(async () => {
341+
while (true) {
329342
const { procs } = await monitor.processes(Date.now());
330343
// reconstruct process tree
331344
const children: { [pid: number]: number[] } = {};
@@ -337,26 +350,26 @@ function doSpawn(
337350
// we only consider those, which are the pid itself or one of its children
338351
const { rss, pct_cpu, cpu_secs } = sumChildren(procs, children, pid);
339352

340-
let obj = asyncCache.get(job_id);
341-
obj ??= opts.job_config; // in case the cache "forgot" about it
342-
if (obj != null) {
343-
obj.pid = pid;
344-
obj.stats ??= [];
345-
obj.stats.push({
346-
timestamp: Date.now(),
347-
mem_rss: rss,
348-
cpu_pct: pct_cpu,
349-
cpu_secs,
350-
});
351-
asyncCache.set(job_id, obj);
352-
}
353-
}, 1000 * MONITOR_INTERVAL_S);
354-
}
353+
// ?? fallback, in case the cache "forgot" about it
354+
const obj = asyncCache.get(job_id) ?? job_config;
355+
obj.pid = pid;
356+
obj.stats ??= [];
357+
obj.stats.push({
358+
timestamp: Date.now(),
359+
mem_rss: rss,
360+
cpu_pct: pct_cpu,
361+
cpu_secs,
362+
});
363+
asyncCache.set(job_id, obj);
364+
365+
// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)
366+
const elapsed_s = (Date.now() - job_config.start) / 1000;
367+
// i.e. after 6 minutes, we check every minute
368+
const next_s = Math.max(1, Math.floor(elapsed_s / 6));
369+
const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);
370+
await new Promise((done) => setTimeout(done, wait_s * 1000));
355371

356-
function clearMonitor() {
357-
if (monitorRef != null) {
358-
clearInterval(monitorRef);
359-
monitorRef = null;
372+
if (callback_done) return;
360373
}
361374
}
362375

@@ -436,7 +449,8 @@ function doSpawn(
436449
});
437450

438451
if (opts.job_id && child.pid) {
439-
setupMonitor();
452+
// we don't await it, it runs until $callback_done is true
453+
startMonitor();
440454
}
441455

442456
const finish = (err?) => {
@@ -449,9 +463,8 @@ function doSpawn(
449463
// we already finished up.
450464
return;
451465
}
452-
// finally finish up.
466+
// finally finish up – this will also terminate the monitor
453467
callback_done = true;
454-
clearMonitor();
455468

456469
if (timer != null) {
457470
clearTimeout(timer);

src/packages/next/lib/api/schema/exec.ts

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
/*
2+
* This file is part of CoCalc: Copyright © 2024 Sagemath, Inc.
3+
* License: MS-RSL – see LICENSE.md for details
4+
*/
5+
16
import { z } from "../framework";
27

8+
import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";
39
import { FailedAPIOperationSchema } from "./common";
410
import { ComputeServerIdSchema } from "./compute/common";
511
import { ProjectIdSchema } from "./projects/common";
@@ -35,7 +41,7 @@ const ExecInputSchemaBlocking = ExecInputCommon.merge(
3541
timeout: z
3642
.number()
3743
.min(0)
38-
.default(60)
44+
.default(PROJECT_EXEC_DEFAULT_TIMEOUT_S)
3945
.optional()
4046
.describe("Number of seconds before this shell command times out."),
4147
max_output: z
@@ -58,10 +64,12 @@ const ExecInputSchemaBlocking = ExecInputCommon.merge(
5864
),
5965
uid: z
6066
.number()
67+
.min(0)
6168
.optional()
6269
.describe("Set the `UID` identity of the spawned process."),
6370
gid: z
6471
.number()
72+
.min(0)
6573
.optional()
6674
.describe("Set the `GID` identity of the spawned process."),
6775
aggregate: z
@@ -119,6 +127,12 @@ as well as a status field indicating if the job is still running or has complete
119127
Start time and duration are returned as well.
120128
121129
Results are cached temporarily in the project.`),
130+
async_stats: z
131+
.boolean()
132+
.optional()
133+
.describe(
134+
`If true, retrieve recorded statistics (CPU/memory) of the process and its child processes.`,
135+
),
122136
}),
123137
);
124138

@@ -147,14 +161,75 @@ const ExecOutputBlocking = z.object({
147161
const ExecOutputAsync = ExecOutputBlocking.extend({
148162
type: z.literal("async"),
149163
job_id: z.string().describe("The ID identifying the async operation"),
150-
start: z
151-
.number()
152-
.optional()
153-
.describe("UNIX timestamp, when the execution started"),
164+
start: z.number().describe("UNIX timestamp, when the execution started"),
154165
elapsed_s: z.string().optional().describe("How long the execution took"),
155166
status: z // AsyncStatus
156167
.union([z.literal("running"), z.literal("completed"), z.literal("error")])
157168
.describe("Status of the async operation"),
169+
pid: z.number().min(0).describe("Process ID"),
170+
stats: z
171+
.array(
172+
z.object({
173+
timestamp: z.number().describe("UNIX epoch timestamp"),
174+
mem_rss: z
175+
.number()
176+
.describe(
177+
"Sum of residual memory usage of that process and its children.",
178+
),
179+
cpu_pct: z
180+
.number()
181+
.describe(
182+
"Sum of percentage CPU usage of that process and its children.",
183+
),
184+
cpu_secs: z
185+
.number()
186+
.describe(
187+
"Sum of CPU time usage (user+system) of that process and its children.",
188+
),
189+
}),
190+
)
191+
.optional()
192+
.describe(
193+
`Recorded metrics about the process. Each entry has a timestamp and corresponding cpu and memory usage, of that process and children. Initially, the sampling frequency is higher, but then it is spaced out. The total number of samples is truncated, discarding the oldest ones.
194+
195+
You can visualize the data this way:
196+
197+
\`\`\`python
198+
import matplotlib.pyplot as plt
199+
from datetime import datetime
200+
201+
# Extract stats data
202+
timestamps = [stat['timestamp'] for stat in data['stats']]
203+
mem_rss = [stat['mem_rss'] for stat in data['stats']]
204+
cpu_pct = [stat['cpu_pct'] for stat in data['stats']]
205+
206+
# Convert timestamps to datetime objects
207+
timestamps = [datetime.fromtimestamp(ts / 1000) for ts in timestamps]
208+
209+
# Create plots
210+
fig, ax1 = plt.subplots()
211+
212+
# Memory usage
213+
ax1.plot(timestamps, mem_rss, color='blue', label='Memory (RSS)')
214+
ax1.set_xlabel('Time')
215+
ax1.set_ylabel('Memory (MB)', color='blue')
216+
ax1.tick_params(axis='y', labelcolor='blue')
217+
218+
# CPU utilization (secondary axis)
219+
ax2 = ax1.twinx()
220+
ax2.plot(timestamps, cpu_pct, color='red', label='CPU (%)')
221+
ax2.set_ylabel('CPU (%)', color='red')
222+
ax2.tick_params(axis='y', labelcolor='red')
223+
224+
# Add labels and legend
225+
plt.title('Job Stats')
226+
plt.legend(loc='upper left')
227+
228+
# Display the plot
229+
plt.show()
230+
\`\`\`
231+
`,
232+
),
158233
});
159234

160235
export const ExecOutputSchema = z.union([

0 commit comments

Comments
 (0)