#
#

# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


"""Logical units dealing with storage of instances."""

import itertools
import logging
import os
import time

from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import ht
from ganeti import locking
from ganeti.masterd import iallocator
from ganeti import objects
from ganeti import utils
import ganeti.rpc.node as rpc
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
  ComputeIPolicyDiskSizesViolation, \
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
  CheckDiskTemplateEnabled
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy

import ganeti.masterd.instance


_DISK_TEMPLATE_NAME_PREFIX = {
  constants.DT_PLAIN: "",
  constants.DT_RBD: ".rbd",
  constants.DT_EXT: ".ext",
  constants.DT_FILE: ".file",
  constants.DT_SHARED_FILE: ".sharedfile",
  }


def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
                         excl_stor):
  """Create a single block device on a given node.

  This will not recurse over children of the device, so they must be
  created in advance.

  @param lu: the lu on whose behalf we execute
  @param node_uuid: the node on which to create the device
  @type instance: L{objects.Instance}
  @param instance: the instance which owns the device
  @type device: L{objects.Disk}
  @param device: the device to create
  @param info: the extra 'metadata' we should attach to the device
      (this will be represented as a LVM tag)
  @type force_open: boolean
  @param force_open: this parameter will be passes to the
      L{backend.BlockdevCreate} function where it specifies
      whether we run on primary or not, and it affects both
      the child assembly and the device own Open() execution
  @type excl_stor: boolean
  @param excl_stor: Whether exclusive_storage is active for the node

  """
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
                                       device.size, instance.name, force_open,
                                       info, excl_stor)
  result.Raise("Can't create block device %s on"
               " node %s for instance %s" % (device,
                                             lu.cfg.GetNodeName(node_uuid),
                                             instance.name))


def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
                         info, force_open, excl_stor):
  """Create a tree of block devices on a given node.

  If this device type has to be created on secondaries, create it and
  all its children.

  If not, just recurse to children keeping the same 'force' value.

  @attention: The device has to be annotated already.

  @param lu: the lu on whose behalf we execute
  @param node_uuid: the node on which to create the device
  @type instance: L{objects.Instance}
  @param instance: the instance which owns the device
  @type device: L{objects.Disk}
  @param device: the device to create
  @type force_create: boolean
  @param force_create: whether to force creation of this device; this
      will be change to True whenever we find a device which has
      CreateOnSecondary() attribute
  @param info: the extra 'metadata' we should attach to the device
      (this will be represented as a LVM tag)
  @type force_open: boolean
  @param force_open: this parameter will be passes to the
      L{backend.BlockdevCreate} function where it specifies
      whether we run on primary or not, and it affects both
      the child assembly and the device own Open() execution
  @type excl_stor: boolean
  @param excl_stor: Whether exclusive_storage is active for the node

  @return: list of created devices
  """
  created_devices = []
  try:
    if device.CreateOnSecondary():
      force_create = True

    if device.children:
      for child in device.children:
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
                                    force_create, info, force_open, excl_stor)
        created_devices.extend(devs)

    if not force_create:
      return created_devices

    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
                         excl_stor)
    # The device has been completely created, so there is no point in keeping
    # its subdevices in the list. We just add the device itself instead.
    created_devices = [(node_uuid, device)]
    return created_devices

  except errors.DeviceCreationError, e:
    e.created_devices.extend(created_devices)
    raise e
  except errors.OpExecError, e:
    raise errors.DeviceCreationError(str(e), created_devices)


def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
  """Whether exclusive_storage is in effect for the given node.

  @type cfg: L{config.ConfigWriter}
  @param cfg: The cluster configuration
  @type node_uuid: string
  @param node_uuid: The node UUID
  @rtype: bool
  @return: The effective value of exclusive_storage
  @raise errors.OpPrereqError: if no node exists with the given name

  """
  ni = cfg.GetNodeInfo(node_uuid)
  if ni is None:
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
                               errors.ECODE_NOENT)
  return IsExclusiveStorageEnabledNode(cfg, ni)


def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
                    force_open):
  """Wrapper around L{_CreateBlockDevInner}.

  This method annotates the root device first.

  """
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
                              force_open, excl_stor)


def _UndoCreateDisks(lu, disks_created, instance):
  """Undo the work performed by L{CreateDisks}.

  This function is called in case of an error to undo the work of
  L{CreateDisks}.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @param disks_created: the result returned by L{CreateDisks}
  @type instance: L{objects.Instance}
  @param instance: the instance for which disks were created

  """
  for (node_uuid, disk) in disks_created:
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
    result.Warn("Failed to remove newly-created disk %s on node %s" %
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)


def CreateDisks(lu, instance, disk_template=None,
                to_skip=None, target_node_uuid=None, disks=None):
  """Create all disks for an instance.

  This abstracts away some work from AddInstance.

  Since the instance may not have been saved to the config file yet, this
  function can not query the config file for the instance's disks; in that
  case they need to be passed as an argument.

  This function is also used by the disk template conversion mechanism to
  create the new disks of the instance. Since the instance will have the
  old template at the time we create the new disks, the new template must
  be passed as an extra argument.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type disk_template: string
  @param disk_template: if provided, overrides the instance's disk_template
  @type to_skip: list
  @param to_skip: list of indices to skip
  @type target_node_uuid: string
  @param target_node_uuid: if passed, overrides the target node for creation
  @type disks: list of {objects.Disk}
  @param disks: the disks to create; if not specified, all the disks of the
      instance are created
  @return: information about the created disks, to be used to call
      L{_UndoCreateDisks}
  @raise errors.OpPrereqError: in case of error

  """
  info = GetInstanceInfoText(instance)

  if disks is None:
    disks = lu.cfg.GetInstanceDisks(instance.uuid)

  if target_node_uuid is None:
    pnode_uuid = instance.primary_node
    # We cannot use config's 'GetInstanceNodes' here as 'CreateDisks'
    # is used by 'LUInstanceCreate' and the instance object is not
    # stored in the config yet.
    all_node_uuids = []
    for disk in disks:
      all_node_uuids.extend(disk.all_nodes)
    all_node_uuids = set(all_node_uuids)
    # ensure that primary node is always the first
    all_node_uuids.discard(pnode_uuid)
    all_node_uuids = [pnode_uuid] + list(all_node_uuids)
  else:
    pnode_uuid = target_node_uuid
    all_node_uuids = [pnode_uuid]

  if disk_template is None:
    disk_template = utils.GetDiskTemplate(disks)
    if disk_template == constants.DT_MIXED:
      raise errors.OpExecError("Creating disk for '%s' instances "
                               "only possible with explicit disk template."
                               % (constants.DT_MIXED,))

  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), disk_template)

  if disk_template in constants.DTS_FILEBASED:
    file_storage_dir = os.path.dirname(disks[0].logical_id[1])
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)

    result.Raise("Failed to create directory '%s' on"
                 " node %s" % (file_storage_dir,
                               lu.cfg.GetNodeName(pnode_uuid)))

  disks_created = []
  for idx, device in enumerate(disks):
    if to_skip and idx in to_skip:
      continue
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
    for node_uuid in all_node_uuids:
      f_create = node_uuid == pnode_uuid
      try:
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
                        f_create)
        disks_created.append((node_uuid, device))
      except errors.DeviceCreationError, e:
        logging.warning("Creating disk %s for instance '%s' failed",
                        idx, instance.name)
        disks_created.extend(e.created_devices)
        _UndoCreateDisks(lu, disks_created, instance)
        raise errors.OpExecError(e.message)
  return disks_created


def ComputeDiskSizePerVG(disk_template, disks):
  """Compute disk size requirements in the volume group

  """
  def _compute(disks, payload):
    """Universal algorithm.

    """
    vgs = {}
    for disk in disks:
      vg_name = disk[constants.IDISK_VG]
      vgs[vg_name] = \
        vgs.get(vg_name, 0) + disk[constants.IDISK_SIZE] + payload

    return vgs

  # Required free disk space as a function of disk and swap space
  req_size_dict = {
    constants.DT_DISKLESS: {},
    constants.DT_PLAIN: _compute(disks, 0),
    # 128 MB are added for drbd metadata for each disk
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
    constants.DT_FILE: {},
    constants.DT_SHARED_FILE: {},
    constants.DT_GLUSTER: {},
    }

  if disk_template not in req_size_dict:
    raise errors.ProgrammerError("Disk template '%s' size requirement"
                                 " is unknown" % disk_template)

  return req_size_dict[disk_template]


