summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelix Fietkau2024-12-31 11:54:06 +0000
committerFelix Fietkau2025-01-02 12:40:21 +0000
commitafa57cce0aff82f4a7a0e509d4387ebc23dd3be7 (patch)
tree5e8a55021cdbf8a6241b21ef81df2bbf1c6bdabf
parentd996988ae55b6af12e4611fdd758a2f0ad27a9a1 (diff)
downloadubus-afa57cce0aff82f4a7a0e509d4387ebc23dd3be7.tar.gz
libubus: add support for using channels
A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. Signed-off-by: Felix Fietkau <nbd@nbd.name>
-rw-r--r--libubus-io.c5
-rw-r--r--libubus-obj.c26
-rw-r--r--libubus-req.c6
-rw-r--r--libubus-sub.c6
-rw-r--r--libubus.c74
-rw-r--r--libubus.h21
-rw-r--r--ubusmsg.h2
7 files changed, 128 insertions, 12 deletions
diff --git a/libubus-io.c b/libubus-io.c
index d190b67..16c1c14 100644
--- a/libubus-io.c
+++ b/libubus-io.c
@@ -401,6 +401,9 @@ int ubus_reconnect(struct ubus_context *ctx, const char *path)
struct blob_attr *buf;
int ret = UBUS_STATUS_UNKNOWN_ERROR;
+ if (ubus_context_is_channel(ctx))
+ return -1;
+
if (!path)
path = UBUS_UNIX_SOCKET;
@@ -435,7 +438,7 @@ int ubus_reconnect(struct ubus_context *ctx, const char *path)
goto out_free;
ctx->local_id = hdr.hdr.peer;
- if (!ctx->local_id)
+ if (ctx->local_id <= UBUS_CLIENT_ID_CHANNEL)
goto out_free;
ret = UBUS_STATUS_OK;
diff --git a/libubus-obj.c b/libubus-obj.c
index 29cbb2b..4a56110 100644
--- a/libubus-obj.c
+++ b/libubus-obj.c
@@ -57,12 +57,13 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
.fd = -1,
.req_fd = fd,
};
-
+ ubus_handler_t handler;
int method;
int ret;
bool no_reply = false;
- if (!obj) {
+ if ((!obj && !ubus_context_is_channel(ctx)) ||
+ (!ctx->request_handler && ubus_context_is_channel(ctx))) {
ret = UBUS_STATUS_NOT_FOUND;
goto send;
}
@@ -77,6 +78,12 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
req.peer = hdr->peer;
req.seq = hdr->seq;
+
+ if (ubus_context_is_channel(ctx)) {
+ handler = ctx->request_handler;
+ goto found;
+ }
+
req.object = obj->id;
if (attrbuf[UBUS_ATTR_USER] && attrbuf[UBUS_ATTR_GROUP]) {
req.acl.user = blobmsg_get_string(attrbuf[UBUS_ATTR_USER]);
@@ -86,8 +93,10 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
for (method = 0; method < obj->n_methods; method++)
if (!obj->methods[method].name ||
!strcmp(obj->methods[method].name,
- blob_data(attrbuf[UBUS_ATTR_METHOD])))
+ blob_data(attrbuf[UBUS_ATTR_METHOD]))) {
+ handler = obj->methods[method].handler;
goto found;
+ }
/* not found */
ret = UBUS_STATUS_METHOD_NOT_FOUND;
@@ -99,9 +108,8 @@ found:
goto send;
}
- ret = obj->methods[method].handler(ctx, obj, &req,
- blob_data(attrbuf[UBUS_ATTR_METHOD]),
- attrbuf[UBUS_ATTR_DATA]);
+ ret = handler(ctx, obj, &req, blob_data(attrbuf[UBUS_ATTR_METHOD]),
+ attrbuf[UBUS_ATTR_DATA]);
if (req.req_fd >= 0)
close(req.req_fd);
if (req.deferred || no_reply)
@@ -211,6 +219,9 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj)
struct ubus_request req;
int ret;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
blob_buf_init(&b, 0);
if (obj->name && obj->type) {
@@ -258,6 +269,9 @@ int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj)
struct ubus_request req;
int ret;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
blob_buf_init(&b, 0);
blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
diff --git a/libubus-req.c b/libubus-req.c
index 474aac2..3e8d55c 100644
--- a/libubus-req.c
+++ b/libubus-req.c
@@ -280,6 +280,9 @@ __ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj,
const char *type, struct blob_attr *msg,
struct ubus_notify_request *req, bool reply)
{
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
memset(req, 0, sizeof(*req));
blob_buf_init(&b, 0);
@@ -496,6 +499,9 @@ void __hidden ubus_process_req_msg(struct ubus_context *ctx, struct ubus_msghdr_
int __ubus_monitor(struct ubus_context *ctx, const char *type)
{
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
blob_buf_init(&b, 0);
return ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_MONITOR, type, b.head, NULL, NULL, 1000);
}
diff --git a/libubus-sub.c b/libubus-sub.c
index 80d1f1a..127a2de 100644
--- a/libubus-sub.c
+++ b/libubus-sub.c
@@ -79,6 +79,9 @@ int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s
struct ubus_object *obj = &s->obj;
int ret;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
INIT_LIST_HEAD(&s->list);
obj->methods = &watch_method;
obj->n_methods = 1;
@@ -104,6 +107,9 @@ __ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint
{
struct ubus_request req;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
blob_buf_init(&b, 0);
blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
blob_put_int32(&b, UBUS_ATTR_TARGET, id);
diff --git a/libubus.c b/libubus.c
index c9c3c6e..5d22660 100644
--- a/libubus.c
+++ b/libubus.c
@@ -14,6 +14,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
+#include <fcntl.h>
#include <libubox/blob.h>
#include <libubox/blobmsg.h>
@@ -98,9 +99,12 @@ ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd)
ubus_process_req_msg(ctx, buf, fd);
break;
- case UBUS_MSG_INVOKE:
case UBUS_MSG_UNSUBSCRIBE:
case UBUS_MSG_NOTIFY:
+ if (ubus_context_is_channel(ctx))
+ break;
+ /* fallthrough */
+ case UBUS_MSG_INVOKE:
if (ctx->stack_depth) {
ubus_queue_msg(ctx, buf);
break;
@@ -111,6 +115,9 @@ ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd)
ctx->stack_depth--;
break;
case UBUS_MSG_MONITOR:
+ if (ubus_context_is_channel(ctx))
+ break;
+
if (ctx->monitor_cb)
ctx->monitor_cb(ctx, buf->hdr.seq, buf->data);
break;
@@ -163,6 +170,9 @@ int ubus_lookup(struct ubus_context *ctx, const char *path,
{
struct ubus_lookup_request lookup;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
blob_buf_init(&b, 0);
if (path)
blob_put_string(&b, UBUS_ATTR_OBJPATH, path);
@@ -193,6 +203,9 @@ int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id)
{
struct ubus_request req;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
blob_buf_init(&b, 0);
if (path)
blob_put_string(&b, UBUS_ATTR_OBJPATH, path);
@@ -230,6 +243,9 @@ int ubus_register_event_handler(struct ubus_context *ctx,
struct blob_buf b2 = {};
int ret;
+ if (ubus_context_is_channel(ctx))
+ return UBUS_STATUS_INVALID_ARGUMENT;
+
if (!obj->id) {
obj->methods = &event_method;
obj->n_methods = 1;
@@ -281,7 +297,8 @@ static void ubus_default_connection_lost(struct ubus_context *ctx)
uloop_end();
}
-int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
+static int
+__ubus_ctx_init(struct ubus_context *ctx)
{
uloop_init();
memset(ctx, 0, sizeof(*ctx));
@@ -298,8 +315,16 @@ int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
INIT_LIST_HEAD(&ctx->requests);
INIT_LIST_HEAD(&ctx->pending);
- INIT_LIST_HEAD(&ctx->auto_subscribers);
avl_init(&ctx->objects, ubus_cmp_id, false, NULL);
+ return 0;
+}
+
+int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
+{
+ if (__ubus_ctx_init(ctx))
+ return -1;
+
+ INIT_LIST_HEAD(&ctx->auto_subscribers);
if (ubus_reconnect(ctx, path)) {
free(ctx->msgbuf.data);
ctx->msgbuf.data = NULL;
@@ -309,6 +334,49 @@ int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
return 0;
}
+int ubus_channel_connect(struct ubus_context *ctx, int fd,
+ ubus_handler_t handler)
+{
+ if (__ubus_ctx_init(ctx))
+ return -1;
+
+ if (ctx->sock.fd >= 0) {
+ if (ctx->sock.registered)
+ uloop_fd_delete(&ctx->sock);
+
+ close(ctx->sock.fd);
+ }
+
+ ctx->sock.eof = false;
+ ctx->sock.error = false;
+ ctx->sock.fd = fd;
+ ctx->local_id = UBUS_CLIENT_ID_CHANNEL;
+ ctx->request_handler = handler;
+
+ fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC);
+
+ return 0;
+}
+
+int ubus_channel_create(struct ubus_context *ctx, int *remote_fd,
+ ubus_handler_t handler)
+{
+ int sfd[2];
+
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, sfd))
+ return -1;
+
+ if (ubus_channel_connect(ctx, sfd[0], handler) < 0) {
+ close(sfd[0]);
+ close(sfd[1]);
+ return -1;
+ }
+
+ *remote_fd = sfd[1];
+
+ return 0;
+}
+
static void ubus_auto_reconnect_cb(struct uloop_timeout *timeout)
{
struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer);
diff --git a/libubus.h b/libubus.h
index fcf62c8..aa9263c 100644
--- a/libubus.h
+++ b/libubus.h
@@ -177,8 +177,15 @@ struct ubus_context {
uint32_t msgbuf_data_len;
int msgbuf_reduction_counter;
- struct list_head auto_subscribers;
- struct ubus_event_handler auto_subscribe_event_handler;
+ union {
+ struct {
+ struct list_head auto_subscribers;
+ struct ubus_event_handler auto_subscribe_event_handler;
+ };
+ struct {
+ ubus_handler_t request_handler;
+ };
+ };
};
struct ubus_object_data {
@@ -253,6 +260,16 @@ struct ubus_context *ubus_connect(const char *path);
int ubus_connect_ctx(struct ubus_context *ctx, const char *path);
void ubus_auto_connect(struct ubus_auto_conn *conn);
int ubus_reconnect(struct ubus_context *ctx, const char *path);
+int ubus_channel_connect(struct ubus_context *ctx, int fd,
+ ubus_handler_t handler);
+int ubus_channel_create(struct ubus_context *ctx, int *remote_fd,
+ ubus_handler_t handler);
+
+static inline bool
+ubus_context_is_channel(struct ubus_context *ctx)
+{
+ return ctx->local_id == UBUS_CLIENT_ID_CHANNEL;
+}
/* call this only for struct ubus_context pointers returned by ubus_connect() */
void ubus_free(struct ubus_context *ctx);
diff --git a/ubusmsg.h b/ubusmsg.h
index b2df8dc..8858511 100644
--- a/ubusmsg.h
+++ b/ubusmsg.h
@@ -21,6 +21,8 @@
#define UBUS_MSG_CHUNK_SIZE 65536
+#define UBUS_CLIENT_ID_CHANNEL 1
+
#define UBUS_SYSTEM_OBJECT_EVENT 1
#define UBUS_SYSTEM_OBJECT_ACL 2
#define UBUS_SYSTEM_OBJECT_MONITOR 3