Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ CHECK_CONST_EXISTS(KERN_ARND sys/sysctl.h EVENT__HAVE_DECL_KERN_ARND)
CHECK_CONST_EXISTS(KERN_RANDOM sys/sysctl.h EVENT__HAVE_DECL_KERN_RANDOM)
CHECK_CONST_EXISTS(RANDOM_UUID sys/sysctl.h EVENT__HAVE_DECL_RANDOM_UUID)
CHECK_SYMBOL_EXISTS(F_SETFD fcntl.h EVENT__HAVE_SETFD)
CHECK_CONST_EXISTS(SO_TIMESTAMP sys/socket.h EVENT__HAVE_DECL_SO_TIMESTAMP)
CHECK_CONST_EXISTS(SO_TIMESTAMPNS sys/socket.h EVENT__HAVE_DECL_SO_TIMESTAMPNS)

CHECK_TYPE_SIZE(fd_mask EVENT__HAVE_FD_MASK)

Expand Down
179 changes: 174 additions & 5 deletions buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,14 @@ advance_last_with_data(struct evbuffer *buf)
int
evbuffer_commit_space(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs)
{
return evbuffer_commit_space_with_timespec(buf, vec, n_vecs, NULL);
}

int
evbuffer_commit_space_with_timespec(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs,
const struct timespec *ts)
{
struct evbuffer_chain *chain, **firstchainp, **chainp;
int result = -1;
Expand All @@ -744,6 +752,10 @@ evbuffer_commit_space(struct evbuffer *buf,
goto done;
buf->last->off += vec[0].iov_len;
added = vec[0].iov_len;
if (ts && added && buf->last->timestamp.valid == 0) {
buf->last->timestamp.ts = *ts;
buf->last->timestamp.valid = 1;
}
if (added)
advance_last_with_data(buf);
goto okay;
Expand Down Expand Up @@ -773,6 +785,10 @@ evbuffer_commit_space(struct evbuffer *buf,
for (i=0; i<n_vecs; ++i) {
(*chainp)->off += vec[i].iov_len;
added += vec[i].iov_len;
if (ts && vec[i].iov_len && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = *ts;
(*chainp)->timestamp.valid = 1;
}
Comment on lines 785 to +791
if (vec[i].iov_len) {
buf->last_with_datap = chainp;
}
Expand Down Expand Up @@ -940,6 +956,7 @@ APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
tmp->off = chain->off;
tmp->flags |= EVBUFFER_MULTICAST|EVBUFFER_IMMUTABLE;
tmp->buffer = chain->buffer;
tmp->timestamp = chain->timestamp;
evbuffer_chain_insert(dst, tmp);
}
}
Expand Down Expand Up @@ -1145,6 +1162,7 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
EVUTIL_ASSERT(remaining == 0);
chain->misalign += chain->off;
chain->off = 0;
chain->timestamp.valid = 0;
break;
} else
evbuffer_chain_free(chain);
Expand Down Expand Up @@ -1386,6 +1404,11 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}

if (CHAIN_PINNED(chain)) {
/* Pinned chain case: expand in-place by appending data from
* subsequent chains. Timestamps from subsequent chains being
* consolidated are intentionally discarded; only this chain's
* timestamp is preserved as it contains the oldest data.
*/
size_t old_off = chain->off;
if (CHAIN_SPACE_LEN(chain) < size - chain->off) {
/* not enough room at end of chunk. */
Expand All @@ -1397,6 +1420,11 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
size -= old_off;
chain = chain->next;
} else if (chain->buffer_len - chain->misalign >= (size_t)size) {
/* Sufficient space case: expand in-place without reallocation
* by appending data from subsequent chains. Timestamps from
* subsequent chains being consolidated are intentionally discarded;
* only this chain's timestamp is preserved.
*/
/* already have enough space in the first chain */
size_t old_off = chain->off;
buffer = chain->buffer + chain->misalign + chain->off;
Expand All @@ -1411,11 +1439,21 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}
buffer = tmp->buffer;
tmp->off = size;
/* Preserve timestamp from the original first chain */
if (chain->timestamp.valid) {
tmp->timestamp = chain->timestamp;
}
buf->first = tmp;
}

/* TODO(niels): deal with buffers that point to NULL like sendfile */

/* Note: When consolidating multiple chains into one during pullup,
* only the timestamp from the first (oldest) chain is preserved.
* Timestamps from subsequent chains are intentionally discarded.
* This design choice keeps the API simple by tracking only the
* receipt time of the oldest data in the buffer.
*/
/* Copy and free every chunk that will be entirely pulled into tmp */
last_with_data = *buf->last_with_datap;
for (; chain != NULL && (size_t)size >= chain->off; chain = next) {
Expand Down Expand Up @@ -2276,10 +2314,41 @@ get_n_bytes_readable_on_socket(evutil_socket_t fd)
#endif
}

