blob: 51474d6db24857b8c89a612c037e96e278752c65 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Logical units dealing with the cluster."""
import copy
import itertools
import logging
import operator
import os
import re
import time
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import hypervisor
from ganeti import locking
from ganeti import masterd
from ganeti import netutils
from ganeti import objects
from ganeti import opcodes
from ganeti import pathutils
from ganeti import query
import ganeti.rpc.node as rpc
from ganeti import runtime
from ganeti import ssh
from ganeti import uidpool
from ganeti import utils
from ganeti import vcluster
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
ResultWithJobs
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
CheckDiskAccessModeConsistency, GetClientCertDigest, \
AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp, \
CheckImageValidity, CheckDiskAccessModeConsistency, EnsureKvmdOnNodes
import ganeti.masterd.instance
class LUClusterRenewCrypto(NoHooksLU):
"""Renew the cluster's crypto tokens.
"""
_MAX_NUM_RETRIES = 3
REQ_BGL = False
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
}
self.share_locks = ShareAll()
self.share_locks[locking.LEVEL_NODE] = 0
def CheckPrereq(self):
"""Check prerequisites.
This checks whether the cluster is empty.
Any errors are signaled by raising errors.OpPrereqError.
"""
self._ssh_renewal_suppressed = \
not self.cfg.GetClusterInfo().modify_ssh_setup and self.op.ssh_keys
def _RenewNodeSslCertificates(self, feedback_fn):
"""Renews the nodes' SSL certificates.
Note that most of this operation is done in gnt_cluster.py, this LU only
takes care of the renewal of the client SSL certificates.
"""
master_uuid = self.cfg.GetMasterNode()
cluster = self.cfg.GetClusterInfo()
logging.debug("Renewing the master's SSL node certificate."
" Master's UUID: %s.", master_uuid)
# mapping node UUIDs to client certificate digests
digest_map = {}
master_digest = utils.GetCertificateDigest(
cert_filename=pathutils.NODED_CLIENT_CERT_FILE)
digest_map[master_uuid] = master_digest
logging.debug("Adding the master's SSL node certificate digest to the"
" configuration. Master's UUID: %s, Digest: %s",
master_uuid, master_digest)
node_errors = {}
nodes = self.cfg.GetAllNodesInfo()
logging.debug("Renewing non-master nodes' node certificates.")
for (node_uuid, node_info) in nodes.items():
if node_info.offline:
logging.info("* Skipping offline node %s", node_info.name)
continue
if node_uuid != master_uuid:
logging.debug("Adding certificate digest of node '%s'.", node_uuid)
last_exception = None
for i in range(self._MAX_NUM_RETRIES):
try:
if node_info.master_candidate:
node_digest = GetClientCertDigest(self, node_uuid)
digest_map[node_uuid] = node_digest
logging.debug("Added the node's certificate to candidate"
" certificate list. Current list: %s.",
str(cluster.candidate_certs))
break
except errors.OpExecError as e:
last_exception = e
logging.error("Could not fetch a non-master node's SSL node"
" certificate at attempt no. %s. The node's UUID"
" is %s, and the error was: %s.",
str(i), node_uuid, e)
else:
if last_exception:
node_errors[node_uuid] = last_exception
if node_errors:
msg = ("Some nodes' SSL client certificates could not be fetched."
" Please make sure those nodes are reachable and rerun"
" the operation. The affected nodes and their errors are:\n")
for uuid, e in node_errors.items():
msg += "Node %s: %s\n" % (uuid, e)
feedback_fn(msg)
self.cfg.SetCandidateCerts(digest_map)
def _RenewSshKeys(self):
"""Renew all nodes' SSH keys.
"""
master_uuid = self.cfg.GetMasterNode()
nodes = self.cfg.GetAllNodesInfo()
nodes_uuid_names = [(node_uuid, node_info.name) for (node_uuid, node_info)
in nodes.items() if not node_info.offline]
node_names = [name for (_, name) in nodes_uuid_names]
node_uuids = [uuid for (uuid, _) in nodes_uuid_names]
potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
result = self.rpc.call_node_ssh_keys_renew(
[master_uuid],
node_uuids, node_names,
master_candidate_uuids,
potential_master_candidates)
result[master_uuid].Raise("Could not renew the SSH keys of all nodes")
def Exec(self, feedback_fn):
if self.op.node_certificates:
feedback_fn("Renewing Node SSL certificates")
self._RenewNodeSslCertificates(feedback_fn)
if self.op.ssh_keys and not self._ssh_renewal_suppressed:
feedback_fn("Renewing SSH keys")
self._RenewSshKeys()
elif self._ssh_renewal_suppressed:
feedback_fn("Cannot renew SSH keys if the cluster is configured to not"
" modify the SSH setup.")
class LUClusterActivateMasterIp(NoHooksLU):
"""Activate the master IP on the master node.
"""
def Exec(self, feedback_fn):
"""Activate the master IP.
"""
master_params = self.cfg.GetMasterNetworkParameters()
ems = self.cfg.GetUseExternalMipScript()
result = self.rpc.call_node_activate_master_ip(master_params.uuid,
master_params, ems)
result.Raise("Could not activate the master IP")
class LUClusterDeactivateMasterIp(NoHooksLU):
"""Deactivate the master IP on the master node.
"""
def Exec(self, feedback_fn):
"""Deactivate the master IP.
"""
master_params = self.cfg.GetMasterNetworkParameters()
ems = self.cfg.GetUseExternalMipScript()
result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
master_params, ems)
result.Raise("Could not deactivate the master IP")
class LUClusterConfigQuery(NoHooksLU):
"""Return configuration values.
"""
REQ_BGL = False
def CheckArguments(self):
self.cq = ClusterQuery(None, self.op.output_fields, False)
def ExpandNames(self):
self.cq.ExpandNames(self)
def DeclareLocks(self, level):
self.cq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
result = self.cq.OldStyleQuery(self)
assert len(result) == 1
return result[0]
class LUClusterDestroy(LogicalUnit):
"""Logical unit for destroying the cluster.
"""
HPATH = "cluster-destroy"
HTYPE = constants.HTYPE_CLUSTER
# Read by the job queue to detect when the cluster is gone and job files will
# never be available.
# FIXME: This variable should be removed together with the Python job queue.
clusterHasBeenDestroyed = False
def BuildHooksEnv(self):
"""Build hooks env.
"""
return {
"OP_TARGET": self.cfg.GetClusterName(),
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
return ([], [])
def CheckPrereq(self):
"""Check prerequisites.
This checks whether the cluster is empty.
Any errors are signaled by raising errors.OpPrereqError.
"""
master = self.cfg.GetMasterNode()
nodelist = self.cfg.GetNodeList()
if len(nodelist) != 1 or nodelist[0] != master:
raise errors.OpPrereqError("There are still %d node(s) in"
" this cluster." % (len(nodelist) - 1),
errors.ECODE_INVAL)
instancelist = self.cfg.GetInstanceList()
if instancelist:
raise errors.OpPrereqError("There are still %d instance(s) in"
" this cluster." % len(instancelist),
errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Destroys the cluster.
"""
master_params = self.cfg.GetMasterNetworkParameters()
# Run post hooks on master node before it's removed
RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
ems = self.cfg.GetUseExternalMipScript()
result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
master_params, ems)
result.Warn("Error disabling the master IP address", self.LogWarning)
self.wconfd.Client().PrepareClusterDestruction(self.wconfdcontext)
# signal to the job queue that the cluster is gone
LUClusterDestroy.clusterHasBeenDestroyed = True
return master_params.uuid
class LUClusterPostInit(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
"""
HPATH = "cluster-init"
HTYPE = constants.HTYPE_CLUSTER
def CheckArguments(self):
self.master_uuid = self.cfg.GetMasterNode()
self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
# TODO: When Issue 584 is solved, and None is properly parsed when used
# as a default value, ndparams.get(.., None) can be changed to
# ndparams[..] to access the values directly
# OpenvSwitch: Warn user if link is missing
if (self.master_ndparams[constants.ND_OVS] and not
self.master_ndparams.get(constants.ND_OVS_LINK, None)):
self.LogInfo("No physical interface for OpenvSwitch was given."
" OpenvSwitch will not have an outside connection. This"
" might not be what you want.")
def BuildHooksEnv(self):
"""Build hooks env.
"""
return {
"OP_TARGET": self.cfg.GetClusterName(),
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
return ([], [self.cfg.GetMasterNode()])
def Exec(self, feedback_fn):
"""Create and configure Open vSwitch
"""
if self.master_ndparams[constants.ND_OVS]:
result = self.rpc.call_node_configure_ovs(
self.master_uuid,
self.master_ndparams[constants.ND_OVS_NAME],
self.master_ndparams.get(constants.ND_OVS_LINK, None))
result.Raise("Could not successully configure Open vSwitch")
return True
class ClusterQuery(QueryBase):
FIELDS = query.CLUSTER_FIELDS
#: Do not sort (there is only one item)
SORT_FIELD = None
def ExpandNames(self, lu):
lu.needed_locks = {}
# The following variables interact with _QueryBase._GetNames
self.wanted = locking.ALL_SET
self.do_locking = self.use_locking
if self.do_locking:
raise errors.OpPrereqError("Can not use locking for cluster queries",
errors.ECODE_INVAL)
def DeclareLocks(self, lu, level):
pass
def _GetQueryData(self, lu):
"""Computes the list of nodes and their attributes.
"""
if query.CQ_CONFIG in self.requested_data:
cluster = lu.cfg.GetClusterInfo()
nodes = lu.cfg.GetAllNodesInfo()
else:
cluster = NotImplemented
nodes = NotImplemented
if query.CQ_QUEUE_DRAINED in self.requested_data:
drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
else:
drain_flag = NotImplemented
if query.CQ_WATCHER_PAUSE in self.requested_data:
master_node_uuid = lu.cfg.GetMasterNode()
result = lu.rpc.call_get_watcher_pause(master_node_uuid)
result.Raise("Can't retrieve watcher pause from master node '%s'" %
lu.cfg.GetMasterNodeName())
watcher_pause = result.payload
else:
watcher_pause = NotImplemented
return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
class LUClusterQuery(NoHooksLU):
"""Query cluster configuration.
"""
REQ_BGL = False
def ExpandNames(self):
self.needed_locks = {}
def Exec(self, feedback_fn):
"""Return cluster config.
"""
cluster = self.cfg.GetClusterInfo()
os_hvp = {}
# Filter just for enabled hypervisors
for os_name, hv_dict in cluster.os_hvp.items():
os_hvp[os_name] = {}
for hv_name, hv_params in hv_dict.items():
if hv_name in cluster.enabled_hypervisors:
os_hvp[os_name][hv_name] = hv_params
# Convert ip_family to ip_version
primary_ip_version = constants.IP4_VERSION
if cluster.primary_ip_family == netutils.IP6Address.family:
primary_ip_version = constants.IP6_VERSION
result = {
"software_version": constants.RELEASE_VERSION,
"protocol_version": constants.PROTOCOL_VERSION,
"config_version": constants.CONFIG_VERSION,
"os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
"vcs_version": constants.VCS_VERSION,
"architecture": runtime.GetArchInfo(),
"name": cluster.cluster_name,
"master": self.cfg.GetMasterNodeName(),
"default_hypervisor": cluster.primary_hypervisor,
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
for hypervisor_name in cluster.enabled_hypervisors]),
"os_hvp": os_hvp,
"beparams": cluster.beparams,
"osparams": cluster.osparams,
"ipolicy": cluster.ipolicy,
"nicparams": cluster.nicparams,
"ndparams": cluster.ndparams,
"diskparams": cluster.diskparams,
"candidate_pool_size": cluster.candidate_pool_size,
"max_running_jobs": cluster.max_running_jobs,
"max_tracked_jobs": cluster.max_tracked_jobs,
"mac_prefix": cluster.mac_prefix,
"master_netdev": cluster.master_netdev,
"master_netmask": cluster.master_netmask,
"use_external_mip_script": cluster.use_external_mip_script,
"volume_group_name": cluster.volume_group_name,
"drbd_usermode_helper": cluster.drbd_usermode_helper,
"file_storage_dir": cluster.file_storage_dir,
"shared_file_storage_dir": cluster.shared_file_storage_dir,
"maintain_node_health": cluster.maintain_node_health,
"ctime": cluster.ctime,
"mtime": cluster.mtime,
"uuid": cluster.uuid,
"tags": list(cluster.GetTags()),
"uid_pool": cluster.uid_pool,
"default_iallocator": cluster.default_iallocator,
"default_iallocator_params": cluster.default_iallocator_params,
"reserved_lvs": cluster.reserved_lvs,
"primary_ip_version": primary_ip_version,
"prealloc_wipe_disks": cluster.prealloc_wipe_disks,
"hidden_os": cluster.hidden_os,
"blacklisted_os": cluster.blacklisted_os,
"enabled_disk_templates": cluster.enabled_disk_templates,
"install_image": cluster.install_image,
"instance_communication_network": cluster.instance_communication_network,
"compression_tools": cluster.compression_tools,
"enabled_user_shutdown": cluster.enabled_user_shutdown,
}
return result
class LUClusterRedistConf(NoHooksLU):
"""Force the redistribution of cluster configuration.
This is a very simple LU.
"""
REQ_BGL = False
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
}
self.share_locks = ShareAll()
def Exec(self, feedback_fn):
"""Redistribute the configuration.
"""
self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
RedistributeAncillaryFiles(self)
class LUClusterRename(LogicalUnit):
"""Rename the cluster.
"""
HPATH = "cluster-rename"
HTYPE = constants.HTYPE_CLUSTER
def BuildHooksEnv(self):
"""Build hooks env.
"""
return {
"OP_TARGET": self.cfg.GetClusterName(),
"NEW_NAME": self.op.name,
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
def CheckPrereq(self):
"""Verify that the passed name is a valid one.
"""
hostname = netutils.GetHostname(name=self.op.name,
family=self.cfg.GetPrimaryIPFamily())
new_name = hostname.name
self.ip = new_ip = hostname.ip
old_name = self.cfg.GetClusterName()
old_ip = self.cfg.GetMasterIP()
if new_name == old_name and new_ip == old_ip:
raise errors.OpPrereqError("Neither the name nor the IP address of the"
" cluster has changed",
errors.ECODE_INVAL)
if new_ip != old_ip:
if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("The given cluster IP address (%s) is"
" reachable on the network" %
new_ip, errors.ECODE_NOTUNIQUE)
self.op.name = new_name
def Exec(self, feedback_fn):
"""Rename the cluster.
"""
clustername = self.op.name
new_ip = self.ip
# shutdown the master IP
master_params = self.cfg.GetMasterNetworkParameters()
ems = self.cfg.GetUseExternalMipScript()
result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
master_params, ems)
result.Raise("Could not disable the master role")
try:
cluster = self.cfg.GetClusterInfo()
cluster.cluster_name = clustername
cluster.master_ip = new_ip
self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
node_list = self.cfg.GetOnlineNodeList()
try:
node_list.remove(master_params.uuid)
except ValueError:
pass
UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
finally:
master_params.ip = new_ip
result = self.rpc.call_node_activate_master_ip(master_params.uuid,
master_params, ems)
result.Warn("Could not re-enable the master role on the master,"
" please restart manually", self.LogWarning)
return clustername
class LUClusterRepairDiskSizes(NoHooksLU):
"""Verifies the cluster disks sizes.
"""
REQ_BGL = False
def ExpandNames(self):
if self.op.instances:
(_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
# Not getting the node allocation lock as only a specific set of
# instances (and their nodes) is going to be acquired
self.needed_locks = {
locking.LEVEL_NODE_RES: [],
locking.LEVEL_INSTANCE: self.wanted_names,
}
self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
else:
self.wanted_names = None
self.needed_locks = {
locking.LEVEL_NODE_RES: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
self.share_locks = {
locking.LEVEL_NODE_RES: 1,
locking.LEVEL_INSTANCE: 0,
}
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
self._LockInstancesNodes(primary_only=True, level=level)
def CheckPrereq(self):
"""Check prerequisites.
This only checks the optional instance list against the existing names.
"""
if self.wanted_names is None:
self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
self.wanted_instances = \
map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
def _EnsureChildSizes(self, disk):
"""Ensure children of the disk have the needed disk size.
This is valid mainly for DRBD8 and fixes an issue where the
children have smaller disk size.
@param disk: an L{ganeti.objects.Disk} object
"""
if disk.dev_type == constants.DT_DRBD8:
assert disk.children, "Empty children for DRBD8?"
fchild = disk.children[0]
mismatch = fchild.size < disk.size
if mismatch:
self.LogInfo("Child disk has size %d, parent %d, fixing",
fchild.size, disk.size)
fchild.size = disk.size
# and we recurse on this child only, not on the metadev
return self._EnsureChildSizes(fchild) or mismatch
else:
return False
def Exec(self, feedback_fn):
"""Verify the size of cluster disks.
"""
# TODO: check child disks too
# TODO: check differences in size between primary/secondary nodes
per_node_disks = {}
for instance in self.wanted_instances:
pnode = instance.primary_node
if pnode not in per_node_disks:
per_node_disks[pnode] = []
for idx, disk in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
per_node_disks[pnode].append((instance, idx, disk))
assert not (frozenset(per_node_disks.keys()) -
frozenset(self.owned_locks(locking.LEVEL_NODE_RES))), \
"Not owning correct locks"
assert not self.owned_locks(locking.LEVEL_NODE)
es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
per_node_disks.keys())
changed = []
for node_uuid, dskl in per_node_disks.items():
if not dskl:
# no disks on the node
continue
newl = [([v[2].Copy()], v[0]) for v in dskl]
node_name = self.cfg.GetNodeName(node_uuid)
result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
if result.fail_msg:
self.LogWarning("Failure in blockdev_getdimensions call to node"
" %s, ignoring", node_name)
continue
if len(result.payload) != len(dskl):
logging.warning("Invalid result from node %s: len(dksl)=%d,"
" result.payload=%s", node_name, len(dskl),
result.payload)
self.LogWarning("Invalid result from node %s, ignoring node results",
node_name)
continue
for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
if dimensions is None:
self.LogWarning("Disk %d of instance %s did not return size"
" information, ignoring", idx, instance.name)
continue
if not isinstance(dimensions, (tuple, list)):
self.LogWarning("Disk %d of instance %s did not return valid"
" dimension information, ignoring", idx,
instance.name)
continue
(size, spindles) = dimensions
if not isinstance(size, (int, long)):
self.LogWarning("Disk %d of instance %s did not return valid"
" size information, ignoring", idx, instance.name)
continue
size = size >> 20
if size != disk.size:
self.LogInfo("Disk %d of instance %s has mismatched size,"
" correcting: recorded %d, actual %d", idx,
instance.name, disk.size, size)
disk.size = size
self.cfg.Update(disk, feedback_fn)
changed.append((instance.name, idx, "size", size))
if es_flags[node_uuid]:
if spindles is None:
self.LogWarning("Disk %d of instance %s did not return valid"
" spindles information, ignoring", idx,
instance.name)
elif disk.spindles is None or disk.spindles != spindles:
self.LogInfo("Disk %d of instance %s has mismatched spindles,"
" correcting: recorded %s, actual %s",
idx, instance.name, disk.spindles, spindles)
disk.spindles = spindles
self.cfg.Update(disk, feedback_fn)
changed.append((instance.name, idx, "spindles", disk.spindles))
if self._EnsureChildSizes(disk):
self.cfg.Update(disk, feedback_fn)
changed.append((instance.name, idx, "size", disk.size))
return changed
def _ValidateNetmask(cfg, netmask):
"""Checks if a netmask is valid.
@type cfg: L{config.ConfigWriter}
@param cfg: cluster configuration
@type netmask: int
@param netmask: netmask to be verified
@raise errors.OpPrereqError: if the validation fails
"""
ip_family = cfg.GetPrimaryIPFamily()
try:
ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
except errors.ProgrammerError:
raise errors.OpPrereqError("Invalid primary ip family: %s." %
ip_family, errors.ECODE_INVAL)
if not ipcls.ValidateNetmask(netmask):
raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
(netmask), errors.ECODE_INVAL)
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates,
file_disk_template):
"""Checks whether the given file-based storage directory is acceptable.
Note: This function is public, because it is also used in bootstrap.py.
@type logging_warn_fn: function
@param logging_warn_fn: function which accepts a string and logs it
@type file_storage_dir: string
@param file_storage_dir: the directory to be used for file-based instances
@type enabled_disk_templates: list of string
@param enabled_disk_templates: the list of enabled disk templates
@type file_disk_template: string
@param file_disk_template: the file-based disk template for which the
path should be checked
"""
assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
constants.ST_FILE, constants.ST_SHARED_FILE, constants.ST_GLUSTER
))
file_storage_enabled = file_disk_template in enabled_disk_templates
if file_storage_dir is not None:
if file_storage_dir == "":
if file_storage_enabled:
raise errors.OpPrereqError(
"Unsetting the '%s' storage directory while having '%s' storage"
" enabled is not permitted." %
(file_disk_template, file_disk_template),
errors.ECODE_INVAL)
else:
if not file_storage_enabled:
logging_warn_fn(
"Specified a %s storage directory, although %s storage is not"
" enabled." % (file_disk_template, file_disk_template))
else:
raise errors.ProgrammerError("Received %s storage dir with value"
" 'None'." % file_disk_template)
def CheckFileStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates):
"""Checks whether the given file storage directory is acceptable.
@see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
"""
CheckFileBasedStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates,
constants.DT_FILE)
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates):
"""Checks whether the given shared file storage directory is acceptable.
@see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
"""
CheckFileBasedStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates,
constants.DT_SHARED_FILE)
def CheckGlusterStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates):
"""Checks whether the given gluster storage directory is acceptable.
@see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
"""
CheckFileBasedStoragePathVsEnabledDiskTemplates(
logging_warn_fn, file_storage_dir, enabled_disk_templates,
constants.DT_GLUSTER)
def CheckCompressionTools(tools):
"""Check whether the provided compression tools look like executables.
@type tools: list of string
@param tools: The tools provided as opcode input
"""
regex = re.compile('^[-_a-zA-Z0-9]+$')
illegal_tools = [t for t in tools if not regex.match(t)]
if illegal_tools:
raise errors.OpPrereqError(
"The tools '%s' contain illegal characters: only alphanumeric values,"
" dashes, and underscores are allowed" % ", ".join(illegal_tools),
errors.ECODE_INVAL
)
if constants.IEC_GZIP not in tools:
raise errors.OpPrereqError("For compatibility reasons, the %s utility must"
" be present among the compression tools" %
constants.IEC_GZIP, errors.ECODE_INVAL)
if constants.IEC_NONE in tools:
raise errors.OpPrereqError("%s is a reserved value used for no compression,"
" and cannot be used as the name of a tool" %
constants.IEC_NONE, errors.ECODE_INVAL)
class LUClusterSetParams(LogicalUnit):
"""Change the parameters of the cluster.
"""
HPATH = "cluster-modify"
HTYPE = constants.HTYPE_CLUSTER
REQ_BGL = False
def CheckArguments(self):
"""Check parameters
"""
if self.op.uid_pool:
uidpool.CheckUidPool(self.op.uid_pool)
if self.op.add_uids:
uidpool.CheckUidPool(self.op.add_uids)
if self.op.remove_uids:
uidpool.CheckUidPool(self.op.remove_uids)
if self.op.mac_prefix:
self.op.mac_prefix = \
utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)
if self.op.master_netmask is not None:
_ValidateNetmask(self.cfg, self.op.master_netmask)
if self.op.diskparams:
for dt_params in self.op.diskparams.values():
utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
try:
utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
CheckDiskAccessModeValidity(self.op.diskparams)
except errors.OpPrereqError, err:
raise errors.OpPrereqError("While verify diskparams options: %s" % err,
errors.ECODE_INVAL)
if self.op.install_image is not None:
CheckImageValidity(self.op.install_image,
"Install image must be an absolute path or a URL")
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
# FIXME: This opcode changes cluster-wide settings. Is acquiring all
# resource locks the right thing, shouldn't it be the BGL instead?
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
locking.LEVEL_NODEGROUP: locking.ALL_SET,
}
self.share_locks = ShareAll()
def BuildHooksEnv(self):
"""Build hooks env.
"""
return {
"OP_TARGET": self.cfg.GetClusterName(),
"NEW_VG_NAME": self.op.vg_name,
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
mn = self.cfg.GetMasterNode()
return ([mn], [mn])
def _CheckVgName(self, node_uuids, enabled_disk_templates,
new_enabled_disk_templates):
"""Check the consistency of the vg name on all nodes and in case it gets
unset whether there are instances still using it.
"""
lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
new_enabled_disk_templates)
current_vg_name = self.cfg.GetVGName()
if self.op.vg_name == '':
if lvm_is_enabled:
raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
" disk templates are or get enabled.",
errors.ECODE_INVAL)
if self.op.vg_name is None:
if current_vg_name is None and lvm_is_enabled:
raise errors.OpPrereqError("Please specify a volume group when"
" enabling lvm-based disk-templates.",
errors.ECODE_INVAL)
if self.op.vg_name is not None and not self.op.vg_name:
if self.cfg.DisksOfType(constants.DT_PLAIN):
raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
" instances exist", errors.ECODE_INVAL)
if (self.op.vg_name is not None and lvm_is_enabled) or \
(self.cfg.GetVGName() is not None and lvm_gets_enabled):
self._CheckVgNameOnNodes(node_uuids)
def _CheckVgNameOnNodes(self, node_uuids):
"""Check the status of the volume group on each node.
"""
vglist = self.rpc.call_vg_list(node_uuids)
for node_uuid in node_uuids:
msg = vglist[node_uuid].fail_msg
if msg:
# ignoring down node
self.LogWarning("Error while gathering data on node %s"
" (ignoring node): %s",
self.cfg.GetNodeName(node_uuid), msg)
continue
vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
self.op.vg_name,
constants.MIN_VG_SIZE)
if vgstatus:
raise errors.OpPrereqError("Error on node '%s': %s" %
(self.cfg.GetNodeName(node_uuid), vgstatus),
errors.ECODE_ENVIRON)
@staticmethod
def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
old_enabled_disk_templates):
"""Computes three sets of disk templates.
@see: C{_GetDiskTemplateSets} for more details.
"""
enabled_disk_templates = None
new_enabled_disk_templates = []
disabled_disk_templates = []
if op_enabled_disk_templates:
enabled_disk_templates = op_enabled_disk_templates
new_enabled_disk_templates = \
list(set(enabled_disk_templates)
- set(old_enabled_disk_templates))
disabled_disk_templates = \
list(set(old_enabled_disk_templates)
- set(enabled_disk_templates))
else:
enabled_disk_templates = old_enabled_disk_templates
return (enabled_disk_templates, new_enabled_disk_templates,
disabled_disk_templates)
def _GetDiskTemplateSets(self, cluster):
"""Computes three sets of disk templates.
The three sets are:
- disk templates that will be enabled after this operation (no matter if
they were enabled before or not)
- disk templates that get enabled by this operation (thus haven't been
enabled before.)
- disk templates that get disabled by this operation
"""
return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
cluster.enabled_disk_templates)
def _CheckIpolicy(self, cluster, enabled_disk_templates):
"""Checks the ipolicy.
@type cluster: C{objects.Cluster}
@param cluster: the cluster's configuration
@type enabled_disk_templates: list of string
@param enabled_disk_templates: list of (possibly newly) enabled disk
templates
"""
# FIXME: write unit tests for this
if self.op.ipolicy:
self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
group_policy=False)
CheckIpolicyVsDiskTemplates(self.new_ipolicy,
enabled_disk_templates)
all_instances = self.cfg.GetAllInstancesInfo().values()
violations = set()
for group in self.cfg.GetAllNodeGroupsInfo().values():
instances = frozenset(
[inst for inst in all_instances
if compat.any(nuuid in group.members
for nuuid in self.cfg.GetInstanceNodes(inst.uuid))])
new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
self.cfg)
if new:
violations.update(new)
if violations:
self.LogWarning("After the ipolicy change the following instances"
" violate them: %s",
utils.CommaJoin(utils.NiceSort(violations)))
else:
CheckIpolicyVsDiskTemplates(cluster.ipolicy,
enabled_disk_templates)
def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
"""Checks whether the set DRBD helper actually exists on the nodes.
@type drbd_helper: string
@param drbd_helper: path of the drbd usermode helper binary
@type node_uuids: list of strings
@param node_uuids: list of node UUIDs to check for the helper
"""
# checks given drbd helper on all nodes
helpers = self.rpc.call_drbd_helper(node_uuids)
for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
if ninfo.offline:
self.LogInfo("Not checking drbd helper on offline node %s",
ninfo.name)
continue
msg = helpers[ninfo.uuid].fail_msg
if msg:
raise errors.OpPrereqError("Error checking drbd helper on node"
" '%s': %s" % (ninfo.name, msg),
errors.ECODE_ENVIRON)
node_helper = helpers[ninfo.uuid].payload
if node_helper != drbd_helper:
raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
(ninfo.name, node_helper),
errors.ECODE_ENVIRON)
def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
"""Check the DRBD usermode helper.
@type node_uuids: list of strings
@param node_uuids: a list of nodes' UUIDs
@type drbd_enabled: boolean
@param drbd_enabled: whether DRBD will be enabled after this operation
(no matter if it was disabled before or not)
@type drbd_gets_enabled: boolen
@param drbd_gets_enabled: true if DRBD was disabled before this
operation, but will be enabled afterwards
"""
if self.op.drbd_helper == '':
if drbd_enabled:
raise errors.OpPrereqError("Cannot disable drbd helper while"
" DRBD is enabled.", errors.ECODE_STATE)
if self.cfg.DisksOfType(constants.DT_DRBD8):
raise errors.OpPrereqError("Cannot disable drbd helper while"
" drbd-based instances exist",
errors.ECODE_INVAL)
else:
if self.op.drbd_helper is not None and drbd_enabled:
self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
else:
if drbd_gets_enabled:
current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
if current_drbd_helper is not None:
self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
else:
raise errors.OpPrereqError("Cannot enable DRBD without a"
" DRBD usermode helper set.",
errors.ECODE_STATE)
def _CheckInstancesOfDisabledDiskTemplates(
self, disabled_disk_templates):
"""Check whether we try to disable a disk template that is in use.
@type disabled_disk_templates: list of string
@param disabled_disk_templates: list of disk templates that are going to
be disabled by this operation
"""
for disk_template in disabled_disk_templates:
disks_with_type = self.cfg.DisksOfType(disk_template)
if disks_with_type:
disk_desc = []
for disk in disks_with_type:
instance_uuid = self.cfg.GetInstanceForDisk(disk.uuid)
instance = self.cfg.GetInstanceInfo(instance_uuid)
if instance:
instance_desc = "on " + instance.name
else:
instance_desc = "detached"
disk_desc.append("%s (%s)" % (disk, instance_desc))
raise errors.OpPrereqError(
"Cannot disable disk template '%s', because there is at least one"
" disk using it:\n * %s" % (disk_template, "\n * ".join(disk_desc)),
errors.ECODE_STATE)
if constants.DT_DISKLESS in disabled_disk_templates:
instances = self.cfg.GetAllInstancesInfo()
for inst in instances.values():
if not inst.disks:
raise errors.OpPrereqError(
"Cannot disable disk template 'diskless', because there is at"
" least one instance using it:\n * %s" % inst.name,
errors.ECODE_STATE)
@staticmethod
def _CheckInstanceCommunicationNetwork(network, warning_fn):
"""Check whether an existing network is configured for instance
communication.
Checks whether an existing network is configured with the
parameters that are advisable for instance communication, and
otherwise issue security warnings.
@type network: L{ganeti.objects.Network}
@param network: L{ganeti.objects.Network} object whose
configuration is being checked
@type warning_fn: function
@param warning_fn: function used to print warnings
@rtype: None
@return: None
"""
def _MaybeWarn(err, val, default):
if val != default:
warning_fn("Supplied instance communication network '%s' %s '%s',"
" this might pose a security risk (default is '%s').",
network.name, err, val, default)
if network.network is None:
raise errors.OpPrereqError("Supplied instance communication network '%s'"
" must have an IPv4 network address.",
network.name)
_MaybeWarn("has an IPv4 gateway", network.gateway, None)
_MaybeWarn("has a non-standard IPv4 network address", network.network,
constants.INSTANCE_COMMUNICATION_NETWORK4)
_MaybeWarn("has an IPv6 gateway", network.gateway6, None)
_MaybeWarn("has a non-standard IPv6 network address", network.network6,
constants.INSTANCE_COMMUNICATION_NETWORK6)
_MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
def CheckPrereq(self):
"""Check prerequisites.
This checks whether the given params don't conflict and
if the given volume group is valid.
"""
node_uuids = self.owned_locks(locking.LEVEL_NODE)
self.cluster = cluster = self.cfg.GetClusterInfo()
vm_capable_node_uuids = [node.uuid
for node in self.cfg.GetAllNodesInfo().values()
if node.uuid in node_uuids and node.vm_capable]
(enabled_disk_templates, new_enabled_disk_templates,
disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
new_enabled_disk_templates)
if self.op.file_storage_dir is not None:
CheckFileStoragePathVsEnabledDiskTemplates(
self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
if self.op.shared_file_storage_dir is not None:
CheckSharedFileStoragePathVsEnabledDiskTemplates(
self.LogWarning, self.op.shared_file_storage_dir,
enabled_disk_templates)
drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
self._CheckDrbdHelper(vm_capable_node_uuids,
drbd_enabled, drbd_gets_enabled)
# validate params changes
if self.op.beparams:
objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
if self.op.ndparams:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
# TODO: we need a more general way to handle resetting
# cluster-level parameters to default values
if self.new_ndparams["oob_program"] == "":
self.new_ndparams["oob_program"] = \
constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
if self.op.hv_state:
new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
self.cluster.hv_state_static)
self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
for hv, values in new_hv_state.items())
if self.op.disk_state:
new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
self.cluster.disk_state_static)
self.new_disk_state = \
dict((storage, dict((name, cluster.SimpleFillDiskState(values))
for name, values in svalues.items()))
for storage, svalues in new_disk_state.items())
self._CheckIpolicy(cluster, enabled_disk_templates)
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
objects.NIC.CheckParameterSyntax(self.new_nicparams)
nic_errors = []
# check all instances for consistency
for instance in self.cfg.GetAllInstancesInfo().values():
for nic_idx, nic in enumerate(instance.nics):
params_copy = copy.deepcopy(nic.nicparams)
params_filled = objects.FillDict(self.new_nicparams, params_copy)
# check parameter syntax
try:
objects.NIC.CheckParameterSyntax(params_filled)
except errors.ConfigurationError, err:
nic_errors.append("Instance %s, nic/%d: %s" %
(instance.name, nic_idx, err))
# if we're moving instances to routed, check that they have an ip
target_mode = params_filled[constants.NIC_MODE]
if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
" address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
"\n".join(nic_errors), errors.ECODE_INVAL)
# hypervisor list/parameters
self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
if self.op.hvparams:
for hv_name, hv_dict in self.op.hvparams.items():
if hv_name not in self.new_hvparams:
self.new_hvparams[hv_name] = hv_dict
else:
self.new_hvparams[hv_name].update(hv_dict)
# disk template parameters
self.new_diskparams = objects.FillDict(cluster.diskparams, {})
if self.op.diskparams:
for dt_name, dt_params in self.op.diskparams.items():
if dt_name not in self.new_diskparams:
self.new_diskparams[dt_name] = dt_params
else:
self.new_diskparams[dt_name].update(dt_params)
CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
# os hypervisor parameters
self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
if self.op.os_hvp:
for os_name, hvs in self.op.os_hvp.items():
if os_name not in self.new_os_hvp:
self.new_os_hvp[os_name] = hvs
else:
for hv_name, hv_dict in hvs.items():
if hv_dict is None:
# Delete if it exists
self.new_os_hvp[os_name].pop(hv_name, None)
elif hv_name not in self.new_os_hvp[os_name]:
self.new_os_hvp[os_name][hv_name] = hv_dict
else:
self.new_os_hvp[os_name][hv_name].update(hv_dict)
# os parameters
self._BuildOSParams(cluster)
# changes to the hypervisor list
if self.op.enabled_hypervisors is not None:
for hv in self.op.enabled_hypervisors:
# if the hypervisor doesn't already exist in the cluster
# hvparams, we initialize it to empty, and then (in both
# cases) we make sure to fill the defaults, as we might not
# have a complete defaults list if the hypervisor wasn't
# enabled before
if hv not in new_hvp:
new_hvp[hv] = {}
new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
if self.op.hvparams or self.op.enabled_hypervisors is not None:
# either the enabled list has changed, or the parameters have, validate
for hv_name, hv_params in self.new_hvparams.items():
if ((self.op.hvparams and hv_name in self.op.hvparams) or
(self.op.enabled_hypervisors and
hv_name in self.op.enabled_hypervisors)):
# either this is a new hypervisor, or its parameters have changed
hv_class = hypervisor.GetHypervisorClass(hv_name)
utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
hv_class.CheckParameterSyntax(hv_params)
CheckHVParams(self, node_uuids, hv_name, hv_params)
if self.op.os_hvp:
# no need to check any newly-enabled hypervisors, since the
# defaults have already been checked in the above code-block
for os_name, os_hvp in self.new_os_hvp.items():
for hv_name, hv_params in os_hvp.items():
utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
# we need to fill in the new os_hvp on top of the actual hv_p
cluster_defaults = self.new_hvparams.get(hv_name, {})
new_osp = objects.FillDict(cluster_defaults, hv_params)
hv_class = hypervisor.GetHypervisorClass(hv_name)
hv_class.CheckParameterSyntax(new_osp)
CheckHVParams(self, node_uuids, hv_name, new_osp)
if self.op.default_iallocator:
alloc_script = utils.FindFile(self.op.default_iallocator,
constants.IALLOCATOR_SEARCH_PATH,
os.path.isfile)
if alloc_script is None:
raise errors.OpPrereqError("Invalid default iallocator script '%s'"
" specified" % self.op.default_iallocator,
errors.ECODE_INVAL)
if self.op.instance_communication_network:
network_name = self.op.instance_communication_network
try:
network_uuid = self.cfg.LookupNetwork(network_name)
except errors.OpPrereqError:
network_uuid = None
if network_uuid is not None:
network = self.cfg.GetNetwork(network_uuid)
self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
if self.op.compression_tools:
CheckCompressionTools(self.op.compression_tools)
def _BuildOSParams(self, cluster):
"Calculate the new OS parameters for this operation."
def _GetNewParams(source, new_params):
"Wrapper around GetUpdatedParams."
if new_params is None:
return source
result = objects.FillDict(source, {}) # deep copy of source
for os_name in new_params:
result[os_name] = GetUpdatedParams(result.get(os_name, {}),
new_params[os_name],
use_none=True)
if not result[os_name]:
del result[os_name] # we removed all parameters
return result
self.new_osp = _GetNewParams(cluster.osparams,
self.op.osparams)
self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
self.op.osparams_private_cluster)
# Remove os validity check
changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
for os_name in changed_oses:
os_params = cluster.SimpleFillOS(
os_name,
self.new_osp.get(os_name, {}),
os_params_private=self.new_osp_private.get(os_name, {})
)
# check the parameter validity (remote check)
CheckOSParams(self, False, [self.cfg.GetMasterNode()],
os_name, os_params, False)
def _SetVgName(self, feedback_fn):
"""Determines and sets the new volume group name.
"""
if self.op.vg_name is not None:
new_volume = self.op.vg_name
if not new_volume:
new_volume = None
if new_volume != self.cfg.GetVGName():
self.cfg.SetVGName(new_volume)
else:
feedback_fn("Cluster LVM configuration already in desired"
" state, not changing")
def _SetFileStorageDir(self, feedback_fn):
"""Set the file storage directory.
"""
if self.op.file_storage_dir is not None:
if self.cluster.file_storage_dir == self.op.file_storage_dir:
feedback_fn("Global file storage dir already set to value '%s'"
% self.cluster.file_storage_dir)
else:
self.cluster.file_storage_dir = self.op.file_storage_dir
def _SetSharedFileStorageDir(self, feedback_fn):
"""Set the shared file storage directory.
"""
if self.op.shared_file_storage_dir is not None:
if self.cluster.shared_file_storage_dir == \
self.op.shared_file_storage_dir:
feedback_fn("Global shared file storage dir already set to value '%s'"
% self.cluster.shared_file_storage_dir)
else:
self.cluster.shared_file_storage_dir = self.op.shared_file_storage_dir
def _SetDrbdHelper(self, feedback_fn):
"""Set the DRBD usermode helper.
"""
if self.op.drbd_helper is not None:
if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
feedback_fn("Note that you specified a drbd user helper, but did not"
" enable the drbd disk template.")
new_helper = self.op.drbd_helper
if not new_helper:
new_helper = None
if new_helper != self.cfg.GetDRBDHelper():
self.cfg.SetDRBDHelper(new_helper)
else:
feedback_fn("Cluster DRBD helper already in desired state,"
" not changing")
@staticmethod
def _EnsureInstanceCommunicationNetwork(cfg, network_name):
"""Ensure that the instance communication network exists and is
connected to all groups.
The instance communication network given by L{network_name} it is
created, if necessary, via the opcode 'OpNetworkAdd'. Also, the
instance communication network is connected to all existing node
groups, if necessary, via the opcode 'OpNetworkConnect'.
@type cfg: L{config.ConfigWriter}
@param cfg: cluster configuration
@type network_name: string
@param network_name: instance communication network name
@rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
@return: L{ganeti.cmdlib.ResultWithJobs} if the instance
communication needs to be created or it needs to be
connected to a group, otherwise L{None}
"""
jobs = []
try:
network_uuid = cfg.LookupNetwork(network_name)
network_exists = True
except errors.OpPrereqError:
network_exists = False
if not network_exists:
jobs.append(AddInstanceCommunicationNetworkOp(network_name))
for group_uuid in cfg.GetNodeGroupList():
group = cfg.GetNodeGroup(group_uuid)
if network_exists:
network_connected = network_uuid in group.networks
else:
# The network was created asynchronously by the previous
# opcode and, therefore, we don't have access to its
# network_uuid. As a result, we assume that the network is
# not connected to any group yet.
network_connected = False
if not network_connected:
op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
jobs.append(op)
if jobs:
return ResultWithJobs([jobs])
else:
return None
@staticmethod
def _ModifyInstanceCommunicationNetwork(cfg, network_name, feedback_fn):
"""Update the instance communication network stored in the cluster
configuration.
Compares the user-supplied instance communication network against
the one stored in the Ganeti cluster configuration. If there is a
change, the instance communication network may be possibly created
and connected to all groups (see
L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}).
@type cfg: L{config.ConfigWriter}
@param cfg: cluster configuration
@type network_name: string
@param network_name: instance communication network name
@type feedback_fn: function
@param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit}
@rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None}
@return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}
"""
config_network_name = cfg.GetInstanceCommunicationNetwork()
if network_name == config_network_name:
feedback_fn("Instance communication network already is '%s', nothing to"
" do." % network_name)
else:
try:
cfg.LookupNetwork(config_network_name)
feedback_fn("Previous instance communication network '%s'"
" should be removed manually." % config_network_name)
except errors.OpPrereqError:
pass
if network_name:
feedback_fn("Changing instance communication network to '%s', only new"
" instances will be affected."
% network_name)
else:
feedback_fn("Disabling instance communication network, only new"
" instances will be affected.")
cfg.SetInstanceCommunicationNetwork(network_name)
if network_name:
return LUClusterSetParams._EnsureInstanceCommunicationNetwork(
cfg,
network_name)
else:
return None
def Exec(self, feedback_fn):
"""Change the parameters of the cluster.
"""
# re-read the fresh configuration
self.cluster = self.cfg.GetClusterInfo()
if self.op.enabled_disk_templates:
self.cluster.enabled_disk_templates = \
list(self.op.enabled_disk_templates)
# save the changes
self.cfg.Update(self.cluster, feedback_fn)
self._SetVgName(feedback_fn)
self.cluster = self.cfg.GetClusterInfo()
self._SetFileStorageDir(feedback_fn)
self._SetSharedFileStorageDir(feedback_fn)
self.cfg.Update(self.cluster, feedback_fn)
self._SetDrbdHelper(feedback_fn)
# re-read the fresh configuration again
self.cluster = self.cfg.GetClusterInfo()
ensure_kvmd = False
active = constants.DATA_COLLECTOR_STATE_ACTIVE
if self.op.enabled_data_collectors is not None:
for name, val in self.op.enabled_data_collectors.items():
self.cluster.data_collectors[name][active] = val
if self.op.data_collector_interval:
internal = constants.DATA_COLLECTOR_PARAMETER_INTERVAL
for name, val in self.op.data_collector_interval.items():
self.cluster.data_collectors[name][internal] = int(val)
if self.op.hvparams:
self.cluster.hvparams = self.new_hvparams
if self.op.os_hvp:
self.cluster.os_hvp = self.new_os_hvp
if self.op.enabled_hypervisors is not None:
self.cluster.hvparams = self.new_hvparams
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
ensure_kvmd = True
if self.op.beparams:
self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
if self.op.nicparams:
self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
if self.op.ipolicy:
self.cluster.ipolicy = self.new_ipolicy
if self.op.osparams:
self.cluster.osparams = self.new_osp
if self.op.osparams_private_cluster:
self.cluster.osparams_private_cluster = self.new_osp_private
if self.op.ndparams:
self.cluster.ndparams = self.new_ndparams
if self.op.diskparams:
self.cluster.diskparams = self.new_diskparams
if self.op.hv_state:
self.cluster.hv_state_static = self.new_hv_state
if self.op.disk_state:
self.cluster.disk_state_static = self.new_disk_state
if self.op.candidate_pool_size is not None:
self.cluster.candidate_pool_size = self.op.candidate_pool_size
# we need to update the pool size here, otherwise the save will fail
AdjustCandidatePool(self, [])
if self.op.max_running_jobs is not None:
self.cluster.max_running_jobs = self.op.max_running_jobs
if self.op.max_tracked_jobs is not None:
self.cluster.max_tracked_jobs = self.op.max_tracked_jobs
if self.op.maintain_node_health is not None:
self.cluster.maintain_node_health = self.op.maintain_node_health
if self.op.modify_etc_hosts is not None:
self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
if self.op.prealloc_wipe_disks is not None:
self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
if self.op.add_uids is not None:
uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
if self.op.remove_uids is not None:
uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
if self.op.uid_pool is not None:
self.cluster.uid_pool = self.op.uid_pool
if self.op.default_iallocator is not None:
self.cluster.default_iallocator = self.op.default_iallocator
if self.op.default_iallocator_params is not None:
self.cluster.default_iallocator_params = self.op.default_iallocator_params
if self.op.reserved_lvs is not None:
self.cluster.reserved_lvs = self.op.reserved_lvs
if self.op.use_external_mip_script is not None:
self.cluster.use_external_mip_script = self.op.use_external_mip_script
if self.op.enabled_user_shutdown is not None and \
self.cluster.enabled_user_shutdown != self.op.enabled_user_shutdown:
self.cluster.enabled_user_shutdown = self.op.enabled_user_shutdown
ensure_kvmd = True
def helper_os(aname, mods, desc):
desc += " OS list"
lst = getattr(self.cluster, aname)
for key, val in mods:
if key == constants.DDM_ADD:
if val in lst:
feedback_fn("OS %s already in %s, ignoring" % (val, desc))
else:
lst.append(val)
elif key == constants.DDM_REMOVE:
if val in lst:
lst.remove(val)
else:
feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
else:
raise errors.ProgrammerError("Invalid modification '%s'" % key)
if self.op.hidden_os:
helper_os("hidden_os", self.op.hidden_os, "hidden")
if self.op.blacklisted_os:
helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
if self.op.mac_prefix:
self.cluster.mac_prefix = self.op.mac_prefix
if self.op.master_netdev:
master_params = self.cfg.GetMasterNetworkParameters()
ems = self.cfg.GetUseExternalMipScript()
feedback_fn("Shutting down master ip on the current netdev (%s)" %
self.cluster.master_netdev)
result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
master_params, ems)
if not self.op.force:
result.Raise("Could not disable the master ip")
else:
if result.fail_msg:
msg = ("Could not disable the master ip (continuing anyway): %s" %
result.fail_msg)
feedback_fn(msg)
feedback_fn("Changing master_netdev from %s to %s" %
(master_params.netdev, self.op.master_netdev))
self.cluster.master_netdev = self.op.master_netdev
if self.op.master_netmask:
master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
result = self.rpc.call_node_change_master_netmask(
master_params.uuid, master_params.netmask,
self.op.master_netmask, master_params.ip,
master_params.netdev)
result.Warn("Could not change the master IP netmask", feedback_fn)
self.cluster.master_netmask = self.op.master_netmask
if self.op.install_image:
self.cluster.install_image = self.op.install_image
if self.op.zeroing_image is not None:
CheckImageValidity(self.op.zeroing_image,
"Zeroing image must be an absolute path or a URL")
self.cluster.zeroing_image = self.op.zeroing_image
self.cfg.Update(self.cluster, feedback_fn)
if self.op.master_netdev:
master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Starting the master ip on the new master netdev (%s)" %
self.op.master_netdev)
ems = self.cfg.GetUseExternalMipScript()
result = self.rpc.call_node_activate_master_ip(master_params.uuid,
master_params, ems)
result.Warn("Could not re-enable the master ip on the master,"
" please restart manually", self.LogWarning)
# Even though 'self.op.enabled_user_shutdown' is being tested
# above, the RPCs can only be done after 'self.cfg.Update' because
# this will update the cluster object and sync 'Ssconf', and kvmd
# uses 'Ssconf'.
if ensure_kvmd:
EnsureKvmdOnNodes(self, feedback_fn)
if self.op.compression_tools is not None:
self.cfg.SetCompressionTools(self.op.compression_tools)
network_name = self.op.instance_communication_network
if network_name is not None:
return self._ModifyInstanceCommunicationNetwork(self.cfg,
network_name, feedback_fn)
else:
return None