ubusd: add lookup command queuing support
authorDaniel Danzberger <daniel@dd-wrt.com>
Wed, 8 Jun 2022 11:12:29 +0000 (13:12 +0200)
committerFelix Fietkau <nbd@nbd.name>
Wed, 15 Jun 2022 17:59:40 +0000 (19:59 +0200)
Defers and continues a client's lookup command to avoid unnecessary
buffering under load.

Signed-off-by: Daniel Danzberger <daniel@dd-wrt.com>
Signed-off-by: Felix Fietkau <nbd@nbd.name>
ubusd.h
ubusd_main.c
ubusd_proto.c

diff --git a/ubusd.h b/ubusd.h
index c5d6d2a49e4162b55679a315e1c243cde8e513c0..f43b9360d3367db0a1b70cf2491326b306a54a12 100644 (file)
--- a/ubusd.h
+++ b/ubusd.h
@@ -41,6 +41,12 @@ struct ubus_msg_buf_list {
        struct ubus_msg_buf *msg;
 };
 
+struct ubus_client_cmd {
+       struct list_head list;
+       struct ubus_msg_buf *msg;
+       struct ubus_object *obj;
+};
+
 struct ubus_client {
        struct ubus_id id;
        struct uloop_fd sock;
@@ -53,6 +59,7 @@ struct ubus_client {
 
        struct list_head objects;
 
+       struct list_head cmd_queue;
        struct list_head tx_queue;
        unsigned int txq_ofs;
        unsigned int txq_len;
@@ -86,6 +93,7 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub
 void ubusd_proto_free_client(struct ubus_client *cl);
 void ubus_proto_send_msg_from_blob(struct ubus_client *cl, struct ubus_msg_buf *ub,
                                   uint8_t type);
+int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd);
 
 typedef struct ubus_msg_buf *(*event_fill_cb)(void *priv, const char *id);
 void ubusd_event_init(void);
index 6b132ced03d1b48987bc3f2c59dbe8a5c3939cba..adbd2932be3d22a1317ca8a33be50fb30662f7cd 100644 (file)
@@ -32,6 +32,28 @@ static void handle_client_disconnect(struct ubus_client *cl)
        free(cl);
 }
 
+static void ubus_client_cmd_free(struct ubus_client_cmd *cmd)
+{
+       list_del(&cmd->list);
+       ubus_msg_free(cmd->msg);
+       free(cmd);
+}
+
+static void ubus_client_cmd_queue_process(struct ubus_client *cl)
+{
+       struct ubus_client_cmd *cmd, *tmp;
+
+       list_for_each_entry_safe(cmd, tmp, &cl->cmd_queue, list) {
+               int ret = ubusd_cmd_lookup(cl, cmd);
+
+               /* Stop if the last command caused buffering again */
+               if (ret == -2)
+                       break;
+
+               ubus_client_cmd_free(cmd);
+       }
+}
+
 static void client_cb(struct uloop_fd *sock, unsigned int events)
 {
        struct ubus_client *cl = container_of(sock, struct ubus_client, sock);
@@ -82,10 +104,15 @@ static void client_cb(struct uloop_fd *sock, unsigned int events)
                ubus_msg_list_free(ubl);
        }
 