def ComputeDisks(disks, disk_template, default_vg):
  """Computes the instance disks.

  @type disks: list of dictionaries
  @param disks: The disks' input dictionary
  @type disk_template: string
  @param disk_template: The disk template of the instance
  @type default_vg: string
  @param default_vg: The default_vg to assume

  @return: The computed disks

  """
  new_disks = []
  for disk in disks:
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
    if mode not in constants.DISK_ACCESS_SET:
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
                                 mode, errors.ECODE_INVAL)
    size = disk.get(constants.IDISK_SIZE, None)
    if size is None:
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
    try:
      size = int(size)
    except (TypeError, ValueError):
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                 errors.ECODE_INVAL)

    CheckDiskExtProvider(disk, disk_template)

    data_vg = disk.get(constants.IDISK_VG, default_vg)
    name = disk.get(constants.IDISK_NAME, None)
    if name is not None and name.lower() == constants.VALUE_NONE:
      name = None
    new_disk = {
      constants.IDISK_SIZE: size,
      constants.IDISK_MODE: mode,
      constants.IDISK_VG: data_vg,
      constants.IDISK_NAME: name,
      constants.IDISK_TYPE: disk_template,
      }

    for key in [
      constants.IDISK_METAVG,
      constants.IDISK_ADOPT,
      constants.IDISK_SPINDLES,
      ]:
      if key in disk:
        new_disk[key] = disk[key]

    # Add IDISK_ACCESS parameter for disk templates that support it
    if (disk_template in constants.DTS_HAVE_ACCESS and
        constants.IDISK_ACCESS in disk):
      new_disk[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS]

    # For extstorage, demand the `provider' option and add any
    # additional parameters (ext-params) to the dict
    if disk_template == constants.DT_EXT:
      new_disk[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
      for key in disk:
        if key not in constants.IDISK_PARAMS:
          new_disk[key] = disk[key]

    new_disks.append(new_disk)

  return new_disks


def ComputeDisksInfo(disks, disk_template, default_vg, ext_params):
  """Computes the new instance's disks for the template conversion.

  This method is used by the disks template conversion mechanism. Using the
  'ComputeDisks' method as an auxiliary method computes the disks that will be
  used for generating the new disk template of the instance. It computes the
  size, mode, and name parameters from the instance's current disks, such as
  the volume group and the access parameters for the templates that support
  them. For conversions targeting an extstorage template, the mandatory
  provider's name or any user-provided extstorage parameters will also be
  included in the result.

  @type disks: list of {objects.Disk}
  @param disks: The current disks of the instance
  @type disk_template: string
  @param disk_template: The disk template of the instance
  @type default_vg: string
  @param default_vg: The default volume group to assume
  @type ext_params: dict
  @param ext_params: The extstorage parameters

  @rtype: list of dictionaries
  @return: The computed disks' information for the new template

  """
  # Ensure 'ext_params' does not violate existing disks' params
  for key in ext_params.keys():
    if key != constants.IDISK_PROVIDER:
      assert key not in constants.IDISK_PARAMS, \
        "Invalid extstorage parameter '%s'" % key

  # Prepare the disks argument for the 'ComputeDisks' method.
  inst_disks = [dict((key, value) for key, value in disk.iteritems()
                     if key in constants.IDISK_PARAMS)
                for disk in map(objects.Disk.ToDict, disks)]

  # Update disks with the user-provided 'ext_params'.
  for disk in inst_disks:
    disk.update(ext_params)

  # Compute the new disks' information.
  new_disks = ComputeDisks(inst_disks, disk_template, default_vg)

  # Add missing parameters to the previously computed disks.
  for disk, new_disk in zip(disks, new_disks):
    # Conversions between ExtStorage templates allowed only for different
    # providers.
    if (disk.dev_type == disk_template and
        disk_template == constants.DT_EXT):
      provider = new_disk[constants.IDISK_PROVIDER]
      if provider == disk.params[constants.IDISK_PROVIDER]:
        raise errors.OpPrereqError("Not converting, '%s' of type ExtStorage"
                                   " already using provider '%s'" %
                                   (disk.iv_name, provider), errors.ECODE_INVAL)

    # Add IDISK_ACCESS parameter for conversions between disk templates that
    # support it.
    if (disk_template in constants.DTS_HAVE_ACCESS and
        constants.IDISK_ACCESS in disk.params):
      new_disk[constants.IDISK_ACCESS] = disk.params[constants.IDISK_ACCESS]

    # For LVM-based conversions (plain <-> drbd) use the same volume group.
    if disk_template in constants.DTS_LVM:
      if disk.dev_type == constants.DT_PLAIN:
        new_disk[constants.IDISK_VG] = disk.logical_id[0]
      elif disk.dev_type == constants.DT_DRBD8:
        new_disk[constants.IDISK_VG] = disk.children[0].logical_id[0]

  return new_disks


def CalculateFileStorageDir(disk_type, cfg, instance_name,
                            file_storage_dir=None):
  """Calculate final instance file storage dir.

  @type disk_type: disk template
  @param disk_type: L{constants.DT_FILE}, L{constants.DT_SHARED_FILE}, or
                    L{constants.DT_GLUSTER}

  @type cfg: ConfigWriter
  @param cfg: the configuration that is to be used.
  @type file_storage_dir: path
  @param file_storage_dir: the path below the configured base.
  @type instance_name: string
  @param instance_name: name of the instance this disk is for.

  @rtype: string
  @return: The file storage directory for the instance

  """
  # file storage dir calculation/check
  instance_file_storage_dir = None
  if disk_type in constants.DTS_FILEBASED:
    # build the full file storage dir path
    joinargs = []

    cfg_storage = None
    if disk_type == constants.DT_FILE:
      cfg_storage = cfg.GetFileStorageDir()
    elif disk_type == constants.DT_SHARED_FILE:
      cfg_storage = cfg.GetSharedFileStorageDir()
    elif disk_type == constants.DT_GLUSTER:
      cfg_storage = cfg.GetGlusterStorageDir()

    if not cfg_storage:
      raise errors.OpPrereqError(
        "Cluster file storage dir for {tpl} storage type not defined".format(
          tpl=repr(disk_type)
        ),
        errors.ECODE_STATE)

    joinargs.append(cfg_storage)

    if file_storage_dir is not None:
      joinargs.append(file_storage_dir)

    if disk_type != constants.DT_GLUSTER:
      joinargs.append(instance_name)

    if len(joinargs) > 1:
      # pylint: disable=W0142
      instance_file_storage_dir = utils.PathJoin(*joinargs)
    else:
      instance_file_storage_dir = joinargs[0]

  return instance_file_storage_dir


def CheckRADOSFreeSpace():
  """Compute disk size requirements inside the RADOS cluster.

  """
  # For the RADOS cluster we assume there is always enough space.
  pass


def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
                         iv_name, forthcoming=False):
  """Generate a drbd8 device complete with its children.

  """
  assert len(vgnames) == len(names) == 2
  port = lu.cfg.AllocatePort()
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())

  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
                          logical_id=(vgnames[0], names[0]),
                          nodes=[primary_uuid, secondary_uuid],
                          params={}, forthcoming=forthcoming)
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
                          size=constants.DRBD_META_SIZE,
                          logical_id=(vgnames[1], names[1]),
                          nodes=[primary_uuid, secondary_uuid],
                          params={}, forthcoming=forthcoming)
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())

  drbd_uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
  minors = lu.cfg.AllocateDRBDMinor([primary_uuid, secondary_uuid], drbd_uuid)
  assert len(minors) == 2
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
                          logical_id=(primary_uuid, secondary_uuid, port,
                                      minors[0], minors[1],
                                      shared_secret),
                          children=[dev_data, dev_meta],
                          nodes=[primary_uuid, secondary_uuid],
                          iv_name=iv_name, params={},
                          forthcoming=forthcoming)
  drbd_dev.uuid = drbd_uuid
  return drbd_dev


def GenerateDiskTemplate(
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
  disk_info, file_storage_dir, file_driver, base_index,
  feedback_fn, full_disk_params, forthcoming=False):
  """Generate the entire disk layout for a given template type.

  """
  vgname = lu.cfg.GetVGName()
  disk_count = len(disk_info)
  disks = []

  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)

  if template_name == constants.DT_DISKLESS:
    pass
  elif template_name == constants.DT_DRBD8:
    if len(secondary_node_uuids) != 1:
      raise errors.ProgrammerError("Wrong template configuration")
    remote_node_uuid = secondary_node_uuids[0]

    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
                                                       full_disk_params)
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]

    names = []
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                               for i in range(disk_count)]):
      names.append(lv_prefix + "_data")
      names.append(lv_prefix + "_meta")
    for idx, disk in enumerate(disk_info):
      disk_index = idx + base_index
      data_vg = disk.get(constants.IDISK_VG, vgname)
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
                                      disk[constants.IDISK_SIZE],
                                      [data_vg, meta_vg],
                                      names[idx * 2:idx * 2 + 2],
                                      "disk/%d" % disk_index,
                                      forthcoming=forthcoming)
      disk_dev.mode = disk[constants.IDISK_MODE]
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
      disk_dev.dev_type = template_name
      disks.append(disk_dev)
  else:
    if secondary_node_uuids:
      raise errors.ProgrammerError("Wrong template configuration")

    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
    if name_prefix is None:
      names = None
    else:
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
                                        (name_prefix, base_index + i)
                                        for i in range(disk_count)])
    disk_nodes = []

    if template_name == constants.DT_PLAIN:

      def logical_id_fn(idx, _, disk):
        vg = disk.get(constants.IDISK_VG, vgname)
        return (vg, names[idx])

      disk_nodes = [primary_node_uuid]

    elif template_name == constants.DT_GLUSTER:
      logical_id_fn = lambda _1, disk_index, _2: \
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
                                        disk_index))

    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
      logical_id_fn = \
        lambda _, disk_index, disk: (file_driver,
                                     "%s/%s" % (file_storage_dir,
                                                names[idx]))
      if template_name == constants.DT_FILE:
        disk_nodes = [primary_node_uuid]

    elif template_name == constants.DT_BLOCK:
      logical_id_fn = \
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
                                       disk[constants.IDISK_ADOPT])
    elif template_name == constants.DT_RBD:
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
    elif template_name == constants.DT_EXT:
      def logical_id_fn(idx, _, disk):
        provider = disk.get(constants.IDISK_PROVIDER, None)
        if provider is None:
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
                                       " not found", constants.DT_EXT,
                                       constants.IDISK_PROVIDER)
        return (provider, names[idx])
    else:
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)

    dev_type = template_name

    for idx, disk in enumerate(disk_info):
      params = {}
      # Only for the Ext template add disk_info to params
      if template_name == constants.DT_EXT:
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
        for key in disk:
          if key not in constants.IDISK_PARAMS:
            params[key] = disk[key]
      # Add IDISK_ACCESS param to disk params
      if (template_name in constants.DTS_HAVE_ACCESS and
          constants.IDISK_ACCESS in disk):
        params[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS]
      disk_index = idx + base_index
      size = disk[constants.IDISK_SIZE]
      feedback_fn("* disk %s, size %s" %
                  (disk_index, utils.FormatUnit(size, "h")))
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
                              logical_id=logical_id_fn(idx, disk_index, disk),
                              iv_name="disk/%d" % disk_index,
                              mode=disk[constants.IDISK_MODE],
                              params=params, nodes=disk_nodes,
                              spindles=disk.get(constants.IDISK_SPINDLES),
                              forthcoming=forthcoming)
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
      disks.append(disk_dev)

  return disks


def CommitDisks(disks):
  """Recursively remove the forthcoming flag

  """
  for disk in disks:
    disk.forthcoming = False
    CommitDisks(disk.children)


def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
  """Check the presence of the spindle options with exclusive_storage.

  @type diskdict: dict
  @param diskdict: disk parameters
  @type es_flag: bool
  @param es_flag: the effective value of the exlusive_storage flag
  @type required: bool
  @param required: whether spindles are required or just optional
  @raise errors.OpPrereqError when spindles are given and they should not

  """
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
      diskdict[constants.IDISK_SPINDLES] is not None):
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
                               " when exclusive storage is not active",
                               errors.ECODE_INVAL)
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
                                diskdict[constants.IDISK_SPINDLES] is None)):
    raise errors.OpPrereqError("You must specify spindles in instance disks"
                               " when exclusive storage is active",
                               errors.ECODE_INVAL)


def CheckDiskExtProvider(diskdict, disk_template):
  """Check that the given disk should or should not have the provider param.

  @type diskdict: dict
  @param diskdict: disk parameters
  @type disk_template: string
  @param disk_template: the desired template of this disk
  @raise errors.OpPrereqError: when the parameter is used in the wrong way

  """
  ext_provider = diskdict.get(constants.IDISK_PROVIDER, None)

  if ext_provider and disk_template != constants.DT_EXT:
    raise errors.OpPrereqError("The '%s' option is only valid for the %s"
                               " disk template, not %s" %
                               (constants.IDISK_PROVIDER, constants.DT_EXT,
                                disk_template), errors.ECODE_INVAL)

  if ext_provider is None and disk_template == constants.DT_EXT:
    raise errors.OpPrereqError("Missing provider for template '%s'" %
                               constants.DT_EXT, errors.ECODE_INVAL)


