qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v3 4/7] colo: Introduce resource agent


From: Lukas Straub
Subject: [PATCH v3 4/7] colo: Introduce resource agent
Date: Tue, 4 Aug 2020 12:46:55 +0200

Introduce a resource agent which can be used to manage qemu COLO
in a pacemaker cluster.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 scripts/colo-resource-agent/colo | 1501 ++++++++++++++++++++++++++++++
 1 file changed, 1501 insertions(+)
 create mode 100755 scripts/colo-resource-agent/colo

diff --git a/scripts/colo-resource-agent/colo b/scripts/colo-resource-agent/colo
new file mode 100755
index 0000000000..fa70c0ae51
--- /dev/null
+++ b/scripts/colo-resource-agent/colo
@@ -0,0 +1,1501 @@
+#!/usr/bin/env python3
+
+# Resource agent for qemu COLO for use with Pacemaker CRM
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+
+import subprocess
+import sys
+import os
+import os.path
+import signal
+import socket
+import select
+import json
+import re
+import time
+import logging
+import logging.handlers
+
+# Constants
+OCF_SUCCESS = 0
+OCF_ERR_GENERIC = 1
+OCF_ERR_ARGS = 2
+OCF_ERR_UNIMPLEMENTED = 3
+OCF_ERR_PERM = 4
+OCF_ERR_INSTALLED = 5
+OCF_ERR_CONFIGURED = 6
+OCF_NOT_RUNNING = 7
+OCF_RUNNING_MASTER = 8
+OCF_FAILED_MASTER = 9
+
+# Get environment variables
+OCF_RESKEY_CRM_meta_notify_type \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_type")
+OCF_RESKEY_CRM_meta_notify_operation \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_operation")
+OCF_RESKEY_CRM_meta_notify_key_operation \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_key_operation")
+OCF_RESKEY_CRM_meta_notify_start_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_start_uname", "")
+OCF_RESKEY_CRM_meta_notify_stop_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_stop_uname", "")
+OCF_RESKEY_CRM_meta_notify_active_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_active_uname", "")
+OCF_RESKEY_CRM_meta_notify_promote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_promote_uname", "")
+OCF_RESKEY_CRM_meta_notify_demote_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_demote_uname", "")
+OCF_RESKEY_CRM_meta_notify_master_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_master_uname", "")
+OCF_RESKEY_CRM_meta_notify_slave_uname \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify_slave_uname", "")
+
+HA_RSCTMP = os.getenv("HA_RSCTMP", "/run/resource-agents")
+HA_LOGFACILITY = os.getenv("HA_LOGFACILITY")
+HA_LOGFILE = os.getenv("HA_LOGFILE")
+HA_DEBUG = os.getenv("HA_debug", "0")
+HA_DEBUGLOG = os.getenv("HA_DEBUGLOG")
+OCF_RESOURCE_INSTANCE = os.getenv("OCF_RESOURCE_INSTANCE", "default-instance")
+OCF_RESKEY_CRM_meta_timeout \
+    = os.getenv("OCF_RESKEY_CRM_meta_timeout", "60000")
+OCF_RESKEY_CRM_meta_interval \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_interval", "1"))
+OCF_RESKEY_CRM_meta_clone_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_max", "1"))
+OCF_RESKEY_CRM_meta_clone_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_clone_node_max", "1"))
+OCF_RESKEY_CRM_meta_master_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_max", "1"))
+OCF_RESKEY_CRM_meta_master_node_max \
+    = int(os.getenv("OCF_RESKEY_CRM_meta_master_node_max", "1"))
+OCF_RESKEY_CRM_meta_notify \
+    = os.getenv("OCF_RESKEY_CRM_meta_notify")
+OCF_RESKEY_CRM_meta_globally_unique \
+    = os.getenv("OCF_RESKEY_CRM_meta_globally_unique")
+
+HOSTNAME = os.getenv("OCF_RESKEY_CRM_meta_on_node", socket.gethostname())
+
+OCF_ACTION = os.getenv("__OCF_ACTION")
+if not OCF_ACTION and len(sys.argv) == 2:
+    OCF_ACTION = sys.argv[1]
+
+# Resource parameters
+OCF_RESKEY_qemu_binary_default = "qemu-system-x86_64"
+OCF_RESKEY_qemu_img_binary_default = "qemu-img"
+OCF_RESKEY_log_dir_default = HA_RSCTMP
+OCF_RESKEY_options_default = ""
+OCF_RESKEY_active_hidden_dir_default = ""
+OCF_RESKEY_listen_address_default = "0.0.0.0"
+OCF_RESKEY_base_port_default = "9000"
+OCF_RESKEY_checkpoint_interval_default = "20000"
+OCF_RESKEY_compare_timeout_default = "3000"
+OCF_RESKEY_expired_scan_cycle_default = "3000"
+OCF_RESKEY_max_queue_size_default = "1024"
+OCF_RESKEY_use_filter_rewriter_default = "true"
+OCF_RESKEY_vnet_hdr_default = "false"
+OCF_RESKEY_max_disk_errors_default = "1"
+OCF_RESKEY_monitor_timeout_default = "20000"
+OCF_RESKEY_yank_timeout_default = "10000"
+OCF_RESKEY_fail_fast_timeout_default = "5000"
+OCF_RESKEY_debug_default = "0"
+
+OCF_RESKEY_qemu_binary \
+    = os.getenv("OCF_RESKEY_qemu_binary", OCF_RESKEY_qemu_binary_default)
+OCF_RESKEY_qemu_img_binary \
+    = os.getenv("OCF_RESKEY_qemu_img_binary", 
OCF_RESKEY_qemu_img_binary_default)
+OCF_RESKEY_log_dir \
+    = os.getenv("OCF_RESKEY_log_dir", OCF_RESKEY_log_dir_default)
+OCF_RESKEY_options \
+    = os.getenv("OCF_RESKEY_options", OCF_RESKEY_options_default)
+OCF_RESKEY_active_hidden_dir \
+    = os.getenv("OCF_RESKEY_active_hidden_dir", 
OCF_RESKEY_active_hidden_dir_default)
+OCF_RESKEY_listen_address \
+    = os.getenv("OCF_RESKEY_listen_address", OCF_RESKEY_listen_address_default)
+OCF_RESKEY_base_port \
+    = os.getenv("OCF_RESKEY_base_port", OCF_RESKEY_base_port_default)
+OCF_RESKEY_checkpoint_interval \
+    = os.getenv("OCF_RESKEY_checkpoint_interval", 
OCF_RESKEY_checkpoint_interval_default)
+OCF_RESKEY_compare_timeout \
+    = os.getenv("OCF_RESKEY_compare_timeout", 
OCF_RESKEY_compare_timeout_default)
+OCF_RESKEY_expired_scan_cycle \
+    = os.getenv("OCF_RESKEY_expired_scan_cycle", 
OCF_RESKEY_expired_scan_cycle_default)
+OCF_RESKEY_max_queue_size \
+    = os.getenv("OCF_RESKEY_max_queue_size", OCF_RESKEY_max_queue_size_default)
+OCF_RESKEY_use_filter_rewriter \
+    = os.getenv("OCF_RESKEY_use_filter_rewriter", 
OCF_RESKEY_use_filter_rewriter_default)
+OCF_RESKEY_vnet_hdr \
+    = os.getenv("OCF_RESKEY_vnet_hdr", OCF_RESKEY_vnet_hdr_default)
+OCF_RESKEY_max_disk_errors \
+    = os.getenv("OCF_RESKEY_max_disk_errors", 
OCF_RESKEY_max_disk_errors_default)
+OCF_RESKEY_monitor_timeout \
+    = os.getenv("OCF_RESKEY_monitor_timeout", 
OCF_RESKEY_monitor_timeout_default)
+OCF_RESKEY_yank_timeout \
+    = os.getenv("OCF_RESKEY_yank_timeout", OCF_RESKEY_yank_timeout_default)
+OCF_RESKEY_fail_fast_timeout \
+    = os.getenv("OCF_RESKEY_fail_fast_timeout", 
OCF_RESKEY_fail_fast_timeout_default)
+OCF_RESKEY_debug \
+    = os.getenv("OCF_RESKEY_debug", OCF_RESKEY_debug_default)
+
+ACTIVE_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-active.qcow2")
+HIDDEN_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+                            OCF_RESOURCE_INSTANCE + "-hidden.qcow2")
+
+QMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qmp.sock")
+HELPER_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-helper.sock")
+COMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-compare.sock")
+COMP_OUT_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE \
+                                        + "-comp_out.sock")
+
+PID_FILE = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qemu.pid")
+
+QMP_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qmp.log")
+QEMU_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + 
"-qemu.log")
+HELPER_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE \
+                                                                + 
"-helper.log")
+
+START_TIME = time.time()
+did_yank = False
+
+# Exception only raised by ourself
+class Error(Exception):
+    pass
+
+def setup_constants():
+    # This function is called after the parameters where validated
+    global OCF_RESKEY_CRM_meta_timeout
+    if OCF_ACTION == "monitor":
+        OCF_RESKEY_CRM_meta_timeout = OCF_RESKEY_monitor_timeout
+
+    global MIGRATE_PORT, MIRROR_PORT, COMPARE_IN_PORT, NBD_PORT
+    MIGRATE_PORT = int(OCF_RESKEY_base_port)
+    MIRROR_PORT = int(OCF_RESKEY_base_port) + 1
+    COMPARE_IN_PORT = int(OCF_RESKEY_base_port) + 2
+    NBD_PORT = int(OCF_RESKEY_base_port) + 3
+
+    global QEMU_PRIMARY_CMDLINE
+    QEMU_PRIMARY_CMDLINE = \
+        ("'%(OCF_RESKEY_qemu_binary)s' %(OCF_RESKEY_options)s"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=parent0"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait -no-shutdown"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+    global QEMU_SECONDARY_CMDLINE
+    QEMU_SECONDARY_CMDLINE = \
+        ("'%(OCF_RESKEY_qemu_binary)s' %(OCF_RESKEY_options)s"
+        " -chardev socket,id=red0,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(MIRROR_PORT)s,server,nowait,nodelay"
+        " -chardev socket,id=red1,host='%(OCF_RESKEY_listen_address)s',"
+        "port=%(COMPARE_IN_PORT)s,server,nowait,nodelay"
+        " -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0"
+        " -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1") \
+        % globals()
+
+    if is_true(OCF_RESKEY_use_filter_rewriter):
+        QEMU_SECONDARY_CMDLINE += \
+            " -object filter-rewriter,id=rew0,netdev=hn0,queue=all"
+
+    QEMU_SECONDARY_CMDLINE += \
+        (" -drive if=none,node-name=childs0,top-id=colo-disk0,"
+        "driver=replication,mode=secondary,file.driver=qcow2,"
+        "file.file.filename='%(ACTIVE_IMAGE)s',file.backing.driver=qcow2,"
+        "file.backing.file.filename='%(HIDDEN_IMAGE)s',"
+        "file.backing.backing=parent0"
+        " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+        "vote-threshold=1,children.0=childs0"
+        " -incoming tcp:'%(OCF_RESKEY_listen_address)s':%(MIGRATE_PORT)s"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait -no-shutdown"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+    global QEMU_DUMMY_CMDLINE
+    QEMU_DUMMY_CMDLINE = \
+        ("'%(OCF_RESKEY_qemu_binary)s' %(OCF_RESKEY_options)s"
+        " -drive if=none,node-name=colo-disk0,driver=null-co -S"
+        " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+        " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+def qemu_colo_meta_data():
+    print("""\
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="colo">
+
+    <version>1.0</version>
+    <longdesc lang="en">
+Resource agent for qemu COLO. (https://wiki.qemu.org/Features/COLO)
+
+After defining the master/slave instance, the master score has to be
+manually set to show which node has up-to-date data. So you copy your
+image to one host (and create empty images the other host(s)) and then
+run "crm_master -r name_of_your_primitive -v 10" on that host.
+Also, you have to set 'notify=true' in the metadata attributes when
+defining the master/slave instance.
+
+Note:
+-If the instance is stopped cluster-wide, the resource agent will do a
+clean shutdown. Set the demote timeout to the time it takes for your
+guest to shutdown.
+-Colo replication is started from the monitor action. Set the monitor
+timeout to at least the time it takes for replication to start. You can
+set the monitor_timeout parameter for a soft timeout, which the resource
+agent tries to satisfy.
+-The resource agent may notify pacemaker about peer failure,
+these failures will show up with exitreason="Simulated failure".
+    </longdesc>
+    <shortdesc lang="en">Qemu COLO</shortdesc>
+
+    <parameters>
+
+    <parameter name="qemu_binary" unique="0" required="0">
+        <longdesc lang="en">qemu binary to use</longdesc>
+        <shortdesc lang="en">qemu binary</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_qemu_binary_default + """\"/>
+    </parameter>
+
+    <parameter name="qemu_img_binary" unique="0" required="0">
+        <longdesc lang="en">qemu-img binary to use</longdesc>
+        <shortdesc lang="en">qemu-img binary</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_qemu_img_binary_default + """\"/>
+    </parameter>
+
+    <parameter name="log_dir" unique="0" required="0">
+        <longdesc lang="en">Directory to place logs in</longdesc>
+        <shortdesc lang="en">Log directory</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_log_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="options" unique="0" required="1">
+        <longdesc lang="en">
+Options to pass to qemu. These will be passed alongside COLO specific
+options, so you need to follow these conventions: The netdev should have
+id=hn0 and the disk controller drive=colo-disk0. The image node should
+have node-name=parent0, but should not be connected to the guest.
+Example:
+-vnc :0 -enable-kvm -cpu qemu64,+kvmclock -m 512 -netdev bridge,id=hn0
+-device e1000,netdev=hn0 -device virtio-blk,drive=colo-disk0
+-drive if=none,node-name=parent0,format=qcow2,file=/mnt/vms/vm01.qcow2
+        </longdesc>
+        <shortdesc lang="en">Options to pass to qemu.</shortdesc>
+    </parameter>
+
+    <parameter name="active_hidden_dir" unique="0" required="1">
+        <longdesc lang="en">
+Directory where the active and hidden images will be stored. It is
+recommended to put this on a ramdisk.
+        </longdesc>
+        <shortdesc lang="en">Path to active and hidden images</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_active_hidden_dir_default + """\"/>
+    </parameter>
+
+    <parameter name="listen_address" unique="0" required="0">
+        <longdesc lang="en">Address to listen on.</longdesc>
+        <shortdesc lang="en">Listen address</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_listen_address_default + """\"/>
+    </parameter>
+
+    <parameter name="base_port" unique="1" required="0">
+        <longdesc lang="en">
+4 tcp ports that are unique for each instance. (base_port to base_port + 3)
+        </longdesc>
+        <shortdesc lang="en">Ports to use</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_base_port_default + """\"/>
+    </parameter>
+
+    <parameter name="checkpoint_interval" unique="0" required="0">
+        <longdesc lang="en">
+Interval for regular checkpoints in milliseconds.
+        </longdesc>
+        <shortdesc lang="en">Interval for regular checkpoints</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_checkpoint_interval_default + """\"/>
+    </parameter>
+
+    <parameter name="compare_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Maximum time to hold a primary packet if secondary hasn't sent it yet,
+in milliseconds.
+You should also adjust "expired_scan_cycle" accordingly.
+        </longdesc>
+        <shortdesc lang="en">Compare timeout</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_compare_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="expired_scan_cycle" unique="0" required="0">
+        <longdesc lang="en">
+Interval for checking for expired primary packets in milliseconds.
+        </longdesc>
+        <shortdesc lang="en">Expired packet check interval</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_expired_scan_cycle_default + """\"/>
+    </parameter>
+
+    <parameter name="max_queue_size" unique="0" required="0">
+        <longdesc lang="en">
+Maximum queue size for network packets.
+        </longdesc>
+        <shortdesc lang="en">Maximum queue size</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_max_queue_size_default + """\"/>
+    </parameter>
+
+    <parameter name="use_filter_rewriter" unique="0" required="0">
+        <longdesc lang="en">
+Use filter-rewriter to increase similarity between the VMs.
+        </longdesc>
+        <shortdesc lang="en">Use filter-rewriter</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_use_filter_rewriter_default + """\"/>
+    </parameter>
+
+    <parameter name="vnet_hdr" unique="0" required="0">
+        <longdesc lang="en">
+Set this to true if your system supports vnet_hdr and you enabled
+it on the tap netdev.
+        </longdesc>
+        <shortdesc lang="en">vnet_hdr support</shortdesc>
+        <content type="string" default=\"""" \
+            + OCF_RESKEY_vnet_hdr_default + """\"/>
+    </parameter>
+
+    <parameter name="max_disk_errors" unique="0" required="0">
+        <longdesc lang="en">
+Maximum disk read errors per monitor interval before marking the resource
+as failed. A write error is always fatal except if the value is 0.
+A value of 0 will disable disk error handling.
+Primary disk errors are only handled if there is a healthy secondary.
+        </longdesc>
+        <shortdesc lang="en">Maximum disk errors</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_max_disk_errors_default + """\"/>
+    </parameter>
+
+    <parameter name="monitor_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Soft timeout for monitor, in milliseconds.
+Must be lower than the monitor action timeout.
+        </longdesc>
+        <shortdesc lang="en">Monitor timeout</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_monitor_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="yank_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Timeout for QMP commands after which to execute the "yank" command,
+in milliseconds.
+Must be lower than any of the action timeouts.
+        </longdesc>
+        <shortdesc lang="en">Yank timeout</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_yank_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="fail_fast_timeout" unique="0" required="0">
+        <longdesc lang="en">
+Timeout for QMP commands used in the stop and demote actions to speed
+up recovery from a hanging qemu, in milliseconds.
+Must be lower than any of the action timeouts.
+        </longdesc>
+        <shortdesc lang="en">Timeout for fast paths</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_fail_fast_timeout_default + """\"/>
+    </parameter>
+
+    <parameter name="debug" unique="0" required="0">
+        <longdesc lang="en">
+Control debugging:
+0: disable debugging
+1: log debug messages and qmp commands
+2: + dump core of hanging qemu
+        </longdesc>
+        <shortdesc lang="en">Control debugging</shortdesc>
+        <content type="integer" default=\"""" \
+            + OCF_RESKEY_debug_default + """\"/>
+    </parameter>
+
+    </parameters>
+
+    <actions>
+        <action name="start"        timeout="30s" />
+        <action name="stop"         timeout="10s" />
+        <action name="monitor"      timeout="30s" \
+            interval="1000ms" depth="0" role="Slave" />
+        <action name="monitor"      timeout="30s" \
+            interval="1001ms" depth="0" role="Master" />
+        <action name="notify"       timeout="30s" />
+        <action name="promote"      timeout="30s" />
+        <action name="demote"       timeout="120s" />
+        <action name="meta-data"    timeout="5s" />
+        <action name="validate-all" timeout="20s" />
+    </actions>
+
+</resource-agent>
+""")
+
+def logs_open():
+    global log
+    log = logging.getLogger(OCF_RESOURCE_INSTANCE)
+    if int(OCF_RESKEY_debug) >= 1 or HA_DEBUG != "0":
+        log.setLevel(logging.DEBUG)
+    else:
+        log.setLevel(logging.INFO)
+
+    formater = logging.Formatter("(%(name)s) %(levelname)s: %(message)s")
+
+    if sys.stdout.isatty():
+        handler = logging.StreamHandler(stream=sys.stderr)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFACILITY:
+        handler = logging.handlers.SysLogHandler("/dev/log")
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_LOGFILE:
+        handler = logging.FileHandler(HA_LOGFILE)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    if HA_DEBUGLOG and HA_DEBUGLOG != HA_LOGFILE:
+        handler = logging.FileHandler(HA_DEBUGLOG)
+        handler.setFormatter(formater)
+        log.addHandler(handler)
+
+    global qmp_log
+    qmp_log = logging.getLogger("qmp_log")
+    qmp_log.setLevel(logging.DEBUG)
+    formater = logging.Formatter("%(message)s")
+
+    if int(OCF_RESKEY_debug) >= 1:
+        handler = logging.handlers.WatchedFileHandler(QMP_LOG)
+        handler.setFormatter(formater)
+        qmp_log.addHandler(handler)
+    else:
+        handler = logging.NullHandler()
+        qmp_log.addHandler(handler)
+
+def rotate_logfile(logfile, numlogs):
+    numlogs -= 1
+    for n in range(numlogs, -1, -1):
+        _file = logfile
+        if n != 0:
+            _file = "%s.%s" % (_file, n)
+        if os.path.exists(_file):
+            if n == numlogs:
+                os.remove(_file)
+            else:
+                newname = "%s.%s" % (logfile, n + 1)
+                os.rename(_file, newname)
+
+def is_writable(_file):
+    return os.access(_file, os.W_OK)
+
+def is_executable_file(_file):
+    return os.path.isfile(_file) and os.access(_file, os.X_OK)
+
+def is_true(var):
+    return re.match("yes|true|1|YES|TRUE|True|ja|on|ON", str(var)) != None
+
+# Check if the binary exists and is executable
+def check_binary(binary):
+    if is_executable_file(binary):
+        return True
+    PATH = os.getenv("PATH", os.defpath)
+    for _dir in PATH.split(os.pathsep):
+        if is_executable_file(os.path.join(_dir, binary)):
+            return True
+    log.error("binary \"%s\" doesn't exist or not executable" % binary)
+    return False
+
+def run_command(commandline):
+    proc = subprocess.Popen(commandline, shell=True, stdout=subprocess.PIPE,
+                          stderr=subprocess.STDOUT, universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        log.error("command \"%s\" failed with code %s:\n%s" \
+                    % (commandline, proc.returncode, stdout))
+        raise Error()
+
+# Functions for setting and getting the master score to tell Pacemaker which
+# host has the most recent data
+def set_master_score(score):
+    if score == 0:
+        run_command("crm_master -q -l forever -D")
+    else:
+        run_command("crm_master -q -l forever -v %s" % score)
+
+def set_remote_master_score(remote, score):
+    if score == 0:
+        run_command("crm_master -q -l forever -N '%s' -D" % remote)
+    else:
+        run_command("crm_master -q -l forever -N '%s' -v %s" % (remote, score))
+
+def get_master_score():
+    proc = subprocess.Popen("crm_master -q -G", shell=True,
+                            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
+                            universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+def get_remote_master_score(remote):
+    proc = subprocess.Popen("crm_master -q -N '%s' -G" % remote, shell=True,
+                            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
+                            universal_newlines=True)
+    stdout, stderr = proc.communicate()
+    if proc.returncode != 0:
+        return 0
+    else:
+        return int(str.strip(stdout))
+
+# Tell Pacemaker that the remote resource failed
+def report_remote_failure(remote):
+    run_command("crm_resource --resource '%s' --fail --node '%s'"
+                % (OCF_RESOURCE_INSTANCE, remote))
+
+def recv_line(fd):
+    line = ""
+    while True:
+        tmp = fd.recv(1).decode()
+        line += tmp
+        if tmp == "\n" or len(tmp) == 0:
+            break
+    return line
+
+# Filter out events
+def read_answer(fd):
+    while True:
+        line = recv_line(fd)
+        qmp_log.debug(str.strip(line))
+
+        if len(line) == 0:
+            log.error("qmp connection closed")
+            raise Error()
+
+        answer = json.loads(line)
+        # Ignore everything else
+        if "return" in answer or "error" in answer:
+            break
+    return answer
+
+# Execute one or more qmp commands
+def qmp_execute(fd, commands, ignore_error = False, do_yank = True):
+    for command in commands:
+        if not command:
+            continue
+
+        try:
+            to_send = json.dumps(command)
+            fd.sendall(str.encode(to_send + "\n"))
+            qmp_log.debug(to_send)
+
+            answer = read_answer(fd)
+        except Exception as e:
+            if isinstance(e, socket.timeout) and do_yank:
+                log.warning("Command timed out, trying to unfreeze qemu")
+                new_timeout = max(2, (int(OCF_RESKEY_CRM_meta_timeout)/1000) \
+                                    - (time.time() - START_TIME) - 2)
+                fd.settimeout(new_timeout)
+                try:
+                    # answer is the answer of timed-out command
+                    answer = yank(fd)
+                    if not answer:
+                        answer = read_answer(fd)
+                except socket.error as e:
+                    log.error("while reading answer of timed out command: "
+                              "%s\n%s" % (json.dumps(command), e))
+                    raise Error()
+            elif isinstance(e, (socket.error, socket.timeout)):
+                log.error("while executing qmp command: %s\n%s" \
+                            % (json.dumps(command), e))
+                raise Error()
+            else:
+                raise
+
+        if not ignore_error and ("error" in answer):
+            log.error("qmp command returned error:\n%s\n%s" \
+                        % (json.dumps(command), json.dumps(answer)))
+            raise Error()
+
+    return answer
+
+# Open qemu qmp connection
+def qmp_open(fail_fast = False):
+    if fail_fast:
+        timeout = int(OCF_RESKEY_fail_fast_timeout)/1000
+    else:
+        timeout = int(OCF_RESKEY_yank_timeout)/1000
+
+    try:
+        fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        fd.settimeout(timeout)
+        fd.connect(HELPER_SOCK)
+    except socket.error as e:
+        log.error("while connecting to helper socket: %s" % e)
+        raise Error()
+
+    return fd
+
+def yank(fd):
+    global did_yank
+    did_yank = True
+    ret = None
+    while True:
+        answer = qmp_execute(fd, [{"exec-oob": "query-yank", "id": "yank0"}], \
+                                do_yank = False, ignore_error = True)
+        if "id" not in answer:
+            # This is the answer of the timed-out command
+            ret = answer
+            answer = read_answer(fd)
+        if "error" in answer:
+            log.error("While executing 'query-yank':\n%s" % json.dumps(answer))
+            raise Error()
+        instances = []
+        for n in answer["return"]["instances"]:
+            if n == "blockdev:nbd0" or n == "chardev:mirror0" \
+               or n == "chardev:comp_sec_in0" or n == "migration" \
+               or n == "chardev:red0" or n == "chardev:red1":
+                instances.append(n)
+        answer = qmp_execute(fd, [{"exec-oob": "yank", "id": "yank0", 
"arguments":{ "instances": instances }}], \
+                                do_yank = False, ignore_error = True)
+        if "id" not in answer:
+            # This is the answer of the timed-out command
+            ret = answer
+            answer = read_answer(fd)
+        if "error" in answer:
+            if answer["error"]["class"] == "DeviceNotFound":
+                continue
+            else:
+                log.error("While executing 'yank':\n%s" % json.dumps(answer))
+                raise Error()
+        break
+
+    return ret
+
+def oob_helper_exec(client, cmd, events):
+    if cmd["exec-helper"] == "get-events":
+        event = cmd["arguments"]["event"]
+        if (event in events):
+            to_send = json.dumps({"return": events[event]})
+            client.sendall(str.encode(to_send + "\n"))
+        else:
+            client.sendall(str.encode("{\"return\": []}\n"))
+    elif cmd["exec-helper"] == "clear-events":
+        events.clear()
+        client.sendall(str.encode("{\"return\": {}}\n"))
+    else:
+        client.sendall(str.encode("{\"error\": \"Unknown helper command\"}\n"))
+
+def oob_helper(qmp, server):
+    max_events = max(100, int(OCF_RESKEY_max_disk_errors))
+    events = {}
+    try:
+        os.close(0)
+        os.close(1)
+        os.close(2)
+        logging.shutdown()
+
+        client = None
+        while True:
+            if client:
+                watch = [client, qmp]
+            else:
+                watch = [server, qmp]
+            sel = select.select(watch, [], [])
+            try:
+                if client in sel[0]:
+                    cmd = recv_line(client)
+                    if len(cmd) == 0:
+                        # client socket was closed: wait for new client
+                        client.close()
+                        client = None
+                        continue
+                    else:
+                        parsed = json.loads(cmd)
+                        if ("exec-helper" in parsed):
+                            oob_helper_exec(client, parsed, events)
+                        else:
+                            qmp.sendall(str.encode(cmd))
+                if qmp in sel[0]:
+                    answer = recv_line(qmp)
+                    if len(answer) == 0:
+                        # qmp socket was closed: qemu died, exit
+                        os._exit(0)
+                    else:
+                        parsed = json.loads(answer)
+                        if ("event" in parsed):
+                            event = parsed["event"]
+                            if (event not in events):
+                                events[event] = []
+                            if len(events[event]) < max_events:
+                                events[event].append(parsed)
+                        elif client:
+                            client.sendall(str.encode(answer))
+                if server in sel[0]:
+                    client, client_addr = server.accept()
+            except socket.error as e:
+                pass
+    except Exception as e:
+        with open(HELPER_LOG, 'a') as f:
+            f.write(str(e) + "\n")
+    os._exit(0)
+
+# Fork off helper to keep the oob qmp connection open and to catch events
+def oob_helper_open():
+    try:
+        qmp = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        qmp.connect(QMP_SOCK)
+        qmp_execute(qmp, [{"execute": "qmp_capabilities", "arguments": 
{"enable": ["oob"]}}])
+    except socket.error as e:
+        log.error("while connecting to qmp socket: %s" % e)
+        raise Error()
+
+    try:
+        if os.path.exists(HELPER_SOCK):
+            os.unlink(HELPER_SOCK)
+        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        server.bind(HELPER_SOCK)
+        server.listen(1)
+    except socket.error as e:
+        log.error("while opening helper socket: %s" % e)
+        raise Error()
+
+    qmp.set_inheritable(True)
+    server.set_inheritable(True)
+
+    try:
+        pid = os.fork()
+    except OSError as e:
+        log.error("while forking off oob helper: %s" % e)
+        raise Error()
+
+    if pid == 0:
+        # child 1: Exits after forking off child 2, so pid 1 will become
+        # responsible for the child
+        os.setsid()
+
+        pid = os.fork()
+
+        if pid == 0:
+            # child 2: here the actual work is being done
+            oob_helper(qmp, server)
+        else:
+            os._exit(0)
+
+    qmp.close()
+    server.close()
+
+# Get the disk size of the (user supplied) parent disk
+def qmp_get_disk_size(fd):
+    block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes", 
"arguments": {"flat": True}}])
+    for node in block_nodes["return"]:
+        if node["node-name"] == "parent0":
+            return node["image"]["virtual-size"]
+
+    log.error("Disk \"parent0\" not found")
+    raise Error()
+
+# Get the host of the nbd node
+def qmp_get_nbd_remote(fd):
+    block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes", 
"arguments": {"flat": True}}])
+    for node in block_nodes["return"]:
+        if node["node-name"] == "nbd0":
+            url = str(node["image"]["filename"])
+            return str.split(url, "//")[1].split("/")[0].split(":")[0]
+    return None
+
+# Check if we are currently resyncing
+def qmp_check_resync(fd):
+    answer = qmp_execute(fd, [{"execute": "query-block-jobs"}])
+    for job in answer["return"]:
+        if job["device"] == "resync":
+            return job
+    return None
+
+def qmp_start_resync(fd, remote):
+    answer = qmp_execute(fd, [{"execute": "blockdev-add", "arguments": 
{"driver": "nbd", "node-name": "nbd0", "server": {"type": "inet", "host": 
str(remote), "port": str(NBD_PORT)}, "export": "parent0", "detect-zeroes": 
"on"}}], ignore_error = True)
+    if "error" in answer:
+        log.warning("Failed to add nbd node: %s" % json.dumps(answer))
+        log.warning("Assuming peer failure")
+        report_remote_failure(remote)
+    else:
+        qmp_execute(fd, [{"execute": "blockdev-mirror", "arguments": 
{"device": "colo-disk0", "job-id": "resync", "target": "nbd0", "sync": "full", 
"on-target-error": "report", "on-source-error": "ignore", "auto-dismiss": 
False}}])
+
+def qmp_cancel_resync(fd):
+    timeout = START_TIME + (int(OCF_RESKEY_yank_timeout)/1000)
+
+    if qmp_check_resync(fd)["status"] != "concluded":
+        qmp_execute(fd, [{"execute": "block-job-cancel", "arguments": 
{"device": "resync", "force": True}}], ignore_error = True)
+        # Wait for the block-job to finish
+        while time.time() < timeout:
+            if qmp_check_resync(fd)["status"] == "concluded":
+                break
+            log.debug("Waiting for block-job to finish in qmp_cancel_resync()")
+            time.sleep(1)
+        else:
+            log.warning("Timed out, trying to unfreeze qemu")
+            yank(fd)
+            while qmp_check_resync(fd)["status"] != "concluded":
+                log.debug("Waiting for block-job to finish")
+                time.sleep(1)
+
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}}
+        ])
+
+def qmp_start_colo(fd, remote):
+    # Check if we have a filter-rewriter
+    answer = qmp_execute(fd, [{"execute": "qom-list", "arguments": {"path": 
"/objects/rew0"}}], ignore_error = True)
+    if "error" in answer:
+        if answer["error"]["class"] == "DeviceNotFound":
+            have_filter_rewriter = False
+        else:
+            log.error("While checking for filter-rewriter:\n%s" \
+                        % json.dumps(answer))
+            raise Error()
+    else:
+        have_filter_rewriter = True
+
+    # Pause VM and cancel resync
+    qmp_execute(fd, [
+        {"execute": "stop"},
+        {"execute": "block-job-cancel", "arguments": {"device": "resync"}}
+        ])
+
+    # Wait for the block-job to finish
+    while qmp_check_resync(fd)["status"] != "concluded":
+        log.debug("Waiting for block-job to finish in qmp_start_colo()")
+        time.sleep(1)
+
+    # Add nbd to the quorum node
+    qmp_execute(fd, [
+        {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", 
"node": "nbd0"}}
+        ])
+
+    # Connect mirror and compare_in to secondary
+    qmp_execute(fd, [
+        {"execute": "chardev-add", "arguments": {"id": "comp_pri_in0<", 
"backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": 
{"path": str(COMP_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_pri_in0>", 
"backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": 
{"path": str(COMP_SOCK)}}, "server": False}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out0<", 
"backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": 
{"path": str(COMP_OUT_SOCK)}}, "server": True}}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_out0>", 
"backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": 
{"path": str(COMP_OUT_SOCK)}}, "server": False}}}},
+        {"execute": "chardev-add", "arguments": {"id": "mirror0", "backend": 
{"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": 
str(remote), "port": str(MIRROR_PORT)}}, "server": False, "nodelay": True }}}},
+        {"execute": "chardev-add", "arguments": {"id": "comp_sec_in0", 
"backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": 
{"host": str(remote), "port": str(COMPARE_IN_PORT)}}, "server": False, 
"nodelay": True }}}}
+        ])
+
+    # Add the COLO filters
+    vnet_hdr_support = is_true(OCF_RESKEY_vnet_hdr)
+    if have_filter_rewriter:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": 
"filter-mirror", "id": "m0", "props": {"insert": "before", "position": 
"id=rew0", "netdev": "hn0", "queue": "tx", "outdev": "mirror0", 
"vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": 
"filter-redirector", "id": "redire0", "props": {"insert": "before", "position": 
"id=rew0", "netdev": "hn0", "queue": "rx", "indev": "comp_out0<", 
"vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": 
"filter-redirector", "id": "redire1", "props": {"insert": "before", "position": 
"id=rew0", "netdev": "hn0", "queue": "rx", "outdev": "comp_pri_in0<", 
"vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", 
"id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": 
"colo-compare", "id": "comp0", "props": {"primary_in": "comp_pri_in0>", 
"secondary_in": "comp_sec_in0", "outdev": "comp_out0>", "iothread": 
"iothread1", "compare_timeout": int(OCF_RESKEY_compare_timeout), 
"expired_scan_cycle": int(OCF_RESKEY_expired_scan_cycle), "max_queue_size": 
int(OCF_RESKEY_max_queue_size), "vnet_hdr_support": vnet_hdr_support}}}
+            ])
+    else:
+        qmp_execute(fd, [
+            {"execute": "object-add", "arguments": {"qom-type": 
"filter-mirror", "id": "m0", "props": {"netdev": "hn0", "queue": "tx", 
"outdev": "mirror0", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": 
"filter-redirector", "id": "redire0", "props": {"netdev": "hn0", "queue": "rx", 
"indev": "comp_out0<", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": 
"filter-redirector", "id": "redire1", "props": {"netdev": "hn0", "queue": "rx", 
"outdev": "comp_pri_in0<", "vnet_hdr_support": vnet_hdr_support}}},
+            {"execute": "object-add", "arguments": {"qom-type": "iothread", 
"id": "iothread1"}},
+            {"execute": "object-add", "arguments": {"qom-type": 
"colo-compare", "id": "comp0", "props": {"primary_in": "comp_pri_in0>", 
"secondary_in": "comp_sec_in0", "outdev": "comp_out0>", "iothread": 
"iothread1", "compare_timeout": int(OCF_RESKEY_compare_timeout), 
"expired_scan_cycle": int(OCF_RESKEY_expired_scan_cycle), "max_queue_size": 
int(OCF_RESKEY_max_queue_size), "vnet_hdr_support": vnet_hdr_support}}}
+            ])
+
+    # Start COLO
+    qmp_execute(fd, [
+        {"execute": "migrate-set-capabilities", "arguments": {"capabilities": 
[{"capability": "x-colo", "state": True }] }},
+        {"execute": "migrate-set-parameters", "arguments": 
{"x-checkpoint-delay": int(OCF_RESKEY_checkpoint_interval) }},
+        {"execute": "migrate", "arguments": {"uri": "tcp:%s:%s" % (remote, 
MIGRATE_PORT)}}
+        ])
+
+    # Wait for COLO to start
+    while qmp_execute(fd, [{"execute": "query-status"}])["return"]["status"] \
+            == "paused" \
+            or qmp_execute(fd, [{"execute": 
"query-colo-status"}])["return"]["mode"] \
+            != "primary" :
+        log.debug("Waiting for colo replication to start")
+        time.sleep(1)
+
+def qmp_primary_failover(fd):
+    qmp_execute(fd, [
+        {"execute": "object-del", "arguments": {"id": "m0"}},
+        {"execute": "object-del", "arguments": {"id": "redire0"}},
+        {"execute": "object-del", "arguments": {"id": "redire1"}},
+        {"execute": "x-colo-lost-heartbeat"},
+        {"execute": "object-del", "arguments": {"id": "comp0"}},
+        {"execute": "object-del", "arguments": {"id": "iothread1"}},
+        {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", 
"child": "children.1"}},
+        {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "mirror0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_sec_in0"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_pri_in0>"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_pri_in0<"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_out0>"}},
+        {"execute": "chardev-remove", "arguments": {"id": "comp_out0<"}}
+        ])
+
+def qmp_secondary_failover(fd):
+    qmp_execute(fd, [
+        {"execute": "nbd-server-stop"},
+        {"execute": "object-del", "arguments": {"id": "f2"}},
+        {"execute": "object-del", "arguments": {"id": "f1"}},
+        {"execute": "x-colo-lost-heartbeat"},
+        {"execute": "chardev-remove", "arguments": {"id": "red1"}},
+        {"execute": "chardev-remove", "arguments": {"id": "red0"}},
+        ])
+
+def qmp_is_colo_active(fd):
+    answer = qmp_execute(fd, [{"execute": "query-colo-status"}])
+
+    if answer["return"]["mode"] != "none":
+        return True
+    else:
+        return False
+
+# Check qemu health and colo role
+def qmp_check_health(fd, do_yank = True):
+    answer = qmp_execute(fd, [{"execute": "query-status"}], do_yank = do_yank)
+    vm_status = answer["return"]
+
+    answer = qmp_execute(fd, [{"execute": "query-colo-status"}], \
+                            do_yank = do_yank)
+    colo_status = answer["return"]
+
+    if vm_status["status"] == "inmigrate" \
+        or vm_status["status"] == "shutdown":
+        role = OCF_SUCCESS
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "none" \
+         and (colo_status["reason"] == "request" \
+              or colo_status["reason"] == "none"):
+        role = OCF_RUNNING_MASTER
+        replication = OCF_NOT_RUNNING
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "secondary":
+        role = OCF_SUCCESS
+        replication = OCF_SUCCESS
+
+    elif (vm_status["status"] == "running" \
+          or vm_status["status"] == "colo" \
+          or vm_status["status"] == "finish-migrate") \
+         and colo_status["mode"] == "primary":
+        role = OCF_RUNNING_MASTER
+        replication = OCF_SUCCESS
+
+    else:
+        log.error("Invalid qemu status:\nvm status: %s\ncolo status: %s" \
+                    % (vm_status, colo_status))
+        role = OCF_ERR_GENERIC
+        replication = OCF_ERR_GENERIC
+
+    return role, replication
+
+# Sanity checks: check parameters, files, binaries, etc.
+def qemu_colo_validate_all():
+    # Check resource parameters
+    if not str.isdigit(OCF_RESKEY_base_port):
+        log.error("base_port needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_checkpoint_interval):
+        log.error("checkpoint_interval needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_compare_timeout):
+        log.error("compare_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_expired_scan_cycle):
+        log.error("expired_scan_cycle needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_max_queue_size):
+        log.error("max_queue_size needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_max_disk_errors):
+        log.error("max_disk_errors needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_monitor_timeout):
+        log.error("monitor_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_yank_timeout):
+        log.error("yank_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_fail_fast_timeout):
+        log.error("fail_fast_timeout needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not str.isdigit(OCF_RESKEY_debug):
+        log.error("debug needs to be a number")
+        return OCF_ERR_CONFIGURED
+
+    if not OCF_RESKEY_active_hidden_dir:
+        log.error("active_hidden_dir needs to be specified")
+        return OCF_ERR_CONFIGURED
+
+    # Check resource meta configuration
+    if OCF_ACTION != "stop":
+        if OCF_RESKEY_CRM_meta_master_max != 1:
+            log.error("only one master allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_max > 2:
+            log.error("maximum 2 clones allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_master_node_max != 1:
+            log.error("only one master per node allowed")
+            return OCF_ERR_CONFIGURED
+
+        if OCF_RESKEY_CRM_meta_clone_node_max != 1:
+            log.error("only one clone per node allowed")
+            return OCF_ERR_CONFIGURED
+
+    # Check if notify is enabled
+    if OCF_ACTION != "stop" and OCF_ACTION != "monitor":
+        if not is_true(OCF_RESKEY_CRM_meta_notify) \
+           and not OCF_RESKEY_CRM_meta_notify_start_uname:
+            log.error("notify needs to be enabled")
+            return OCF_ERR_CONFIGURED
+
+    # Check that globally-unique is disabled
+    if is_true(OCF_RESKEY_CRM_meta_globally_unique):
+        log.error("globally-unique needs to be disabled")
+        return OCF_ERR_CONFIGURED
+
+    # Check binaries
+    if not check_binary(OCF_RESKEY_qemu_binary):
+        return OCF_ERR_INSTALLED
+
+    if not check_binary(OCF_RESKEY_qemu_img_binary):
+        return OCF_ERR_INSTALLED
+
+    # Check paths and files
+    if not is_writable(OCF_RESKEY_active_hidden_dir) \
+        or not os.path.isdir(OCF_RESKEY_active_hidden_dir):
+        log.error("active and hidden image directory missing or not writable")
+        return OCF_ERR_PERM
+
+    return OCF_SUCCESS
+
+# Check if qemu is running
+def check_pid():
+    if not os.path.exists(PID_FILE):
+        return OCF_NOT_RUNNING, None
+
+    fd = open(PID_FILE, "r")
+    pid = int(str.strip(fd.readline()))
+    fd.close()
+    try:
+        os.kill(pid, 0)
+    except OSError:
+        log.info("qemu is not running")
+        return OCF_NOT_RUNNING, pid
+    else:
+        return OCF_SUCCESS, pid
+
+def qemu_colo_monitor(fail_fast = False):
+    status, pid = check_pid()
+    if status != OCF_SUCCESS:
+        return status, OCF_NOT_RUNNING
+
+    fd = qmp_open(fail_fast)
+
+    role, replication = qmp_check_health(fd, do_yank = not fail_fast)
+    if role != OCF_SUCCESS and role != OCF_RUNNING_MASTER:
+        return role, replication
+
+    colo_events = qmp_execute(fd, [{"exec-helper": "get-events", "arguments": 
{"event": "COLO_EXIT"}}], do_yank = False)
+    for event in colo_events["return"]:
+        if event["data"]["reason"] == "error":
+            if replication == OCF_SUCCESS:
+                replication = OCF_ERR_GENERIC
+
+    if did_yank and replication == OCF_SUCCESS:
+        replication = OCF_ERR_GENERIC
+
+    peer_disk_errors = 0
+    local_disk_errors = 0
+    quorum_events = qmp_execute(fd, [{"exec-helper": "get-events", 
"arguments": {"event": "QUORUM_REPORT_BAD"}}], do_yank = False)
+    for event in quorum_events["return"]:
+        if event["data"]["node-name"] == "nbd0":
+            if event["data"]["type"] == "read":
+                peer_disk_errors += 1
+            else:
+                peer_disk_errors += int(OCF_RESKEY_max_disk_errors)
+        else:
+            if event["data"]["type"] == "read":
+                local_disk_errors += 1
+            else:
+                local_disk_errors += int(OCF_RESKEY_max_disk_errors)
+
+    if int(OCF_RESKEY_max_disk_errors) != 0:
+        if peer_disk_errors >= int(OCF_RESKEY_max_disk_errors):
+            log.error("Peer disk error")
+            if replication == OCF_SUCCESS:
+                replication = OCF_ERR_GENERIC
+
+        if local_disk_errors >= int(OCF_RESKEY_max_disk_errors):
+            if replication == OCF_SUCCESS:
+                log.error("Local disk error")
+                role = OCF_ERR_GENERIC
+            else:
+                log.warning("Local disk error")
+
+    if not fail_fast and OCF_RESKEY_CRM_meta_interval != 0:
+        # This isn't a probe monitor
+        block_job = qmp_check_resync(fd)
+        if block_job:
+            if "error" in block_job:
+                log.error("resync error: %s" % block_job["error"])
+                peer = qmp_get_nbd_remote(fd)
+                qmp_cancel_resync(fd)
+                report_remote_failure(peer)
+            elif block_job["ready"] == True:
+                log.info("resync done, starting colo")
+                peer = qmp_get_nbd_remote(fd)
+                qmp_start_colo(fd, peer)
+                # COLO started, our secondary now can be promoted if the
+                # primary fails
+                set_remote_master_score(peer, 100)
+            else:
+                pct_done = (float(block_job["offset"]) \
+                            / float(block_job["len"])) * 100
+                log.info("resync %.1f%% done" % pct_done)
+        else:
+            if replication == OCF_ERR_GENERIC:
+                if role == OCF_RUNNING_MASTER:
+                    log.error("Replication error")
+                    peer = qmp_get_nbd_remote(fd)
+                    if peer:
+                        report_remote_failure(peer)
+                else:
+                    log.warning("Replication error")
+        qmp_execute(fd, [{"exec-helper": "clear-events"}], do_yank = False)
+
+    fd.close()
+
+    return role, replication
+
+def qemu_colo_start():
+    if check_pid()[0] == OCF_SUCCESS:
+        log.info("qemu is already running")
+        return OCF_SUCCESS
+
+    rotate_logfile(QMP_LOG, 8)
+    rotate_logfile(QEMU_LOG, 8)
+
+    run_command(QEMU_DUMMY_CMDLINE)
+    oob_helper_open()
+    fd = qmp_open()
+    disk_size = qmp_get_disk_size(fd)
+    fd.close()
+    _qemu_colo_stop(OCF_SUCCESS, False)
+
+    run_command("'%s' create -q -f qcow2 %s %s" \
+            % (OCF_RESKEY_qemu_img_binary, ACTIVE_IMAGE, disk_size))
+    run_command("'%s' create -q -f qcow2 %s %s" \
+            % (OCF_RESKEY_qemu_img_binary, HIDDEN_IMAGE, disk_size))
+
+    run_command(QEMU_SECONDARY_CMDLINE)
+    oob_helper_open()
+
+    fd = qmp_open()
+    qmp_execute(fd, [
+        {"execute": "nbd-server-start", "arguments": {"addr": {"type": "inet", 
"data": {"host": str(OCF_RESKEY_listen_address), "port": str(NBD_PORT)}}}},
+        {"execute": "nbd-server-add", "arguments": {"device": "parent0", 
"writable": True}}
+        ])
+    fd.close()
+
+    return OCF_SUCCESS
+
+def env_do_shutdown_guest():
+    return OCF_RESKEY_CRM_meta_notify_active_uname \
+           and OCF_RESKEY_CRM_meta_notify_stop_uname \
+           and str.strip(OCF_RESKEY_CRM_meta_notify_active_uname) \
+               == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+
+def env_find_secondary():
+    # slave(s) =
+    # OCF_RESKEY_CRM_meta_notify_slave_uname
+    # - OCF_RESKEY_CRM_meta_notify_stop_uname
+    # + OCF_RESKEY_CRM_meta_notify_start_uname
+    # Filter out hosts that are stopping and ourselves
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_slave_uname, " "):
+        if host:
+            for stopping_host \
+                in str.split(OCF_RESKEY_CRM_meta_notify_stop_uname, " "):
+                if host == stopping_host:
+                    break
+            else:
+                if host != HOSTNAME:
+                    # we found a valid secondary
+                    return host
+
+    for host in str.split(OCF_RESKEY_CRM_meta_notify_start_uname, " "):
+        if host != HOSTNAME:
+            # we found a valid secondary
+            return host
+
+    # we found no secondary
+    return None
+
+def _qemu_colo_stop(monstatus, shutdown_guest):
+    # stop action must do everything possible to stop the resource
+    try:
+        timeout = START_TIME + (int(OCF_RESKEY_CRM_meta_timeout)/1000) - 5
+        force_stop = False
+
+        if monstatus == OCF_NOT_RUNNING:
+            log.info("resource is already stopped")
+            return OCF_SUCCESS
+        elif monstatus == OCF_RUNNING_MASTER or monstatus == OCF_SUCCESS:
+            force_stop = False
+        else:
+            force_stop = True
+
+        if not force_stop:
+            executed_quit = False
+
+            fd = qmp_open(fail_fast = True)
+            if shutdown_guest:
+                qmp_execute(fd, [{"execute": "system_powerdown"}], \
+                                    do_yank = False)
+            else:
+                qmp_execute(fd, [{"execute": "quit"}], do_yank = False)
+                fd.close()
+                executed_quit = True
+
+            # wait for qemu to stop
+            while time.time() < timeout:
+                status, pid = check_pid()
+                if status == OCF_NOT_RUNNING:
+                    # qemu stopped
+                    return OCF_SUCCESS
+                elif status == OCF_SUCCESS and not executed_quit:
+                    vmstatus = qmp_execute(fd, [{"execute": "query-status"}], \
+                                        do_yank = False)
+                    if vmstatus["return"]["status"] == "shutdown":
+                        qmp_execute(fd, [{"execute": "quit"}], do_yank = False)
+                        fd.close()
+                        executed_quit = True
+                    log.debug("Waiting for guest to shutdown")
+                    time.sleep(1)
+                elif status == OCF_SUCCESS and executed_quit:
+                    log.debug("Waiting for qemu to stop")
+                    time.sleep(1)
+                else:
+                    # something went wrong, force stop instead
+                    break
+
+            log.warning("clean stop timeout reached")
+    except Exception as e:
+        log.warning("error while stopping: %s" % e)
+
+    log.info("force stopping qemu")
+
+    status, pid = check_pid()
+    if status == OCF_NOT_RUNNING:
+        return OCF_SUCCESS
+    try:
+        if int(OCF_RESKEY_debug) >= 2:
+            os.kill(pid, signal.SIGSEGV)
+        else:
+            os.kill(pid, signal.SIGTERM)
+            time.sleep(2)
+            os.kill(pid, signal.SIGKILL)
+    except Exception:
+        pass
+
+    while check_pid()[0] != OCF_NOT_RUNNING:
+        time.sleep(1)
+
+    return OCF_SUCCESS
+
+def qemu_colo_stop():
+    shutdown_guest = env_do_shutdown_guest()
+
+    try:
+        role, replication = qemu_colo_monitor(fail_fast = True)
+    except Exception:
+        role, replication = OCF_ERR_GENERIC, OCF_ERR_GENERIC
+
+    status = _qemu_colo_stop(role, shutdown_guest)
+
+    if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+        if str.strip(OCF_RESKEY_CRM_meta_notify_promote_uname) \
+            and str.strip(OCF_RESKEY_CRM_meta_notify_promote_uname) != 
HOSTNAME:
+            # We where primary and the secondary is to be promoted.
+            # We are going to be out of date.
+            set_master_score(0)
+        else:
+            if role == OCF_RUNNING_MASTER:
+                # We where a healthy primary but had no healty secondary or it
+                # was stopped as well. So we have up-to-date data.
+                set_master_score(10)
+            else:
+                # We where a unhealthy primary but also had no healty 
secondary.
+                # So we still should have up-to-date data.
+                set_master_score(5)
+    else:
+        if get_master_score() > 10:
+            if role == OCF_SUCCESS:
+                if shutdown_guest:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary and both where stopped. So we have up-to-date 
data
+                    # too.
+                    set_master_score(10)
+                else:
+                    # We where a healthy secondary and (probably) had a healthy
+                    # primary still running. So we are now out of date.
+                    set_master_score(0)
+            else:
+                # We where a unhealthy secondary. So we are now out of date.
+                set_master_score(0)
+
+    return status
+
+def qemu_colo_notify():
+    action = "%s-%s" % (OCF_RESKEY_CRM_meta_notify_type, \
+                        OCF_RESKEY_CRM_meta_notify_operation)
+
+    if action == "post-start":
+        if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+            peer = str.strip(OCF_RESKEY_CRM_meta_notify_start_uname)
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+            fd.close()
+
+    elif action == "pre-stop":
+        if not env_do_shutdown_guest() \
+           and HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname) \
+           and HOSTNAME != str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+            fd = qmp_open()
+            peer = qmp_get_nbd_remote(fd)
+            log.debug("our peer: %s" % peer)
+            if peer == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+                if qmp_check_resync(fd):
+                    qmp_cancel_resync(fd)
+                elif qmp_is_colo_active(fd):
+                    qmp_primary_failover(fd)
+                qmp_execute(fd, [{"exec-helper": 
"clear-events"}],do_yank=False)
+            fd.close()
+
+    elif action == "post-stop" \
+         and OCF_RESKEY_CRM_meta_notify_key_operation == "stonith" \
+         and (HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname)
+            or str.strip(OCF_RESKEY_CRM_meta_notify_promote_uname)):
+        peer = str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+        set_remote_master_score(peer, 0)
+
+    return OCF_SUCCESS
+
+def qemu_colo_promote():
+    role, replication = qemu_colo_monitor()
+
+    if role == OCF_SUCCESS and replication == OCF_NOT_RUNNING:
+        status = _qemu_colo_stop(OCF_SUCCESS, False)
+        if status != OCF_SUCCESS:
+            return status
+
+        rotate_logfile(QMP_LOG, 8)
+        rotate_logfile(QEMU_LOG, 8)
+        run_command(QEMU_PRIMARY_CMDLINE)
+        oob_helper_open()
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            fd = qmp_open()
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+            fd.close()
+        return OCF_SUCCESS
+    elif role == OCF_SUCCESS and replication != OCF_NOT_RUNNING:
+        fd = qmp_open()
+        qmp_secondary_failover(fd)
+        set_master_score(101)
+
+        peer = env_find_secondary()
+        if peer:
+            qmp_start_resync(fd, peer)
+            # The secondary has inconsistent data until resync is finished
+            set_remote_master_score(peer, 0)
+        qmp_execute(fd, [{"exec-helper": "clear-events"}], do_yank=False)
+        fd.close()
+        return OCF_SUCCESS
+    else:
+        return OCF_ERR_GENERIC
+
+def qemu_colo_demote():
+    status = qemu_colo_stop()
+    if status != OCF_SUCCESS:
+        return status
+    return qemu_colo_start()
+
+
+if OCF_ACTION == "meta-data":
+    qemu_colo_meta_data()
+    exit(OCF_SUCCESS)
+
+logs_open()
+
+status = qemu_colo_validate_all()
+# Exit here if our sanity checks fail, but try to continue if we need to stop
+if status != OCF_SUCCESS and OCF_ACTION != "stop":
+    exit(status)
+
+setup_constants()
+
+try:
+    if OCF_ACTION == "start":
+        status = qemu_colo_start()
+    elif OCF_ACTION == "stop":
+        status = qemu_colo_stop()
+    elif OCF_ACTION == "monitor":
+        status = qemu_colo_monitor()[0]
+    elif OCF_ACTION == "notify":
+        status = qemu_colo_notify()
+    elif OCF_ACTION == "promote":
+        status = qemu_colo_promote()
+    elif OCF_ACTION == "demote":
+        status = qemu_colo_demote()
+    elif OCF_ACTION == "validate-all":
+        status = qemu_colo_validate_all()
+    else:
+        status = OCF_ERR_UNIMPLEMENTED
+except Error:
+    exit(OCF_ERR_GENERIC)
+else:
+    exit(status)
--
2.20.1

Attachment: pgpxJkA3jwxPD.pgp
Description: OpenPGP digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]