unetmsg: add subscriber update callback to notify about publish events
authorFelix Fietkau <nbd@nbd.name>
Mon, 5 May 2025 11:55:42 +0000 (13:55 +0200)
committerFelix Fietkau <nbd@nbd.name>
Mon, 5 May 2025 11:57:08 +0000 (13:57 +0200)
When services start publishing on a topic, this can be used to allow
subscribers to query them.

Signed-off-by: Felix Fietkau <nbd@nbd.name>
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc

index 06c927297e0aa77e9cb7930de6ca5d8158d0b157..293763572fa25e7b8278f4f0175ad3ab9976ff04 100644 (file)
@@ -23,7 +23,7 @@ function publish(name, request_cb)
        this.channel.request("publish", { name });
 }
 
-function subscribe(name, message_cb)
+function subscribe(name, message_cb, update_cb)
 {
        if (!this.channel)
                this.connect();
@@ -31,8 +31,12 @@ function subscribe(name, message_cb)
        if (type(name) == "string")
                name = [ name ];
 
+       let cb = {
+               cb: message_cb,
+               update: update_cb
+       };
        for (let cur in name)
-               this.cb_sub[cur] = message_cb;
+               this.cb_sub[cur] = cb;
 
        if (!this.channel)
                return;
@@ -109,6 +113,12 @@ function connect()
 const client_proto = {
        connect, publish, subscribe, send, request,
        close: function() {
+               for (let sub in this.sub_cb) {
+                       if (!sub.timer)
+                               continue;
+                       sub.timer.cancel();
+                       delete sub.timer;
+               }
                if (this.channel)
                        this.channel.disconnect();
                this.connect_timer.cancel();
@@ -119,11 +129,29 @@ const client_proto = {
 
 function handle_request(cl, req)
 {
-       let cb;
+       let data, cb;
 
        switch (req.type) {
+       case "publish":
+               data = cl.cb_sub[req.args.name];
+               if (!data || data.timer)
+                       break;
+
+               cb = data.update;
+               if (!cb)
+                       return;
+
+               data.timer = uloop.timer(100, () => {
+                       delete data.timer;
+                       cb();
+               });
+               break;
        case "message":
-               cb = cl.cb_sub[req.args.name];
+               data = cl.cb_sub[req.args.name];
+               if (!data)
+                       break;
+
+               cb = data.cb;
                if (cb)
                        return cb(req);
                break;
index 8b428821520539a34dade5c738c609b4ec181be7..6da745a7709c09b5ec812e21e3d3054856ca33fa 100644 (file)
@@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names)
                cl_list[name] = core.pubsub_add(kind, name, proto({
                        client: cl.id,
                }, pubsub_proto));
+
+               if (kind == "publish")
+                       core.handle_publish(cl_list[name], name);
        }
 
        return 0;
@@ -101,8 +104,11 @@ function client_disconnect(id)
                return;
 
        for (let kind in [ "publish", "subscribe" ])
-               for (let name, data in cl[kind])
+               for (let name, data in cl[kind]) {
+                       if (kind == "publish")
+                               core.handle_publish(data, name);
                        core.pubsub_del(kind, name, data);
+               }
 
        delete clients[id];
 }
index edc034343b0458d3f0bfb0cc9d9c09ca6f017e86..18ee2a3684c948d2ea5878a532142b50f969f04d 100644 (file)
@@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req)
                if (!name)
                        return;
                if (args.enabled) {
-                       if (list[name])
+                       if (list[name]) {
+                               core.handle_publish(null, name);
                                return 0;
+                       }
 
                        let allowed = net.peers[host].allowed == null;
                        for (let cur in net.peers[host].allowed) {
@@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req)
                                network: sock_data.network,
                                name: host,
                        }, pubsub_proto);
+                       core.handle_publish(null, name);
                        list[name] = true;
                } else {
                        if (!list[name])
                                return 0;
+                       core.handle_publish(null, name);
                        delete core["remote_" + msgtype][name][host];
                        delete list[name];
                }
index 393a6ea47adf5b65f84cad65f813cec3c2b385c0..b81acb908eb49097f9c4aea1e7dd8aa4cd0e6f0c 100644 (file)
@@ -47,8 +47,8 @@ function new_handle(list, name, data)
 function pubsub_add(kind, name, data)
 {
        let list = this[kind];
-       if (!length(list[name])) {
-               list[name] = {};
+       if (!length(list[name]) || kind == "publish") {
+               list[name] ??= {};
                remote.pubsub_set(kind, name, true);
        }
        return new_handle(this[kind], name, data);
@@ -58,8 +58,8 @@ function pubsub_del(kind, name, data)
 {
        let list = this[kind][name];
        delete list[data._id];
-       if (!length(list))
-               remote.pubsub_set(kind, name, false);
+       if (!length(list) || kind == "publish")
+               remote.pubsub_set(kind, name, length(list) > 0);
 }
 
 function get_handles(handle, local, remote)
@@ -158,6 +158,27 @@ function handle_message(handle, data, remote)
        return 0;
 }
 
+function handle_publish(handle, name)
+{
+       let local = this.subscribe[name];
+       let handles = get_handles(handle, local);
+
+       for (let cur in handles) {
+               if (!cur || !cur.get_channel)
+                       continue;
+
+               let chan = cur.get_channel();
+               if (!chan)
+                       continue;
+
+               chan.request({
+                       method: "publish",
+                       return: "ignore",
+                       data: { name },
+               });
+       }
+}
+
 function add_acl(type, user, data)
 {
        if (!data || !user)
@@ -199,6 +220,7 @@ const core_proto = {
        pubsub_del,
        handle_request,
        handle_message,
+       handle_publish,
        dbg: function(msg) {
                if (this.debug_enabled)
                        warn(msg);