class LUInstanceRecreateDisks(LogicalUnit):
  """Recreate an instance's missing disks.

  """
  HPATH = "instance-recreate-disks"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  _MODIFYABLE = compat.UniqueFrozenset([
    constants.IDISK_SIZE,
    constants.IDISK_MODE,
    constants.IDISK_SPINDLES,
    ])

  # New or changed disk parameters may have different semantics
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
    constants.IDISK_ADOPT,

    # TODO: Implement support changing VG while recreating
    constants.IDISK_VG,
    constants.IDISK_METAVG,
    constants.IDISK_PROVIDER,
    constants.IDISK_NAME,
    constants.IDISK_ACCESS,
    constants.IDISK_TYPE,
    ]))

  def _RunAllocator(self):
    """Run the allocator based on input opcode.

    """
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)

    # FIXME
    # The allocator should actually run in "relocate" mode, but current
    # allocators don't support relocating all the nodes of an instance at
    # the same time. As a workaround we use "allocate" mode, but this is
    # suboptimal for two reasons:
    # - The instance name passed to the allocator is present in the list of
    #   existing instances, so there could be a conflict within the
    #   internal structures of the allocator. This doesn't happen with the
    #   current allocators, but it's a liability.
    # - The allocator counts the resources used by the instance twice: once
    #   because the instance exists already, and once because it tries to
    #   allocate a new instance.
    # The allocator could choose some of the nodes on which the instance is
    # running, but that's not a problem. If the instance nodes are broken,
    # they should be already be marked as drained or offline, and hence
    # skipped by the allocator. If instance disks have been lost for other
    # reasons, then recreating the disks on the same nodes should be fine.
    spindle_use = be_full[constants.BE_SPINDLE_USE]
    disk_template = self.cfg.GetInstanceDiskTemplate(self.instance.uuid)
    disks = [{
      constants.IDISK_SIZE: d.size,
      constants.IDISK_MODE: d.mode,
      constants.IDISK_SPINDLES: d.spindles,
      constants.IDISK_TYPE: d.dev_type
      } for d in self.cfg.GetInstanceDisks(self.instance.uuid)]
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
                                        disk_template=disk_template,
                                        group_name=None,
                                        tags=list(self.instance.GetTags()),
                                        os=self.instance.os,
                                        nics=[{}],
                                        vcpus=be_full[constants.BE_VCPUS],
                                        memory=be_full[constants.BE_MAXMEM],
                                        spindle_use=spindle_use,
                                        disks=disks,
                                        hypervisor=self.instance.hypervisor,
                                        node_whitelist=None)
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)

    ial.Run(self.op.iallocator)

    assert req.RequiredNodes() == \
      len(self.cfg.GetInstanceNodes(self.instance.uuid))

    if not ial.success:
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
                                 " %s" % (self.op.iallocator, ial.info),
                                 errors.ECODE_NORES)

    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                 self.op.instance_name, self.op.iallocator,
                 utils.CommaJoin(self.op.nodes))

  def CheckArguments(self):
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
      # Normalize and convert deprecated list of disk indices
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]

    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
    if duplicates:
      raise errors.OpPrereqError("Some disks have been specified more than"
                                 " once: %s" % utils.CommaJoin(duplicates),
                                 errors.ECODE_INVAL)

    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
    # when neither iallocator nor nodes are specified
    if self.op.iallocator or self.op.nodes:
      CheckIAllocatorOrNode(self, "iallocator", "nodes")

    for (idx, params) in self.op.disks:
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
      if unsupported:
        raise errors.OpPrereqError("Parameters for disk %s try to change"
                                   " unmodifyable parameter(s): %s" %
                                   (idx, utils.CommaJoin(unsupported)),
                                   errors.ECODE_INVAL)

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND

    if self.op.nodes:
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
    else:
      self.needed_locks[locking.LEVEL_NODE] = []
      if self.op.iallocator:
        # iallocator will select a new node in the same group
        self.needed_locks[locking.LEVEL_NODEGROUP] = []

    self.needed_locks[locking.LEVEL_NODE_RES] = []

    self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True
    self.dont_collate_locks[locking.LEVEL_NODE] = True
    self.dont_collate_locks[locking.LEVEL_NODE_RES] = True

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODEGROUP:
      assert self.op.iallocator is not None
      assert not self.op.nodes
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
      # Lock the primary group used by the instance optimistically; this
      # requires going via the node before it's locked, requiring
      # verification later on
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)

    elif level == locking.LEVEL_NODE:
      # If an allocator is used, then we lock all the nodes in the current
      # instance group, as we don't know yet which ones will be selected;
      # if we replace the nodes without using an allocator, locks are
      # already declared in ExpandNames; otherwise, we need to lock all the
      # instance nodes for disk re-creation
      if self.op.iallocator:
        assert not self.op.nodes
        assert not self.needed_locks[locking.LEVEL_NODE]
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1

        # Lock member nodes of the group of the primary node
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
          self.needed_locks[locking.LEVEL_NODE].extend(
            self.cfg.GetNodeGroup(group_uuid).members)

      elif not self.op.nodes:
        self._LockInstancesNodes(primary_only=False)
    elif level == locking.LEVEL_NODE_RES:
      # Copy node locks
      self.needed_locks[locking.LEVEL_NODE_RES] = \
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on master, primary and secondary nodes of the instance.

    """
    return BuildInstanceHookEnvByObject(self, self.instance)

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    nl = [self.cfg.GetMasterNode()] + \
      list(self.cfg.GetInstanceNodes(self.instance.uuid))
    return (nl, nl)

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster and is not running.

    """
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
    assert instance is not None, \
      "Cannot retrieve locked instance %s" % self.op.instance_name
    if self.op.node_uuids:
      inst_nodes = self.cfg.GetInstanceNodes(instance.uuid)
      if len(self.op.node_uuids) != len(inst_nodes):
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
                                   " %d replacement nodes were specified" %
                                   (instance.name, len(inst_nodes),
                                    len(self.op.node_uuids)),
                                   errors.ECODE_INVAL)
      disks = self.cfg.GetInstanceDisks(instance.uuid)
      assert (not utils.AnyDiskOfType(disks, [constants.DT_DRBD8]) or
              len(self.op.node_uuids) == 2)
      assert (not utils.AnyDiskOfType(disks, [constants.DT_PLAIN]) or
              len(self.op.node_uuids) == 1)
      primary_node = self.op.node_uuids[0]
    else:
      primary_node = instance.primary_node
    if not self.op.iallocator:
      CheckNodeOnline(self, primary_node)

    if not instance.disks:
      raise errors.OpPrereqError("Instance '%s' has no disks" %
                                 self.op.instance_name, errors.ECODE_INVAL)

    # Verify if node group locks are still correct
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
    if owned_groups:
      # Node group locks are acquired only for the primary node (and only
      # when the allocator is used)
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
                              primary_only=True)

    # if we replace nodes *and* the old primary is offline, we don't
    # check the instance state
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
                         msg="cannot recreate disks")

    if self.op.disks:
      self.disks = dict(self.op.disks)
    else:
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))

    maxidx = max(self.disks.keys())
    if maxidx >= len(instance.disks):
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
                                 errors.ECODE_INVAL)

    if ((self.op.node_uuids or self.op.iallocator) and
         sorted(self.disks.keys()) != range(len(instance.disks))):
      raise errors.OpPrereqError("Can't recreate disks partially and"
                                 " change the nodes at the same time",
                                 errors.ECODE_INVAL)

    self.instance = instance

    if self.op.iallocator:
      self._RunAllocator()
      # Release unneeded node and node resource locks
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)

    if self.op.node_uuids:
      node_uuids = self.op.node_uuids
    else:
      node_uuids = self.cfg.GetInstanceNodes(instance.uuid)
    excl_stor = compat.any(
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
      )
    for new_params in self.disks.values():
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)

  def Exec(self, feedback_fn):
    """Recreate the disks.

    """
    assert (self.owned_locks(locking.LEVEL_NODE) ==
            self.owned_locks(locking.LEVEL_NODE_RES))

    to_skip = []
    mods = [] # keeps track of needed changes

    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    for idx, disk in enumerate(inst_disks):
      try:
        changes = self.disks[idx]
      except KeyError:
        # Disk should not be recreated
        to_skip.append(idx)
        continue

      # update secondaries for disks, if needed
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
        # need to update the nodes and minors
        assert len(self.op.node_uuids) == 2
        assert len(disk.logical_id) == 6 # otherwise disk internals
                                         # have changed
        (_, _, old_port, _, _, old_secret) = disk.logical_id
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
                                                disk.uuid)
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
                  new_minors[0], new_minors[1], old_secret)
        assert len(disk.logical_id) == len(new_id)
      else:
        new_id = None

      mods.append((idx, new_id, changes))

    # now that we have passed all asserts above, we can apply the mods
    # in a single run (to avoid partial changes)
    for idx, new_id, changes in mods:
      disk = inst_disks[idx]
      if new_id is not None:
        assert disk.dev_type == constants.DT_DRBD8
        disk.logical_id = new_id
      if changes:
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
                    mode=changes.get(constants.IDISK_MODE, None),
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
      self.cfg.Update(disk, feedback_fn)

    # change primary node, if needed
    if self.op.node_uuids:
      self.LogWarning("Changing the instance's nodes, you will have to"
                      " remove any disks left on the older nodes manually")
      self.instance.primary_node = self.op.node_uuids[0]
      self.cfg.Update(self.instance, feedback_fn)
      for disk in inst_disks:
        self.cfg.SetDiskNodes(disk.uuid, self.op.node_uuids)

    # All touched nodes must be locked
    mylocks = self.owned_locks(locking.LEVEL_NODE)
    inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
    assert mylocks.issuperset(frozenset(inst_nodes))
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)

    # TODO: Release node locks before wiping, or explain why it's not possible
    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
      wipedisks = [(idx, disk, 0)
                   for (idx, disk) in enumerate(inst_disks)
                   if idx not in to_skip]
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
                         cleanup=new_disks)


def _PerformNodeInfoCall(lu, node_uuids, vg):
  """Prepares the input and performs a node info call.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
  @type node_uuids: list of string
  @param node_uuids: list of node UUIDs to perform the call for
  @type vg: string
  @param vg: the volume group's name

  """
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
                                                  node_uuids)
  hvname = lu.cfg.GetHypervisorType()
  hvparams = lu.cfg.GetClusterInfo().hvparams
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
                                   [(hvname, hvparams[hvname])])
  return nodeinfo


def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
  """Checks the vg capacity for a given node.

  @type node_info: tuple (_, list of dicts, _)
  @param node_info: the result of the node info call for one node
  @type node_name: string
  @param node_name: the name of the node
  @type vg: string
  @param vg: volume group name
  @type requested: int
  @param requested: the amount of disk in MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  (_, space_info, _) = node_info
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
      space_info, constants.ST_LVM_VG)
  if not lvm_vg_info:
    raise errors.OpPrereqError("Can't retrieve storage information for LVM",
                               errors.ECODE_ENVIRON)
  vg_free = lvm_vg_info.get("storage_free", None)
  if not isinstance(vg_free, int):
    raise errors.OpPrereqError("Can't compute free disk space on node"
                               " %s for vg %s, result was '%s'" %
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
  if requested > vg_free:
    raise errors.OpPrereqError("Not enough disk space on target node %s"
                               " vg %s: required %d MiB, available %d MiB" %
                               (node_name, vg, requested, vg_free),
                               errors.ECODE_NORES)


def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
  """Checks if nodes have enough free disk space in the specified VG.

  This function checks if all given nodes have the needed amount of
  free disk. In case any node has less disk or we cannot get the
  information from the node, this function raises an OpPrereqError
  exception.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
  @type node_uuids: C{list}
  @param node_uuids: the list of node UUIDs to check
  @type vg: C{str}
  @param vg: the volume group to check
  @type requested: C{int}
  @param requested: the amount of disk in MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
  for node_uuid in node_uuids:
    node_name = lu.cfg.GetNodeName(node_uuid)
    info = nodeinfo[node_uuid]
    info.Raise("Cannot get current information from node %s" % node_name,
               prereq=True, ecode=errors.ECODE_ENVIRON)
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)


