qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC V2 7/8] throttle: Add throttle group support


From: Benoît Canet
Subject: [Qemu-devel] [RFC V2 7/8] throttle: Add throttle group support
Date: Wed, 13 Aug 2014 16:23:58 +0200

The throttle group support use a cooperative round robin scheduling algorithm.

The principle of the algorithm are simple:
- Each BDS of the group is used as a token in a circular way.
- The active BDS compute if a wait must be done and arm the right timer.
- If a wait must be done the token timer will be armed so the token will become
  the next active BDS.

Signed-off-by: Benoit Canet <address@hidden>
---
 block.c                   | 191 ++++++++++++++++++++++++++++++++++++++++------
 block/qapi.c              |   7 +-
 block/throttle-groups.c   |   2 +-
 blockdev.c                |  18 ++++-
 hmp.c                     |   4 +-
 include/block/block.h     |   3 +-
 include/block/block_int.h |   9 ++-
 qapi/block-core.json      |   5 +-
 qemu-options.hx           |   1 +
 qmp-commands.hx           |   3 +-
 10 files changed, 208 insertions(+), 35 deletions(-)

diff --git a/block.c b/block.c
index 323fca5..6dd94fb 100644
--- a/block.c
+++ b/block.c
@@ -35,6 +35,7 @@
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "block/throttle-groups.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -126,7 +127,9 @@ void bdrv_set_io_limits(BlockDriverState *bs,
 {
     int i;
 
-    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_lock(bs->throttle_state);
+    throttle_config(bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_unlock(bs->throttle_state);
 
     for (i = 0; i < 2; i++) {
         qemu_co_enter_next(&bs->throttled_reqs[i]);
@@ -153,34 +156,99 @@ static bool bdrv_start_throttled_reqs(BlockDriverState 
*bs)
     return drained;
 }
 
+static void bdrv_throttle_group_add(BlockDriverState *bs)
+{
+    int i;
+    BlockDriverState *token;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+
+        /* If the ThrottleGroup is new set the current BlockDriverState as
+         * token
+         */
+        if (!token) {
+            throttle_group_set_token(bs->throttle_state, bs, i);
+        }
+
+    }
+
+    throttle_group_register_bs(bs->throttle_state, bs);
+}
+
+static void bdrv_throttle_group_remove(BlockDriverState *bs)
+{
+    BlockDriverState *token;
+    int i;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+        /* if this bs is the current token set the next bs as token */
+        if (token == bs) {
+            token = throttle_group_next_bs(token);
+            /* take care of the case where bs is the only bs of the group */
+            if (token == bs) {
+                token = NULL;
+            }
+            throttle_group_set_token(bs->throttle_state, token, i);
+        }
+    }
+
+    /* remove the current bs from the list */
+    QLIST_REMOVE(bs, round_robin);
+}
+
 void bdrv_io_limits_disable(BlockDriverState *bs)
 {
+
+    throttle_group_lock(bs->throttle_state);
     bs->io_limits_enabled = false;
+    throttle_group_unlock(bs->throttle_state);
 
     bdrv_start_throttled_reqs(bs);
 
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_remove(bs);
+    throttle_group_unlock(bs->throttle_state);
+
+    throttle_group_unref(bs->throttle_state);
+    bs->throttle_state = NULL;
+
     throttle_timers_destroy(&bs->throttle_timers);
 }
 
 static void bdrv_throttle_read_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, false);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, false);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[0]);
 }
 
 static void bdrv_throttle_write_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, true);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, true);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[1]);
 }
 
 /* should be called before bdrv_set_io_limits if a limit is set */