-       /* prevent further ULOOP_WRITE events if we don't have data
-        * to send anymore */
-       if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE))
-               uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
+       if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) {
+               /* Process queued commands */
+               ubus_client_cmd_queue_process(cl);
+
+               /* prevent further ULOOP_WRITE events if we don't have data
+                * to send anymore */
+               if (list_empty(&cl->tx_queue))
+                       uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
+       }
 
 retry:
        if (!sock->eof && cl->pending_msg_offset < (int) sizeof(cl->hdrbuf)) {
index b20f91c793982159515d8afe8bae92e101223dda..48de9b96141c8011061ac9ae7d9885d5a0eb6b12 100644 (file)
@@ -186,16 +186,56 @@ static void ubusd_send_obj(struct ubus_client *cl, struct ubus_msg_buf *ub, stru
                ubus_proto_send_msg_from_blob(cl, ub, UBUS_MSG_DATA);
 }
 
-static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
+static int ubus_client_cmd_queue_add(struct ubus_client *cl,
+                                       struct ubus_msg_buf *msg,
+                                       struct ubus_object *obj)
 {
-       struct ubus_object *obj;
+       struct ubus_client_cmd *cmd = malloc(sizeof(*cmd));
+
+       if (cmd) {
+               cmd->msg = msg;
+               cmd->obj = obj;
+               list_add_tail(&cmd->list, &cl->cmd_queue);
+               return -2;
+       }
+       return UBUS_STATUS_UNKNOWN_ERROR;
+}
+
+static int __ubusd_handle_lookup(struct ubus_client *cl,
+                               struct ubus_msg_buf *ub,
+                               struct blob_attr **attr,
+                               struct ubus_client_cmd *cmd)
+{
+       struct ubus_object *obj = NULL;
        char *objpath;
        bool found = false;
        int len;
 
        if (!attr[UBUS_ATTR_OBJPATH]) {
-               avl_for_each_element(&path, obj, path)
-                       ubusd_send_obj(cl, ub, obj);
+               if (cmd)
+                       obj = cmd->obj;
+
+               /* Start from beginning or continue from the last object */
+               if (obj == NULL)
+                       obj = avl_first_element(&path, obj, path);
+
+               avl_for_element_range(obj, avl_last_element(&path, obj, path), obj, path) {
+                       /* Keep sending objects until buffering starts */
+                       if (list_empty(&cl->tx_queue)) {
+                               ubusd_send_obj(cl, ub, obj);
+                       } else {
+                               /* Queue command and continue on the next call */
+                               int ret;
+
+                               if (cmd == NULL) {
+                                       ret = ubus_client_cmd_queue_add(cl, ub, obj);
+                               } else {
+                                       cmd->obj = obj;
+                                       ret = -2;
+                               }
+                               return ret;
+                       }
+               }
                return 0;
        }
 
@@ -230,6 +270,40 @@ static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub,
        return 0;
 }
 
+static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
+{
+       int rc;
+
+       if (list_empty(&cl->tx_queue))
+               rc = __ubusd_handle_lookup(cl, ub, attr, NULL);
+       else
+               rc = ubus_client_cmd_queue_add(cl, ub, NULL);
+
+       return rc;
+}
+
+int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd)
+{
+       struct ubus_msg_buf *ub = cmd->msg;
+       struct blob_attr **attr;
+       int ret;
+
+       attr = ubus_parse_msg(ub->data, blob_raw_len(ub->data));
+       ret = __ubusd_handle_lookup(cl, ub, attr, cmd);
+
+       if (ret != -2) {
+               struct ubus_msg_buf *retmsg = cl->retmsg;
+               int *retmsg_data = blob_data(blob_data(retmsg->data));
+
+               retmsg->hdr.seq = ub->hdr.seq;
+               retmsg->hdr.peer = ub->hdr.peer;
+
+               *retmsg_data = htonl(ret);
+               ubus_msg_send(cl, retmsg);
+       }
+       return ret;
+}
+
 static void
 ubusd_forward_invoke(struct ubus_client *cl, struct ubus_object *obj,
                     const char *method, struct ubus_msg_buf *ub,
@@ -458,6 +532,10 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub
        else
                ret = UBUS_STATUS_INVALID_COMMAND;
 
+       /* Command has not been completed yet and got queued */
+       if (ret == -2)
+               return;
+
        ubus_msg_free(ub);
 
        if (ret == -1)
@@ -495,6 +573,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
                goto free;
 
        INIT_LIST_HEAD(&cl->objects);
+       INIT_LIST_HEAD(&cl->cmd_queue);
        INIT_LIST_HEAD(&cl->tx_queue);
        cl->sock.fd = fd;
        cl->sock.cb = cb;