def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
  """Checks if nodes have enough free disk space in all the VGs.

  This function checks if all given nodes have the needed amount of
  free disk. In case any node has less disk or we cannot get the
  information from the node, this function raises an OpPrereqError
  exception.

  @type lu: C{LogicalUnit}
  @param lu: a logical unit from which we get configuration data
  @type node_uuids: C{list}
  @param node_uuids: the list of node UUIDs to check
  @type req_sizes: C{dict}
  @param req_sizes: the hash of vg and corresponding amount of disk in
      MiB to check for
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
      or we cannot check the node

  """
  for vg, req_size in req_sizes.items():
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)


def _DiskSizeInBytesToMebibytes(lu, size):
  """Converts a disk size in bytes to mebibytes.

  Warns and rounds up if the size isn't an even multiple of 1 MiB.

  """
  (mib, remainder) = divmod(size, 1024 * 1024)

  if remainder != 0:
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
                  " to not overwrite existing data (%s bytes will not be"
                  " wiped)", (1024 * 1024) - remainder)
    mib += 1

  return mib


def _CalcEta(time_taken, written, total_size):
  """Calculates the ETA based on size written and total size.

  @param time_taken: The time taken so far
  @param written: amount written so far
  @param total_size: The total size of data to be written
  @return: The remaining time in seconds

  """
  avg_time = time_taken / float(written)
  return (total_size - written) * avg_time


def WipeDisks(lu, instance, disks=None):
  """Wipes instance disks.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
  @param disks: Disk details; tuple contains disk index, disk object and the
    start offset

  """
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)

  if disks is None:
    inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
    disks = [(idx, disk, 0)
             for (idx, disk) in enumerate(inst_disks)]

  logging.info("Pausing synchronization of disks of instance '%s'",
               instance.name)
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
                                                  (map(compat.snd, disks),
                                                   instance),
                                                  True)
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)

  for idx, success in enumerate(result.payload):
    if not success:
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
                   " failed", idx, instance.name)

  try:
    for (idx, device, offset) in disks:
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
      wipe_chunk_size = \
        int(min(constants.MAX_WIPE_CHUNK,
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))

      size = device.size
      last_output = 0
      start_time = time.time()

      if offset == 0:
        info_text = ""
      else:
        info_text = (" (from %s to %s)" %
                     (utils.FormatUnit(offset, "h"),
                      utils.FormatUnit(size, "h")))

      lu.LogInfo("* Wiping disk %s%s", idx, info_text)

      logging.info("Wiping disk %d for instance %s on node %s using"
                   " chunk size %s", idx, instance.name, node_name,
                   wipe_chunk_size)

      while offset < size:
        wipe_size = min(wipe_chunk_size, size - offset)

        logging.debug("Wiping disk %d, offset %s, chunk %s",
                      idx, offset, wipe_size)

        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
                                           offset, wipe_size)
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
                     (idx, offset, wipe_size))

        now = time.time()
        offset += wipe_size
        if now - last_output >= 60:
          eta = _CalcEta(now - start_time, offset, size)
          lu.LogInfo(" - done: %.1f%% ETA: %s",
                     offset / float(size) * 100, utils.FormatSeconds(eta))
          last_output = now
  finally:
    logging.info("Resuming synchronization of disks for instance '%s'",
                 instance.name)

    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
                                                    (map(compat.snd, disks),
                                                     instance),
                                                    False)

    if result.fail_msg:
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
                    node_name, result.fail_msg)
    else:
      for idx, success in enumerate(result.payload):
        if not success:
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
                        " failed", idx, instance.name)


def ImageDisks(lu, instance, image, disks=None):
  """Dumps an image onto an instance disk.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should create
  @type image: string
  @param image: the image whose disks we should create
  @type disks: None or list of ints
  @param disks: disk indices

  """
  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)

  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  if disks is None:
    disks = [(0, inst_disks[0])]
  else:
    disks = [(idx, inst_disks[idx]) for idx in disks]

  logging.info("Pausing synchronization of disks of instance '%s'",
               instance.name)
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
                                                  (map(compat.snd, disks),
                                                   instance),
                                                  True)
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)

  for idx, success in enumerate(result.payload):
    if not success:
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
                   " failed", idx, instance.name)

  try:
    for (idx, device) in disks:
      lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'",
                 idx, instance.name, node_name)

      result = lu.rpc.call_blockdev_image(node_uuid, (device, instance),
                                          image, device.size)
      result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" %
                   (idx, instance.name, node_name))
  finally:
    logging.info("Resuming synchronization of disks for instance '%s'",
                 instance.name)

    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
                                                    (map(compat.snd, disks),
                                                     instance),
                                                    False)

    if result.fail_msg:
      lu.LogWarning("Failed to resume disk synchronization for instance '%s' on"
                    " node '%s'", node_name, result.fail_msg)
    else:
      for idx, success in enumerate(result.payload):
        if not success:
          lu.LogWarning("Failed to resume synchronization of disk '%d' of"
                        " instance '%s'", idx, instance.name)


def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
  """Wrapper for L{WipeDisks} that handles errors.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance whose disks we should wipe
  @param disks: see L{WipeDisks}
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
      case of error
  @raise errors.OpPrereqError: in case of failure

  """
  try:
    WipeDisks(lu, instance, disks=disks)
  except errors.OpExecError:
    logging.warning("Wiping disks for instance '%s' failed",
                    instance.name)
    _UndoCreateDisks(lu, cleanup, instance)
    raise


def ExpandCheckDisks(instance_disks, disks):
  """Return the instance disks selected by the disks list

  @type disks: list of L{objects.Disk} or None
  @param disks: selected disks
  @rtype: list of L{objects.Disk}
  @return: selected instance disks to act on

  """
  if disks is None:
    return instance_disks
  else:
    inst_disks_uuids = [d.uuid for d in instance_disks]
    disks_uuids = [d.uuid for d in disks]
    if not set(disks_uuids).issubset(inst_disks_uuids):
      raise errors.ProgrammerError("Can only act on disks belonging to the"
                                   " target instance: expected a subset of %s,"
                                   " got %s" % (inst_disks_uuids, disks_uuids))
    return disks


def WaitForSync(lu, instance, disks=None, oneshot=False):
  """Sleep and poll for an instance's disk to sync.

  """
  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  if not inst_disks or disks is not None and not disks:
    return True

  disks = [d for d in ExpandCheckDisks(inst_disks, disks)
           if d.dev_type in constants.DTS_INT_MIRROR]

  if not oneshot:
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)

  node_uuid = instance.primary_node
  node_name = lu.cfg.GetNodeName(node_uuid)

  # TODO: Convert to utils.Retry

  retries = 0
  degr_retries = 10 # in seconds, as we sleep 1 second each time
  while True:
    max_time = 0
    done = True
    cumul_degraded = False
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
    msg = rstats.fail_msg
    if msg:
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
      retries += 1
      if retries >= 10:
        raise errors.RemoteError("Can't contact node %s for mirror data,"
                                 " aborting." % node_name)
      time.sleep(6)
      continue
    rstats = rstats.payload
    retries = 0
    for i, mstat in enumerate(rstats):
      if mstat is None:
        lu.LogWarning("Can't compute data for node %s/%s",
                      node_name, disks[i].iv_name)
        continue

      cumul_degraded = (cumul_degraded or
                        (mstat.is_degraded and mstat.sync_percent is None))
      if mstat.sync_percent is not None:
        done = False
        if mstat.estimated_time is not None:
          rem_time = ("%s remaining (estimated)" %
                      utils.FormatSeconds(mstat.estimated_time))
          max_time = mstat.estimated_time
        else:
          rem_time = "no time estimate"
          max_time = 5 # sleep at least a bit between retries
        lu.LogInfo("- device %s: %5.2f%% done, %s",
                   disks[i].iv_name, mstat.sync_percent, rem_time)

    # if we're done but degraded, let's do a few small retries, to
    # make sure we see a stable and not transient situation; therefore
    # we force restart of the loop
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
      logging.info("Degraded disks found, %d retries left", degr_retries)
      degr_retries -= 1
      time.sleep(1)
      continue

    if done or oneshot:
      break

    time.sleep(min(60, max_time))

  if done:
    lu.LogInfo("Instance %s's disks are in sync", instance.name)

  return not cumul_degraded


def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
  """Shutdown block devices of an instance.

  This does the shutdown on all nodes of the instance.

  If the ignore_primary is false, errors on the primary node are
  ignored.

  Modifies the configuration of the instance, so the caller should re-read the
  instance configuration, if needed.

  """
  all_result = True

  if disks is None:
    # only mark instance disks as inactive if all disks are affected
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  disks = ExpandCheckDisks(inst_disks, disks)

  for disk in disks:
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
      msg = result.fail_msg
      if msg:
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
        if ((node_uuid == instance.primary_node and not ignore_primary) or
            (node_uuid != instance.primary_node and not result.offline)):
          all_result = False
  return all_result


def _SafeShutdownInstanceDisks(lu, instance, disks=None, req_states=None):
  """Shutdown block devices of an instance.

  This function checks if an instance is running, before calling
  _ShutdownInstanceDisks.

  """
  if req_states is None:
    req_states = INSTANCE_DOWN
  CheckInstanceState(lu, instance, req_states, msg="cannot shutdown disks")
  ShutdownInstanceDisks(lu, instance, disks=disks)


