[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH] Support NBD client under win32/MinGW
From: |
Or Goshen |
Subject: |
[Qemu-devel] [PATCH] Support NBD client under win32/MinGW |
Date: |
Mon, 24 Feb 2014 11:13:21 +0200 |
From: Or Goshen <address@hidden>
---
aio-win32.c | 244 +++++++++++++++++++++++++++++++++++++++++----------
block/Makefile.objs | 4 +-
block/nbd-client.h | 2 +-
include/block/aio.h | 2 -
include/block/nbd.h | 2 +-
main-loop.c | 2 -
nbd.c | 4 +-
qemu-coroutine-io.c | 4 +-
8 files changed, 208 insertions(+), 56 deletions(-)
diff --git a/aio-win32.c b/aio-win32.c
index 23f4e5b..7f716b1 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -22,12 +22,76 @@
struct AioHandler {
EventNotifier *e;
+ IOHandler *io_read;
+ IOHandler *io_write;
EventNotifierHandler *io_notify;
GPollFD pfd;
int deleted;
+ void *opaque;
QLIST_ENTRY(AioHandler) node;
};
+void aio_set_fd_handler(AioContext *ctx,
+ int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ /* fd is a SOCKET in our case */
+ AioHandler *node;
+
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ if (node->pfd.fd == fd && !node->deleted)
+ break;
+ }
+
+ /* Are we deleting the fd handler? */
+ if (!io_read && !io_write) {
+ if (node) {
+ /* If the lock is held, just mark the node as deleted */
+ if (ctx->walking_handlers) {
+ node->deleted = 1;
+ node->pfd.revents = 0;
+ } else {
+ /* Otherwise, delete it for real. We can't
just mark it as
+ * deleted because deleted nodes are only
cleaned up after
+ * releasing the walking_handlers lock.
+ */
+ QLIST_REMOVE(node, node);
+ CloseHandle((HANDLE)node->e);
+ g_free(node);
+ }
+ }
+ } else {
+ if (node == NULL) {
+ /* Alloc and insert if it's not already there */
+ node = g_malloc0(sizeof(AioHandler));
+ node->pfd.fd = fd;
+ QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+ }
+ /* Create event */
+ HANDLE event = WSACreateEvent();
+ long lNetworkEvents = 0;
+
+ if (node->io_read)
+ lNetworkEvents |= FD_READ;
+ if (node->io_write)
+ lNetworkEvents |= FD_WRITE;
+
+ WSAEventSelect(node->pfd.fd, event, lNetworkEvents);
+ node->e = (EventNotifier *)event;
+
+ /* Update handler with latest information */
+ node->pfd.events = (io_read != NULL ? G_IO_IN : 0);
+ node->pfd.events |= (io_write != NULL ? G_IO_OUT : 0);
+ node->opaque = opaque;
+ node->io_read = io_read;
+ node->io_write = io_write;
+ }
+
+ aio_notify(ctx);
+}
+
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
EventNotifierHandler *io_notify)
@@ -81,14 +145,88 @@ bool aio_pending(AioContext *ctx)
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ // HANDLE ?
if (node->pfd.revents && node->io_notify) {
return true;
}
+
+ // SOCKET ?
+ int revents;
+
+ revents = node->pfd.revents & node->pfd.events;
+ if ((revents & G_IO_IN) && node->io_read) {
+ return true;
+ }
+ if ((revents & G_IO_OUT) && node->io_write) {
+ return true;
+ }
}
return false;
}
+static bool aio_dispatch(AioContext *ctx)
+{
+ AioHandler *node;
+ bool progress = false;
+
+ /*
+ * We have to walk very carefully in case qemu_aio_set_fd_handler is
+ * called while we're walking.
+ */
+ node = QLIST_FIRST(&ctx->aio_handlers);
+ while (node) {
+ AioHandler *tmp = node;
+
+ ctx->walking_handlers++;
+
+ if (!node->deleted) {
+
+ // HANDLE ?
+ if (node->pfd.revents && node->io_notify) {
+ node->pfd.revents = 0;
+ node->io_notify(node->e);
+
+ /* aio_notify() does not count as progress */
+ if (node->e != &ctx->notifier) {
+ progress = true;
+ }
+ }
+
+ // SOCKET ?
+ int revents = node->pfd.revents & node->pfd.events;
+ node->pfd.revents = 0;
+
+ if ((revents & G_IO_IN) && node->io_read) {
+ node->io_read(node->opaque);
+
+ /* aio_notify() does not count as progress */
+ if (node->opaque != &ctx->notifier) {
+ progress = true;
+ }
+ }
+ if ((revents & G_IO_OUT) && node->io_write) {
+ node->io_write(node->opaque);
+ progress = true;
+ }
+ }
+
+ node = QLIST_NEXT(node, node);
+
+ ctx->walking_handlers--;
+
+ if (!ctx->walking_handlers && tmp->deleted) {
+ QLIST_REMOVE(tmp, node);
+ g_free(tmp);
+ }
+ }
+
+ /* Run our timers */
+ progress |= timerlistgroup_run_timers(&ctx->tlg);
+
+ return progress;
+}
+
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
@@ -96,6 +234,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
bool progress;
int count;
int timeout;
+ fd_set rfds, wfds;
+ struct timeval tv0 = { .tv_sec = 0, .tv_usec = 0};
progress = false;
@@ -109,41 +249,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
progress = true;
}
- /* Run timers */
- progress |= timerlistgroup_run_timers(&ctx->tlg);
-
- /*
- * Then dispatch any pending callbacks from the GSource.
- *
- * We have to walk very carefully in case qemu_aio_set_fd_handler is
- * called while we're walking.
- */
- node = QLIST_FIRST(&ctx->aio_handlers);
- while (node) {
- AioHandler *tmp;
-
- ctx->walking_handlers++;
-
- if (node->pfd.revents && node->io_notify) {
- node->pfd.revents = 0;
- node->io_notify(node->e);
-
- /* aio_notify() does not count as progress */
- if (node->e != &ctx->notifier) {
- progress = true;
- }
- }
-
- tmp = node;
- node = QLIST_NEXT(node, node);
-
- ctx->walking_handlers--;
-
- if (!ctx->walking_handlers && tmp->deleted) {
- QLIST_REMOVE(tmp, node);
- g_free(tmp);
- }
- }
+ progress = aio_dispatch(ctx);
if (progress && !blocking) {
return true;
@@ -151,12 +257,42 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers++;
- /* fill fd sets */
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
count = 0;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (!node->deleted && node->io_notify) {
+ if (node->deleted)
+ continue;
+
+ /* HANDLE ? */
+ if (node->io_notify) {
events[count++] = event_notifier_get_handle(node->e);
}
+
+ /* SOCKET ? */
+ else if (node->io_read || node->io_write) {
+ if (node->io_read)
+ FD_SET ((SOCKET)node->pfd.fd, &rfds);
+ if (node->io_write)
+ FD_SET ((SOCKET)node->pfd.fd, &wfds);
+
+ events[count++] = (HANDLE)node->e;
+ }
+ }
+
+ if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ node->pfd.revents = 0;
+ if (FD_ISSET(node->pfd.fd, &rfds)) {
+ node->pfd.revents |= G_IO_IN;
+ blocking = false;
+ }
+
+ if (FD_ISSET(node->pfd.fd, &wfds)) {
+ node->pfd.revents |= G_IO_OUT;
+ blocking = false;
+ }
+ }
}
ctx->walking_handlers--;
@@ -184,6 +320,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers++;
+ // Handle ?
if (!node->deleted &&
event_notifier_get_handle(node->e) == events[ret -
WAIT_OBJECT_0] &&
node->io_notify) {
@@ -195,6 +332,27 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
}
+ // SOCKET ?
+ if (!node->deleted &&
+ ((HANDLE)node->e == events[ret -
WAIT_OBJECT_0])) {
+
+ // what happened ?
+ WSANETWORKEVENTS ev;
+ ev.lNetworkEvents = 0xC0FFEE;
+
+ WSAEnumNetworkEvents(node->pfd.fd,
(HANDLE)node->e, &ev);
+
+ if ((ev.lNetworkEvents & FD_READ) != 0 &&
node->io_read) {
+ node->io_read(node->opaque);
+ progress = true;
+ }
+
+ if ((ev.lNetworkEvents & FD_WRITE) != 0 &&
node->io_write) {
+ node->io_write(node->opaque);
+ progress = true;
+ }
+ }
+
tmp = node;
node = QLIST_NEXT(node, node);
@@ -210,14 +368,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
events[ret - WAIT_OBJECT_0] = events[--count];
}
- if (blocking) {
- /* Run the timers a second time. We do this because otherwise aio_wait
- * will not note progress - and will stop a drain early - if we have
- * a timer that was not ready to run entering g_poll but is ready
- * after g_poll. This will only do anything if a timer has expired.
- */
- progress |= timerlistgroup_run_timers(&ctx->tlg);
- }
+ /* Run dispatch even if there were no readable fds to run timers */
+ if (aio_dispatch(ctx)) {
+ progress = true;
+ }
return progress;
}
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 4e8c91e..e28f916 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -1,4 +1,4 @@
-block-obj-y += raw_bsd.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o
vvfat.o
+block-obj-y += raw_bsd.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o
vvfat.o nbd.o
block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
qcow2-cache.o
block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
block-obj-y += qed-check.o
@@ -10,7 +10,7 @@ block-obj-$(CONFIG_POSIX) += raw-posix.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
ifeq ($(CONFIG_POSIX),y)
-block-obj-y += nbd.o nbd-client.o sheepdog.o
+block-obj-y += nbd-client.o sheepdog.o
block-obj-$(CONFIG_LIBISCSI) += iscsi.o
block-obj-$(CONFIG_CURL) += curl.o
block-obj-$(CONFIG_RBD) += rbd.o
diff --git a/block/nbd-client.h b/block/nbd-client.h
index f2a6337..d02acc1 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -19,7 +19,7 @@
typedef struct NbdClientSession {
int sock;
uint32_t nbdflags;
- off_t size;
+ uint64_t size;
size_t blocksize;
CoMutex send_mutex;
diff --git a/include/block/aio.h b/include/block/aio.h
index 2efdf41..effc8c2 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -199,7 +199,6 @@ bool aio_pending(AioContext *ctx);
*/
bool aio_poll(AioContext *ctx, bool blocking);
-#ifdef CONFIG_POSIX
/* Register a file descriptor and associated callbacks. Behaves very similarly
* to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will
* be invoked when using qemu_aio_wait().
@@ -212,7 +211,6 @@ void aio_set_fd_handler(AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
-#endif
/* Register an event notifier and associated callbacks. Behaves very similarly
* to event_notifier_set_handler. Unlike event_notifier_set_handler, these
callbacks
diff --git a/include/block/nbd.h b/include/block/nbd.h
index c90f5e4..7a84882 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -69,7 +69,7 @@ int unix_socket_outgoing(const char *path);
int unix_socket_incoming(const char *path);
int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
- off_t *size, size_t *blocksize);
+ uint64_t *size, size_t *blocksize);
int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize);
ssize_t nbd_send_request(int csock, struct nbd_request *request);
ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
diff --git a/main-loop.c b/main-loop.c
index c3c9c28..0c82193 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -503,7 +503,6 @@ bool qemu_aio_wait(void)
return aio_poll(qemu_aio_context, true);
}
-#ifdef CONFIG_POSIX
void qemu_aio_set_fd_handler(int fd,
IOHandler *io_read,
IOHandler *io_write,
@@ -511,7 +510,6 @@ void qemu_aio_set_fd_handler(int fd,
{
aio_set_fd_handler(qemu_aio_context, fd, io_read, io_write, opaque);
}
-#endif
void qemu_aio_set_event_notifier(EventNotifier *notifier,
EventNotifierHandler *io_read)
diff --git a/nbd.c b/nbd.c
index 030f56b..475503d 100644
--- a/nbd.c
+++ b/nbd.c
@@ -149,7 +149,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool
do_read)
err = socket_error();
/* recoverable error */
- if (err == EINTR || (offset > 0 && err == EAGAIN)) {
+ if (err == EINTR || (offset > 0 && (err == EAGAIN || err ==
EWOULDBLOCK))) {
continue;
}
@@ -434,7 +434,7 @@ fail:
}
int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
- off_t *size, size_t *blocksize)
+ uint64_t *size, size_t *blocksize)
{
char buf[256];
uint64_t magic, s;
diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c
index 054ca70..eb89817 100644
--- a/qemu-coroutine-io.c
+++ b/qemu-coroutine-io.c
@@ -34,13 +34,15 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned
iov_cnt,
{
size_t done = 0;
ssize_t ret;
+ int err;
while (done < bytes) {
ret = iov_send_recv(sockfd, iov, iov_cnt,
offset + done, bytes - done, do_send);
if (ret > 0) {
done += ret;
} else if (ret < 0) {
- if (errno == EAGAIN) {
+ err = socket_error();
+ if (err == EAGAIN || err == EWOULDBLOCK) {
qemu_coroutine_yield();
} else if (done == 0) {
return -1;
--
1.7.9
- [Qemu-devel] [PATCH] Support NBD client under win32/MinGW,
Or Goshen <=