Skip to content

Commit e41df76

Browse files
committed
compact_before_queue
1 parent ecf3d7b commit e41df76

File tree

4 files changed

+73
-116
lines changed

4 files changed

+73
-116
lines changed

.github/workflows/docker-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ jobs:
3636
push: true
3737
platforms: linux/amd64
3838
# tags: cdnbye/cbsignal_uws:latest
39-
tags: cdnbye/cbsignal_uws:5.3.1
39+
tags: cdnbye/cbsignal_uws:5.4.0

lib/broker/redis.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ async function getNodeClientCount(addr) {
6969
return Number(count)
7070
}
7171

72-
function setLocalPeer(peerId) {
72+
function setLocalPeer(peerId, extra) {
7373
if (!isAlive) return
74-
redisCli.set(keyForPeerId(peerId), selfAddr, "EX", PEER_EXPIRE_DUTATION);
74+
const value = extra ? `${selfAddr}:${extra}` : selfAddr;
75+
redisCli.set(keyForPeerId(peerId), value, "EX", PEER_EXPIRE_DUTATION);
7576
}
7677

7778
function delLocalPeer(peerId) {
@@ -169,7 +170,7 @@ async function getRemotePeerAddr(peerId) {
169170
}
170171
}
171172
// 如果是本节点
172-
if (addr === selfAddr) {
173+
if (addr.split(':')[0] === selfAddr) {
173174
return null
174175
}
175176
return addr

lib/hub.js

Lines changed: 67 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class Hub {
2222
doRegister(client) {
2323
// this.logger.info(`${client.peerId} doRegister`);
2424
this._map.set(client.peerId, client);
25-
redis.setLocalPeer(client.peerId);
25+
redis.setLocalPeer(client.peerId, client.isCompact ? '1' : undefined);
2626
}
2727

2828
doUnregister(peerId) {
@@ -57,7 +57,6 @@ class Hub {
5757
}
5858
const message = ProtoBuf.decode(b);
5959
const object = ProtoBuf.toObject(message);
60-
// this.sendMessageToLocalPeer(object.items);
6160
this.sendMessageToLocalPeerBatch(object.items);
6261
} else {
6362
// this.logger.info(`consume mq is empty!`);
@@ -79,37 +78,27 @@ class Hub {
7978
map.forEach((items, toPeerId) => {
8079
const client = this.getClient(toPeerId);
8180
if (client) {
82-
const data = items.map(item => {
83-
let json = JSON.parse(item.toString()); // pb => json
84-
if (client.isCompact) {
85-
// this.logger.warn(`origin: ${json}`);
86-
const compacted = this._compactSignalMsg(json);
87-
// this.logger.warn(`compacted: ${compacted}`);
88-
if (compacted) {
89-
// json.data = compacted;
90-
// json.from = json.from_peer_id;
91-
// json.from_peer_id = undefined;
92-
json = {
93-
data: compacted,
94-
from: json.from_peer_id,
95-
};
96-
// this.logger.warn(`compacted: ${raw}`);
97-
}
98-
}
99-
return json
100-
})
10181
let success;
102-
if (client.batchable && data.length > 1) {
82+
if (client.batchable && items.length > 1) {
10383
// 批量发送
84+
const data = items.map(item => {
85+
const json = JSON.parse(item.toString()); // pb => json
86+
json.action = undefined;
87+
if (json.from_peer_id) {
88+
json.from = json.from_peer_id;
89+
json.from_peer_id = undefined;
90+
}
91+
return json
92+
})
10493
success = client.sendMessage(JSON.stringify({
10594
action: 'signals',
10695
data,
10796
}));
10897
} else {
109-
for (let item of data) {
110-
item.action = 'signal';
111-
success = client.sendMessage(JSON.stringify(item));
112-
}
98+
items.forEach(item => {
99+
const json = JSON.parse(item.toString()); // pb => json
100+
success = client.sendMessage(JSON.stringify(json));
101+
})
113102
}
114103
if (!success && this.doUnregister(toPeerId)) {
115104
// this.logger.info(`sendMessage to ${item.toPeerId} failed, doUnregister`);
@@ -118,39 +107,6 @@ class Hub {
118107
})
119108
}
120109

121-
sendMessageToLocalPeer(items) {
122-
let client;
123-
for (let item of items) {
124-
if (client && client.peerId === item.toPeerId) {
125-
// this.logger.warn(`hit peer ${item.toPeerId} cache`);
126-
} else {
127-
client = this.getClient(item.toPeerId);
128-
}
129-
if (client) {
130-
// this.logger.info(`local peer ${item.toPeerId} found`);
131-
// compact
132-
let raw;
133-
if (client.isCompact) {
134-
const json = JSON.parse(item.data);
135-
const compacted = this._compactSignalMsg(json);
136-
if (compacted) {
137-
json.data = compacted;
138-
json.from = json.from_peer_id;
139-
json.from_peer_id = undefined;
140-
raw = JSON.stringify(json);
141-
// this.logger.warn(`compacted: ${raw}`);
142-
}
143-
} else {
144-
raw = item.data.toString();
145-
}
146-
const success = client.sendMessage(raw);
147-
if (!success && this.doUnregister(item.toPeerId)) {
148-
// this.logger.info(`sendMessage to ${item.toPeerId} failed, doUnregister`);
149-
}
150-
}
151-
}
152-
}
153-
154110
getClient(peerId) {
155111
return this._map.get(peerId)
156112
}
@@ -171,13 +127,6 @@ class Hub {
171127
return success
172128
}
173129

174-
_compactSignalMsg(msg) {
175-
if (msg.action === 'signal' && msg.data) {
176-
return compact(msg.data);
177-
}
178-
return null
179-
}
180-
181130
get numClient() {
182131
return this._map.size
183132
}
@@ -229,27 +178,21 @@ class Hub {
229178
}
230179
}
231180

232-
async processSignal(client, target, json, toPeerId, peerId) {
233-
const { data } = json
234-
if (!data) return
235-
if (data.candidate && !data.candidate.candidate) {
236-
return
237-
}
238-
if (target) {
239-
if (target.isCompact) {
240-
const compacted = compact(data);
241-
if (compacted) {
242-
// if (target.device === 'android') {
243-
// this.logger.warn(`origin: ${JSON.stringify(data)}`);
244-
// this.logger.warn(`compacted: ${compacted}`);
245-
// }
181+
_processJson(json, peerId, isCompact) {
182+
if (isCompact) {
183+
const compacted = compact(json.data);
184+
if (compacted) {
185+
// if (target.device === 'android') {
186+
// this.logger.warn(`origin: ${JSON.stringify(json.data)}`);
187+
// this.logger.warn(`compacted: ${compacted}`);
188+
// }
246189

247190
// const decompacted = decompact(compacted.data);
248191
// // this.logger.warn(`decompact: ${JSON.stringify(decompacted)}`);
249192
// // this.logger.warn(`------------------------------------------`);
250-
json.data = compacted;
251-
json.from_peer_id = undefined;
252-
json.from = peerId;
193+
json.data = compacted;
194+
json.from_peer_id = undefined;
195+
json.from = peerId;
253196
// const originData = (json.data.sdp || json.data.candidate.candidate)
254197
// try {
255198
// const result = compareStringArrays(originData.split("\r\n"), (decompacted.sdp || decompacted.candidate.candidate).split("\r\n"), {
@@ -273,25 +216,44 @@ class Hub {
273216
// console.error(JSON.stringify(json.data))
274217
// console.error(e)
275218
// }
276-
}
277219
}
278-
const success = this.sendJsonToClient(target, json);
220+
}
221+
return json
222+
}
223+
224+
async _getRemotePeerInfo(toPeerId) {
225+
const info = await redis.getRemotePeerAddr(toPeerId);
226+
if (info) {
227+
const [addr, extra] = info.split(':');
228+
const isCompact = !!extra;
229+
const node = await this.nodes.getNode(addr);
230+
if (node) {
231+
return { node: node, isCompact }
232+
}
233+
}
234+
return { node: undefined }
235+
}
236+
237+
async processSignal(client, target, json, toPeerId, peerId) {
238+
const { data } = json
239+
if (!data) return
240+
if (data.candidate && !data.candidate.candidate) {
241+
return
242+
}
243+
if (target) {
244+
const success = this.sendJsonToClient(target, this._processJson(json, peerId, target.isCompact));
279245
if (!success) {
280246
this._handlePeerNotFound(client, toPeerId, peerId);
281247
}
282248
return success
283249
}
284250
if (redis.getIsAlive()) {
285-
const addr = await redis.getRemotePeerAddr(toPeerId);
286-
// console.warn(`getRemotePeerAddr toPeerId ${toPeerId} addr ${addr}`)
287-
if (addr) {
288-
const node = await this.nodes.getNode(addr);
289-
if (node) {
290-
return node.sendMsgSignal(json, toPeerId)
291-
}
292-
this._handlePeerNotFound(client, toPeerId, peerId);
293-
return false
251+
const { node, isCompact } = await this._getRemotePeerInfo(toPeerId);
252+
if (node) {
253+
return node.sendMsgSignal(this._processJson(json, peerId, isCompact), toPeerId)
294254
}
255+
this._handlePeerNotFound(client, toPeerId, peerId);
256+
return false
295257
}
296258
// this.logger.info(`peer ${toPeerId} not found`);
297259
this._handlePeerNotFound(client, toPeerId, peerId);
@@ -315,16 +277,13 @@ class Hub {
315277
return this.sendJsonToClient(target, json);
316278
}
317279
if (redis.getIsAlive()) {
318-
const addr = await redis.getRemotePeerAddr(toPeerId);
319-
if (addr) {
320-
const node = await this.nodes.getNode(addr);
321-
if (node) {
322-
if (!node.sendMsgSignal(json, toPeerId)) {
323-
// return
324-
}
325-
} else {
326-
// this.logger.info(`node ${addr} not found`);
280+
const { node } = await this._getRemotePeerInfo(toPeerId);
281+
if (node) {
282+
if (!node.sendMsgSignal(json, toPeerId)) {
283+
// return
327284
}
285+
} else {
286+
// this.logger.info(`node ${addr} not found`);
328287
}
329288
}
330289
}
@@ -342,14 +301,11 @@ class Hub {
342301
}
343302
// polling to another worker
344303
if (redis.getIsAlive()) {
345-
const addr = await redis.getRemotePeerAddr(peerId);
346-
if (addr) {
347-
const node = await this.nodes.getNode(addr);
348-
if (node) {
349-
// this.logger.info(`polling to another worker, send peer not found`);
350-
if (!node.sendMsgSignal(msg, peerId)) {
351-
// this.logger.warn(`sendMsgSignal to remote ${addr} failed`);
352-
}
304+
const { node } = await this._getRemotePeerInfo(peerId);
305+
if (node) {
306+
// this.logger.info(`polling to another worker, send peer not found`);
307+
if (!node.sendMsgSignal(msg, peerId)) {
308+
// this.logger.warn(`sendMsgSignal to remote ${addr} failed`);
353309
}
354310
}
355311
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "cbsignal_uws",
3-
"version": "5.3.1",
3+
"version": "5.4.0",
44
"description": "SwarmCloud signaling server using uWebSockets.js",
55
"main": "index.js",
66
"bin": "index.js",

0 commit comments

Comments
 (0)