def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
                          ignore_size=False):
  """Prepare the block devices for an instance.

  This sets up the block devices on all nodes.

  Modifies the configuration of the instance, so the caller should re-read the
  instance configuration, if needed.

  @type lu: L{LogicalUnit}
  @param lu: the logical unit on whose behalf we execute
  @type instance: L{objects.Instance}
  @param instance: the instance for whose disks we assemble
  @type disks: list of L{objects.Disk} or None
  @param disks: which disks to assemble (or all, if None)
  @type ignore_secondaries: boolean
  @param ignore_secondaries: if true, errors on secondary nodes
      won't result in an error return from the function
  @type ignore_size: boolean
  @param ignore_size: if true, the current known size of the disk
      will not be used during the disk activation, useful for cases
      when the size is wrong
  @return: False if the operation failed, otherwise a list of
      (host, instance_visible_name, node_visible_name)
      with the mapping from node devices to instance devices, as well as the
      payloads of the RPC calls

  """
  device_info = []
  disks_ok = True
  payloads = []

  if disks is None:
    # only mark instance disks as active if all disks are affected
    instance = lu.cfg.MarkInstanceDisksActive(instance.uuid)

  inst_disks = lu.cfg.GetInstanceDisks(instance.uuid)
  disks = ExpandCheckDisks(inst_disks, disks)

  # With the two passes mechanism we try to reduce the window of
  # opportunity for the race condition of switching DRBD to primary
  # before handshaking occured, but we do not eliminate it

  # The proper fix would be to wait (with some limits) until the
  # connection has been made and drbd transitions from WFConnection
  # into any other network-connected state (Connected, SyncTarget,
  # SyncSource, etc.)

  # 1st pass, assemble on all nodes in secondary mode
  for idx, inst_disk in enumerate(disks):
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
                                  instance.primary_node):
      if ignore_size:
        node_disk = node_disk.Copy()
        node_disk.UnsetSize()
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
                                             instance, False, idx)
      msg = result.fail_msg
      if msg:
        secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance.uuid)
        is_offline_secondary = (node_uuid in secondary_nodes and
                                result.offline)
        lu.LogWarning("Could not prepare block device %s on node %s"
                      " (is_primary=False, pass=1): %s",
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
        if not (ignore_secondaries or is_offline_secondary):
          disks_ok = False

  # FIXME: race condition on drbd migration to primary

  # 2nd pass, do only the primary node
  for idx, inst_disk in enumerate(disks):
    dev_path = None

    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
                                  instance.primary_node):
      if node_uuid != instance.primary_node:
        continue
      if ignore_size:
        node_disk = node_disk.Copy()
        node_disk.UnsetSize()
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
                                             instance, True, idx)
      payloads.append(result.payload)
      msg = result.fail_msg
      if msg:
        lu.LogWarning("Could not prepare block device %s on node %s"
                      " (is_primary=True, pass=2): %s",
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
        disks_ok = False
      else:
        dev_path, _, __ = result.payload

    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
                        inst_disk.iv_name, dev_path))

  if not disks_ok:
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)

  return disks_ok, device_info, payloads


def StartInstanceDisks(lu, instance, force):
  """Start the disks of an instance.

  Modifies the configuration of the instance, so the caller should re-read the
  instance configuration, if needed.

  """
  disks_ok, _, _ = AssembleInstanceDisks(lu, instance,
                                         ignore_secondaries=force)
  if not disks_ok:
    ShutdownInstanceDisks(lu, instance)
    if force is not None and not force:
      lu.LogWarning("",
                    hint=("If the message above refers to a secondary node,"
                          " you can retry the operation using '--force'"))
    raise errors.OpExecError("Disk consistency error")


class LUInstanceGrowDisk(LogicalUnit):
  """Grow a disk of an instance.

  """
  HPATH = "disk-grow"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.needed_locks[locking.LEVEL_NODE] = []
    self.needed_locks[locking.LEVEL_NODE_RES] = []
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
    self.dont_collate_locks[locking.LEVEL_NODE] = True
    self.dont_collate_locks[locking.LEVEL_NODE_RES] = True

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODE:
      self._LockInstancesNodes()
    elif level == locking.LEVEL_NODE_RES:
      # Copy node locks
      self.needed_locks[locking.LEVEL_NODE_RES] = \
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on the master, the primary and all the secondaries.

    """
    env = {
      "DISK": self.op.disk,
      "AMOUNT": self.op.amount,
      "ABSOLUTE": self.op.absolute,
      }
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
    return env

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    nl = [self.cfg.GetMasterNode()] + \
      list(self.cfg.GetInstanceNodes(self.instance.uuid))
    return (nl, nl)

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster.

    """
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
    assert self.instance is not None, \
      "Cannot retrieve locked instance %s" % self.op.instance_name
    node_uuids = list(self.cfg.GetInstanceNodes(self.instance.uuid))
    for node_uuid in node_uuids:
      CheckNodeOnline(self, node_uuid)
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)

    self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk))

    if self.disk.dev_type not in constants.DTS_GROWABLE:
      raise errors.OpPrereqError(
          "Instance's disk layout %s does not support"
          " growing" % self.disk.dev_type, errors.ECODE_INVAL)

    if self.op.absolute:
      self.target = self.op.amount
      self.delta = self.target - self.disk.size
      if self.delta < 0:
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
                                   "current disk size (%s)" %
                                   (utils.FormatUnit(self.target, "h"),
                                    utils.FormatUnit(self.disk.size, "h")),
                                   errors.ECODE_STATE)
    else:
      self.delta = self.op.amount
      self.target = self.disk.size + self.delta
      if self.delta < 0:
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
                                   utils.FormatUnit(self.delta, "h"),
                                   errors.ECODE_INVAL)

    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))

    self._CheckIPolicy(self.target)

  def _CheckDiskSpace(self, node_uuids, req_vgspace):
    template = self.disk.dev_type
    if (template not in constants.DTS_NO_FREE_SPACE_CHECK and
        not any(self.node_es_flags.values())):
      # TODO: check the free disk space for file, when that feature will be
      # supported
      # With exclusive storage we need to do something smarter than just looking
      # at free space, which, in the end, is basically a dry run. So we rely on
      # the dry run performed in Exec() instead.
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)

  def _CheckIPolicy(self, target_size):
    cluster = self.cfg.GetClusterInfo()
    group_uuid = list(self.cfg.GetInstanceNodeGroups(self.op.instance_uuid,
                                                     primary_only=True))[0]
    group_info = self.cfg.GetNodeGroup(group_uuid)
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
                                                            group_info)

    disks = self.cfg.GetInstanceDisks(self.op.instance_uuid)
    disk_sizes = [disk.size if disk.uuid != self.disk.uuid else target_size
                  for disk in disks]

    # The ipolicy checker below ignores None, so we only give it the disk size
    res = ComputeIPolicyDiskSizesViolation(ipolicy, disk_sizes, disks)
    if res:
      msg = ("Growing disk %s violates policy: %s" %
             (self.op.disk,
              utils.CommaJoin(res)))
      if self.op.ignore_ipolicy:
        self.LogWarning(msg)
      else:
        raise errors.OpPrereqError(msg, errors.ECODE_INVAL)

  def Exec(self, feedback_fn):
    """Execute disk grow.

    """
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
    assert (self.owned_locks(locking.LEVEL_NODE) ==
            self.owned_locks(locking.LEVEL_NODE_RES))

    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks

    disks_ok, _, _ = AssembleInstanceDisks(self, self.instance,
                                           disks=[self.disk])
    if not disks_ok:
      raise errors.OpExecError("Cannot activate block device to grow")

    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
                (self.op.disk, self.instance.name,
                 utils.FormatUnit(self.delta, "h"),
                 utils.FormatUnit(self.target, "h")))

    # First run all grow ops in dry-run mode
    inst_nodes = self.cfg.GetInstanceNodes(self.instance.uuid)
    for node_uuid in inst_nodes:
      result = self.rpc.call_blockdev_grow(node_uuid,
                                           (self.disk, self.instance),
                                           self.delta, True, True,
                                           self.node_es_flags[node_uuid])
      result.Raise("Dry-run grow request failed to node %s" %
                   self.cfg.GetNodeName(node_uuid))

    if wipe_disks:
      # Get disk size from primary node for wiping
      result = self.rpc.call_blockdev_getdimensions(
                 self.instance.primary_node, [([self.disk], self.instance)])
      result.Raise("Failed to retrieve disk size from node '%s'" %
                   self.instance.primary_node)

      (disk_dimensions, ) = result.payload

      if disk_dimensions is None:
        raise errors.OpExecError("Failed to retrieve disk size from primary"
                                 " node '%s'" % self.instance.primary_node)
      (disk_size_in_bytes, _) = disk_dimensions

      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)

      assert old_disk_size >= self.disk.size, \
        ("Retrieved disk size too small (got %s, should be at least %s)" %
         (old_disk_size, self.disk.size))
    else:
      old_disk_size = None

    # We know that (as far as we can test) operations across different
    # nodes will succeed, time to run it for real on the backing storage
    for node_uuid in inst_nodes:
      result = self.rpc.call_blockdev_grow(node_uuid,
                                           (self.disk, self.instance),
                                           self.delta, False, True,
                                           self.node_es_flags[node_uuid])
      result.Raise("Grow request failed to node %s" %
                   self.cfg.GetNodeName(node_uuid))

    # And now execute it for logical storage, on the primary node
    node_uuid = self.instance.primary_node
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
                                         self.delta, False, False,
                                         self.node_es_flags[node_uuid])
    result.Raise("Grow request failed to node %s" %
                 self.cfg.GetNodeName(node_uuid))

    self.disk.RecordGrow(self.delta)
    self.cfg.Update(self.instance, feedback_fn)
    self.cfg.Update(self.disk, feedback_fn)

    # Changes have been recorded, release node lock
    ReleaseLocks(self, locking.LEVEL_NODE)

    # Downgrade lock while waiting for sync
    self.WConfdClient().DownGradeLocksLevel(
          locking.LEVEL_NAMES[locking.LEVEL_INSTANCE])

    assert wipe_disks ^ (old_disk_size is None)

    if wipe_disks:
      inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
      assert inst_disks[self.op.disk] == self.disk

      # Wipe newly added disk space
      WipeDisks(self, self.instance,
                disks=[(self.op.disk, self.disk, old_disk_size)])

    if self.op.wait_for_sync:
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
      if disk_abort:
        self.LogWarning("Disk syncing has not returned a good status; check"
                        " the instance")
      if not self.instance.disks_active:
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
    elif not self.instance.disks_active:
      self.LogWarning("Not shutting down the disk even if the instance is"
                      " not supposed to be running because no wait for"
                      " sync mode was requested")

    assert self.owned_locks(locking.LEVEL_NODE_RES)
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)


