[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flu
From: |
Leonardo Bras |
Subject: |
[PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush |
Date: |
Wed, 22 Sep 2021 19:24:22 -0300 |
Implement the new optional callbacks io_async_write and io_async_flush on
QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
available in the host kernel, and TCP sockets are used.
qio_channel_socket_writev() contents were moved to a helper function
__qio_channel_socket_writev() which accepts an extra 'flag' argument.
This helper function is used to implement qio_channel_socket_writev(), with
flags = 0, keeping it's behavior unchanged, and
qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.
qio_channel_socket_async_flush() was implemented by reading the socket's error
queue, which will have information on MSG_ZEROCOPY send completion.
There is no need to worry with re-sending packets in case any error happens, as
MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.
Notes on using async_write():
- As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
some caution is necessary to avoid overwriting any buffer before it's sent.
If something like this happen, a newer version of the buffer may be sent
instead.
- If this is a problem, it's recommended to use async_flush() before freeing or
re-using the buffer.
- When using MSG_ZERCOCOPY, the buffer memory will be locked, so it may require
a larger amount than usually available to non-root user.
- If the required amount of locked memory is not available, it falls-back to
buffer copying behavior, and synchronous sending.
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
include/io/channel-socket.h | 2 +
include/io/channel.h | 1 +
io/channel-socket.c | 176 ++++++++++++++++++++++++++++++++++--
3 files changed, 169 insertions(+), 10 deletions(-)
diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index e747e63514..4d1be0637a 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -47,6 +47,8 @@ struct QIOChannelSocket {
socklen_t localAddrLen;
struct sockaddr_storage remoteAddr;
socklen_t remoteAddrLen;
+ ssize_t async_queued;
+ ssize_t async_sent;
};
diff --git a/include/io/channel.h b/include/io/channel.h
index 74f2e3ae8a..611bb2ea26 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -31,6 +31,7 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
#define QIO_CHANNEL_ERR_BLOCK -2
+#define QIO_CHANNEL_ERR_NOBUFS -3
typedef enum QIOChannelFeature QIOChannelFeature;
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 606ec97cf7..c67832d0bb 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -26,9 +26,23 @@
#include "io/channel-watch.h"
#include "trace.h"
#include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
#define SOCKET_MAX_FDS 16
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+ Error **errp);
+
SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
Error **errp)
@@ -55,6 +69,8 @@ qio_channel_socket_new(void)
sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
sioc->fd = -1;
+ sioc->async_queued = 0;
+ sioc->async_sent = 0;
ioc = QIO_CHANNEL(sioc);
qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
Error **errp)
{
int fd;
+ int ret, v = 1;
trace_qio_channel_socket_connect_sync(ioc, addr);
fd = socket_connect(addr, errp);
@@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
return -1;
}
+#ifdef CONFIG_LINUX
+ if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
+ return 0;
+ }
+
+ ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+ if (ret >= 0) {
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ klass->io_async_writev = qio_channel_socket_async_writev;
+ klass->io_async_flush = qio_channel_socket_async_flush;
+ }
+#endif
+
return 0;
}
@@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
return ret;
}
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp)
+static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ int flags,
+ Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
ssize_t ret;
@@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
}
retry:
- ret = sendmsg(sioc->fd, &msg, 0);
+ ret = sendmsg(sioc->fd, &msg, flags);
if (ret <= 0) {
- if (errno == EAGAIN) {
+ switch (errno) {
+ case EAGAIN:
return QIO_CHANNEL_ERR_BLOCK;
- }
- if (errno == EINTR) {
+ case EINTR:
goto retry;
+ case ENOBUFS:
+ return QIO_CHANNEL_ERR_NOBUFS;
}
+
error_setg_errno(errp, errno,
"Unable to write to socket");
return -1;
}
return ret;
}
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ ssize_t ret;
+
+ sioc->async_queued++;
+
+ ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
+ errp);
+ if (ret == QIO_CHANNEL_ERR_NOBUFS) {
+ /*
+ * Not enough locked memory available to the process.
+ * Fallback to default sync callback.
+ */
+
+ if (errp && *errp) {
+ warn_reportf_err(*errp,
+ "Process can't lock enough memory for using
MSG_ZEROCOPY,"
+ "falling back to non-zerocopy");
+ }
+
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ klass->io_async_writev = NULL;
+ klass->io_async_flush = NULL;
+
+ /* Re-send current buffer */
+ ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
+ }
+
+ return ret;
+}
+
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ struct msghdr msg = {};
+ struct pollfd pfd;
+ struct sock_extended_err *serr;
+ struct cmsghdr *cm;
+ char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+ int ret;
+
+ memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+ msg.msg_control = control;
+ msg.msg_controllen = sizeof(control);
+
+ while (sioc->async_sent < sioc->async_queued) {
+ ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ /* Nothing on errqueue, wait */
+ pfd.fd = sioc->fd;
+ pfd.events = 0;
+ ret = poll(&pfd, 1, 250);
+ if (ret == 0) {
+ /*
+ * Timeout : After 250ms without receiving any zerocopy
+ * notification, consider all data as sent.
+ */
+ break;
+ } else if (ret < 0 ||
+ (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+ error_setg_errno(errp, errno,
+ "Poll error");
+ break;
+ } else {
+ continue;
+ }
+ }
+ if (errno == EINTR) {
+ continue;
+ }
+
+ error_setg_errno(errp, errno,
+ "Unable to read errqueue");
+ break;
+ }
+
+ cm = CMSG_FIRSTHDR(&msg);
+ if (cm->cmsg_level != SOL_IP &&
+ cm->cmsg_type != IP_RECVERR) {
+ error_setg_errno(errp, EPROTOTYPE,
+ "Wrong cmsg in errqueue");
+ break;
+ }
+
+ serr = (void *) CMSG_DATA(cm);
+ if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+ error_setg_errno(errp, serr->ee_errno,
+ "Error on socket");
+ break;
+ }
+ if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+ error_setg_errno(errp, serr->ee_origin,
+ "Error not from zerocopy");
+ break;
+ }
+
+ /* No errors, count sent ids*/
+ sioc->async_sent += serr->ee_data - serr->ee_info + 1;
+ }
+}
+
+
#else /* WIN32 */
static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
const struct iovec *iov,
--
2.33.0
[PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush,
Leonardo Bras <=
Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush, Peter Xu, 2021/09/28
[PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages., Leonardo Bras, 2021/09/22