Skip to content

Commit f21e9c1

Browse files
committed
Create signalling service based on yjs example
1 parent e9b5afe commit f21e9c1

File tree

4 files changed

+184
-0
lines changed

4 files changed

+184
-0
lines changed

apps/signalling-service/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "signalling-service",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "server.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"type": "module",
10+
"keywords": [],
11+
"author": "",
12+
"license": "ISC",
13+
"dependencies": {
14+
"lib0": "^0.2.98",
15+
"ws": "^8.18.0"
16+
}
17+
}

apps/signalling-service/pnpm-lock.yaml

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/signalling-service/server.js

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Referenced from https://github.com/yjs/y-webrtc/blob/master/bin/server.js
2+
3+
import { WebSocketServer } from "ws";
4+
import http from "http";
5+
import * as map from "lib0/map";
6+
7+
const wsReadyStateConnecting = 0;
8+
const wsReadyStateOpen = 1;
9+
const wsReadyStateClosing = 2;
10+
const wsReadyStateClosed = 3;
11+
12+
const pingTimeout = 30000;
13+
14+
const port = process.env.PORT || 4444;
15+
const wss = new WebSocketServer({ noServer: true });
16+
17+
const server = http.createServer((request, response) => {
18+
response.writeHead(200, {
19+
"Content-Type": "text/plain",
20+
"Access-Control-Allow-Origin": "*", // Allow all origins for testing
21+
});
22+
response.end("okay");
23+
});
24+
25+
const topics = new Map();
26+
27+
const send = (conn, message) => {
28+
if (
29+
conn.readyState !== wsReadyStateConnecting &&
30+
conn.readyState !== wsReadyStateOpen
31+
) {
32+
conn.close();
33+
}
34+
try {
35+
conn.send(JSON.stringify(message));
36+
} catch (e) {
37+
conn.close();
38+
}
39+
};
40+
41+
const onconnection = (conn) => {
42+
const subscribedTopics = new Set();
43+
let closed = false;
44+
let pongReceived = true;
45+
const pingInterval = setInterval(() => {
46+
if (!pongReceived) {
47+
conn.close();
48+
clearInterval(pingInterval);
49+
} else {
50+
pongReceived = false;
51+
try {
52+
conn.ping();
53+
} catch (e) {
54+
conn.close();
55+
}
56+
}
57+
}, pingTimeout);
58+
conn.on("pong", () => {
59+
pongReceived = true;
60+
});
61+
conn.on("close", () => {
62+
subscribedTopics.forEach((topicName) => {
63+
const subs = topics.get(topicName) || new Set();
64+
subs.delete(conn);
65+
if (subs.size === 0) {
66+
topics.delete(topicName);
67+
}
68+
});
69+
subscribedTopics.clear();
70+
closed = true;
71+
});
72+
conn.on("message", (message) => {
73+
if (typeof message === "string" || message instanceof Buffer) {
74+
message = JSON.parse(message.toString());
75+
}
76+
if (message && message.type && !closed) {
77+
switch (message.type) {
78+
case "subscribe":
79+
(message.topics || []).forEach((topicName) => {
80+
if (typeof topicName === "string") {
81+
const topic = map.setIfUndefined(
82+
topics,
83+
topicName,
84+
() => new Set()
85+
);
86+
topic.add(conn);
87+
subscribedTopics.add(topicName);
88+
}
89+
});
90+
break;
91+
case "unsubscribe":
92+
(message.topics || []).forEach((topicName) => {
93+
const subs = topics.get(topicName);
94+
if (subs) {
95+
subs.delete(conn);
96+
}
97+
});
98+
break;
99+
case "publish":
100+
if (message.topic) {
101+
const receivers = topics.get(message.topic);
102+
if (receivers) {
103+
message.clients = receivers.size;
104+
receivers.forEach((receiver) => send(receiver, message));
105+
}
106+
}
107+
break;
108+
case "ping":
109+
send(conn, { type: "pong" });
110+
}
111+
}
112+
});
113+
};
114+
115+
wss.on("connection", onconnection);
116+
117+
server.on("upgrade", (request, socket, head) => {
118+
const handleAuth = (ws) => {
119+
wss.emit("connection", ws, request);
120+
};
121+
wss.handleUpgrade(request, socket, head, handleAuth);
122+
});
123+
124+
server.listen(port);
125+
126+
console.log("Signaling server running on localhost:", port);

0 commit comments

Comments
 (0)