class LUInstanceReplaceDisks(LogicalUnit):
  """Replace the disks of an instance.

  """
  HPATH = "mirrors-replace"
  HTYPE = constants.HTYPE_INSTANCE
  REQ_BGL = False

  def CheckArguments(self):
    """Check arguments.

    """
    if self.op.mode == constants.REPLACE_DISK_CHG:
      if self.op.remote_node is None and self.op.iallocator is None:
        raise errors.OpPrereqError("When changing the secondary either an"
                                   " iallocator script must be used or the"
                                   " new node given", errors.ECODE_INVAL)
      else:
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")

    elif self.op.remote_node is not None or self.op.iallocator is not None:
      # Not replacing the secondary
      raise errors.OpPrereqError("The iallocator and new node options can"
                                 " only be used when changing the"
                                 " secondary node", errors.ECODE_INVAL)

  def ExpandNames(self):
    self._ExpandAndLockInstance(allow_forthcoming=True)

    assert locking.LEVEL_NODE not in self.needed_locks
    assert locking.LEVEL_NODE_RES not in self.needed_locks
    assert locking.LEVEL_NODEGROUP not in self.needed_locks

    assert self.op.iallocator is None or self.op.remote_node is None, \
      "Conflicting options"

    if self.op.remote_node is not None:
      (self.op.remote_node_uuid, self.op.remote_node) = \
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
                              self.op.remote_node)

      # Warning: do not remove the locking of the new secondary here
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
      # currently it doesn't since parallel invocations of
      # FindUnusedMinor will conflict
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
    else:
      self.needed_locks[locking.LEVEL_NODE] = []
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE

      if self.op.iallocator is not None:
        # iallocator will select a new node in the same group
        self.needed_locks[locking.LEVEL_NODEGROUP] = []

    self.needed_locks[locking.LEVEL_NODE_RES] = []

    self.dont_collate_locks[locking.LEVEL_NODEGROUP] = True
    self.dont_collate_locks[locking.LEVEL_NODE] = True
    self.dont_collate_locks[locking.LEVEL_NODE_RES] = True

    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
                                   self.op.instance_name, self.op.mode,
                                   self.op.iallocator, self.op.remote_node_uuid,
                                   self.op.disks, self.op.early_release,
                                   self.op.ignore_ipolicy)

    self.tasklets = [self.replacer]

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODEGROUP:
      assert self.op.remote_node_uuid is None
      assert self.op.iallocator is not None
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]

      self.share_locks[locking.LEVEL_NODEGROUP] = 1
      # Lock all groups used by instance optimistically; this requires going
      # via the node before it's locked, requiring verification later on
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)

    elif level == locking.LEVEL_NODE:
      if self.op.iallocator is not None:
        assert self.op.remote_node_uuid is None
        assert not self.needed_locks[locking.LEVEL_NODE]

        # Lock member nodes of all locked groups
        self.needed_locks[locking.LEVEL_NODE] = \
          [node_uuid
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
      else:
        self._LockInstancesNodes()

    elif level == locking.LEVEL_NODE_RES:
      # Reuse node locks
      self.needed_locks[locking.LEVEL_NODE_RES] = \
        self.needed_locks[locking.LEVEL_NODE]

  def BuildHooksEnv(self):
    """Build hooks env.

    This runs on the master, the primary and all the secondaries.

    """
    instance = self.replacer.instance
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid)
    env = {
      "MODE": self.op.mode,
      "NEW_SECONDARY": self.op.remote_node,
      "OLD_SECONDARY": self.cfg.GetNodeName(secondary_nodes[0]),
      }
    env.update(BuildInstanceHookEnvByObject(self, instance))
    return env

  def BuildHooksNodes(self):
    """Build hooks nodes.

    """
    instance = self.replacer.instance
    nl = [
      self.cfg.GetMasterNode(),
      instance.primary_node,
      ]
    if self.op.remote_node_uuid is not None:
      nl.append(self.op.remote_node_uuid)
    return nl, nl

  def CheckPrereq(self):
    """Check prerequisites.

    """
    # Verify if node group locks are still correct
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
    if owned_groups:
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)

    return LogicalUnit.CheckPrereq(self)


class LUInstanceActivateDisks(NoHooksLU):
  """Bring up an instance's disks.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.needed_locks[locking.LEVEL_NODE] = []
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODE:
      self._LockInstancesNodes()

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster.

    """
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
    assert self.instance is not None, \
      "Cannot retrieve locked instance %s" % self.op.instance_name
    CheckNodeOnline(self, self.instance.primary_node)

  def Exec(self, feedback_fn):
    """Activate the disks.

    """
    disks_ok, disks_info, _ = AssembleInstanceDisks(
      self, self.instance, ignore_size=self.op.ignore_size)

    if not disks_ok:
      raise errors.OpExecError("Cannot activate block devices")

    if self.op.wait_for_sync:
      if not WaitForSync(self, self.instance):
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
        raise errors.OpExecError("Some disks of the instance are degraded!")

    return disks_info


