Skip to content

Commit 9fad045

Browse files
committed
Merge remote-tracking branch 'origin/master' into github-tests-report-status
2 parents 5a8f6b9 + 92554d0 commit 9fad045

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2258
-901
lines changed

src/.claude/settings.local.json

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,18 @@
44
"Bash(pnpm tsc:*)",
55
"Bash(pnpm build:*)",
66
"Bash(git add:*)",
7-
"Bash(git commit:*)"
7+
"Bash(git commit:*)",
8+
"Bash(node:*)",
9+
"Bash(grep:*)",
10+
"Bash(find:*)",
11+
"WebFetch(domain:github.com)",
12+
"WebFetch(domain:cocalc.com)",
13+
"WebFetch(domain:doc.cocalc.com)",
14+
"Bash(npm show:*)",
15+
"Bash(prettier -w:*)",
16+
"Bash(npx tsc:*)",
17+
"Bash(gh pr view:*)",
18+
"Bash(gh:*)"
819
],
920
"deny": []
1021
}

src/packages/conat/core/client.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,10 @@ export class Client extends EventEmitter {
510510
reconnection: true,
511511
});
512512

513-
this.conn.on("info", (info) => {
513+
this.conn.on("info", (info, ack) => {
514+
if (typeof ack == "function") {
515+
ack();
516+
}
514517
const firstTime = this.info == null;
515518
this.info = info;
516519
this.emit("info", info);

src/packages/conat/core/server.ts

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import {
5353
import { Patterns } from "./patterns";
5454
import { is_array } from "@cocalc/util/misc";
5555
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
56-
import { once } from "@cocalc/util/async-utils";
56+
import { once, until } from "@cocalc/util/async-utils";
5757
import {
5858
clusterLink,
5959
type ClusterLink,
@@ -940,7 +940,7 @@ export class ConatServer extends EventEmitter {
940940
this.subscriptions[id] = new Set<string>();
941941
}
942942

943-
socket.emit("info", { ...this.info(), user });
943+
this.sendInfo(socket, user);
944944

945945
socket.on("stats", ({ recv0 }) => {
946946
const s = this.stats[socket.id];
@@ -1086,6 +1086,38 @@ export class ConatServer extends EventEmitter {
10861086
});
10871087
};
10881088

1089+
sendInfo = async (socket, user) => {
1090+
// we send info with an ack because I think sometimes the initial "info"
1091+
// message gets dropped, leaving a broken hanging connection that never
1092+
// does anything (until the user explicitly refreshes their browser).
1093+
// I did see what is probably this in production frequently.
1094+
try {
1095+
await until(
1096+
async () => {
1097+
if (!socket.conn?.readyState.startsWith("o")) {
1098+
// logger.debug(`failed to send "info" message to ${socket.id}`);
1099+
// readyState not defined or not opened or opening, so connection must
1100+
// have been closed before success.
1101+
return true;
1102+
}
1103+
try {
1104+
await socket
1105+
.timeout(7500)
1106+
.emitWithAck("info", { ...this.info(), user });
1107+
return true;
1108+
} catch (err) {
1109+
// logger.debug(`error sending "info" message to ${socket.id}`, err);
1110+
return false;
1111+
}
1112+
},
1113+
{ min: 5000, max: 30000, timeout: 120_000 },
1114+
);
1115+
} catch {
1116+
// never ack'd "info" after a few minutes -- could just be an old client,
1117+
// so don't do anything at this point.
1118+
}
1119+
};
1120+
10891121
address = () => getServerAddress(this.options);
10901122

10911123
// create new client in the same process connected to this server.
@@ -1349,8 +1381,8 @@ export class ConatServer extends EventEmitter {
13491381
const usage = async () => {
13501382
return { [this.id]: this.usage.stats() };
13511383
};
1352-
// user has to explicitly refresh there browser after
1353-
// being disconnected this way
1384+
// user has to explicitly refresh their browser after
1385+
// being disconnected this way:
13541386
const disconnect = async (ids: string | string[]) => {
13551387
if (typeof ids == "string") {
13561388
ids = [ids];

src/packages/conat/files/read.ts

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-
4141
import { conat } from "@cocalc/conat/client";
4242
import { projectSubject } from "@cocalc/conat/names";
4343
import { type Subscription } from "@cocalc/conat/core/client";
44+
import { delay } from "awaiting";
45+
import { getLogger } from "@cocalc/conat/client";
46+
47+
const logger = getLogger("conat:files:read");
4448

4549
let subs: { [name: string]: Subscription } = {};
4650
export async function close({ project_id, compute_server_id, name = "" }) {
@@ -72,6 +76,7 @@ export async function createServer({
7276
compute_server_id,
7377
name,
7478
});
79+
logger.debug("createServer", { subject });
7580
const cn = await conat();
7681
const sub = await cn.subscribe(subject);
7782
subs[subject] = sub;
@@ -89,36 +94,65 @@ async function listen({ sub, createReadStream }) {
8994
}
9095

9196
async function handleMessage(mesg, createReadStream) {
97+
logger.debug("handleMessage", mesg.subject);
9298
try {
9399
await sendData(mesg, createReadStream);
94100
await mesg.respond(null, { headers: { done: true } });
95101
} catch (err) {
96-
// console.log("sending ERROR", err);
102+
logger.debug("handleMessage: ERROR", err);
97103
mesg.respondSync(null, { headers: { error: `${err}` } });
98104
}
99105
}
100106

101-
const MAX_CHUNK_SIZE = 16384 * 16 * 3;
107+
// 4MB -- chunks may be slightly bigger
108+
const CHUNK_SIZE = 4194304;
109+
const CHUNK_INTERVAL = 250;
102110

103111
function getSeqHeader(seq) {
104112
return { headers: { seq } };
105113
}
106114

107115
async function sendData(mesg, createReadStream) {
108116
const { path } = mesg.data;
117+
logger.debug("sendData: starting", { path });
109118
let seq = 0;
119+
const chunks: Buffer[] = [];
120+
let size = 0;
121+
const sendChunks = async () => {
122+
// Not only is waiting for the response useful to make sure somebody is listening,
123+
// we also use await here partly to space out the messages to avoid saturing
124+
// the websocket connection, since doing so would break everything
125+
// (heartbeats, etc.) and disconnect us, when transfering a large file.
126+
seq += 1;
127+
logger.debug("sendData: sending", { path, seq });
128+
const data = Buffer.concat(chunks);
129+
const { count } = await mesg.respond(data, getSeqHeader(seq));
130+
if (count == 0) {
131+
logger.debug("sendData: nobody is listening");
132+
// nobody is listening so don't waste effort sending...
133+
throw Error("receiver is gone");
134+
}
135+
size = 0;
136+
chunks.length = 0;
137+
// Delay a little just to give other messages a chance, so we don't get disconnected
138+
// e.g., due to lack of heartbeats. Also, this reduces the load on conat-router.
139+
await delay(CHUNK_INTERVAL);
140+
};
141+
110142
for await (let chunk of createReadStream(path, {
111-
highWaterMark: 16384 * 16 * 3,
143+
highWaterMark: CHUNK_SIZE,
112144
})) {
113-
// console.log("sending ", { seq, bytes: chunk.length });
114-
// We must break the chunk into smaller messages or it will
115-
// get bounced by conat...
116-
while (chunk.length > 0) {
117-
seq += 1;
118-
mesg.respondSync(chunk.slice(0, MAX_CHUNK_SIZE), getSeqHeader(seq));
119-
chunk = chunk.slice(MAX_CHUNK_SIZE);
145+
chunks.push(chunk);
146+
size += chunk.length;
147+
if (size >= CHUNK_SIZE) {
148+
// send it
149+
await sendChunks();
120150
}
121151
}
152+
if (size > 0) {
153+
await sendChunks();
154+
}
155+
logger.debug("sendData: done", { path }, "successfully sent ", seq, "chunks");
122156
}
123157

124158
export interface ReadFileOptions {
@@ -136,6 +170,7 @@ export async function* readFile({
136170
name = "",
137171
maxWait = 1000 * 60 * 10, // 10 minutes
138172
}: ReadFileOptions) {
173+
logger.debug("readFile", { project_id, compute_server_id, path });
139174
const cn = await conat();
140175
const subject = getSubject({
141176
project_id,

src/packages/conat/files/write.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ import {
7373
import { projectSubject } from "@cocalc/conat/names";
7474
import { type Subscription } from "@cocalc/conat/core/client";
7575
import { type Readable } from "node:stream";
76+
import { getLogger } from "@cocalc/conat/client";
77+
const logger = getLogger("conat:files:write");
7678

7779
function getWriteSubject({ project_id, compute_server_id }) {
7880
return projectSubject({
@@ -106,6 +108,7 @@ export async function createServer({
106108
createWriteStream: (path: string) => any;
107109
}) {
108110
const subject = getWriteSubject({ project_id, compute_server_id });
111+
logger.debug("createServer", { subject });
109112
let sub = subs[subject];
110113
if (sub != null) {
111114
return;
@@ -141,6 +144,7 @@ async function handleMessage({
141144
let writeStream: null | Awaited<ReturnType<typeof createWriteStream>> = null;
142145
try {
143146
const { path, name, maxWait } = mesg.data;
147+
logger.debug("handleMessage", { path, name, maxWait });
144148
writeStream = await createWriteStream(path);
145149
// console.log("created writeStream");
146150
writeStream.on("error", (err) => {
@@ -166,12 +170,15 @@ async function handleMessage({
166170
writeStream.write(chunk);
167171
chunks += 1;
168172
bytes += chunk.length;
173+
logger.debug("handleMessage -- wrote", { path, name, bytes });
169174
// console.log("wrote ", bytes);
170175
}
171176
writeStream.end();
172177
writeStream.emit("rename");
173178
mesg.respondSync({ status: "success", bytes, chunks });
179+
logger.debug("handleMessage -- SUCCESS", { path, name });
174180
} catch (err) {
181+
logger.debug("handleMessage: ERROR", err);
175182
if (!error) {
176183
mesg.respondSync({ error: `${err}`, status: "error" });
177184
writeStream?.emit("remove");
@@ -194,6 +201,7 @@ export async function writeFile({
194201
stream,
195202
maxWait = 1000 * 60 * 10, // 10 minutes
196203
}): Promise<{ bytes: number; chunks: number }> {
204+
logger.debug("writeFile", { project_id, compute_server_id, path, maxWait });
197205
const name = randomId();
198206
try {
199207
function createReadStream() {

src/packages/conat/service/service.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,12 @@ export class ConatService extends EventEmitter {
210210
await mesg.respond(resp);
211211
} catch (err) {
212212
const data = { error: `${err}` };
213-
await mesg.respond(data);
213+
try {
214+
await mesg.respond(data);
215+
} catch (err2) {
216+
// do not crash on sending an error report:
217+
logger.debug("WARNING: unable to send error", this.name, err, err2);
218+
}
214219
}
215220
}
216221
};

src/packages/conat/socket/util.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export type Role = "client" | "server";
1313
// socketio and use those to manage things. This ping
1414
// is entirely a "just in case" backup if some event
1515
// were missed (e.g., a kill -9'd process...)
16-
export const PING_PONG_INTERVAL = 60000;
16+
export const PING_PONG_INTERVAL = 90000;
1717

1818
// We queue up unsent writes, but only up to a point (to not have a huge memory issue).
1919
// Any write beyond this size result in an exception.
@@ -24,8 +24,8 @@ export const PING_PONG_INTERVAL = 60000;
2424
export const DEFAULT_MAX_QUEUE_SIZE = 1000;
2525

2626
export let DEFAULT_COMMAND_TIMEOUT = 10_000;
27-
export let DEFAULT_KEEP_ALIVE = 90_000;
28-
export let DEFAULT_KEEP_ALIVE_TIMEOUT = 15_000;
27+
export let DEFAULT_KEEP_ALIVE = 25_000;
28+
export let DEFAULT_KEEP_ALIVE_TIMEOUT = 10_000;
2929

3030
export function setDefaultSocketTimeouts({
3131
command = DEFAULT_COMMAND_TIMEOUT,

src/packages/frontend/admin/_style.sass

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@
1313
td:first-child
1414
font-family: monospace
1515
font-weight: bold
16+
17+
.admin-llm-test-running-row
18+
background-color: #f0f0f0 !important

0 commit comments

Comments
 (0)