-void bdrv_io_limits_enable(BlockDriverState *bs)
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
 {
     assert(!bs->io_limits_enabled);
-    throttle_init(&bs->throttle_state);
+    bs->throttle_state = throttle_group_incref(group ? group: bs->device_name);
+
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_add(bs);
     throttle_timers_init(&bs->throttle_timers,
                          bdrv_get_aio_context(bs),
                          QEMU_CLOCK_VIRTUAL,
@@ -188,6 +256,53 @@ void bdrv_io_limits_enable(BlockDriverState *bs)
                          bdrv_throttle_write_timer_cb,
                          bs);
     bs->io_limits_enabled = true;
+    throttle_group_unlock(bs->throttle_state);
+}
+
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group)
+{
+    /* this bs is not part of any group */
+    if (!bs->throttle_state) {
+        return;
+    }
+
+    /* this bs is a part of the same group than the one we want */
+    if (throttle_group_compare(bs->throttle_state, group)) {
+        return;
+    }
+
+    /* need to change the group this bs belong to */
+    bdrv_io_limits_disable(bs);
+    bdrv_io_limits_enable(bs, group);
+}
+
+/* This implement the round robin policy and must be called under ThrottleGroup
+ * lock
+ */
+static BlockDriverState *bdrv_next_throttle_token(BlockDriverState *bs,
+                                                  bool is_write)
+{
+    BlockDriverState *token, *start;
+
+    start = token = throttle_group_token(bs->throttle_state, is_write);
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bs(token);
+    while (token != start  &&
+           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = throttle_group_next_bs(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start &&
+        qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = bs;
+    }
+
+    return token;
 }
 
 /* This function makes an IO wait if needed
@@ -199,32 +314,63 @@ static void bdrv_io_limits_intercept(BlockDriverState *bs,
                                      unsigned int bytes,
                                      bool is_write)
 {
+    bool empty;
     bool armed;
+    bool token_queue_empty;
+    BlockDriverState *token;
 
+    throttle_group_lock(bs->throttle_state);
+    /* get the next bs to schedule */
+    token = bdrv_next_throttle_token(bs, is_write);
     /* does this io must wait */
-    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
-                                             &bs->throttle_timers,
+    bool must_wait = throttle_schedule_timer(bs->throttle_state,
+                                             &token->throttle_timers,
                                              is_write,
                                              &armed);
+    /* the timer got armed -> save the token */
+    if (armed) {
+        throttle_group_set_token(bs->throttle_state, token, is_write);
+    }
+    empty = qemu_co_queue_empty(&bs->throttled_reqs[is_write]);
+    throttle_group_unlock(bs->throttle_state);
 
     /* if must wait or any request of this type throttled queue the IO */
-    if (must_wait ||
-        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+    if (must_wait || !empty) {
         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
     }
 
-    /* the IO will be executed, do the accounting */
-    throttle_account(&bs->throttle_state, is_write, bytes);
-
+    throttle_group_lock(bs->throttle_state);
+    /* get the next bs to schedule */
+    token = bdrv_next_throttle_token(bs, is_write);
+    /* is there an IO to schedule in the round robin token ? */
+    token_queue_empty = qemu_co_queue_empty(&token->throttled_reqs[is_write]);
+    /* this IO will be executed, do the accounting */
+    throttle_account(bs->throttle_state, is_write, bytes);
+    /* does the next IO queued must wait ? */
+    must_wait = throttle_schedule_timer(bs->throttle_state,
+                                        &token->throttle_timers,
+                                        is_write,
+                                        &armed);
+    /* If a timer was armed or an IO is to be scheduled in the next round robin
+     * token then save the token.
+     */
+    if (armed || !token_queue_empty) {
+        throttle_group_set_token(bs->throttle_state, token, is_write);
+    }
 
     /* if the next request must wait -> do nothing */
-    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
-                                is_write, &armed)) {
+    if (must_wait) {
+        throttle_group_unlock(bs->throttle_state);
         return;
     }
 
-    /* else queue next request for execution */
-    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    /* else schedule next request for execution */
+    if (!qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+        qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    } else if(!token_queue_empty) {
+        throttle_fire_timer(&token->throttle_timers, is_write);
+    }
+    throttle_group_unlock(bs->throttle_state);
 }
 
 size_t bdrv_opt_mem_align(BlockDriverState *bs)
