Skip to content

Commit 06f44f6

Browse files
committed
unetmsg: add subscriber update callback to notify about publish events
When services start publishing on a topic, this can be used to allow subscribers to query them. Signed-off-by: Felix Fietkau <[email protected]>
1 parent 6fcaf3d commit 06f44f6

File tree

4 files changed

+70
-10
lines changed

4 files changed

+70
-10
lines changed

package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@ function publish(name, request_cb)
2323
this.channel.request("publish", { name });
2424
}
2525

26-
function subscribe(name, message_cb)
26+
function subscribe(name, message_cb, update_cb)
2727
{
2828
if (!this.channel)
2929
this.connect();
3030

3131
if (type(name) == "string")
3232
name = [ name ];
3333

34+
let cb = {
35+
cb: message_cb,
36+
update: update_cb
37+
};
3438
for (let cur in name)
35-
this.cb_sub[cur] = message_cb;
39+
this.cb_sub[cur] = cb;
3640

3741
if (!this.channel)
3842
return;
@@ -109,6 +113,12 @@ function connect()
109113
const client_proto = {
110114
connect, publish, subscribe, send, request,
111115
close: function() {
116+
for (let sub in this.sub_cb) {
117+
if (!sub.timer)
118+
continue;
119+
sub.timer.cancel();
120+
delete sub.timer;
121+
}
112122
if (this.channel)
113123
this.channel.disconnect();
114124
this.connect_timer.cancel();
@@ -119,11 +129,29 @@ const client_proto = {
119129

120130
function handle_request(cl, req)
121131
{
122-
let cb;
132+
let data, cb;
123133

124134
switch (req.type) {
135+
case "publish":
136+
data = cl.cb_sub[req.args.name];
137+
if (!data || data.timer)
138+
break;
139+
140+
cb = data.update;
141+
if (!cb)
142+
return;
143+
144+
data.timer = uloop.timer(100, () => {
145+
delete data.timer;
146+
cb();
147+
});
148+
break;
125149
case "message":
126-
cb = cl.cb_sub[req.args.name];
150+
data = cl.cb_sub[req.args.name];
151+
if (!data)
152+
break;
153+
154+
cb = data.cb;
127155
if (cb)
128156
return cb(req);
129157
break;

package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names)
4040
cl_list[name] = core.pubsub_add(kind, name, proto({
4141
client: cl.id,
4242
}, pubsub_proto));
43+
44+
if (kind == "publish")
45+
core.handle_publish(cl_list[name], name);
4346
}
4447

4548
return 0;
@@ -101,8 +104,11 @@ function client_disconnect(id)
101104
return;
102105

103106
for (let kind in [ "publish", "subscribe" ])
104-
for (let name, data in cl[kind])
107+
for (let name, data in cl[kind]) {
108+
if (kind == "publish")
109+
core.handle_publish(data, name);
105110
core.pubsub_del(kind, name, data);
111+
}
106112

107113
delete clients[id];
108114
}

package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req)
9696
if (!name)
9797
return;
9898
if (args.enabled) {
99-
if (list[name])
99+
if (list[name]) {
100+
core.handle_publish(null, name);
100101
return 0;
102+
}
101103

102104
let allowed = net.peers[host].allowed == null;
103105
for (let cur in net.peers[host].allowed) {
@@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req)
114116
network: sock_data.network,
115117
name: host,
116118
}, pubsub_proto);
119+
core.handle_publish(null, name);
117120
list[name] = true;
118121
} else {
119122
if (!list[name])
120123
return 0;
124+
core.handle_publish(null, name);
121125
delete core["remote_" + msgtype][name][host];
122126
delete list[name];
123127
}

package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ function new_handle(list, name, data)
4747
function pubsub_add(kind, name, data)
4848
{
4949
let list = this[kind];
50-
if (!length(list[name])) {
51-
list[name] = {};
50+
if (!length(list[name]) || kind == "publish") {
51+
list[name] ??= {};
5252
remote.pubsub_set(kind, name, true);
5353
}
5454
return new_handle(this[kind], name, data);
@@ -58,8 +58,8 @@ function pubsub_del(kind, name, data)
5858
{
5959
let list = this[kind][name];
6060
delete list[data._id];
61-
if (!length(list))
62-
remote.pubsub_set(kind, name, false);
61+
if (!length(list) || kind == "publish")
62+
remote.pubsub_set(kind, name, length(list) > 0);
6363
}
6464

6565
function get_handles(handle, local, remote)
@@ -158,6 +158,27 @@ function handle_message(handle, data, remote)
158158
return 0;
159159
}
160160

161+
function handle_publish(handle, name)
162+
{
163+
let local = this.subscribe[name];
164+
let handles = get_handles(handle, local);
165+
166+
for (let cur in handles) {
167+
if (!cur || !cur.get_channel)
168+
continue;
169+
170+
let chan = cur.get_channel();
171+
if (!chan)
172+
continue;
173+
174+
chan.request({
175+
method: "publish",
176+
return: "ignore",
177+
data: { name },
178+
});
179+
}
180+
}
181+
161182
function add_acl(type, user, data)
162183
{
163184
if (!data || !user)
@@ -199,6 +220,7 @@ const core_proto = {
199220
pubsub_del,
200221
handle_request,
201222
handle_message,
223+
handle_publish,
202224
dbg: function(msg) {
203225
if (this.debug_enabled)
204226
warn(msg);

0 commit comments

Comments
 (0)