Skip to content

Commit 17da968

Browse files
committed
stand-alone remote runtime with useMQ
1 parent b974313 commit 17da968

File tree

5 files changed

+66
-17
lines changed

5 files changed

+66
-17
lines changed

cpkernel/docker-compose.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Use postgres/example user/password credentials
2+
version: "3.5"
3+
4+
services:
5+
rabbitmq:
6+
image: rabbitmq:3-management-alpine
7+
restart: always
8+
volumes:
9+
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
10+
ports:
11+
# The AMQP port
12+
# - 5672:5672
13+
# the management UI
14+
- 15672:15672
15+
# STOMP-over-WebSockets
16+
- 15674:15674
17+
# 61613 STOMP clients, internally used by kernel.js
18+
- 61613:61613

cpkernel/rabbitmq/enabled_plugins

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[rabbitmq_management,rabbitmq_prometheus,rabbitmq_web_stomp].

cpkernel/src/kernel.ts

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import { v4 as uuidv4 } from "uuid";
1212
import crypto from "crypto";
1313
import Docker from "dockerode";
1414

15+
import Stomp from "stompjs";
16+
1517
import path from "path";
1618

1719
import os from "os";
@@ -558,16 +560,16 @@ async function createContainer(image, name, network) {
558560
});
559561
}
560562

561-
// var mq_client = Stomp.overTCP("rabbitmq", 61613);
562-
563563
class MyMqSocket {
564564
queue;
565-
constructor(queue) {
565+
mq_client;
566+
constructor(queue, mq_client) {
566567
this.queue = queue;
568+
this.mq_client = mq_client
567569
}
568570
send(obj) {
569571
// FIXME need to make sure it is connected
570-
// mq_client.send(this.queue, {}, obj);
572+
this.mq_client.send(this.queue, {}, obj);
571573
}
572574
}
573575

@@ -718,6 +720,7 @@ function handleIOPub_stream({ msgs, socket }) {
718720
}
719721
}
720722