@@ -1975,15 +2121,16 @@ static void bdrv_move_feature_fields(BlockDriverState 
*bs_dest,
     bs_dest->enable_write_cache = bs_src->enable_write_cache;
 
     /* i/o throttled req */
-    memcpy(&bs_dest->throttle_state,
-           &bs_src->throttle_state,
-           sizeof(ThrottleState));
+    bs_dest->throttle_state     = bs_src->throttle_state,
+    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
+    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
+    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
+    memcpy(&bs_dest->round_robin,
+           &bs_src->round_robin,
+           sizeof(bs_dest->round_robin));
     memcpy(&bs_dest->throttle_timers,
            &bs_src->throttle_timers,
            sizeof(ThrottleTimers));
-    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
-    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
-    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
 
     /* r/w error */
     bs_dest->on_read_error      = bs_src->on_read_error;
diff --git a/block/qapi.c b/block/qapi.c
index f44f6b4..c1b92c3 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -24,6 +24,7 @@
 
 #include "block/qapi.h"
 #include "block/block_int.h"
+#include "block/throttle-groups.h"
 #include "qmp-commands.h"
 #include "qapi-visit.h"
 #include "qapi/qmp-output-visitor.h"
@@ -54,7 +55,11 @@ BlockDeviceInfo *bdrv_block_device_info(BlockDriverState *bs)
 
     if (bs->io_limits_enabled) {
         ThrottleConfig cfg;
-        throttle_get_config(&bs->throttle_state, &cfg);
+
+        throttle_group_lock(bs->throttle_state);
+        throttle_get_config(bs->throttle_state, &cfg);
+        throttle_group_unlock(bs->throttle_state);
+
         info->bps     = cfg.buckets[THROTTLE_BPS_TOTAL].avg;
         info->bps_rd  = cfg.buckets[THROTTLE_BPS_READ].avg;
         info->bps_wr  = cfg.buckets[THROTTLE_BPS_WRITE].avg;
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index ea5baca..399ae5e 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -154,7 +154,7 @@ void throttle_group_register_bs(ThrottleState *ts, 
BlockDriverState *bs)
  */
 BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
 {
-    ThrottleState *ts = &bs->throttle_state;
+    ThrottleState *ts = bs->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
 
diff --git a/blockdev.c b/blockdev.c
index 48bd9a3..b9ed099 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -330,6 +330,7 @@ static DriveInfo *blockdev_init(const char *file, QDict 
*bs_opts,
     bool has_driver_specific_opts;
     BlockdevDetectZeroesOptions detect_zeroes;
     BlockDriver *drv = NULL;
+    const char *throttling_group;
 
     /* Check common options by copying from bs_opts to opts, all other options
      * stay in bs_opts for processing by bdrv_open(). */
@@ -432,6 +433,8 @@ static DriveInfo *blockdev_init(const char *file, QDict 
*bs_opts,
 
     cfg.op_size = qemu_opt_get_number(opts, "throttling.iops-size", 0);
 
+    throttling_group = qemu_opt_get(opts, "throttling.group");
+
     if (!check_throttle_config(&cfg, &error)) {
         error_propagate(errp, error);
         goto early_err;
@@ -490,7 +493,7 @@ static DriveInfo *blockdev_init(const char *file, QDict 
*bs_opts,
 
     /* disk I/O throttling */
     if (throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(dinfo->bdrv);
+        bdrv_io_limits_enable(dinfo->bdrv, throttling_group);
         bdrv_set_io_limits(dinfo->bdrv, &cfg);
     }
 
@@ -679,6 +682,7 @@ DriveInfo *drive_new(QemuOpts *all_opts, BlockInterfaceType 
block_default_type)
 
     qemu_opt_rename(all_opts,
                     "iops_size", "throttling.iops-size");
+    qemu_opt_rename(all_opts, "group", "throttling.group");
 
     qemu_opt_rename(all_opts, "readonly", "read-only");
 
@@ -1689,7 +1693,9 @@ void qmp_block_set_io_throttle(const char *device, 
int64_t bps, int64_t bps_rd,
                                bool has_iops_wr_max,
                                int64_t iops_wr_max,
                                bool has_iops_size,
-                               int64_t iops_size, Error **errp)
+                               int64_t iops_size,
+                               bool has_group,
+                               const char *group, Error **errp)
 {
     ThrottleConfig cfg;
     BlockDriverState *bs;
@@ -1741,9 +1747,11 @@ void qmp_block_set_io_throttle(const char *device, 
int64_t bps, int64_t bps_rd,
     aio_context_acquire(aio_context);
 
     if (!bs->io_limits_enabled && throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, has_group ? group : NULL);
     } else if (bs->io_limits_enabled && !throttle_enabled(&cfg)) {
         bdrv_io_limits_disable(bs);
+    } else if (bs->io_limits_enabled && throttle_enabled(&cfg)) {
+        bdrv_io_limits_update_group(bs, has_group ? group : NULL);
     }
 
     if (bs->io_limits_enabled) {
@@ -2643,6 +2651,10 @@ QemuOptsList qemu_common_drive_opts = {
             .type = QEMU_OPT_NUMBER,
             .help = "when limiting by iops max size of an I/O in bytes",
         },{
+            .name = "throttling.group",
+            .type = QEMU_OPT_STRING,
+            .help = "name of the block throttling group",
+        },{
             .name = "copy-on-read",
             .type = QEMU_OPT_BOOL,
             .help = "copy read data from backing file into image file",
diff --git a/hmp.c b/hmp.c
index 4d1838e..c580b0e 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1165,7 +1165,9 @@ void hmp_block_set_io_throttle(Monitor *mon, const QDict 
*qdict)
                               false,
                               0,
                               false, /* No default I/O size */
-                              0, &err);
+                              0,
+                              false,
+                              NULL, &err);
     hmp_handle_error(mon, &err);
 }
 
diff --git a/include/block/block.h b/include/block/block.h
index f08471d..70fce04 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -190,8 +190,9 @@ void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
-void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group);
 void bdrv_io_limits_disable(BlockDriverState *bs);
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group);
 
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 6066f63..fbf5d2e 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -334,12 +334,13 @@ struct BlockDriverState {
     /* number of in-flight serialising requests */
     unsigned int serialising_in_flight;
 
-    /* I/O throttling */
-    ThrottleState throttle_state;
-    ThrottleTimers throttle_timers; 
-    CoQueue      throttled_reqs[2];
+    /* I/O throttling - following elements protected by ThrottleGroup lock */
+    ThrottleState *throttle_state;
     bool         io_limits_enabled;
+    CoQueue      throttled_reqs[2];
     QLIST_ENTRY(BlockDriverState) round_robin;
+    /* timers have their own locking */
+    ThrottleTimers throttle_timers;
 
     /* I/O stats (display with "info blockstats"). */
     uint64_t nr_bytes[BDRV_MAX_IOTYPE];
diff --git a/qapi/block-core.json b/qapi/block-core.json
index e378653..aa307a2 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -886,6 +886,9 @@
 #
 # @iops_size: #optional an I/O size in bytes (Since 1.7)
 #
+#
+# @group: #optional throttle group name (Since 2.2)
+#
 # Returns: Nothing on success
 #          If @device is not a valid block device, DeviceNotFound
 #
@@ -897,7 +900,7 @@
             '*bps_max': 'int', '*bps_rd_max': 'int',
             '*bps_wr_max': 'int', '*iops_max': 'int',
             '*iops_rd_max': 'int', '*iops_wr_max': 'int',
-            '*iops_size': 'int' } }
+            '*iops_size': 'int', '*group': 'str' } }
 
 ##
 # @block-stream:
diff --git a/qemu-options.hx b/qemu-options.hx
index 1549625..f1ca6aa 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -433,6 +433,7 @@ DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n"
     "       [[,iops_max=im]|[[,iops_rd_max=irm][,iops_wr_max=iwm]]]\n"
     "       [[,iops_size=is]]\n"
+    "       [[,group=g]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 4be4765..2f25ac7 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1663,7 +1663,7 @@ EQMP
 
     {
         .name       = "block_set_io_throttle",
-        .args_type  = 
"device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?",
+        .args_type  = 
"device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?,group:s?",
         .mhandler.cmd_new = qmp_marshal_input_block_set_io_throttle,
     },
 
@@ -1689,6 +1689,7 @@ Arguments:
 - "iops_rd_max":  read I/O operations max (json-int)
 - "iops_wr_max":  write I/O operations max (json-int)
 - "iops_size":  I/O size in bytes when limiting (json-int)
+- "group": throttle group name (json-string)
 
 Example:
 
-- 
2.1.0.rc1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]