[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [RFC PATCH 1/4] qemu-char: add the "1-server-N-client" supp
From: |
Wei Wang |
Subject: |
[Qemu-devel] [RFC PATCH 1/4] qemu-char: add the "1-server-N-client" support |
Date: |
Wed, 9 Nov 2016 21:47:46 -0500 |
This patch enables a qemu server socket to be connected by multiple
client sockets.
Signed-off-by: Wei Wang <address@hidden>
---
include/sysemu/char.h | 64 ++++++-
qapi-schema.json | 3 +-
qemu-char.c | 512 ++++++++++++++++++++++++++++++++++++++------------
3 files changed, 456 insertions(+), 123 deletions(-)
diff --git a/include/sysemu/char.h b/include/sysemu/char.h
index ee7e554..ff5dda6 100644
--- a/include/sysemu/char.h
+++ b/include/sysemu/char.h
@@ -58,17 +58,24 @@ struct ParallelIOArg {
typedef void IOEventHandler(void *opaque, int event);
+#define MAX_CLIENTS 256
+#define ANONYMOUS_CLIENT (~((uint64_t)0))
struct CharDriverState {
QemuMutex chr_write_lock;
void (*init)(struct CharDriverState *s);
int (*chr_write)(struct CharDriverState *s, const uint8_t *buf, int len);
+ int (*chr_write_n)(struct CharDriverState *s, uint64_t id, const uint8_t
*buf, int len);
int (*chr_sync_read)(struct CharDriverState *s,
const uint8_t *buf, int len);
+ int (*chr_sync_read_n)(struct CharDriverState *s, uint64_t id,
+ const uint8_t *buf, int len);
GSource *(*chr_add_watch)(struct CharDriverState *s, GIOCondition cond);
void (*chr_update_read_handler)(struct CharDriverState *s);
int (*chr_ioctl)(struct CharDriverState *s, int cmd, void *arg);
int (*get_msgfds)(struct CharDriverState *s, int* fds, int num);
+ int (*get_msgfds_n)(struct CharDriverState *s, uint64_t id, int* fds, int
num);
int (*set_msgfds)(struct CharDriverState *s, int *fds, int num);
+ int (*set_msgfds_n)(struct CharDriverState *s, uint64_t id, int *fds, int
num);
int (*chr_add_client)(struct CharDriverState *chr, int fd);
int (*chr_wait_connected)(struct CharDriverState *chr, Error **errp);
IOEventHandler *chr_event;
@@ -77,6 +84,7 @@ struct CharDriverState {
void *handler_opaque;
void (*chr_close)(struct CharDriverState *chr);
void (*chr_disconnect)(struct CharDriverState *chr);
+ void (*chr_disconnect_n)(struct CharDriverState *chr, uint64_t id);
void (*chr_accept_input)(struct CharDriverState *chr);
void (*chr_set_echo)(struct CharDriverState *chr, bool echo);
void (*chr_set_fe_open)(struct CharDriverState *chr, int fe_open);
@@ -91,7 +99,10 @@ struct CharDriverState {
int explicit_be_open;
int avail_connections;
int is_mux;
- guint fd_in_tag;
+ guint fd_in_tag[MAX_CLIENTS];
+ uint64_t max_connections;
+ unsigned long *conn_bitmap;
+ uint64_t conn_id;
QemuOpts *opts;
bool replay;
QTAILQ_ENTRY(CharDriverState) next;
@@ -281,6 +292,20 @@ int qemu_chr_fe_write(CharDriverState *s, const uint8_t
*buf, int len);
int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int len);
/**
+ * @qemu_chr_fe_write_all_n:
+ *
+ * Write data to the selected character backend from the front end.
+ *
+ * @id the connection id of the character backend
+ * @buf the data
+ * @len the number of bytes to send
+ *
+ * Returns: the number of bytes consumed
+ */
+int qemu_chr_fe_write_all_n(CharDriverState *s, uint64_t id,
+ const uint8_t *buf, int len);
+
+/**
* @qemu_chr_fe_read_all:
*
* Read data to a buffer from the back end.
@@ -293,6 +318,20 @@ int qemu_chr_fe_write_all(CharDriverState *s, const
uint8_t *buf, int len);
int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len);
/**
+ * @qemu_chr_fe_read_all_n:
+ *
+ * Read data to a buffer from the selected back end.
+ *
+ * @id the connection id
+ * @buf the data buffer
+ * @len the number of bytes to read
+ *
+ * Returns: the number of bytes read
+ */
+int qemu_chr_fe_read_all_n(CharDriverState *s, uint64_t id,
+ uint8_t *buf, int len);
+
+/**
* @qemu_chr_fe_ioctl:
*
* Issue a device specific ioctl to a backend. This function is thread-safe.
@@ -331,6 +370,19 @@ int qemu_chr_fe_get_msgfd(CharDriverState *s);
*/
int qemu_chr_fe_get_msgfds(CharDriverState *s, int *fds, int num);
+
+/**
+ * @qemu_chr_fe_get_msgfds_n:
+ *
+ * The multi-client version of @qemu_chr_fe_get_msgfds.
+ *
+ * Returns: -1 if fd passing isn't supported or there are no pending file
+ * descriptors. If file descriptors are returned, subsequent calls to
+ * this function will return -1 until a client sends a new set of file
+ * descriptors.
+ */
+int qemu_chr_fe_get_msgfds_n(CharDriverState *s, uint64_t id, int *fds, int
num);
+
/**
* @qemu_chr_fe_set_msgfds:
*
@@ -345,6 +397,16 @@ int qemu_chr_fe_get_msgfds(CharDriverState *s, int *fds,
int num);
int qemu_chr_fe_set_msgfds(CharDriverState *s, int *fds, int num);
/**
+ * @qemu_chr_fe_set_msgfds_n:
+ *
+ * The multi-client version of @qemu_chr_fe_set_msgfds.
+ *
+ * Returns: -1 if fd passing isn't supported.
+ */
+int qemu_chr_fe_set_msgfds_n(CharDriverState *s, uint64_t id, int *fds, int
num);
+
+
+/**
* @qemu_chr_fe_claim:
*
* Claim a backend before using it, should be called before calling
diff --git a/qapi-schema.json b/qapi-schema.json
index 5658723..9bb5d7d 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -3327,7 +3327,8 @@
'*wait' : 'bool',
'*nodelay' : 'bool',
'*telnet' : 'bool',
- '*reconnect' : 'int' },
+ '*reconnect' : 'int' ,
+ '*connections' : 'uint64' },
'base': 'ChardevCommon' }
##
diff --git a/qemu-char.c b/qemu-char.c
index 5f82ebb..dfad6d1 100644
--- a/qemu-char.c
+++ b/qemu-char.c
@@ -265,6 +265,35 @@ static int qemu_chr_fe_write_buffer(CharDriverState *s,
const uint8_t *buf, int
return res;
}
+static int qemu_chr_fe_write_buffer_n(CharDriverState *s, uint64_t id,
+ const uint8_t *buf, int len, int *offset)
+{
+ int res = 0;
+ *offset = 0;
+
+ qemu_mutex_lock(&s->chr_write_lock);
+ while (*offset < len) {
+ retry:
+ res = s->chr_write_n(s, id, buf + *offset, len - *offset);
+ if (res < 0 && errno == EAGAIN) {
+ g_usleep(100);
+ goto retry;
+ }
+
+ if (res <= 0) {
+ break;
+ }
+
+ *offset += res;
+ }
+ if (*offset > 0) {
+ qemu_chr_fe_write_log(s, buf, *offset);
+ }
+ qemu_mutex_unlock(&s->chr_write_lock);
+
+ return res;
+}
+
int qemu_chr_fe_write(CharDriverState *s, const uint8_t *buf, int len)
{
int ret;
@@ -317,6 +346,31 @@ int qemu_chr_fe_write_all(CharDriverState *s, const
uint8_t *buf, int len)
return offset;
}
+int qemu_chr_fe_write_all_n(CharDriverState *s, uint64_t id,
+ const uint8_t *buf, int len)
+{
+ int offset;
+ int res;
+
+ if (s->replay && replay_mode == REPLAY_MODE_PLAY) {
+ replay_char_write_event_load(&res, &offset);
+ assert(offset <= len);
+ qemu_chr_fe_write_buffer_n(s, id, buf, offset, &offset);
+ return res;
+ }
+
+ res = qemu_chr_fe_write_buffer_n(s, id, buf, len, &offset);
+
+ if (s->replay && replay_mode == REPLAY_MODE_RECORD) {
+ replay_char_write_event_save(res, offset);
+ }
+
+ if (res < 0) {
+ return res;
+ }
+ return offset;
+}
+
int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len)
{
int offset = 0, counter = 10;
@@ -325,7 +379,7 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf,
int len)
if (!s->chr_sync_read) {
return 0;
}
-
+
if (s->replay && replay_mode == REPLAY_MODE_PLAY) {
return replay_char_read_all_load(buf);
}
@@ -362,6 +416,52 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf,
int len)
return offset;
}
+int qemu_chr_fe_read_all_n(CharDriverState *s, uint64_t id,
+ uint8_t *buf, int len)
+{
+ int offset = 0, counter = 10;
+ int res;
+
+ if (!s->chr_sync_read_n) {
+ return 0;
+ }
+
+ if (s->replay && replay_mode == REPLAY_MODE_PLAY) {
+ return replay_char_read_all_load(buf);
+ }
+
+ while (offset < len) {
+ retry:
+ res = s->chr_sync_read_n(s, id, buf + offset, len - offset);
+ if (res == -1 && errno == EAGAIN) {
+ g_usleep(100);
+ goto retry;
+ }
+
+ if (res == 0) {
+ break;
+ }
+
+ if (res < 0) {
+ if (s->replay && replay_mode == REPLAY_MODE_RECORD) {
+ replay_char_read_all_save_error(res);
+ }
+ return res;
+ }
+
+ offset += res;
+
+ if (!counter--) {
+ break;
+ }
+ }
+
+ if (s->replay && replay_mode == REPLAY_MODE_RECORD) {
+ replay_char_read_all_save_buf(buf, offset);
+ }
+ return offset;
+}
+
int qemu_chr_fe_ioctl(CharDriverState *s, int cmd, void *arg)
{
int res;
@@ -417,11 +517,23 @@ int qemu_chr_fe_get_msgfds(CharDriverState *s, int *fds,
int len)
return s->get_msgfds ? s->get_msgfds(s, fds, len) : -1;
}
+int qemu_chr_fe_get_msgfds_n(CharDriverState *s,
+ uint64_t id, int *fds, int len)
+{
+ return s->get_msgfds_n ? s->get_msgfds_n(s, id, fds, len) : -1;
+}
+
int qemu_chr_fe_set_msgfds(CharDriverState *s, int *fds, int num)
{
return s->set_msgfds ? s->set_msgfds(s, fds, num) : -1;
}
+int qemu_chr_fe_set_msgfds_n(CharDriverState *s,
+ uint64_t id, int *fds, int num)
+{
+ return s->set_msgfds_n ? s->set_msgfds_n(s, id, fds, num) : -1;
+}
+
int qemu_chr_add_client(CharDriverState *s, int fd)
{
return s->chr_add_client ? s->chr_add_client(s, fd) : -1;
@@ -951,12 +1063,19 @@ static void io_remove_watch_poll(guint tag)
static void remove_fd_in_watch(CharDriverState *chr)
{
- if (chr->fd_in_tag) {
- io_remove_watch_poll(chr->fd_in_tag);
- chr->fd_in_tag = 0;
+ if (chr->fd_in_tag[0]) {
+ io_remove_watch_poll(chr->fd_in_tag[0]);
+ chr->fd_in_tag[0] = 0;
}
}
+static void remove_fd_in_watch_n(CharDriverState *chr, uint64_t id)
+{
+ if (chr->fd_in_tag[id]) {
+ io_remove_watch_poll(chr->fd_in_tag[id]);
+ chr->fd_in_tag[id] = 0;
+ }
+}
static int io_channel_send_full(QIOChannel *ioc,
const void *buf, size_t len,
@@ -1063,7 +1182,7 @@ static void fd_chr_update_read_handler(CharDriverState
*chr)
remove_fd_in_watch(chr);
if (s->ioc_in) {
- chr->fd_in_tag = io_add_watch_poll(s->ioc_in,
+ chr->fd_in_tag[0] = io_add_watch_poll(s->ioc_in,
fd_chr_read_poll,
fd_chr_read, chr);
}
@@ -1410,8 +1529,8 @@ static void pty_chr_state(CharDriverState *chr, int
connected)
s->connected = 1;
s->open_tag = g_idle_add(qemu_chr_be_generic_open_func, chr);
}
- if (!chr->fd_in_tag) {
- chr->fd_in_tag = io_add_watch_poll(s->ioc,
+ if (!chr->fd_in_tag[0]) {
+ chr->fd_in_tag[0] = io_add_watch_poll(s->ioc,
pty_chr_read_poll,
pty_chr_read, chr);
}
@@ -2558,7 +2677,7 @@ static void udp_chr_update_read_handler(CharDriverState
*chr)
remove_fd_in_watch(chr);
if (s->ioc) {
- chr->fd_in_tag = io_add_watch_poll(s->ioc,
+ chr->fd_in_tag[0] = io_add_watch_poll(s->ioc,
udp_chr_read_poll,
udp_chr_read, chr);
}
@@ -2605,20 +2724,21 @@ static CharDriverState
*qemu_chr_open_udp(QIOChannelSocket *sioc,
/* TCP Net console */
typedef struct {
- QIOChannel *ioc; /* Client I/O channel */
- QIOChannelSocket *sioc; /* Client master channel */
+ QIOChannel *ioc[MAX_CLIENTS]; /* Client I/O channels */
+ QIOChannelSocket *sioc[MAX_CLIENTS]; /* Client master channels */
QIOChannelSocket *listen_ioc;
guint listen_tag;
QCryptoTLSCreds *tls_creds;
- int connected;
+ int connected[MAX_CLIENTS];
int max_size;
int do_telnetopt;
int do_nodelay;
int is_unix;
- int *read_msgfds;
- size_t read_msgfds_num;
- int *write_msgfds;
- size_t write_msgfds_num;
+ int *read_msgfds[MAX_CLIENTS];
+ size_t read_msgfds_num[MAX_CLIENTS];
+ int *write_msgfds[MAX_CLIENTS];
+ size_t write_msgfds_num[MAX_CLIENTS];
+ uint64_t connections;
SocketAddress *addr;
bool is_listen;
@@ -2634,7 +2754,7 @@ static gboolean socket_reconnect_timeout(gpointer opaque);
static void qemu_chr_socket_restart_timer(CharDriverState *chr)
{
TCPCharDriver *s = chr->opaque;
- assert(s->connected == 0);
+ assert(s->connected[0] == 0);
s->reconnect_timer = g_timeout_add_seconds(s->reconnect_time,
socket_reconnect_timeout, chr);
}
@@ -2660,16 +2780,16 @@ static gboolean tcp_chr_accept(QIOChannel *chan,
static int tcp_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{
TCPCharDriver *s = chr->opaque;
- if (s->connected) {
- int ret = io_channel_send_full(s->ioc, buf, len,
- s->write_msgfds,
- s->write_msgfds_num);
+ if (s->connected[0]) {
+ int ret = io_channel_send_full(s->ioc[0], buf, len,
+ s->write_msgfds[0],
+ s->write_msgfds_num[0]);
/* free the written msgfds, no matter what */
- if (s->write_msgfds_num) {
- g_free(s->write_msgfds);
- s->write_msgfds = 0;
- s->write_msgfds_num = 0;
+ if (s->write_msgfds_num[0]) {
+ g_free(s->write_msgfds[0]);
+ s->write_msgfds[0] = 0;
+ s->write_msgfds_num[0] = 0;
}
return ret;
@@ -2679,11 +2799,41 @@ static int tcp_chr_write(CharDriverState *chr, const
uint8_t *buf, int len)
}
}
+/* Called with chr_write_lock held. */
+static int tcp_chr_write_n(CharDriverState *chr, uint64_t id,
+ const uint8_t *buf, int len)
+{
+ TCPCharDriver *s = chr->opaque;
+ if (s->connected[id]) {
+ int ret = io_channel_send_full(s->ioc[id], buf, len,
+ s->write_msgfds[id],
+ s->write_msgfds_num[id]);
+
+ /* free the written msgfds, no matter what */
+ if (s->write_msgfds_num[id]) {
+ g_free(s->write_msgfds[id]);
+ s->write_msgfds[id] = 0;
+ s->write_msgfds_num[id] = 0;
+ }
+
+ return ret;
+ } else {
+ /* XXX: indicate an error ? */
+ return len;
+ }
+}
+
static int tcp_chr_read_poll(void *opaque)
{
CharDriverState *chr = opaque;
TCPCharDriver *s = chr->opaque;
- if (!s->connected)
+ uint64_t id;
+
+ for (id = 0; id < s->connections; id++) {
+ if (s->connected[id])
+ break;
+ }
+ if (id == s->connections)
return 0;
s->max_size = qemu_chr_be_can_write(chr);
return s->max_size;
@@ -2742,54 +2892,107 @@ static void tcp_chr_process_IAC_bytes(CharDriverState
*chr,
static int tcp_get_msgfds(CharDriverState *chr, int *fds, int num)
{
TCPCharDriver *s = chr->opaque;
- int to_copy = (s->read_msgfds_num < num) ? s->read_msgfds_num : num;
+ int to_copy = (s->read_msgfds_num[0] < num) ? s->read_msgfds_num[0] : num;
assert(num <= TCP_MAX_FDS);
if (to_copy) {
int i;
- memcpy(fds, s->read_msgfds, to_copy * sizeof(int));
+ memcpy(fds, s->read_msgfds[0], to_copy * sizeof(int));
/* Close unused fds */
- for (i = to_copy; i < s->read_msgfds_num; i++) {
- close(s->read_msgfds[i]);
+ for (i = to_copy; i < s->read_msgfds_num[0]; i++) {
+ close(s->read_msgfds[0][i]);
}
- g_free(s->read_msgfds);
- s->read_msgfds = 0;
- s->read_msgfds_num = 0;
+ g_free(s->read_msgfds[0]);
+ s->read_msgfds[0] = 0;
+ s->read_msgfds_num[0] = 0;
}
return to_copy;
}
+static int tcp_get_msgfds_n(CharDriverState *chr, uint64_t id,
+ int *fds, int num)
+{
+ TCPCharDriver *s = chr->opaque;
+ int to_copy = (s->read_msgfds_num[id] < num) ? s->read_msgfds_num[id] :
num;
+
+ assert(num <= TCP_MAX_FDS);
+
+ if (to_copy) {
+ int i;
+
+ memcpy(fds, s->read_msgfds[id], to_copy * sizeof(int));
+
+ /* Close unused fds */
+ for (i = to_copy; i < s->read_msgfds_num[id]; i++) {
+ close(s->read_msgfds[id][i]);
+ }
+
+ g_free(s->read_msgfds[id]);
+ s->read_msgfds[id] = 0;
+ s->read_msgfds_num[id] = 0;
+ }
+
+ return to_copy;
+}
+
static int tcp_set_msgfds(CharDriverState *chr, int *fds, int num)
{
TCPCharDriver *s = chr->opaque;
/* clear old pending fd array */
- g_free(s->write_msgfds);
- s->write_msgfds = NULL;
- s->write_msgfds_num = 0;
+ g_free(s->write_msgfds[0]);
+ s->write_msgfds[0] = NULL;
+ s->write_msgfds_num[0] = 0;
- if (!s->connected ||
- !qio_channel_has_feature(s->ioc,
+ if (!s->connected[0] ||
+ !qio_channel_has_feature(s->ioc[0],
QIO_CHANNEL_FEATURE_FD_PASS)) {
return -1;
}
if (num) {
- s->write_msgfds = g_new(int, num);
- memcpy(s->write_msgfds, fds, num * sizeof(int));
+ s->write_msgfds[0] = g_new(int, num);
+ memcpy(s->write_msgfds[0], fds, num * sizeof(int));
}
- s->write_msgfds_num = num;
+ s->write_msgfds_num[0] = num;
return 0;
}
-static ssize_t tcp_chr_recv(CharDriverState *chr, char *buf, size_t len)
+static int tcp_set_msgfds_n(CharDriverState *chr, uint64_t id,
+ int *fds, int num)
+{
+ TCPCharDriver *s = chr->opaque;
+
+ /* clear old pending fd array */
+ g_free(s->write_msgfds[id]);
+ s->write_msgfds[id] = NULL;
+ s->write_msgfds_num[id] = 0;
+
+ if (!s->connected[id] ||
+ !qio_channel_has_feature(s->ioc[id],
+ QIO_CHANNEL_FEATURE_FD_PASS)) {
+ return -1;
+ }
+
+ if (num) {
+ s->write_msgfds[id] = g_new(int, num);
+ memcpy(s->write_msgfds[id], fds, num * sizeof(int));
+ }
+
+ s->write_msgfds_num[id] = num;
+
+ return 0;
+}
+
+static ssize_t tcp_chr_recv(CharDriverState *chr, uint64_t id,
+ char *buf, size_t len)
{
TCPCharDriver *s = chr->opaque;
struct iovec iov = { .iov_base = buf, .iov_len = len };
@@ -2798,12 +3001,12 @@ static ssize_t tcp_chr_recv(CharDriverState *chr, char
*buf, size_t len)
int *msgfds = NULL;
size_t msgfds_num = 0;
- if (qio_channel_has_feature(s->ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
- ret = qio_channel_readv_full(s->ioc, &iov, 1,
+ if (qio_channel_has_feature(s->ioc[id], QIO_CHANNEL_FEATURE_FD_PASS)) {
+ ret = qio_channel_readv_full(s->ioc[id], &iov, 1,
&msgfds, &msgfds_num,
NULL);
} else {
- ret = qio_channel_readv_full(s->ioc, &iov, 1,
+ ret = qio_channel_readv_full(s->ioc[id], &iov, 1,
NULL, NULL,
NULL);
}
@@ -2817,20 +3020,20 @@ static ssize_t tcp_chr_recv(CharDriverState *chr, char
*buf, size_t len)
if (msgfds_num) {
/* close and clean read_msgfds */
- for (i = 0; i < s->read_msgfds_num; i++) {
- close(s->read_msgfds[i]);
+ for (i = 0; i < s->read_msgfds_num[id]; i++) {
+ close(s->read_msgfds[id][i]);
}
- if (s->read_msgfds_num) {
- g_free(s->read_msgfds);
+ if (s->read_msgfds_num[id]) {
+ g_free(s->read_msgfds[id]);
}
- s->read_msgfds = msgfds;
- s->read_msgfds_num = msgfds_num;
+ s->read_msgfds[id] = msgfds;
+ s->read_msgfds_num[id] = msgfds_num;
}
- for (i = 0; i < s->read_msgfds_num; i++) {
- int fd = s->read_msgfds[i];
+ for (i = 0; i < s->read_msgfds_num[id]; i++) {
+ int fd = s->read_msgfds[id][i];
if (fd < 0) {
continue;
}
@@ -2849,47 +3052,47 @@ static ssize_t tcp_chr_recv(CharDriverState *chr, char
*buf, size_t len)
static GSource *tcp_chr_add_watch(CharDriverState *chr, GIOCondition cond)
{
TCPCharDriver *s = chr->opaque;
- return qio_channel_create_watch(s->ioc, cond);
+ return qio_channel_create_watch(s->ioc[0], cond);
}
-static void tcp_chr_free_connection(CharDriverState *chr)
+static void tcp_chr_free_connection(CharDriverState *chr, uint64_t id)
{
TCPCharDriver *s = chr->opaque;
int i;
- if (!s->connected) {
+ if (!s->connected[id]) {
return;
}
- if (s->read_msgfds_num) {
- for (i = 0; i < s->read_msgfds_num; i++) {
- close(s->read_msgfds[i]);
+ if (s->read_msgfds_num[id]) {
+ for (i = 0; i < s->read_msgfds_num[id]; i++) {
+ close(s->read_msgfds[id][i]);
}
- g_free(s->read_msgfds);
- s->read_msgfds = NULL;
- s->read_msgfds_num = 0;
+ g_free(s->read_msgfds[id]);
+ s->read_msgfds[id] = NULL;
+ s->read_msgfds_num[id] = 0;
}
- tcp_set_msgfds(chr, NULL, 0);
- remove_fd_in_watch(chr);
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
+ tcp_set_msgfds_n(chr, id, NULL, 0);
+ remove_fd_in_watch_n(chr, id);
+ object_unref(OBJECT(s->sioc[id]));
+ s->sioc[id] = NULL;
+ object_unref(OBJECT(s->ioc[id]));
+ s->ioc[id] = NULL;
g_free(chr->filename);
chr->filename = NULL;
- s->connected = 0;
+ s->connected[id] = 0;
}
-static void tcp_chr_disconnect(CharDriverState *chr)
+static void tcp_chr_disconnect_n(CharDriverState *chr, uint64_t id)
{
TCPCharDriver *s = chr->opaque;
- if (!s->connected) {
+ if (!s->connected[id]) {
return;
}
- tcp_chr_free_connection(chr);
+ tcp_chr_free_connection(chr, id);
if (s->listen_ioc) {
s->listen_tag = qio_channel_add_watch(
@@ -2903,23 +3106,34 @@ static void tcp_chr_disconnect(CharDriverState *chr)
}
}
+static void tcp_chr_disconnect(CharDriverState *chr)
+{
+ tcp_chr_disconnect_n(chr, 0);
+}
+
static gboolean tcp_chr_read(QIOChannel *chan, GIOCondition cond, void *opaque)
{
CharDriverState *chr = opaque;
TCPCharDriver *s = chr->opaque;
uint8_t buf[READ_BUF_LEN];
int len, size;
+ uint64_t id;
- if (!s->connected || s->max_size <= 0) {
+ for (id = 0; id < s->connections; id++) {
+ if (s->ioc[id] == chan)
+ break;
+ }
+
+ if ((id == s->connections) || !s->connected[id] || s->max_size <= 0) {
return TRUE;
}
len = sizeof(buf);
if (len > s->max_size)
len = s->max_size;
- size = tcp_chr_recv(chr, (void *)buf, len);
+ size = tcp_chr_recv(chr, id, (void *)buf, len);
if (size == 0 || size == -1) {
/* connection closed */
- tcp_chr_disconnect(chr);
+ tcp_chr_disconnect_n(chr, id);
} else if (size > 0) {
if (s->do_telnetopt)
tcp_chr_process_IAC_bytes(chr, s, buf, &size);
@@ -2935,33 +3149,52 @@ static int tcp_chr_sync_read(CharDriverState *chr,
const uint8_t *buf, int len)
TCPCharDriver *s = chr->opaque;
int size;
- if (!s->connected) {
+ if (!s->connected[0]) {
+ return 0;
+ }
+
+ size = tcp_chr_recv(chr, 0, (void *) buf, len);
+ if (size == 0) {
+ /* connection closed */
+ tcp_chr_disconnect_n(chr, 0);
+ }
+
+ return size;
+}
+
+static int tcp_chr_sync_read_n(CharDriverState *chr, uint64_t id,
+ const uint8_t *buf, int len)
+{
+ TCPCharDriver *s = chr->opaque;
+ int size;
+
+ if (!s->connected[id]) {
return 0;
}
- size = tcp_chr_recv(chr, (void *) buf, len);
+ size = tcp_chr_recv(chr, id, (void *) buf, len);
if (size == 0) {
/* connection closed */
- tcp_chr_disconnect(chr);
+ tcp_chr_disconnect_n(chr, id);
}
return size;
}
-static void tcp_chr_connect(void *opaque)
+static void tcp_chr_connect(void *opaque, uint64_t id)
{
CharDriverState *chr = opaque;
TCPCharDriver *s = chr->opaque;
g_free(chr->filename);
chr->filename = sockaddr_to_str(
- &s->sioc->localAddr, s->sioc->localAddrLen,
- &s->sioc->remoteAddr, s->sioc->remoteAddrLen,
+ &s->sioc[id]->localAddr, s->sioc[id]->localAddrLen,
+ &s->sioc[id]->remoteAddr, s->sioc[id]->remoteAddrLen,
s->is_listen, s->is_telnet);
- s->connected = 1;
- if (s->ioc) {
- chr->fd_in_tag = io_add_watch_poll(s->ioc,
+ s->connected[id] = 1;
+ if (s->ioc[id]) {
+ chr->fd_in_tag[id] = io_add_watch_poll(s->ioc[id],
tcp_chr_read_poll,
tcp_chr_read, chr);
}
@@ -2971,16 +3204,18 @@ static void tcp_chr_connect(void *opaque)
static void tcp_chr_update_read_handler(CharDriverState *chr)
{
TCPCharDriver *s = chr->opaque;
+ uint64_t id;
- if (!s->connected) {
- return;
- }
+ for (id = 0; id < s->connections; id++) {
+ if (!s->connected[id])
+ continue;
- remove_fd_in_watch(chr);
- if (s->ioc) {
- chr->fd_in_tag = io_add_watch_poll(s->ioc,
- tcp_chr_read_poll,
- tcp_chr_read, chr);
+ remove_fd_in_watch_n(chr, id);
+ if (s->ioc[id]) {
+ chr->fd_in_tag[id] = io_add_watch_poll(s->ioc[id],
+ tcp_chr_read_poll,
+ tcp_chr_read, chr);
+ }
}
}
@@ -3002,14 +3237,14 @@ static gboolean tcp_chr_telnet_init_io(QIOChannel *ioc,
if (ret == QIO_CHANNEL_ERR_BLOCK) {
ret = 0;
} else {
- tcp_chr_disconnect(init->chr);
+ tcp_chr_disconnect_n(init->chr, 0);
return FALSE;
}
}
init->buflen -= ret;
if (init->buflen == 0) {
- tcp_chr_connect(init->chr);
+ tcp_chr_connect(init->chr, 0);
return FALSE;
}
@@ -3018,7 +3253,7 @@ static gboolean tcp_chr_telnet_init_io(QIOChannel *ioc,
return TRUE;
}
-static void tcp_chr_telnet_init(CharDriverState *chr)
+static void tcp_chr_telnet_init(CharDriverState *chr, uint64_t id)
{
TCPCharDriver *s = chr->opaque;
TCPCharDriverTelnetInit *init =
@@ -3045,7 +3280,7 @@ static void tcp_chr_telnet_init(CharDriverState *chr)
#undef IACSET
qio_channel_add_watch(
- s->ioc, G_IO_OUT,
+ s->ioc[id], G_IO_OUT,
tcp_chr_telnet_init_io,
init, NULL);
}
@@ -3059,18 +3294,18 @@ static void tcp_chr_tls_handshake(Object *source,
TCPCharDriver *s = chr->opaque;
if (err) {
- tcp_chr_disconnect(chr);
+ tcp_chr_disconnect_n(chr, 0);
} else {
if (s->do_telnetopt) {
- tcp_chr_telnet_init(chr);
+ tcp_chr_telnet_init(chr, 0);
} else {
- tcp_chr_connect(chr);
+ tcp_chr_connect(chr, 0);
}
}
}
-static void tcp_chr_tls_init(CharDriverState *chr)
+static void tcp_chr_tls_init(CharDriverState *chr, uint64_t id)
{
TCPCharDriver *s = chr->opaque;
QIOChannelTLS *tioc;
@@ -3078,21 +3313,21 @@ static void tcp_chr_tls_init(CharDriverState *chr)
if (s->is_listen) {
tioc = qio_channel_tls_new_server(
- s->ioc, s->tls_creds,
+ s->ioc[id], s->tls_creds,
NULL, /* XXX Use an ACL */
&err);
} else {
tioc = qio_channel_tls_new_client(
- s->ioc, s->tls_creds,
+ s->ioc[id], s->tls_creds,
s->addr->u.inet.data->host,
&err);
}
if (tioc == NULL) {
error_free(err);
- tcp_chr_disconnect(chr);
+ tcp_chr_disconnect_n(chr, id);
}
- object_unref(OBJECT(s->ioc));
- s->ioc = QIO_CHANNEL(tioc);
+ object_unref(OBJECT(s->ioc[id]));
+ s->ioc[id] = QIO_CHANNEL(tioc);
qio_channel_tls_handshake(tioc,
tcp_chr_tls_handshake,
@@ -3100,36 +3335,52 @@ static void tcp_chr_tls_init(CharDriverState *chr)
NULL);
}
+static int find_avail_ioc(TCPCharDriver *s, uint64_t *id)
+{
+ uint64_t i;
+
+ for(i = 0; i < MAX_CLIENTS; i++) {
+ if (s->ioc[i] == NULL) {
+ *id = i;
+ return 0;
+ }
+ }
+ return -1;
+}
static int tcp_chr_new_client(CharDriverState *chr, QIOChannelSocket *sioc)
{
TCPCharDriver *s = chr->opaque;
- if (s->ioc != NULL) {
- return -1;
- }
+ uint64_t id;
- s->ioc = QIO_CHANNEL(sioc);
+ if(find_avail_ioc(s, &id) < 0)
+ return -1;
+
+ s->ioc[id] = QIO_CHANNEL(sioc);
object_ref(OBJECT(sioc));
- s->sioc = sioc;
+ s->sioc[id] = sioc;
object_ref(OBJECT(sioc));
+ if(chr->conn_bitmap != NULL)
+ set_bit(id, chr->conn_bitmap);
- qio_channel_set_blocking(s->ioc, false, NULL);
+ qio_channel_set_blocking(s->ioc[id], false, NULL);
if (s->do_nodelay) {
- qio_channel_set_delay(s->ioc, false);
+ qio_channel_set_delay(s->ioc[id], false);
}
+/*
if (s->listen_tag) {
g_source_remove(s->listen_tag);
s->listen_tag = 0;
}
-
+*/
if (s->tls_creds) {
- tcp_chr_tls_init(chr);
+ tcp_chr_tls_init(chr, id);
} else {
if (s->do_telnetopt) {
- tcp_chr_telnet_init(chr);
+ tcp_chr_telnet_init(chr, id);
} else {
- tcp_chr_connect(chr);
+ tcp_chr_connect(chr, id);
}
}
@@ -3178,7 +3429,7 @@ static int tcp_chr_wait_connected(CharDriverState *chr,
Error **errp)
/* It can't wait on s->connected, since it is set asynchronously
* in TLS and telnet cases, only wait for an accepted socket */
- while (!s->ioc) {
+ while (!s->ioc[0]) {
if (s->is_listen) {
fprintf(stderr, "QEMU waiting for connection on: %s\n",
chr->filename);
@@ -3211,9 +3462,11 @@ int qemu_chr_wait_connected(CharDriverState *chr, Error
**errp)
static void tcp_chr_close(CharDriverState *chr)
{
TCPCharDriver *s = chr->opaque;
+ uint64_t id;
- tcp_chr_free_connection(chr);
-
+ for (id = 0; id < s->connections; id++) {
+ tcp_chr_free_connection(chr, id);
+ }
if (s->reconnect_timer) {
g_source_remove(s->reconnect_timer);
s->reconnect_timer = 0;
@@ -3721,6 +3974,7 @@ static void qemu_chr_parse_socket(QemuOpts *opts,
ChardevBackend *backend,
bool is_telnet = qemu_opt_get_bool(opts, "telnet", false);
bool do_nodelay = !qemu_opt_get_bool(opts, "delay", true);
int64_t reconnect = qemu_opt_get_number(opts, "reconnect", 0);
+ uint64_t connections = qemu_opt_get_number(opts, "connections", 1);
const char *path = qemu_opt_get(opts, "path");
const char *host = qemu_opt_get(opts, "host");
const char *port = qemu_opt_get(opts, "port");
@@ -3758,6 +4012,8 @@ static void qemu_chr_parse_socket(QemuOpts *opts,
ChardevBackend *backend,
sock->has_reconnect = true;
sock->reconnect = reconnect;
sock->tls_creds = g_strdup(tls_creds);
+ sock->has_connections = true;
+ sock->connections = connections;
addr = g_new0(SocketAddress, 1);
if (path) {
@@ -4241,6 +4497,9 @@ QemuOptsList qemu_chardev_opts = {
},{
.name = "logappend",
.type = QEMU_OPT_BOOL,
+ },{
+ .name = "connections",
+ .type = QEMU_OPT_NUMBER,
},
{ /* end of list */ }
},
@@ -4413,6 +4672,7 @@ static CharDriverState *qmp_chardev_open_socket(const
char *id,
bool is_telnet = sock->has_telnet ? sock->telnet : false;
bool is_waitconnect = sock->has_wait ? sock->wait : false;
int64_t reconnect = sock->has_reconnect ? sock->reconnect : 0;
+ uint64_t connections = sock->has_connections ? sock->connections : 1;
ChardevCommon *common = qapi_ChardevSocket_base(sock);
QIOChannelSocket *sioc = NULL;
@@ -4426,6 +4686,7 @@ static CharDriverState *qmp_chardev_open_socket(const
char *id,
s->is_listen = is_listen;
s->is_telnet = is_telnet;
s->do_nodelay = do_nodelay;
+ s->connections = connections;
if (sock->tls_creds) {
Object *creds;
creds = object_resolve_path_component(
@@ -4461,6 +4722,15 @@ static CharDriverState *qmp_chardev_open_socket(const
char *id,
s->addr = QAPI_CLONE(SocketAddress, sock->addr);
+ if (sock->connections > 1) {
+ chr->conn_bitmap = bitmap_new(sock->connections);
+ chr->max_connections = sock->connections;
+ chr->chr_write_n = tcp_chr_write_n;
+ chr->chr_sync_read_n = tcp_chr_sync_read_n;
+ chr->get_msgfds_n = tcp_get_msgfds_n;
+ chr->set_msgfds_n = tcp_set_msgfds_n;
+ chr->chr_disconnect_n = tcp_chr_disconnect_n;
+ }
chr->opaque = s;
chr->chr_wait_connected = tcp_chr_wait_connected;
chr->chr_write = tcp_chr_write;
@@ -4478,10 +4748,12 @@ static CharDriverState *qmp_chardev_open_socket(const
char *id,
chr->filename = SocketAddress_to_str("disconnected:",
addr, is_listen, is_telnet);
+ chr->conn_id = ANONYMOUS_CLIENT;
if (is_listen) {
if (is_telnet) {
s->do_telnetopt = 1;
}
+ chr->conn_id = 0;
} else if (reconnect > 0) {
s->reconnect_time = reconnect;
}
@@ -4502,11 +4774,9 @@ static CharDriverState *qmp_chardev_open_socket(const
char *id,
qemu_chr_wait_connected(chr, errp) < 0) {
goto error;
}
- if (!s->ioc) {
- s->listen_tag = qio_channel_add_watch(
+ s->listen_tag = qio_channel_add_watch(
QIO_CHANNEL(s->listen_ioc), G_IO_IN,
tcp_chr_accept, chr, NULL);
- }
} else if (qemu_chr_wait_connected(chr, errp) < 0) {
goto error;
}
--
2.7.4