2 * ustream - library for stream buffer management
4 * Copyright (C) 2012 Felix Fietkau <nbd@openwrt.org>
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
27 #define CB_PENDING_READ (1 << 0)
29 static void ustream_init_buf(struct ustream_buf
*buf
, int len
)
34 memset(buf
, 0, sizeof(*buf
));
35 buf
->data
= buf
->tail
= buf
->head
;
36 buf
->end
= buf
->head
+ len
;
40 static void ustream_add_buf(struct ustream_buf_list
*l
, struct ustream_buf
*buf
)
51 l
->data_tail
= l
->head
;
54 static bool ustream_can_alloc(struct ustream_buf_list
*l
)
56 if (l
->max_buffers
<= 0)
59 return (l
->buffers
< l
->max_buffers
);
62 static int ustream_alloc_default(struct ustream
*s
, struct ustream_buf_list
*l
)
64 struct ustream_buf
*buf
;
66 if (!ustream_can_alloc(l
))
69 buf
= malloc(sizeof(*buf
) + l
->buffer_len
+ s
->string_data
);
73 ustream_init_buf(buf
, l
->buffer_len
);
74 ustream_add_buf(l
, buf
);
79 static void ustream_free_buffers(struct ustream_buf_list
*l
)
81 struct ustream_buf
*buf
= l
->head
;
84 struct ustream_buf
*next
= buf
->next
;
94 void ustream_free(struct ustream
*s
)
99 uloop_timeout_cancel(&s
->state_change
);
100 ustream_free_buffers(&s
->r
);
101 ustream_free_buffers(&s
->w
);
104 static void ustream_state_change_cb(struct uloop_timeout
*t
)
106 struct ustream
*s
= container_of(t
, struct ustream
, state_change
);
109 ustream_free_buffers(&s
->w
);
114 void ustream_init_defaults(struct ustream
*s
)
116 #define DEFAULT_SET(_f, _default) \
122 DEFAULT_SET(s
->r
.alloc
, ustream_alloc_default
);
123 DEFAULT_SET(s
->w
.alloc
, ustream_alloc_default
);
125 DEFAULT_SET(s
->r
.min_buffers
, 1);
126 DEFAULT_SET(s
->r
.max_buffers
, 1);
127 DEFAULT_SET(s
->r
.buffer_len
, 4096);
129 DEFAULT_SET(s
->w
.min_buffers
, 2);
130 DEFAULT_SET(s
->w
.max_buffers
, -1);
131 DEFAULT_SET(s
->w
.buffer_len
, 256);
135 s
->state_change
.cb
= ustream_state_change_cb
;
136 s
->write_error
= false;
147 static bool ustream_should_move(struct ustream_buf_list
*l
, struct ustream_buf
*buf
, int len
)
152 /* nothing to squeeze */
153 if (buf
->data
== buf
->head
)
156 maxlen
= buf
->end
- buf
->head
;
157 offset
= buf
->data
- buf
->head
;
159 /* less than half is available */
160 if (offset
> maxlen
/ 2)
163 /* less than 32 bytes data but takes more than 1/4 space */
164 if (buf
->tail
- buf
->data
< 32 && offset
> maxlen
/ 4)
167 /* more buf is already in list or can be allocated */
168 if (buf
!= l
->tail
|| ustream_can_alloc(l
))
171 /* no need to move if len is available at the tail */
172 return (buf
->end
- buf
->tail
< len
);
175 static void ustream_free_buf(struct ustream_buf_list
*l
, struct ustream_buf
*buf
)
180 if (buf
== l
->data_tail
)
181 l
->data_tail
= buf
->next
;
186 if (--l
->buffers
>= l
->min_buffers
) {
192 ustream_init_buf(buf
, buf
->end
- buf
->head
);
193 ustream_add_buf(l
, buf
);
196 static void __ustream_set_read_blocked(struct ustream
*s
, unsigned char val
)
198 bool changed
= !!s
->read_blocked
!= !!val
;
200 s
->read_blocked
= val
;
202 s
->set_read_blocked(s
);
205 void ustream_set_read_blocked(struct ustream
*s
, bool set
)
207 unsigned char val
= s
->read_blocked
& ~READ_BLOCKED_USER
;
210 val
|= READ_BLOCKED_USER
;
212 __ustream_set_read_blocked(s
, val
);
215 void ustream_consume(struct ustream
*s
, int len
)
217 struct ustream_buf
*buf
= s
->r
.head
;
222 s
->r
.data_bytes
-= len
;
223 if (s
->r
.data_bytes
< 0)
227 struct ustream_buf
*next
= buf
->next
;
228 int buf_len
= buf
->tail
- buf
->data
;
236 ustream_free_buf(&s
->r
, buf
);
240 __ustream_set_read_blocked(s
, s
->read_blocked
& ~READ_BLOCKED_FULL
);
243 static void ustream_fixup_string(struct ustream
*s
, struct ustream_buf
*buf
)
251 static bool ustream_prepare_buf(struct ustream
*s
, struct ustream_buf_list
*l
, int len
)
253 struct ustream_buf
*buf
;
257 if (ustream_should_move(l
, buf
, len
)) {
258 int len
= buf
->tail
- buf
->data
;
260 memmove(buf
->head
, buf
->data
, len
);
261 buf
->data
= buf
->head
;
262 buf
->tail
= buf
->data
+ len
;
265 ustream_fixup_string(s
, buf
);
267 /* some chunks available at the tail */
268 if (buf
->tail
!= buf
->end
)
270 /* next buf available */
272 l
->data_tail
= buf
->next
;
277 if (!ustream_can_alloc(l
))
280 if (l
->alloc(s
, l
) < 0)
283 l
->data_tail
= l
->tail
;
287 char *ustream_reserve(struct ustream
*s
, int len
, int *maxlen
)
289 struct ustream_buf
*buf
;
291 if (!ustream_prepare_buf(s
, &s
->r
, len
)) {
292 __ustream_set_read_blocked(s
, s
->read_blocked
| READ_BLOCKED_FULL
);
297 buf
= s
->r
.data_tail
;
298 *maxlen
= buf
->end
- buf
->tail
;
302 void ustream_fill_read(struct ustream
*s
, int len
)
304 struct ustream_buf
*buf
= s
->r
.data_tail
;
307 s
->r
.data_bytes
+= len
;
312 maxlen
= buf
->end
- buf
->tail
;
318 ustream_fixup_string(s
, buf
);
320 s
->r
.data_tail
= buf
;
324 if (s
->notify_read
) {
325 if (s
->pending_cb
& CB_PENDING_READ
)
328 s
->pending_cb
|= CB_PENDING_READ
;
329 s
->notify_read(s
, s
->r
.data_bytes
);
330 s
->pending_cb
&= ~CB_PENDING_READ
;
334 char *ustream_get_read_buf(struct ustream
*s
, int *buflen
)
340 len
= s
->r
.head
->tail
- s
->r
.head
->data
;
342 data
= s
->r
.head
->data
;
351 int ustream_read(struct ustream
*s
, char *buf
, int buflen
)
358 chunk
= ustream_get_read_buf(s
, &chunk_len
);
361 if (chunk_len
> buflen
- len
)
362 chunk_len
= buflen
- len
;
363 memcpy(buf
+ len
, chunk
, chunk_len
);
364 ustream_consume(s
, chunk_len
);
366 } while (len
< buflen
);
371 static void ustream_write_error(struct ustream
*s
)
374 ustream_state_change(s
);
375 s
->write_error
= true;
378 bool ustream_write_pending(struct ustream
*s
)
380 struct ustream_buf
*buf
= s
->w
.head
;
386 while (buf
&& s
->w
.data_bytes
) {
387 struct ustream_buf
*next
= buf
->next
;
388 int maxlen
= buf
->tail
- buf
->data
;
390 len
= s
->write(s
, buf
->data
, maxlen
, !!buf
->next
);
392 ustream_write_error(s
);
400 s
->w
.data_bytes
-= len
;
406 ustream_free_buf(&s
->w
, buf
);
411 s
->notify_write(s
, wr
);
413 if (s
->eof
&& wr
&& !s
->w
.data_bytes
)
414 ustream_state_change(s
);
416 return !s
->w
.data_bytes
;
419 static int ustream_write_buffered(struct ustream
*s
, const char *data
, int len
, int wr
)
421 struct ustream_buf_list
*l
= &s
->w
;
422 struct ustream_buf
*buf
;
426 if (!ustream_prepare_buf(s
, &s
->w
, len
))
431 maxlen
= buf
->end
- buf
->tail
;
435 memcpy(buf
->tail
, data
, maxlen
);
440 l
->data_bytes
+= maxlen
;
446 int ustream_write(struct ustream
*s
, const char *data
, int len
, bool more
)
448 struct ustream_buf_list
*l
= &s
->w
;
454 if (!l
->data_bytes
) {
455 wr
= s
->write(s
, data
, len
, more
);
460 ustream_write_error(s
);
468 return ustream_write_buffered(s
, data
, len
, wr
);
471 #define MAX_STACK_BUFLEN 256
473 int ustream_vprintf(struct ustream
*s
, const char *format
, va_list arg
)
475 struct ustream_buf_list
*l
= &s
->w
;
478 int wr
, maxlen
, buflen
;
483 if (!l
->data_bytes
) {
484 buf
= alloca(MAX_STACK_BUFLEN
);
486 maxlen
= vsnprintf(buf
, MAX_STACK_BUFLEN
, format
, arg2
);
488 if (maxlen
< MAX_STACK_BUFLEN
) {
489 wr
= s
->write(s
, buf
, maxlen
, false);
491 ustream_write_error(s
);
499 return ustream_write_buffered(s
, buf
, maxlen
, wr
);
501 buf
= malloc(maxlen
+ 1);
504 wr
= vsnprintf(buf
, maxlen
+ 1, format
, arg
);
505 wr
= ustream_write(s
, buf
, wr
, false);
511 if (!ustream_prepare_buf(s
, l
, 1))
514 buf
= l
->data_tail
->tail
;
515 buflen
= l
->data_tail
->end
- buf
;
518 maxlen
= vsnprintf(buf
, buflen
, format
, arg2
);
525 l
->data_tail
->tail
+= wr
;
530 buf
= malloc(maxlen
+ 1);
533 maxlen
= vsnprintf(buf
, maxlen
+ 1, format
, arg
);
534 wr
= ustream_write_buffered(s
, buf
+ wr
, maxlen
- wr
, wr
);
540 int ustream_printf(struct ustream
*s
, const char *format
, ...)
548 va_start(arg
, format
);
549 ret
= ustream_vprintf(s
, format
, arg
);