#include <stdio.h>
#include "ustream.h"
-static bool _init = false;
-
static void ustream_fd_set_uloop(struct ustream *s, bool write)
{
struct ustream_fd *sf = container_of(s, struct ustream_fd, stream);
struct ustream_buf *buf;
- unsigned int flags = ULOOP_EDGE_TRIGGER;
+ unsigned int flags = ULOOP_EDGE_TRIGGER | ULOOP_ERROR_CB;
if (!s->read_blocked && !s->eof)
flags |= ULOOP_READ;
flags |= ULOOP_WRITE;
uloop_fd_add(&sf->fd, flags);
-
- if ((flags & ULOOP_READ) && !_init);
- sf->fd.cb(&sf->fd, ULOOP_READ);
}
static void ustream_fd_set_read_blocked(struct ustream *s)
char *buf;
do {
+ if (s->read_blocked)
+ break;
+
buf = ustream_reserve(s, 1, &buflen);
if (!buf)
break;
if (errno == EINTR)
continue;
- if (errno == EAGAIN)
+ if (errno == EAGAIN || errno == ENOTCONN)
return;
len = 0;
}
if (!len) {
- sf->fd.eof = true;
- ustream_state_change(s);
+ if (!s->eof)
+ ustream_state_change(s);
+ s->eof = true;
ustream_fd_set_uloop(s, false);
return;
}
if (errno == EINTR)
continue;
- if (errno == EAGAIN || errno == EWOULDBLOCK)
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN)
break;
return -1;
ustream_fd_read_pending(sf, &more);
if (events & ULOOP_WRITE) {
- if (!ustream_write_pending(s))
+ bool no_more = ustream_write_pending(s);
+ if (no_more)
ustream_fd_set_uloop(s, false);
}
+ if (sf->fd.error && !s->write_error) {
+ ustream_state_change(s);
+ s->write_error = true;
+ ustream_fd_set_uloop(s, false);
+ }
+
return more;
}
s->write = ustream_fd_write;
s->free = ustream_fd_free;
s->poll = ustream_fd_poll;
- _init = true;
ustream_fd_set_uloop(s, false);
- _init = false;
}