723+
let mq_client = null;
721724
export class CodePodKernel {
722725
lang;
723726
startupFile;
@@ -727,6 +730,8 @@ export class CodePodKernel {
727730
sessionId;
728731
wire;
729732
socket;
733+
mq_socket;
734+
useMQ;
730735
mapEval({ code, namespace }) { }
731736
mapAddImport({
732737
from,
@@ -749,9 +754,10 @@ export class CodePodKernel {
749754
name: string;
750755
}) { }
751756
constructor() { }
752-
async init({ sessionId, socket }) {
757+
async init({ sessionId, socket, useMQ }) {
753758
console.log("=== INIT!!");
754759
this.sessionId = sessionId;
760+
this.useMQ = useMQ;
755761
let network = process.env["KERNEL_NETWORK"] || "codepod";
756762
let name = `cpkernel_${network}_${sessionId}_${this.lang}`;
757763
// await removeContainer(name);
@@ -774,7 +780,26 @@ export class CodePodKernel {
774780
console.log("connecting to zmq ..");
775781
// this.wire = new ZmqWire(JSON.parse(readFileSync(this.fname)), ip);
776782
this.wire = new ZmqWire(spec, "127.0.0.1");
777-
// this.mq_socket = new MyMqSocket(sessionId);
783+
784+
if (useMQ) {
785+
if (!mq_client) {
786+
mq_client = Stomp.overTCP("localhost", 61613);
787+
mq_client.connect(
788+
"guest",
789+
"guest",
790+
function () {
791+
console.log("connected");
792+
},
793+
function () {
794+
console.log("error");
795+
throw new Error("Cannot connect to RabbitMQ server");
796+
},
797+
"/"
798+
);
799+
}
800+
this.mq_socket = new MyMqSocket(sessionId, mq_client);
801+
}
802+
778803
if (socket) {
779804
// listen to IOPub here
780805
this.addSocket(socket);
@@ -809,7 +834,9 @@ export class CodePodKernel {
809834
}
810835
this.socket = socket;
811836
// DEBUG
812-
// socket = this.mq_socket;
837+
if (this.useMQ) {
838+
socket = this.mq_socket;
839+
}
813840
this.wire.setOnIOPub((topic, msgs) => {
814841
// console.log("-----", topic, msgs);
815842
// iracket's topic seems to be an ID. I should use msg type instead
@@ -846,7 +873,7 @@ export class CodePodKernel {
846873
this.wire.setOnShell((msgs) => {
847874
// DEBUG
848875
// socket = this.mq_socket;
849-
socket = this.socket;
876+
// socket = this.socket;
850877
switch (msgs.header.msg_type) {
851878
case "execute_reply":
852879
{
@@ -1145,22 +1172,24 @@ export async function createKernel({
11451172
lang,
11461173
sessionId,
11471174
socket,
1175+
useMQ
11481176
}: {
11491177
lang: string;
11501178
sessionId: string;
11511179
socket: any;
1180+
useMQ: boolean;
11521181
}) {
11531182
let kernels = detectKernels()
11541183
console.log("===", "createKernel", lang);
11551184
switch (lang) {
11561185
case "julia":
1157-
return await new JuliaKernel({ kernelJson: kernels["julia"] }).init({ sessionId, socket });
1186+
return await new JuliaKernel({ kernelJson: kernels["julia"] }).init({ sessionId, socket, useMQ });
11581187
case "javascript":
1159-
return await new JavascriptKernel({ kernelJson: kernels["javascript"] }).init({ sessionId, socket });
1188+
return await new JavascriptKernel({ kernelJson: kernels["javascript"] }).init({ sessionId, socket, useMQ });
11601189
case "racket":
1161-
return await new RacketKernel({ kernelJson: kernels["racket"] }).init({ sessionId, socket });
1190+
return await new RacketKernel({ kernelJson: kernels["racket"] }).init({ sessionId, socket, useMQ });
11621191
case "python":
1163-
return await new PythonKernel({ kernelJson: kernels["python"] }).init({ sessionId, socket });
1192+
return await new PythonKernel({ kernelJson: kernels["python"] }).init({ sessionId, socket, useMQ });
11641193
default:
11651194
console.log("ERROR: language not implemented", lang);
11661195
// throw new Error(`Language not valid: ${lang}`);

cpkernel/src/server.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ export function startSocketServer() {
2626
// listenOnKernelManagement(socket);
2727
// listenOnSessionManagement(socket);
2828
// listenOnRunCode(socket);
29-
listenOnMessage(socket);
29+
// useMQ=true when creating a stand-alone runtime server
30+
listenOnMessage(socket, true);
3031
});
3132

3233
http_server.listen({ port: 14321 }, () => {

cpkernel/src/socket.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ async function killKernel({ sessionId, lang }) {
152152
}
153153
}
154154

155-
async function getSessionKernel({ sessionId, lang, socket }) {
155+
async function getSessionKernel({ sessionId, lang, socket, useMQ }) {
156156
if (!sessionId || !lang) {
157157
console.log("sesisonId or lang is undefined", sessionId, lang);
158158
return null;
@@ -173,18 +173,18 @@ async function getSessionKernel({ sessionId, lang, socket }) {
173173
// FIXME what if the process never finish?
174174
session[lang] = "spawning";
175175
console.log("spawning kernel ..");
176-
let kernel = await createKernel({ lang, sessionId, socket });
176+
let kernel = await createKernel({ lang, sessionId, socket, useMQ });
177177
console.log("returning the newly spawned kernel");
178178
session[lang] = kernel;
179179
return kernel;
180180
}
181181

182-
export function listenOnMessage(socket) {
182+
export function listenOnMessage(socket, useMQ = false) {
183183
socket.on("message", async (msg) => {
184184
let { type, payload } = JSON.parse(msg.toString());
185185
if (type === "ping") return;
186186
let { sessionId, lang } = payload;
187-
let kernel = await getSessionKernel({ sessionId, lang, socket });
187+
let kernel = await getSessionKernel({ sessionId, lang, socket, useMQ });
188188
if (!kernel) {
189189
console.log("ERROR: kernel error");
190190
return;

0 commit comments

Comments
 (0)