summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelix Fietkau2025-05-05 11:55:42 +0000
committerFelix Fietkau2025-05-05 11:57:08 +0000
commit06f44f69b6fe33b07d85c5b6ed0eb49a199a532e (patch)
treeb7d6965e972e2ab844ce1592f2cd3b867dc708eb
parent6fcaf3d589f9cd5c9e5cdf89cc0e651a97175174 (diff)
downloadopenwrt-06f44f69b6fe33b07d85c5b6ed0eb49a199a532e.tar.gz
unetmsg: add subscriber update callback to notify about publish events
When services start publishing on a topic, this can be used to allow subscribers to query them. Signed-off-by: Felix Fietkau <nbd@nbd.name>
-rw-r--r--package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc36
-rw-r--r--package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc8
-rw-r--r--package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc6
-rw-r--r--package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc30
4 files changed, 70 insertions, 10 deletions
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
index 06c927297e..293763572f 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
@@ -23,7 +23,7 @@ function publish(name, request_cb)
this.channel.request("publish", { name });
}
-function subscribe(name, message_cb)
+function subscribe(name, message_cb, update_cb)
{
if (!this.channel)
this.connect();
@@ -31,8 +31,12 @@ function subscribe(name, message_cb)
if (type(name) == "string")
name = [ name ];
+ let cb = {
+ cb: message_cb,
+ update: update_cb
+ };
for (let cur in name)
- this.cb_sub[cur] = message_cb;
+ this.cb_sub[cur] = cb;
if (!this.channel)
return;
@@ -109,6 +113,12 @@ function connect()
const client_proto = {
connect, publish, subscribe, send, request,
close: function() {
+ for (let sub in this.sub_cb) {
+ if (!sub.timer)
+ continue;
+ sub.timer.cancel();
+ delete sub.timer;
+ }
if (this.channel)
this.channel.disconnect();
this.connect_timer.cancel();
@@ -119,11 +129,29 @@ const client_proto = {
function handle_request(cl, req)
{
- let cb;
+ let data, cb;
switch (req.type) {
+ case "publish":
+ data = cl.cb_sub[req.args.name];
+ if (!data || data.timer)
+ break;
+
+ cb = data.update;
+ if (!cb)
+ return;
+
+ data.timer = uloop.timer(100, () => {
+ delete data.timer;
+ cb();
+ });
+ break;
case "message":
- cb = cl.cb_sub[req.args.name];
+ data = cl.cb_sub[req.args.name];
+ if (!data)
+ break;
+
+ cb = data.cb;
if (cb)
return cb(req);
break;
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
index 8b42882152..6da745a770 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
@@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names)
cl_list[name] = core.pubsub_add(kind, name, proto({
client: cl.id,
}, pubsub_proto));
+
+ if (kind == "publish")
+ core.handle_publish(cl_list[name], name);
}
return 0;
@@ -101,8 +104,11 @@ function client_disconnect(id)
return;
for (let kind in [ "publish", "subscribe" ])
- for (let name, data in cl[kind])
+ for (let name, data in cl[kind]) {
+ if (kind == "publish")
+ core.handle_publish(data, name);
core.pubsub_del(kind, name, data);
+ }
delete clients[id];
}
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
index edc034343b..18ee2a3684 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
@@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req)
if (!name)
return;
if (args.enabled) {
- if (list[name])
+ if (list[name]) {
+ core.handle_publish(null, name);
return 0;
+ }
let allowed = net.peers[host].allowed == null;
for (let cur in net.peers[host].allowed) {
@@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req)
network: sock_data.network,
name: host,
}, pubsub_proto);
+ core.handle_publish(null, name);
list[name] = true;
} else {
if (!list[name])
return 0;
+ core.handle_publish(null, name);
delete core["remote_" + msgtype][name][host];
delete list[name];
}
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
index 393a6ea47a..b81acb908e 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
@@ -47,8 +47,8 @@ function new_handle(list, name, data)
function pubsub_add(kind, name, data)
{
let list = this[kind];
- if (!length(list[name])) {
- list[name] = {};
+ if (!length(list[name]) || kind == "publish") {
+ list[name] ??= {};
remote.pubsub_set(kind, name, true);
}
return new_handle(this[kind], name, data);
@@ -58,8 +58,8 @@ function pubsub_del(kind, name, data)
{
let list = this[kind][name];
delete list[data._id];
- if (!length(list))
- remote.pubsub_set(kind, name, false);
+ if (!length(list) || kind == "publish")
+ remote.pubsub_set(kind, name, length(list) > 0);
}
function get_handles(handle, local, remote)
@@ -158,6 +158,27 @@ function handle_message(handle, data, remote)
return 0;
}
+function handle_publish(handle, name)
+{
+ let local = this.subscribe[name];
+ let handles = get_handles(handle, local);
+
+ for (let cur in handles) {
+ if (!cur || !cur.get_channel)
+ continue;
+
+ let chan = cur.get_channel();
+ if (!chan)
+ continue;
+
+ chan.request({
+ method: "publish",
+ return: "ignore",
+ data: { name },
+ });
+ }
+}
+
function add_acl(type, user, data)
{
if (!data || !user)
@@ -199,6 +220,7 @@ const core_proto = {
pubsub_del,
handle_request,
handle_message,
+ handle_publish,
dbg: function(msg) {
if (this.debug_enabled)
warn(msg);