class LUInstanceDeactivateDisks(NoHooksLU):
  """Shutdown an instance's disks.

  """
  REQ_BGL = False

  def ExpandNames(self):
    self._ExpandAndLockInstance()
    self.needed_locks[locking.LEVEL_NODE] = []
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE

  def DeclareLocks(self, level):
    if level == locking.LEVEL_NODE:
      self._LockInstancesNodes()

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster.

    """
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
    assert self.instance is not None, \
      "Cannot retrieve locked instance %s" % self.op.instance_name

  def Exec(self, feedback_fn):
    """Deactivate the disks

    """
    if self.op.force:
      ShutdownInstanceDisks(self, self.instance)
    else:
      _SafeShutdownInstanceDisks(self, self.instance)


def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
                               ldisk=False):
  """Check that mirrors are not degraded.

  @attention: The device has to be annotated already.

  The ldisk parameter, if True, will change the test from the
  is_degraded attribute (which represents overall non-ok status for
  the device(s)) to the ldisk (representing the local storage status).

  """
  result = True

  if on_primary or dev.AssembleOnSecondary():
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
    msg = rstats.fail_msg
    if msg:
      lu.LogWarning("Can't find disk on node %s: %s",
                    lu.cfg.GetNodeName(node_uuid), msg)
      result = False
    elif not rstats.payload:
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
      result = False
    else:
      if ldisk:
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
      else:
        result = result and not rstats.payload.is_degraded

  if dev.children:
    for child in dev.children:
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
                                                     node_uuid, on_primary)

  return result


def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
  """Wrapper around L{_CheckDiskConsistencyInner}.

  """
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
                                    ldisk=ldisk)


def _BlockdevFind(lu, node_uuid, dev, instance):
  """Wrapper around call_blockdev_find to annotate diskparams.

  @param lu: A reference to the lu object
  @param node_uuid: The node to call out
  @param dev: The device to find
  @param instance: The instance object the device belongs to
  @returns The result of the rpc call

  """
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))


def _GenerateUniqueNames(lu, exts):
  """Generate a suitable LV name.

  This will generate a logical volume name for the given instance.

  """
  results = []
  for val in exts:
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
    results.append("%s%s" % (new_id, val))
  return results


class TLReplaceDisks(Tasklet):
  """Replaces disks for an instance.

  Note: Locking is not within the scope of this class.

  """
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
               remote_node_uuid, disks, early_release, ignore_ipolicy):
    """Initializes this class.

    """
    Tasklet.__init__(self, lu)

    # Parameters
    self.instance_uuid = instance_uuid
    self.instance_name = instance_name
    self.mode = mode
    self.iallocator_name = iallocator_name
    self.remote_node_uuid = remote_node_uuid
    self.disks = disks
    self.early_release = early_release
    self.ignore_ipolicy = ignore_ipolicy

    # Runtime data
    self.instance = None
    self.new_node_uuid = None
    self.target_node_uuid = None
    self.other_node_uuid = None
    self.remote_node_info = None
    self.node_secondary_ip = None

  @staticmethod
  def _RunAllocator(lu, iallocator_name, instance_uuid,
                    relocate_from_node_uuids):
    """Compute a new secondary node using an IAllocator.

    """
    req = iallocator.IAReqRelocate(
          inst_uuid=instance_uuid,
          relocate_from_node_uuids=list(relocate_from_node_uuids))
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)

    ial.Run(iallocator_name)

    if not ial.success:
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
                                 " %s" % (iallocator_name, ial.info),
                                 errors.ECODE_NORES)

    remote_node_name = ial.result[0]
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)

    if remote_node is None:
      raise errors.OpPrereqError("Node %s not found in configuration" %
                                 remote_node_name, errors.ECODE_NOENT)

    lu.LogInfo("Selected new secondary for instance '%s': %s",
               instance_uuid, remote_node_name)

    return remote_node.uuid

  def _FindFaultyDisks(self, node_uuid):
    """Wrapper for L{FindFaultyInstanceDisks}.

    """
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
                                   node_uuid, True)

  def _CheckDisksActivated(self, instance):
    """Checks if the instance disks are activated.

    @param instance: The instance to check disks
    @return: True if they are activated, False otherwise

    """
    node_uuids = self.cfg.GetInstanceNodes(instance.uuid)

    for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
      for node_uuid in node_uuids:
        self.lu.LogInfo("Checking disk/%d on %s", idx,
                        self.cfg.GetNodeName(node_uuid))

        result = _BlockdevFind(self, node_uuid, dev, instance)

        if result.offline:
          continue
        elif result.fail_msg or not result.payload:
          return False

    return True

  def CheckPrereq(self):
    """Check prerequisites.

    This checks that the instance is in the cluster.

    """
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
    assert self.instance is not None, \
      "Cannot retrieve locked instance %s" % self.instance_name

    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
    if len(secondary_nodes) != 1:
      raise errors.OpPrereqError("The instance has a strange layout,"
                                 " expected one secondary but found %d" %
                                 len(secondary_nodes),
                                 errors.ECODE_FAULT)

    secondary_node_uuid = secondary_nodes[0]

    if self.iallocator_name is None:
      remote_node_uuid = self.remote_node_uuid
    else:
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
                                            self.instance.uuid,
                                            secondary_nodes)

    if remote_node_uuid is None:
      self.remote_node_info = None
    else:
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
             "Remote node '%s' is not locked" % remote_node_uuid

      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
      assert self.remote_node_info is not None, \
        "Cannot retrieve locked node %s" % remote_node_uuid

    if remote_node_uuid == self.instance.primary_node:
      raise errors.OpPrereqError("The specified node is the primary node of"
                                 " the instance", errors.ECODE_INVAL)

    if remote_node_uuid == secondary_node_uuid:
      raise errors.OpPrereqError("The specified node is already the"
                                 " secondary node of the instance",
                                 errors.ECODE_INVAL)

    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
                                    constants.REPLACE_DISK_CHG):
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
                                 errors.ECODE_INVAL)

    if self.mode == constants.REPLACE_DISK_AUTO:
      if not self._CheckDisksActivated(self.instance):
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
                                   " first" % self.instance_name,
                                   errors.ECODE_STATE)
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)

      if faulty_primary and faulty_secondary:
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
                                   " one node and can not be repaired"
                                   " automatically" % self.instance_name,
                                   errors.ECODE_STATE)

      if faulty_primary:
        self.disks = faulty_primary
        self.target_node_uuid = self.instance.primary_node
        self.other_node_uuid = secondary_node_uuid
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
      elif faulty_secondary:
        self.disks = faulty_secondary
        self.target_node_uuid = secondary_node_uuid
        self.other_node_uuid = self.instance.primary_node
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
      else:
        self.disks = []
        check_nodes = []

    else:
      # Non-automatic modes
      if self.mode == constants.REPLACE_DISK_PRI:
        self.target_node_uuid = self.instance.primary_node
        self.other_node_uuid = secondary_node_uuid
        check_nodes = [self.target_node_uuid, self.other_node_uuid]

      elif self.mode == constants.REPLACE_DISK_SEC:
        self.target_node_uuid = secondary_node_uuid
        self.other_node_uuid = self.instance.primary_node
        check_nodes = [self.target_node_uuid, self.other_node_uuid]

      elif self.mode == constants.REPLACE_DISK_CHG:
        self.new_node_uuid = remote_node_uuid
        self.other_node_uuid = self.instance.primary_node
        self.target_node_uuid = secondary_node_uuid
        check_nodes = [self.new_node_uuid, self.other_node_uuid]

        CheckNodeNotDrained(self.lu, remote_node_uuid)
        CheckNodeVmCapable(self.lu, remote_node_uuid)

        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
        assert old_node_info is not None
        if old_node_info.offline and not self.early_release:
          # doesn't make sense to delay the release
          self.early_release = True
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
                          " early-release mode", secondary_node_uuid)

      else:
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
                                     self.mode)

      # If not specified all disks should be replaced
      if not self.disks:
        self.disks = range(len(self.instance.disks))

    disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    if (not disks or
        not utils.AllDiskOfType(disks, [constants.DT_DRBD8])):
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
                                 " instances", errors.ECODE_INVAL)

    # TODO: This is ugly, but right now we can't distinguish between internal
    # submitted opcode and external one. We should fix that.
    if self.remote_node_info:
      # We change the node, lets verify it still meets instance policy
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
      cluster = self.cfg.GetClusterInfo()
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
                                                              new_group_info)
      CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
                             self.remote_node_info, self.cfg,
                             ignore=self.ignore_ipolicy)

    for node_uuid in check_nodes:
      CheckNodeOnline(self.lu, node_uuid)

    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
                                                          self.other_node_uuid,
                                                          self.target_node_uuid]
                              if node_uuid is not None)

    # Release unneeded node and node resource locks
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)

    # Release any owned node group
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)

    # Check whether disks are valid
    for disk_idx in self.disks:
      self.instance.FindDisk(disk_idx)

    # Get secondary node IP addresses
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))

  def Exec(self, feedback_fn):
    """Execute disk replacement.

    This dispatches the disk replacement to the appropriate handler.

    """
    if __debug__:
      # Verify owned locks before starting operation
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
      assert set(owned_nodes) == set(self.node_secondary_ip), \
          ("Incorrect node locks, owning %s, expected %s" %
           (owned_nodes, self.node_secondary_ip.keys()))
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
              self.lu.owned_locks(locking.LEVEL_NODE_RES))

      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
      assert list(owned_instances) == [self.instance_name], \
          "Instance '%s' not locked" % self.instance_name

    if not self.disks:
      feedback_fn("No disks need replacement for instance '%s'" %
                  self.instance.name)
      return

    feedback_fn("Replacing disk(s) %s for instance '%s'" %
                (utils.CommaJoin(self.disks), self.instance.name))
    feedback_fn("Current primary node: %s" %
                self.cfg.GetNodeName(self.instance.primary_node))
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
    feedback_fn("Current secondary node: %s" %
                utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes)))

    activate_disks = not self.instance.disks_active

    # Activate the instance disks if we're replacing them on a down instance
    # that is real (forthcoming instances currently only have forthcoming
    # disks).
    if activate_disks and not self.instance.forthcoming:
      StartInstanceDisks(self.lu, self.instance, True)
      # Re-read the instance object modified by the previous call
      self.instance = self.cfg.GetInstanceInfo(self.instance.uuid)

    try:
      # Should we replace the secondary node?
      if self.new_node_uuid is not None:
        fn = self._ExecDrbd8Secondary
      else:
        fn = self._ExecDrbd8DiskOnly

      result = fn(feedback_fn)
    finally:
      # Deactivate the instance disks if we're replacing them on a
      # down instance
      if activate_disks and not self.instance.forthcoming:
        _SafeShutdownInstanceDisks(self.lu, self.instance,
                                   req_states=INSTANCE_NOT_RUNNING)

    self.lu.AssertReleasedLocks(locking.LEVEL_NODE)

    if __debug__:
      # Verify owned locks
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
      nodes = frozenset(self.node_secondary_ip)
      assert ((self.early_release and not owned_nodes) or
              (not self.early_release and not (set(owned_nodes) - nodes))), \
        ("Not owning the correct locks, early_release=%s, owned=%r,"
         " nodes=%r" % (self.early_release, owned_nodes, nodes))

    return result

  def _CheckVolumeGroup(self, node_uuids):
    self.lu.LogInfo("Checking volume groups")

    vgname = self.cfg.GetVGName()

    # Make sure volume group exists on all involved nodes
    results = self.rpc.call_vg_list(node_uuids)
    if not results:
      raise errors.OpExecError("Can't list volume groups on the nodes")

    for node_uuid in node_uuids:
      res = results[node_uuid]
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
      if vgname not in res.payload:
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
                                 (vgname, self.cfg.GetNodeName(node_uuid)))

  def _CheckDisksExistence(self, node_uuids):
    # Check disk existence
    for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)):
      if idx not in self.disks:
        continue

      for node_uuid in node_uuids:
        self.lu.LogInfo("Checking disk/%d on %s", idx,
                        self.cfg.GetNodeName(node_uuid))

        result = _BlockdevFind(self, node_uuid, dev, self.instance)

        msg = result.fail_msg
        if msg or not result.payload:
          if not msg:
            msg = "disk not found"
          if not self._CheckDisksActivated(self.instance):
            extra_hint = ("\nDisks seem to be not properly activated. Try"
                          " running activate-disks on the instance before"
                          " using replace-disks.")
          else:
            extra_hint = ""
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
                                    extra_hint))

  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
    for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance.uuid)):
      if idx not in self.disks:
        continue

      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
                      (idx, self.cfg.GetNodeName(node_uuid)))

      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
                                  on_primary, ldisk=ldisk):
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
                                 " replace disks for instance %s" %
                                 (self.cfg.GetNodeName(node_uuid),
                                  self.instance.name))

  def _CreateNewStorage(self, node_uuid):
    """Create new storage on the primary or secondary node.

    This is only used for same-node replaces, not for changing the
    secondary node, hence we don't want to modify the existing disk.

    """
    iv_names = {}

    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
    for idx, dev in enumerate(disks):
      if idx not in self.disks:
        continue

      self.lu.LogInfo("Adding storage on %s for disk/%d",
                      self.cfg.GetNodeName(node_uuid), idx)

      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
      names = _GenerateUniqueNames(self.lu, lv_names)

      (data_disk, meta_disk) = dev.children
      vg_data = data_disk.logical_id[0]
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
                             logical_id=(vg_data, names[0]),
                             params=data_disk.params)
      vg_meta = meta_disk.logical_id[0]
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
                             size=constants.DRBD_META_SIZE,
                             logical_id=(vg_meta, names[1]),
                             params=meta_disk.params)

      new_lvs = [lv_data, lv_meta]
      old_lvs = [child.Copy() for child in dev.children]
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)

      # we pass force_create=True to force the LVM creation
      for new_lv in new_lvs:
        try:
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
                               GetInstanceInfoText(self.instance), False,
                               excl_stor)
        except errors.DeviceCreationError, e:
          raise errors.OpExecError("Can't create block device: %s" % e.message)

    return iv_names

  def _CheckDevices(self, node_uuid, iv_names):
    for name, (dev, _, _) in iv_names.iteritems():
      result = _BlockdevFind(self, node_uuid, dev, self.instance)

      msg = result.fail_msg
      if msg or not result.payload:
        if not msg:
          msg = "disk not found"
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
                                 (name, msg))

      if result.payload.is_degraded:
        raise errors.OpExecError("DRBD device %s is degraded!" % name)

  def _RemoveOldStorage(self, node_uuid, iv_names):
    for name, (_, old_lvs, _) in iv_names.iteritems():
      self.lu.LogInfo("Remove logical volumes for %s", name)

      for lv in old_lvs:
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
                .fail_msg
        if msg:
          self.lu.LogWarning("Can't remove old LV: %s", msg,
                             hint="remove unused LVs manually")

  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
    """Replace a disk on the primary or secondary for DRBD 8.

    The algorithm for replace is quite complicated:

      1. for each disk to be replaced:

        1. create new LVs on the target node with unique names
        1. detach old LVs from the drbd device
        1. rename old LVs to name_replaced.<time_t>
        1. rename new LVs to old LVs
        1. attach the new LVs (with the old names now) to the drbd device

      1. wait for sync across all devices

      1. for each modified disk:

        1. remove old LVs (which have the name name_replaces.<time_t>)

    Failures are not very well handled.

    """
    steps_total = 6

    if self.instance.forthcoming:
      feedback_fn("Instance forthcoming, not touching disks")
      return

    # Step: check device activation
    self.lu.LogStep(1, steps_total, "Check device existence")
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])

    # Step: check other node consistency
    self.lu.LogStep(2, steps_total, "Check peer consistency")
    self._CheckDisksConsistency(
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
      False)

    # Step: create new storage
    self.lu.LogStep(3, steps_total, "Allocate new storage")
    iv_names = self._CreateNewStorage(self.target_node_uuid)

    # Step: for each lv, detach+rename*2+attach
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
    for dev, old_lvs, new_lvs in iv_names.itervalues():
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)

      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
                                                     (dev, self.instance),
                                                     (old_lvs, self.instance))
      result.Raise("Can't detach drbd from local storage on node"
                   " %s for device %s" %
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
      #dev.children = []
      #cfg.Update(instance)

      # ok, we created the new LVs, so now we know we have the needed
      # storage; as such, we proceed on the target node to rename
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
      # using the assumption that logical_id == unique_id on that node

      # FIXME(iustin): use a better name for the replaced LVs
      temp_suffix = int(time.time())
      ren_fn = lambda d, suff: (d.logical_id[0],
                                d.logical_id[1] + "_replaced-%s" % suff)

      # Build the rename list based on what LVs exist on the node
      rename_old_to_new = []
      for to_ren in old_lvs:
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
                                             (to_ren, self.instance))
        if not result.fail_msg and result.payload:
          # device exists
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))

      self.lu.LogInfo("Renaming the old LVs on the target node")
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
                                             rename_old_to_new)
      result.Raise("Can't rename old LVs on node %s" %
                   self.cfg.GetNodeName(self.target_node_uuid))

      # Now we rename the new LVs to the old LVs
      self.lu.LogInfo("Renaming the new LVs on the target node")
      rename_new_to_old = [(new, old.logical_id)
                           for old, new in zip(old_lvs, new_lvs)]
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
                                             rename_new_to_old)
      result.Raise("Can't rename new LVs on node %s" %
                   self.cfg.GetNodeName(self.target_node_uuid))

      # Intermediate steps of in memory modifications
      for old, new in zip(old_lvs, new_lvs):
        new.logical_id = old.logical_id

      # We need to modify old_lvs so that removal later removes the
      # right LVs, not the newly added ones; note that old_lvs is a
      # copy here
      for disk in old_lvs:
        disk.logical_id = ren_fn(disk, temp_suffix)

      # Now that the new lvs have the old name, we can add them to the device
      self.lu.LogInfo("Adding new mirror component on %s",
                      self.cfg.GetNodeName(self.target_node_uuid))
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
                                                  (dev, self.instance),
                                                  (new_lvs, self.instance))
      msg = result.fail_msg
      if msg:
        for new_lv in new_lvs:
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
                                               (new_lv, self.instance)).fail_msg
          if msg2:
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
                               hint=("cleanup manually the unused logical"
                                     "volumes"))
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)

    cstep = itertools.count(5)

    if self.early_release:
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
      # TODO: Check if releasing locks early still makes sense
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
    else:
      # Release all resource locks except those used by the instance
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
                   keep=self.node_secondary_ip.keys())

    # Release all node locks while waiting for sync
    ReleaseLocks(self.lu, locking.LEVEL_NODE)

    # TODO: Can the instance lock be downgraded here? Take the optional disk
    # shutdown in the caller into consideration.

    # Wait for sync
    # This can fail as the old devices are degraded and _WaitForSync
    # does a combined result over all disks, so we don't check its return value
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
    WaitForSync(self.lu, self.instance)

    # Check all devices manually
    self._CheckDevices(self.instance.primary_node, iv_names)

    # Step: remove old storage
    if not self.early_release:
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
      self._RemoveOldStorage(self.target_node_uuid, iv_names)

  def _UpdateDisksSecondary(self, iv_names, feedback_fn):
    """Update the configuration of disks to have a new secondary.

    @param iv_names: iterable of triples for all volumes of the instance.
        The first component has to be the device and the third the logical
        id.
    @param feedback_fn: function to used send feedback back to the caller of
        the OpCode
    """
    self.lu.LogInfo("Updating instance configuration")
    for dev, _, new_logical_id in iv_names.itervalues():
      dev.logical_id = new_logical_id
      self.cfg.Update(dev, feedback_fn)
      self.cfg.SetDiskNodes(dev.uuid, [self.instance.primary_node,
                                       self.new_node_uuid])

    self.cfg.Update(self.instance, feedback_fn)

  def _ExecDrbd8Secondary(self, feedback_fn):
    """Replace the secondary node for DRBD 8.

    The algorithm for replace is quite complicated:
      - for all disks of the instance:
        - create new LVs on the new node with same names
        - shutdown the drbd device on the old secondary
        - disconnect the drbd network on the primary
        - create the drbd device on the new secondary
        - network attach the drbd on the primary, using an artifice:
          the drbd code for Attach() will connect to the network if it
          finds a device which is connected to the good local disks but
          not network enabled
      - wait for sync across all devices
      - remove all disks from the old secondary

    Failures are not very well handled.

    """
    if self.instance.forthcoming:
      feedback_fn("Instance fortcoming, will only update the configuration")
      inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
      minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
                                           for _ in inst_disks],
                                          self.instance.uuid)
      logging.debug("Allocated minors %r", minors)
      iv_names = {}
      for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)):
        (o_node1, _, o_port, o_minor1, o_minor2, o_secret) = \
            dev.logical_id
        if self.instance.primary_node == o_node1:
          p_minor = o_minor1
        else:
          p_minor = o_minor2
        new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
                      p_minor, new_minor, o_secret)
        iv_names[idx] = (dev, dev.children, new_net_id)
        logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
                      new_net_id)
      self._UpdateDisksSecondary(iv_names, feedback_fn)
      ReleaseLocks(self.lu, locking.LEVEL_NODE)
      return

    steps_total = 6

    pnode = self.instance.primary_node

    # Step: check device activation
    self.lu.LogStep(1, steps_total, "Check device existence")
    self._CheckDisksExistence([self.instance.primary_node])
    self._CheckVolumeGroup([self.instance.primary_node])

    # Step: check other node consistency
    self.lu.LogStep(2, steps_total, "Check peer consistency")
    self._CheckDisksConsistency(self.instance.primary_node, True, True)

    # Step: create new storage
    self.lu.LogStep(3, steps_total, "Allocate new storage")
    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
                                                  self.new_node_uuid)
    for idx, dev in enumerate(disks):
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
      # we pass force_create=True to force LVM creation
      for new_lv in dev.children:
        try:
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
                               new_lv, True, GetInstanceInfoText(self.instance),
                               False, excl_stor)
        except errors.DeviceCreationError, e:
          raise errors.OpExecError("Can't create block device: %s" % e.message)

    # Step 4: dbrd minors and drbd setups changes
    # after this, we must manually remove the drbd minors on both the
    # error and the success paths
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
    minors = []
    for disk in inst_disks:
      minor = self.cfg.AllocateDRBDMinor([self.new_node_uuid], disk.uuid)
      minors.append(minor[0])
    logging.debug("Allocated minors %r", minors)

    iv_names = {}
    for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)):
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
      # create new devices on new_node; note that we create two IDs:
      # one without port, so the drbd will be activated without
      # networking information on the new node at this stage, and one
      # with network, for the latter activation in step 4
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
      if self.instance.primary_node == o_node1:
        p_minor = o_minor1
      else:
        assert self.instance.primary_node == o_node2, "Three-node instance?"
        p_minor = o_minor2

      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
                      p_minor, new_minor, o_secret)
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
                    p_minor, new_minor, o_secret)

      iv_names[idx] = (dev, dev.children, new_net_id)
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
                    new_net_id)
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
                              logical_id=new_alone_id,
                              children=dev.children,
                              size=dev.size,
                              params={})
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
                                            self.cfg)
      try:
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
                             anno_new_drbd,
                             GetInstanceInfoText(self.instance), False,
                             excl_stor)
      except errors.GenericError:
        for disk in inst_disks:
          self.cfg.ReleaseDRBDMinors(disk.uuid)
        raise

    # We have new devices, shutdown the drbd on the old secondary

    for idx, dev in enumerate(inst_disks):
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
                                            (dev, self.instance)).fail_msg
      if msg:
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
                           "node: %s" % (idx, msg),
                           hint=("Please cleanup this device manually as"
                                 " soon as possible"))

    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
    result = self.rpc.call_drbd_disconnect_net(
               [pnode], (inst_disks, self.instance))[pnode]

    msg = result.fail_msg
    if msg:
      # detaches didn't succeed (unlikely)
      for disk in inst_disks:
        self.cfg.ReleaseDRBDMinors(disk.uuid)
      raise errors.OpExecError("Can't detach the disks from the network on"
                               " old node: %s" % (msg,))

    # if we managed to detach at least one, we update all the disks of
    # the instance to point to the new secondary
    self._UpdateDisksSecondary(iv_names, feedback_fn)

    # Release all node locks (the configuration has been updated)
    ReleaseLocks(self.lu, locking.LEVEL_NODE)

    # and now perform the drbd attach
    self.lu.LogInfo("Attaching primary drbds to new secondary"
                    " (standalone => connected)")
    inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
                                            self.new_node_uuid],
                                           (inst_disks, self.instance),
                                           False)
    for to_node, to_result in result.items():
      msg = to_result.fail_msg
      if msg:
        raise errors.OpExecError(
          "Can't attach drbd disks on node %s: %s (please do a gnt-instance "
          "info %s to see the status of disks)" %
          (self.cfg.GetNodeName(to_node), msg, self.instance.name))

    cstep = itertools.count(5)

    if self.early_release:
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
      # TODO: Check if releasing locks early still makes sense
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
    else:
      # Release all resource locks except those used by the instance
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
                   keep=self.node_secondary_ip.keys())

    # TODO: Can the instance lock be downgraded here? Take the optional disk
    # shutdown in the caller into consideration.

    # Wait for sync
    # This can fail as the old devices are degraded and _WaitForSync
    # does a combined result over all disks, so we don't check its return value
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
    WaitForSync(self.lu, self.instance)

    # Check all devices manually
    self._CheckDevices(self.instance.primary_node, iv_names)

    # Step: remove old storage
    if not self.early_release:
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
      self._RemoveOldStorage(self.target_node_uuid, iv_names)


class TemporaryDisk():
  """ Creates a new temporary bootable disk, and makes sure it is destroyed.

  Is a context manager, and should be used with the ``with`` statement as such.

  The disk is guaranteed to be created at index 0, shifting any other disks of
  the instance by one place, and allowing the instance to be booted with the
  content of the disk.

  """

  def __init__(self, lu, instance, disks, feedback_fn,
               shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
    """ Constructor storing arguments until used later.

    @type lu: L{ganeti.cmdlib.base.LogicalUnit}
    @param lu: The LU within which this disk is created.

    @type instance: L{ganeti.objects.Instance}
    @param instance: The instance to which the disk should be added

    @type disks: list of triples (disk template, disk access mode, int)
    @param disks:
      disk specification, which is a list of triples containing the
      disk template (e.g., L{constants.DT_PLAIN}), the disk access
      mode (i.e., L{constants.DISK_RDONLY} or L{constants.DISK_RDWR}),
      and size in MiB.

    @type feedback_fn: function
    @param feedback_fn: Function used to log progress

    """
    self._lu = lu
    self._instance = instance
    self._disks = disks
    self._feedback_fn = feedback_fn
    self._shutdown_timeout = shutdown_timeout

  def _EnsureInstanceDiskState(self):
    """ Ensures that the instance is down, and its disks inactive.

    All the operations related to the creation and destruction of disks require
    that the instance is down and that the disks are inactive. This function is
    invoked to make it so.

    """
    # The instance needs to be down before any of these actions occur
    # Whether it is must be checked manually through a RPC - configuration
    # reflects only the desired state
    self._feedback_fn("Shutting down instance")
    result = self._lu.rpc.call_instance_shutdown(self._instance.primary_node,
                                                 self._instance,
                                                 self._shutdown_timeout,
                                                 self._lu.op.reason)
    result.Raise("Shutdown of instance '%s' while removing temporary disk "
                 "failed" % self._instance.name)

    # Disks need to be deactivated prior to being removed
    # The disks_active configuration entry should match the actual state
    if self._instance.disks_active:
      self._feedback_fn("Deactivating disks")
      ShutdownInstanceDisks(self._lu, self._instance)

  def __enter__(self):
    """ Context manager entry function, creating the disk.

    @rtype: L{ganeti.objects.Disk}
    @return: The disk object created.

    """
    self._EnsureInstanceDiskState()

    new_disks = []

    # The iv_name of the disk intentionally diverges from Ganeti's standards, as
    # this disk should be very temporary and its presence should be reported.
    # With the special iv_name, gnt-cluster verify detects the disk and warns
    # the user of its presence. Removing the disk restores the instance to its
    # proper state, despite an error that appears when the removal is performed.
    for idx, (disk_template, disk_access, disk_size) in enumerate(self._disks):
      new_disk = objects.Disk()
      new_disk.dev_type = disk_template
      new_disk.mode = disk_access
      new_disk.uuid = self._lu.cfg.GenerateUniqueID(self._lu.proc.GetECId())
      new_disk.logical_id = (self._lu.cfg.GetVGName(), new_disk.uuid)
      new_disk.params = {}
      new_disk.size = disk_size

      new_disks.append(new_disk)

    self._feedback_fn("Attempting to create temporary disk")

    self._undoing_info = CreateDisks(self._lu, self._instance, disks=new_disks)
    for idx, new_disk in enumerate(new_disks):
      self._lu.cfg.AddInstanceDisk(self._instance.uuid, new_disk, idx=idx)
    self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid)

    self._feedback_fn("Temporary disk created")

    self._new_disks = new_disks

    return new_disks

  def __exit__(self, exc_type, _value, _traceback):
    """ Context manager exit function, destroying the disk.

    """
    if exc_type:
      self._feedback_fn("Exception raised, cleaning up temporary disk")
    else:
      self._feedback_fn("Regular cleanup of temporary disk")

    try:
      self._EnsureInstanceDiskState()

      _UndoCreateDisks(self._lu, self._undoing_info, self._instance)

      for disk in self._new_disks:
        self._lu.cfg.RemoveInstanceDisk(self._instance.uuid, disk.uuid)
      self._instance = self._lu.cfg.GetInstanceInfo(self._instance.uuid)

      self._feedback_fn("Temporary disk removed")
    except:
      self._feedback_fn("Disk cleanup failed; it will have to be removed "
                        "manually")
      raise
