Skip to content

Commit 57e99d3

Browse files
committed
Merge branch 'master' into fs2
2 parents a897a87 + 14d7a27 commit 57e99d3

File tree

4 files changed

+57
-22
lines changed

4 files changed

+57
-22
lines changed

src/packages/conat/files/read.ts

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-
4141
import { conat } from "@cocalc/conat/client";
4242
import { projectSubject } from "@cocalc/conat/names";
4343
import { type Subscription } from "@cocalc/conat/core/client";
44+
import { delay } from "awaiting";
45+
import { getLogger } from "@cocalc/conat/client";
46+
47+
const logger = getLogger("conat:files:read");
4448

4549
let subs: { [name: string]: Subscription } = {};
4650
export async function close({ project_id, compute_server_id, name = "" }) {
@@ -72,6 +76,7 @@ export async function createServer({
7276
compute_server_id,
7377
name,
7478
});
79+
logger.debug("createServer", { subject });
7580
const cn = await conat();
7681
const sub = await cn.subscribe(subject);
7782
subs[subject] = sub;
@@ -89,36 +94,65 @@ async function listen({ sub, createReadStream }) {
8994
}
9095

9196
async function handleMessage(mesg, createReadStream) {
97+
logger.debug("handleMessage", mesg.subject);
9298
try {
9399
await sendData(mesg, createReadStream);
94100
await mesg.respond(null, { headers: { done: true } });
95101
} catch (err) {
96-
// console.log("sending ERROR", err);
102+
logger.debug("handleMessage: ERROR", err);
97103
mesg.respondSync(null, { headers: { error: `${err}` } });
98104
}
99105
}
100106

101-
const MAX_CHUNK_SIZE = 16384 * 16 * 3;
107+
// 4MB -- chunks may be slightly bigger
108+
const CHUNK_SIZE = 4194304;
109+
const CHUNK_INTERVAL = 250;
102110

103111
function getSeqHeader(seq) {
104112
return { headers: { seq } };
105113
}
106114

107115
async function sendData(mesg, createReadStream) {
108116
const { path } = mesg.data;
117+
logger.debug("sendData: starting", { path });
109118
let seq = 0;
119+
const chunks: Buffer[] = [];
120+
let size = 0;
121+
const sendChunks = async () => {
122+
// Not only is waiting for the response useful to make sure somebody is listening,
123+
// we also use await here partly to space out the messages to avoid saturing
124+
// the websocket connection, since doing so would break everything
125+
// (heartbeats, etc.) and disconnect us, when transfering a large file.
126+
seq += 1;
127+
logger.debug("sendData: sending", { path, seq });
128+
const data = Buffer.concat(chunks);
129+
const { count } = await mesg.respond(data, getSeqHeader(seq));
130+
if (count == 0) {
131+
logger.debug("sendData: nobody is listening");
132+
// nobody is listening so don't waste effort sending...
133+
throw Error("receiver is gone");
134+
}
135+
size = 0;
136+
chunks.length = 0;
137+
// Delay a little just to give other messages a chance, so we don't get disconnected
138+
// e.g., due to lack of heartbeats. Also, this reduces the load on conat-router.
139+
await delay(CHUNK_INTERVAL);
140+
};
141+
110142
for await (let chunk of createReadStream(path, {
111-
highWaterMark: 16384 * 16 * 3,
143+
highWaterMark: CHUNK_SIZE,
112144
})) {
113-
// console.log("sending ", { seq, bytes: chunk.length });
114-
// We must break the chunk into smaller messages or it will
115-
// get bounced by conat...
116-
while (chunk.length > 0) {
117-
seq += 1;
118-
mesg.respondSync(chunk.slice(0, MAX_CHUNK_SIZE), getSeqHeader(seq));
119-
chunk = chunk.slice(MAX_CHUNK_SIZE);
145+
chunks.push(chunk);
146+
size += chunk.length;
147+
if (size >= CHUNK_SIZE) {
148+
// send it
149+
await sendChunks();
120150
}
121151
}
152+
if (size > 0) {
153+
await sendChunks();
154+
}
155+
logger.debug("sendData: done", { path }, "successfully sent ", seq, "chunks");
122156
}
123157

124158
export interface ReadFileOptions {
@@ -136,6 +170,7 @@ export async function* readFile({
136170
name = "",
137171
maxWait = 1000 * 60 * 10, // 10 minutes
138172
}: ReadFileOptions) {
173+
logger.debug("readFile", { project_id, compute_server_id, path });
139174
const cn = await conat();
140175
const subject = getSubject({
141176
project_id,

src/packages/conat/files/write.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ import {
7373
import { projectSubject } from "@cocalc/conat/names";
7474
import { type Subscription } from "@cocalc/conat/core/client";
7575
import { type Readable } from "node:stream";
76+
import { getLogger } from "@cocalc/conat/client";
77+
const logger = getLogger("conat:files:write");
7678

7779
function getWriteSubject({ project_id, compute_server_id }) {
7880
return projectSubject({
@@ -106,6 +108,7 @@ export async function createServer({
106108
createWriteStream: (path: string) => any;
107109
}) {
108110
const subject = getWriteSubject({ project_id, compute_server_id });
111+
logger.debug("createServer", { subject });
109112
let sub = subs[subject];
110113
if (sub != null) {
111114
return;
@@ -141,6 +144,7 @@ async function handleMessage({
141144
let writeStream: null | Awaited<ReturnType<typeof createWriteStream>> = null;
142145
try {
143146
const { path, name, maxWait } = mesg.data;
147+
logger.debug("handleMessage", { path, name, maxWait });
144148
writeStream = await createWriteStream(path);
145149
// console.log("created writeStream");
146150
writeStream.on("error", (err) => {
@@ -166,12 +170,15 @@ async function handleMessage({
166170
writeStream.write(chunk);
167171
chunks += 1;
168172
bytes += chunk.length;
173+
logger.debug("handleMessage -- wrote", { path, name, bytes });
169174
// console.log("wrote ", bytes);
170175
}
171176
writeStream.end();
172177
writeStream.emit("rename");
173178
mesg.respondSync({ status: "success", bytes, chunks });
179+
logger.debug("handleMessage -- SUCCESS", { path, name });
174180
} catch (err) {
181+
logger.debug("handleMessage: ERROR", err);
175182
if (!error) {
176183
mesg.respondSync({ error: `${err}`, status: "error" });
177184
writeStream?.emit("remove");
@@ -194,6 +201,7 @@ export async function writeFile({
194201
stream,
195202
maxWait = 1000 * 60 * 10, // 10 minutes
196203
}): Promise<{ bytes: number; chunks: number }> {
204+
logger.debug("writeFile", { project_id, compute_server_id, path, maxWait });
197205
const name = randomId();
198206
try {
199207
function createReadStream() {

src/packages/jupyter/kernel/kernel.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -725,10 +725,7 @@ export class JupyterKernel
725725
dbg("queue is empty");
726726
return;
727727
}
728-
dbg(
729-
`queue has ${n} items; ensure kernel running`,
730-
this._execute_code_queue,
731-
);
728+
dbg(`queue has ${n} items; ensure kernel running`);
732729
try {
733730
await this.ensure_running();
734731
await this._execute_code_queue[0].go();

src/packages/project/conat/connection.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,9 @@ async function callHub({
7070
timeout?: number;
7171
}) {
7272
const subject = `hub.project.${project_id}.${service}`;
73-
try {
74-
const data = { name, args };
75-
const resp = await client.request(subject, data, { timeout });
76-
return resp.data;
77-
} catch (err) {
78-
err.message = `${err.message} - callHub: subject='${subject}', name='${name}', `;
79-
throw err;
80-
}
73+
const data = { name, args };
74+
const resp = await client.request(subject, data, { timeout });
75+
return resp.data;
8176
}
8277

8378
async function versionCheckLoop(client) {

0 commit comments

Comments
 (0)