/* TODO(niels): should this function return ev_ssize_t and take ev_ssize_t
* as howmuch? */
int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
/**
* Reads data from a socket optionally with kernel timestamp support.
*
* @param buf the evbuffer to populate
* @param fd the file descriptor to use
* @param howmuch the amount of data to read (this will be adjusted;
* see below)
* @param use_recvmsg try to use recvmsg to read the data (and try to
* read kernel timestamps)
*
* If howmuch is less than 0 or greater than EVBUFFER_MAX_READ we'll try
* to read EVBUFFER_MAX_READ bytes (there may be less available on the
*socket)
*
* If use_recvmsg is nonzero, it attempts to retrieve the SO_TIMESTAMPNS or
* SO_TIMESTAMP ancillary data from recvmsg() and associate it with the buffer
* data. The
* timestamp is stored per-chain, assigned only to the first chain
* that doesn't already have a valid timestamp.
*
* NOTE: Multiple recvmsg() calls may append data into the same chain when
* evbuffer_expand_fast_() finds existing chains with free space. This is
* intentional for memory efficiency and avoids excessive chain allocation.
* As a result, a chain may contain data from multiple recvmsg() calls with
* different timestamps; only the timestamp of the first recvmsg() call that
* filled that chain is retained. After draining data from the buffer,
* evbuffer_get_timestamp() will return the timestamp of the first chain,
* which may not reflect the actual timestamp of the remaining bytes if they
* were received in a later recvmsg() call.
*
* TODO(niels): should this function return ev_ssize_t and take ev_ssize_t
* as howmuch?
*/
static int
evbuffer_read_impl_(struct evbuffer *buf, evutil_socket_t fd, int howmuch, int use_recvmsg)
{
struct evbuffer_chain **chainp;
int n;
Expand All @@ -2291,6 +2360,9 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
struct evbuffer_chain *chain;
unsigned char *p;
#endif
struct timespec ts;
int ts_found = 0;
memset(&ts, 0, sizeof(ts));

EVBUFFER_LOCK(buf);

Expand Down Expand Up @@ -2342,7 +2414,65 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
n = bytesRead;
}
#else
n = readv(fd, vecs, nvecs);
if (use_recvmsg) {
struct msghdr msg;
/* Control message buffer for cmsg data.
* Sized to accommodate timestamp messages (SCM_TIMESTAMPNS,
* SCM_TIMESTAMP) plus additional space (256 bytes) for other
* possible cmsg entries (SCM_RIGHTS, SCM_CREDENTIALS, etc.)
* to prevent truncation via MSG_CTRUNC which would silently
* lose timestamp information. */
#define EVBUFFER_RECVMSG_CTRLFN_SZ \
(CMSG_SPACE(sizeof(struct timespec)) + \
CMSG_SPACE(sizeof(struct timeval)) + 256)
unsigned char control[EVBUFFER_RECVMSG_CTRLFN_SZ];
#undef EVBUFFER_RECVMSG_CTRLFN_SZ

/* Setup message header */
memset(&msg, 0, sizeof(msg));
msg.msg_iov = vecs;
msg.msg_iovlen = nvecs;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

/* Receive with ancillary data */
n = recvmsg(fd, &msg, 0);

if (n > 0) {
/* Check if control data was truncated */
if (!(msg.msg_flags & MSG_CTRUNC)) {
struct cmsghdr *cmsg;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET)
continue;
#if EVENT__HAVE_DECL_SO_TIMESTAMPNS
if (cmsg->cmsg_type == SCM_TIMESTAMPNS) {
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct timespec)))
continue;
ts = *(struct timespec *)CMSG_DATA(cmsg);
ts_found = 1;
break;
}
#endif

#if EVENT__HAVE_DECL_SO_TIMESTAMP
if (cmsg->cmsg_type == SCM_TIMESTAMP) {
struct timeval *tv;
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct timeval)))
continue;
tv = (struct timeval *)CMSG_DATA(cmsg);
ts.tv_sec = tv->tv_sec;
ts.tv_nsec = tv->tv_usec * 1000L;
ts_found = 1;
break;
}
#endif
}
}
}
} else {
n = readv(fd, vecs, nvecs);
}
#endif
}

Expand Down Expand Up @@ -2387,8 +2517,16 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
if ((ev_ssize_t)space < remaining) {
(*chainp)->off += space;
remaining -= (int)space;
if (ts_found && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = ts;
(*chainp)->timestamp.valid = 1;
}
} else {
(*chainp)->off += remaining;
if (ts_found && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = ts;
(*chainp)->timestamp.valid = 1;
}
buf->last_with_datap = chainp;
break;
}
Expand All @@ -2409,6 +2547,37 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
return result;
}

int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
return evbuffer_read_impl_(buf, fd, howmuch, 0);
}

int
evbuffer_read_with_timestamp(
struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
return evbuffer_read_impl_(buf, fd, howmuch, 1);
}

int evbuffer_get_timestamp(
struct evbuffer *buf, struct timespec *timestamp)
{
int result = -1;
if (!timestamp) {
return -1;
}
EVBUFFER_LOCK(buf);
{
if (buf->first && buf->first->timestamp.valid) {
*timestamp = buf->first->timestamp.ts;
result = 0;
}
}
EVBUFFER_UNLOCK(buf);
return result;
}

#ifdef USE_IOVEC_IMPL
static inline int
evbuffer_write_iovec(struct evbuffer *buffer, evutil_socket_t fd,
Expand Down
6 changes: 6 additions & 0 deletions bufferevent-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ struct bufferevent_private {
} conn_address;

struct evdns_getaddrinfo_request *dns_request;

/** Flag: set if receive timestamps are enabled */
unsigned recv_timestamps_enabled : 1;
};

/** Possible operations for a control callback. */
Expand Down Expand Up @@ -452,6 +455,9 @@ EVENT2_EXPORT_SYMBOL
void
bufferevent_socket_set_conn_address_(struct bufferevent *bev, struct sockaddr *addr, size_t addrlen);

EVENT2_EXPORT_SYMBOL
int be_socket_enable_timestamps_(evutil_socket_t fd);


/** Internal use: We have just successfully read data into an inbuf, so
* reset the read timeout (if any). */
Expand Down
Loading