| # |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Logical unit for creating a single instance.""" |
| |
| import OpenSSL |
| import logging |
| import os |
| |
| |
| from ganeti import compat |
| from ganeti import constants |
| from ganeti import errors |
| from ganeti import hypervisor |
| from ganeti import locking |
| from ganeti.masterd import iallocator |
| from ganeti import masterd |
| from ganeti import netutils |
| from ganeti import objects |
| from ganeti import pathutils |
| from ganeti import utils |
| from ganeti import serializer |
| |
| from ganeti.cmdlib.base import LogicalUnit |
| |
| from ganeti.cmdlib.common import \ |
| CheckNodeOnline, \ |
| CheckParamsNotGlobal, \ |
| IsExclusiveStorageEnabledNode, CheckHVParams, CheckOSParams, \ |
| ExpandNodeUuidAndName, \ |
| IsValidDiskAccessModeCombination, \ |
| CheckDiskTemplateEnabled, CheckIAllocatorOrNode, CheckOSImage |
| from ganeti.cmdlib.instance_helpervm import RunWithHelperVM |
| from ganeti.cmdlib.instance_storage import CalculateFileStorageDir, \ |
| CheckNodesFreeDiskPerVG, CheckRADOSFreeSpace, CheckSpindlesExclusiveStorage, \ |
| ComputeDiskSizePerVG, CreateDisks, \ |
| GenerateDiskTemplate, CommitDisks, \ |
| WaitForSync, ComputeDisks, \ |
| ImageDisks, WipeDisks |
| from ganeti.cmdlib.instance_utils import \ |
| CheckNodeNotDrained, CopyLockList, \ |
| ReleaseLocks, CheckNodeVmCapable, \ |
| RemoveDisks, CheckNodeFreeMemory, \ |
| UpdateMetadata, CheckForConflictingIp, \ |
| ComputeInstanceCommunicationNIC, \ |
| ComputeIPolicyInstanceSpecViolation, \ |
| CheckHostnameSane, CheckOpportunisticLocking, \ |
| ComputeFullBeParams, ComputeNics, GetClusterDomainSecret, \ |
| CheckInstanceExistence, CreateInstanceAllocRequest, BuildInstanceHookEnv, \ |
| NICListToTuple, CheckNicsBridgesExist, CheckCompressionTool |
| import ganeti.masterd.instance |
| |
| |
| class LUInstanceCreate(LogicalUnit): |
| """Create an instance. |
| |
| """ |
| HPATH = "instance-add" |
| HTYPE = constants.HTYPE_INSTANCE |
| REQ_BGL = False |
| |
| def _CheckDiskTemplateValid(self): |
| """Checks validity of disk template. |
| |
| """ |
| cluster = self.cfg.GetClusterInfo() |
| if self.op.disk_template is None: |
| # FIXME: It would be better to take the default disk template from the |
| # ipolicy, but for the ipolicy we need the primary node, which we get from |
| # the iallocator, which wants the disk template as input. To solve this |
| # chicken-and-egg problem, it should be possible to specify just a node |
| # group from the iallocator and take the ipolicy from that. |
| self.op.disk_template = cluster.enabled_disk_templates[0] |
| CheckDiskTemplateEnabled(cluster, self.op.disk_template) |
| |
| def _CheckDiskArguments(self): |
| """Checks validity of disk-related arguments. |
| |
| """ |
| # check that disk's names are unique and valid |
| utils.ValidateDeviceNames("disk", self.op.disks) |
| |
| self._CheckDiskTemplateValid() |
| |
| # check disks. parameter names and consistent adopt/no-adopt strategy |
| has_adopt = has_no_adopt = False |
| for disk in self.op.disks: |
| if self.op.disk_template != constants.DT_EXT: |
| utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) |
| if constants.IDISK_ADOPT in disk: |
| has_adopt = True |
| else: |
| has_no_adopt = True |
| if has_adopt and has_no_adopt: |
| raise errors.OpPrereqError("Either all disks are adopted or none is", |
| errors.ECODE_INVAL) |
| if has_adopt: |
| if self.op.disk_template not in constants.DTS_MAY_ADOPT: |
| raise errors.OpPrereqError("Disk adoption is not supported for the" |
| " '%s' disk template" % |
| self.op.disk_template, |
| errors.ECODE_INVAL) |
| if self.op.iallocator is not None: |
| raise errors.OpPrereqError("Disk adoption not allowed with an" |
| " iallocator script", errors.ECODE_INVAL) |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| raise errors.OpPrereqError("Disk adoption not allowed for" |
| " instance import", errors.ECODE_INVAL) |
| else: |
| if self.op.disk_template in constants.DTS_MUST_ADOPT: |
| raise errors.OpPrereqError("Disk template %s requires disk adoption," |
| " but no 'adopt' parameter given" % |
| self.op.disk_template, |
| errors.ECODE_INVAL) |
| |
| self.adopt_disks = has_adopt |
| |
| def _CheckVLANArguments(self): |
| """ Check validity of VLANs if given |
| |
| """ |
| for nic in self.op.nics: |
| vlan = nic.get(constants.INIC_VLAN, None) |
| if vlan: |
| if vlan[0] == ".": |
| # vlan starting with dot means single untagged vlan, |
| # might be followed by trunk (:) |
| if not vlan[1:].isdigit(): |
| vlanlist = vlan[1:].split(':') |
| for vl in vlanlist: |
| if not vl.isdigit(): |
| raise errors.OpPrereqError("Specified VLAN parameter is " |
| "invalid : %s" % vlan, |
| errors.ECODE_INVAL) |
| elif vlan[0] == ":": |
| # Trunk - tagged only |
| vlanlist = vlan[1:].split(':') |
| for vl in vlanlist: |
| if not vl.isdigit(): |
| raise errors.OpPrereqError("Specified VLAN parameter is invalid" |
| " : %s" % vlan, errors.ECODE_INVAL) |
| elif vlan.isdigit(): |
| # This is the simplest case. No dots, only single digit |
| # -> Create untagged access port, dot needs to be added |
| nic[constants.INIC_VLAN] = "." + vlan |
| else: |
| raise errors.OpPrereqError("Specified VLAN parameter is invalid" |
| " : %s" % vlan, errors.ECODE_INVAL) |
| |
| def CheckArguments(self): |
| """Check arguments. |
| |
| """ |
| if self.op.forthcoming and self.op.commit: |
| raise errors.OpPrereqError("Forthcoming generation and commiting are" |
| " mutually exclusive", errors.ECODE_INVAL) |
| |
| # do not require name_check to ease forward/backward compatibility |
| # for tools |
| if self.op.no_install and self.op.start: |
| self.LogInfo("No-installation mode selected, disabling startup") |
| self.op.start = False |
| # validate/normalize the instance name |
| self.op.instance_name = \ |
| netutils.Hostname.GetNormalizedName(self.op.instance_name) |
| |
| if self.op.ip_check and not self.op.name_check: |
| # TODO: make the ip check more flexible and not depend on the name check |
| raise errors.OpPrereqError("Cannot do IP address check without a name" |
| " check", errors.ECODE_INVAL) |
| |
| # instance name verification |
| if self.op.name_check: |
| self.hostname = CheckHostnameSane(self, self.op.instance_name) |
| self.op.instance_name = self.hostname.name |
| # used in CheckPrereq for ip ping check |
| self.check_ip = self.hostname.ip |
| else: |
| self.check_ip = None |
| |
| # add NIC for instance communication |
| if self.op.instance_communication: |
| nic_name = ComputeInstanceCommunicationNIC(self.op.instance_name) |
| |
| for nic in self.op.nics: |
| if nic.get(constants.INIC_NAME, None) == nic_name: |
| break |
| else: |
| self.op.nics.append({constants.INIC_NAME: nic_name, |
| constants.INIC_MAC: constants.VALUE_GENERATE, |
| constants.INIC_IP: constants.NIC_IP_POOL, |
| constants.INIC_NETWORK: |
| self.cfg.GetInstanceCommunicationNetwork()}) |
| |
| # timeouts for unsafe OS installs |
| if self.op.helper_startup_timeout is None: |
| self.op.helper_startup_timeout = constants.HELPER_VM_STARTUP |
| |
| if self.op.helper_shutdown_timeout is None: |
| self.op.helper_shutdown_timeout = constants.HELPER_VM_SHUTDOWN |
| |
| # check nics' parameter names |
| for nic in self.op.nics: |
| utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES) |
| # check that NIC's parameters names are unique and valid |
| utils.ValidateDeviceNames("NIC", self.op.nics) |
| |
| self._CheckVLANArguments() |
| |
| self._CheckDiskArguments() |
| assert self.op.disk_template is not None |
| |
| # file storage checks |
| if (self.op.file_driver and |
| not self.op.file_driver in constants.FILE_DRIVER): |
| raise errors.OpPrereqError("Invalid file driver name '%s'" % |
| self.op.file_driver, errors.ECODE_INVAL) |
| |
| # set default file_driver if unset and required |
| if (not self.op.file_driver and |
| self.op.disk_template in constants.DTS_FILEBASED): |
| self.op.file_driver = constants.FD_DEFAULT |
| |
| ### Node/iallocator related checks |
| CheckIAllocatorOrNode(self, "iallocator", "pnode") |
| |
| if self.op.pnode is not None: |
| if self.op.disk_template in constants.DTS_INT_MIRROR: |
| if self.op.snode is None: |
| raise errors.OpPrereqError("The networked disk templates need" |
| " a mirror node", errors.ECODE_INVAL) |
| elif self.op.snode: |
| self.LogWarning("Secondary node will be ignored on non-mirrored disk" |
| " template") |
| self.op.snode = None |
| |
| CheckOpportunisticLocking(self.op) |
| |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| # On import force_variant must be True, because if we forced it at |
| # initial install, our only chance when importing it back is that it |
| # works again! |
| self.op.force_variant = True |
| |
| if self.op.no_install: |
| self.LogInfo("No-installation mode has no effect during import") |
| |
| if objects.GetOSImage(self.op.osparams): |
| self.LogInfo("OS image has no effect during import") |
| elif self.op.mode == constants.INSTANCE_CREATE: |
| os_image = CheckOSImage(self.op) |
| |
| if self.op.os_type is None and os_image is None: |
| raise errors.OpPrereqError("No guest OS or OS image specified", |
| errors.ECODE_INVAL) |
| |
| if self.op.os_type is not None \ |
| and self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os: |
| raise errors.OpPrereqError("Guest OS '%s' is not allowed for" |
| " installation" % self.op.os_type, |
| errors.ECODE_STATE) |
| elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: |
| if objects.GetOSImage(self.op.osparams): |
| self.LogInfo("OS image has no effect during import") |
| |
| self._cds = GetClusterDomainSecret() |
| |
| # Check handshake to ensure both clusters have the same domain secret |
| src_handshake = self.op.source_handshake |
| if not src_handshake: |
| raise errors.OpPrereqError("Missing source handshake", |
| errors.ECODE_INVAL) |
| |
| errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds, |
| src_handshake) |
| if errmsg: |
| raise errors.OpPrereqError("Invalid handshake: %s" % errmsg, |
| errors.ECODE_INVAL) |
| |
| # Load and check source CA |
| self.source_x509_ca_pem = self.op.source_x509_ca |
| if not self.source_x509_ca_pem: |
| raise errors.OpPrereqError("Missing source X509 CA", |
| errors.ECODE_INVAL) |
| |
| try: |
| (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem, |
| self._cds) |
| except OpenSSL.crypto.Error, err: |
| raise errors.OpPrereqError("Unable to load source X509 CA (%s)" % |
| (err, ), errors.ECODE_INVAL) |
| |
| (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) |
| if errcode is not None: |
| raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ), |
| errors.ECODE_INVAL) |
| |
| self.source_x509_ca = cert |
| |
| src_instance_name = self.op.source_instance_name |
| if not src_instance_name: |
| raise errors.OpPrereqError("Missing source instance name", |
| errors.ECODE_INVAL) |
| |
| self.source_instance_name = \ |
| netutils.GetHostname(name=src_instance_name).name |
| |
| else: |
| raise errors.OpPrereqError("Invalid instance creation mode %r" % |
| self.op.mode, errors.ECODE_INVAL) |
| |
| def ExpandNames(self): |
| """ExpandNames for CreateInstance. |
| |
| Figure out the right locks for instance creation. |
| |
| """ |
| self.needed_locks = {} |
| |
| if self.op.commit: |
| (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) |
| if name is None: |
| raise errors.OpPrereqError("Instance %s unknown" % |
| self.op.instance_name, |
| errors.ECODE_INVAL) |
| self.op.instance_name = name |
| if not self.cfg.GetInstanceInfo(uuid).forthcoming: |
| raise errors.OpPrereqError("Instance %s (with uuid %s) not forthcoming" |
| " but --commit was passed." % (name, uuid), |
| errors.ECODE_STATE) |
| logging.debug("Verified that instance %s with uuid %s is forthcoming", |
| name, uuid) |
| else: |
| # this is just a preventive check, but someone might still add this |
| # instance in the meantime; we check again in CheckPrereq |
| CheckInstanceExistence(self, self.op.instance_name) |
| |
| self.add_locks[locking.LEVEL_INSTANCE] = self.op.instance_name |
| |
| if self.op.commit: |
| (uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) |
| self.needed_locks[locking.LEVEL_NODE] = self.cfg.GetInstanceNodes(uuid) |
| logging.debug("Forthcoming instance %s resides on %s", uuid, |
| self.needed_locks[locking.LEVEL_NODE]) |
| elif self.op.iallocator: |
| # TODO: Find a solution to not lock all nodes in the cluster, e.g. by |
| # specifying a group on instance creation and then selecting nodes from |
| # that group |
| self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET |
| |
| if self.op.opportunistic_locking: |
| self.opportunistic_locks[locking.LEVEL_NODE] = True |
| self.opportunistic_locks[locking.LEVEL_NODE_RES] = True |
| if self.op.disk_template == constants.DT_DRBD8: |
| self.opportunistic_locks_count[locking.LEVEL_NODE] = 2 |
| self.opportunistic_locks_count[locking.LEVEL_NODE_RES] = 2 |
| else: |
| (self.op.pnode_uuid, self.op.pnode) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.pnode_uuid, self.op.pnode) |
| nodelist = [self.op.pnode_uuid] |
| if self.op.snode is not None: |
| (self.op.snode_uuid, self.op.snode) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.snode_uuid, self.op.snode) |
| nodelist.append(self.op.snode_uuid) |
| self.needed_locks[locking.LEVEL_NODE] = nodelist |
| |
| # in case of import lock the source node too |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| src_node = self.op.src_node |
| src_path = self.op.src_path |
| |
| if src_path is None: |
| self.op.src_path = src_path = self.op.instance_name |
| |
| if src_node is None: |
| self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET |
| self.op.src_node = None |
| if os.path.isabs(src_path): |
| raise errors.OpPrereqError("Importing an instance from a path" |
| " requires a source node option", |
| errors.ECODE_INVAL) |
| else: |
| (self.op.src_node_uuid, self.op.src_node) = (_, src_node) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.src_node_uuid, src_node) |
| if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: |
| self.needed_locks[locking.LEVEL_NODE].append(self.op.src_node_uuid) |
| if not os.path.isabs(src_path): |
| self.op.src_path = \ |
| utils.PathJoin(pathutils.EXPORT_DIR, src_path) |
| |
| self.needed_locks[locking.LEVEL_NODE_RES] = \ |
| CopyLockList(self.needed_locks[locking.LEVEL_NODE]) |
| |
| # Optimistically acquire shared group locks (we're reading the |
| # configuration). We can't just call GetInstanceNodeGroups, because the |
| # instance doesn't exist yet. Therefore we lock all node groups of all |
| # nodes we have. |
| if self.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET: |
| # In the case we lock all nodes for opportunistic allocation, we have no |
| # choice than to lock all groups, because they're allocated before nodes. |
| # This is sad, but true. At least we release all those we don't need in |
| # CheckPrereq later. |
| self.needed_locks[locking.LEVEL_NODEGROUP] = locking.ALL_SET |
| else: |
| self.needed_locks[locking.LEVEL_NODEGROUP] = \ |
| list(self.cfg.GetNodeGroupsFromNodes( |
| self.needed_locks[locking.LEVEL_NODE])) |
| self.share_locks[locking.LEVEL_NODEGROUP] = 1 |
| |
| def DeclareLocks(self, level): |
| if level == locking.LEVEL_NODE_RES: |
| if self.op.opportunistic_locking: |
| self.needed_locks[locking.LEVEL_NODE_RES] = \ |
| CopyLockList(list(self.owned_locks(locking.LEVEL_NODE))) |
| |
| def _RunAllocator(self): |
| """Run the allocator based on input opcode. |
| |
| """ |
| if self.op.opportunistic_locking: |
| # Only consider nodes for which a lock is held |
| node_name_whitelist = self.cfg.GetNodeNames( |
| set(self.owned_locks(locking.LEVEL_NODE)) & |
| set(self.owned_locks(locking.LEVEL_NODE_RES))) |
| logging.debug("Trying to allocate on nodes %s", node_name_whitelist) |
| else: |
| node_name_whitelist = None |
| |
| req = CreateInstanceAllocRequest(self.op, self.disks, |
| self.nics, self.be_full, |
| node_name_whitelist) |
| ial = iallocator.IAllocator(self.cfg, self.rpc, req) |
| |
| ial.Run(self.op.iallocator) |
| |
| if not ial.success: |
| # When opportunistic locks are used only a temporary failure is generated |
| if self.op.opportunistic_locking: |
| ecode = errors.ECODE_TEMP_NORES |
| self.LogInfo("IAllocator '%s' failed on opportunistically acquired" |
| " nodes: %s", self.op.iallocator, ial.info) |
| else: |
| ecode = errors.ECODE_NORES |
| |
| raise errors.OpPrereqError("Can't compute nodes using" |
| " iallocator '%s': %s" % |
| (self.op.iallocator, ial.info), |
| ecode) |
| |
| (self.op.pnode_uuid, self.op.pnode) = \ |
| ExpandNodeUuidAndName(self.cfg, None, ial.result[0]) |
| self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", |
| self.op.instance_name, self.op.iallocator, |
| utils.CommaJoin(ial.result)) |
| |
| assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator" |
| |
| if req.RequiredNodes() == 2: |
| (self.op.snode_uuid, self.op.snode) = \ |
| ExpandNodeUuidAndName(self.cfg, None, ial.result[1]) |
| |
| def BuildHooksEnv(self): |
| """Build hooks env. |
| |
| This runs on master, primary and secondary nodes of the instance. |
| |
| """ |
| env = { |
| "ADD_MODE": self.op.mode, |
| } |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| env["SRC_NODE"] = self.op.src_node |
| env["SRC_PATH"] = self.op.src_path |
| env["SRC_IMAGES"] = self.src_images |
| |
| env.update(BuildInstanceHookEnv( |
| name=self.op.instance_name, |
| primary_node_name=self.op.pnode, |
| secondary_node_names=self.cfg.GetNodeNames(self.secondaries), |
| status=self.op.start, |
| os_type=self.op.os_type, |
| minmem=self.be_full[constants.BE_MINMEM], |
| maxmem=self.be_full[constants.BE_MAXMEM], |
| vcpus=self.be_full[constants.BE_VCPUS], |
| nics=NICListToTuple(self, self.nics), |
| disk_template=self.op.disk_template, |
| # Note that self.disks here is not a list with objects.Disk |
| # but with dicts as returned by ComputeDisks. |
| disks=self.disks, |
| bep=self.be_full, |
| hvp=self.hv_full, |
| hypervisor_name=self.op.hypervisor, |
| tags=self.op.tags, |
| )) |
| |
| return env |
| |
| def BuildHooksNodes(self): |
| """Build hooks nodes. |
| |
| """ |
| nl = [self.cfg.GetMasterNode(), self.op.pnode_uuid] + self.secondaries |
| return nl, nl |
| |
| def _ReadExportInfo(self): |
| """Reads the export information from disk. |
| |
| It will override the opcode source node and path with the actual |
| information, if these two were not specified before. |
| |
| @return: the export information |
| |
| """ |
| assert self.op.mode == constants.INSTANCE_IMPORT |
| |
| if self.op.src_node_uuid is None: |
| locked_nodes = self.owned_locks(locking.LEVEL_NODE) |
| exp_list = self.rpc.call_export_list(locked_nodes) |
| found = False |
| for node_uuid in exp_list: |
| if exp_list[node_uuid].fail_msg: |
| continue |
| if self.op.src_path in exp_list[node_uuid].payload: |
| found = True |
| self.op.src_node = self.cfg.GetNodeInfo(node_uuid).name |
| self.op.src_node_uuid = node_uuid |
| self.op.src_path = utils.PathJoin(pathutils.EXPORT_DIR, |
| self.op.src_path) |
| break |
| if not found: |
| raise errors.OpPrereqError("No export found for relative path %s" % |
| self.op.src_path, errors.ECODE_INVAL) |
| |
| CheckNodeOnline(self, self.op.src_node_uuid) |
| result = self.rpc.call_export_info(self.op.src_node_uuid, self.op.src_path) |
| result.Raise("No export or invalid export found in dir %s" % |
| self.op.src_path) |
| |
| export_info = objects.SerializableConfigParser.Loads(str(result.payload)) |
| if not export_info.has_section(constants.INISECT_EXP): |
| raise errors.ProgrammerError("Corrupted export config", |
| errors.ECODE_ENVIRON) |
| |
| ei_version = export_info.get(constants.INISECT_EXP, "version") |
| if int(ei_version) != constants.EXPORT_VERSION: |
| raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % |
| (ei_version, constants.EXPORT_VERSION), |
| errors.ECODE_ENVIRON) |
| return export_info |
| |
| def _ReadExportParams(self, einfo): |
| """Use export parameters as defaults. |
| |
| In case the opcode doesn't specify (as in override) some instance |
| parameters, then try to use them from the export information, if |
| that declares them. |
| |
| """ |
| self.op.os_type = einfo.get(constants.INISECT_EXP, "os") |
| |
| if not self.op.disks: |
| disks = [] |
| # TODO: import the disk iv_name too |
| for idx in range(constants.MAX_DISKS): |
| if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx): |
| disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) |
| disk_name = einfo.get(constants.INISECT_INS, "disk%d_name" % idx) |
| disk = { |
| constants.IDISK_SIZE: disk_sz, |
| constants.IDISK_NAME: disk_name |
| } |
| disks.append(disk) |
| self.op.disks = disks |
| if not disks and self.op.disk_template != constants.DT_DISKLESS: |
| raise errors.OpPrereqError("No disk info specified and the export" |
| " is missing the disk information", |
| errors.ECODE_INVAL) |
| |
| if not self.op.nics: |
| nics = [] |
| for idx in range(constants.MAX_NICS): |
| if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx): |
| ndict = {} |
| for name in [constants.INIC_IP, |
| constants.INIC_MAC, constants.INIC_NAME]: |
| nic_param_name = "nic%d_%s" % (idx, name) |
| if einfo.has_option(constants.INISECT_INS, nic_param_name): |
| v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) |
| ndict[name] = v |
| network = einfo.get(constants.INISECT_INS, |
| "nic%d_%s" % (idx, constants.INIC_NETWORK)) |
| # in case network is given link and mode are inherited |
| # from nodegroup's netparams and thus should not be passed here |
| if network: |
| ndict[constants.INIC_NETWORK] = network |
| else: |
| for name in list(constants.NICS_PARAMETERS): |
| v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) |
| ndict[name] = v |
| nics.append(ndict) |
| else: |
| break |
| self.op.nics = nics |
| |
| if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): |
| self.op.tags = einfo.get(constants.INISECT_INS, "tags").split() |
| |
| if (self.op.hypervisor is None and |
| einfo.has_option(constants.INISECT_INS, "hypervisor")): |
| self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") |
| |
| if einfo.has_section(constants.INISECT_HYP): |
| # use the export parameters but do not override the ones |
| # specified by the user |
| for name, value in einfo.items(constants.INISECT_HYP): |
| if name not in self.op.hvparams: |
| self.op.hvparams[name] = value |
| |
| if einfo.has_section(constants.INISECT_BEP): |
| # use the parameters, without overriding |
| for name, value in einfo.items(constants.INISECT_BEP): |
| if name not in self.op.beparams: |
| self.op.beparams[name] = value |
| # Compatibility for the old "memory" be param |
| if name == constants.BE_MEMORY: |
| if constants.BE_MAXMEM not in self.op.beparams: |
| self.op.beparams[constants.BE_MAXMEM] = value |
| if constants.BE_MINMEM not in self.op.beparams: |
| self.op.beparams[constants.BE_MINMEM] = value |
| else: |
| # try to read the parameters old style, from the main section |
| for name in constants.BES_PARAMETERS: |
| if (name not in self.op.beparams and |
| einfo.has_option(constants.INISECT_INS, name)): |
| self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) |
| |
| if einfo.has_section(constants.INISECT_OSP): |
| # use the parameters, without overriding |
| for name, value in einfo.items(constants.INISECT_OSP): |
| if name not in self.op.osparams: |
| self.op.osparams[name] = value |
| |
| if einfo.has_section(constants.INISECT_OSP_PRIVATE): |
| # use the parameters, without overriding |
| for name, value in einfo.items(constants.INISECT_OSP_PRIVATE): |
| if name not in self.op.osparams_private: |
| self.op.osparams_private[name] = serializer.Private(value, descr=name) |
| |
| def _RevertToDefaults(self, cluster): |
| """Revert the instance parameters to the default values. |
| |
| """ |
| # hvparams |
| hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {}) |
| for name in self.op.hvparams.keys(): |
| if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: |
| del self.op.hvparams[name] |
| # beparams |
| be_defs = cluster.SimpleFillBE({}) |
| for name in self.op.beparams.keys(): |
| if name in be_defs and be_defs[name] == self.op.beparams[name]: |
| del self.op.beparams[name] |
| # nic params |
| nic_defs = cluster.SimpleFillNIC({}) |
| for nic in self.op.nics: |
| for name in constants.NICS_PARAMETERS: |
| if name in nic and name in nic_defs and nic[name] == nic_defs[name]: |
| del nic[name] |
| # osparams |
| os_defs = cluster.SimpleFillOS(self.op.os_type, {}) |
| for name in self.op.osparams.keys(): |
| if name in os_defs and os_defs[name] == self.op.osparams[name]: |
| del self.op.osparams[name] |
| |
| os_defs_ = cluster.SimpleFillOS(self.op.os_type, {}, |
| os_params_private={}) |
| for name in self.op.osparams_private.keys(): |
| if name in os_defs_ and os_defs_[name] == self.op.osparams_private[name]: |
| del self.op.osparams_private[name] |
| |
| def _GetNodesFromForthcomingInstance(self): |
| """Set nodes as in the forthcoming instance |
| |
| """ |
| (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) |
| inst = self.cfg.GetInstanceInfo(uuid) |
| self.op.pnode_uuid = inst.primary_node |
| self.op.pnode = self.cfg.GetNodeName(inst.primary_node) |
| sec_nodes = self.cfg.GetInstanceSecondaryNodes(uuid) |
| node_names = [self.op.pnode] |
| if sec_nodes: |
| self.op.snode_uuid = sec_nodes[0] |
| self.op.snode = self.cfg.GetNodeName(sec_nodes[0]) |
| node_names.append(self.op.snode) |
| self.LogInfo("Nodes of instance %s: %s", name, node_names) |
| |
| def CheckPrereq(self): # pylint: disable=R0914 |
| """Check prerequisites. |
| |
| """ |
| owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) |
| |
| if self.op.commit: |
| # Check that the instance is still on the cluster, forthcoming, and |
| # still resides on the nodes we acquired. |
| (uuid, name) = self.cfg.ExpandInstanceName(self.op.instance_name) |
| if uuid is None: |
| raise errors.OpPrereqError("Instance %s disappeared from the cluster" |
| " while waiting for locks" |
| % (self.op.instance_name,), |
| errors.ECODE_STATE) |
| if not self.cfg.GetInstanceInfo(uuid).forthcoming: |
| raise errors.OpPrereqError("Instance %s (with uuid %s) is no longer" |
| " forthcoming" % (name, uuid), |
| errors.ECODE_STATE) |
| required_nodes = self.cfg.GetInstanceNodes(uuid) |
| if not owned_nodes.issuperset(required_nodes): |
| raise errors.OpPrereqError("Forthcoming instance %s nodes changed" |
| " since locks were acquired; retry the" |
| " operation" % self.op.instance_name, |
| errors.ECODE_STATE) |
| else: |
| CheckInstanceExistence(self, self.op.instance_name) |
| |
| # Check that the optimistically acquired groups are correct wrt the |
| # acquired nodes |
| owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) |
| cur_groups = list(self.cfg.GetNodeGroupsFromNodes(owned_nodes)) |
| if not owned_groups.issuperset(cur_groups): |
| raise errors.OpPrereqError("New instance %s's node groups changed since" |
| " locks were acquired, current groups are" |
| " are '%s', owning groups '%s'; retry the" |
| " operation" % |
| (self.op.instance_name, |
| utils.CommaJoin(cur_groups), |
| utils.CommaJoin(owned_groups)), |
| errors.ECODE_STATE) |
| |
| self.instance_file_storage_dir = CalculateFileStorageDir( |
| self.op.disk_template, self.cfg, self.op.instance_name, |
| self.op.file_storage_dir) |
| |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| export_info = self._ReadExportInfo() |
| self._ReadExportParams(export_info) |
| self._old_instance_name = export_info.get(constants.INISECT_INS, "name") |
| else: |
| self._old_instance_name = None |
| |
| if (not self.cfg.GetVGName() and |
| self.op.disk_template not in constants.DTS_NOT_LVM): |
| raise errors.OpPrereqError("Cluster does not support lvm-based" |
| " instances", errors.ECODE_STATE) |
| |
| if (self.op.hypervisor is None or |
| self.op.hypervisor == constants.VALUE_AUTO): |
| self.op.hypervisor = self.cfg.GetHypervisorType() |
| |
| cluster = self.cfg.GetClusterInfo() |
| enabled_hvs = cluster.enabled_hypervisors |
| if self.op.hypervisor not in enabled_hvs: |
| raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" |
| " cluster (%s)" % |
| (self.op.hypervisor, ",".join(enabled_hvs)), |
| errors.ECODE_STATE) |
| |
| # Check tag validity |
| for tag in self.op.tags: |
| objects.TaggableObject.ValidateTag(tag) |
| |
| # check hypervisor parameter syntax (locally) |
| utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) |
| filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, |
| self.op.hvparams) |
| hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor) |
| hv_type.CheckParameterSyntax(filled_hvp) |
| self.hv_full = filled_hvp |
| # check that we don't specify global parameters on an instance |
| CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor", |
| "instance", "cluster") |
| |
| # fill and remember the beparams dict |
| self.be_full = ComputeFullBeParams(self.op, cluster) |
| |
| # build os parameters |
| if self.op.osparams_private is None: |
| self.op.osparams_private = serializer.PrivateDict() |
| if self.op.osparams_secret is None: |
| self.op.osparams_secret = serializer.PrivateDict() |
| |
| self.os_full = cluster.SimpleFillOS( |
| self.op.os_type, |
| self.op.osparams, |
| os_params_private=self.op.osparams_private, |
| os_params_secret=self.op.osparams_secret |
| ) |
| |
| # now that hvp/bep are in final format, let's reset to defaults, |
| # if told to do so |
| if self.op.identify_defaults: |
| self._RevertToDefaults(cluster) |
| |
| # NIC buildup |
| self.nics = ComputeNics(self.op, cluster, self.check_ip, self.cfg, |
| self.proc.GetECId()) |
| |
| # disk checks/pre-build |
| default_vg = self.cfg.GetVGName() |
| self.disks = ComputeDisks(self.op.disks, self.op.disk_template, default_vg) |
| |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| disk_images = [] |
| for idx in range(len(self.disks)): |
| option = "disk%d_dump" % idx |
| if export_info.has_option(constants.INISECT_INS, option): |
| # FIXME: are the old os-es, disk sizes, etc. useful? |
| export_name = export_info.get(constants.INISECT_INS, option) |
| image = utils.PathJoin(self.op.src_path, export_name) |
| disk_images.append(image) |
| else: |
| disk_images.append(False) |
| |
| self.src_images = disk_images |
| |
| if self.op.instance_name == self._old_instance_name: |
| for idx, nic in enumerate(self.nics): |
| if nic.mac == constants.VALUE_AUTO: |
| nic_mac_ini = "nic%d_mac" % idx |
| nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) |
| |
| # ENDIF: self.op.mode == constants.INSTANCE_IMPORT |
| |
| # ip ping checks (we use the same ip that was resolved in ExpandNames) |
| if self.op.ip_check: |
| if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): |
| raise errors.OpPrereqError("IP %s of instance %s already in use" % |
| (self.check_ip, self.op.instance_name), |
| errors.ECODE_NOTUNIQUE) |
| |
| #### mac address generation |
| # By generating here the mac address both the allocator and the hooks get |
| # the real final mac address rather than the 'auto' or 'generate' value. |
| # There is a race condition between the generation and the instance object |
| # creation, which means that we know the mac is valid now, but we're not |
| # sure it will be when we actually add the instance. If things go bad |
| # adding the instance will abort because of a duplicate mac, and the |
| # creation job will fail. |
| for nic in self.nics: |
| if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): |
| nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId()) |
| |
| #### allocator run |
| |
| if self.op.iallocator is not None: |
| if self.op.commit: |
| self._GetNodesFromForthcomingInstance() |
| else: |
| self._RunAllocator() |
| |
| # Release all unneeded node locks |
| keep_locks = filter(None, [self.op.pnode_uuid, self.op.snode_uuid, |
| self.op.src_node_uuid]) |
| ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks) |
| ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks) |
| # Release all unneeded group locks |
| ReleaseLocks(self, locking.LEVEL_NODEGROUP, |
| keep=self.cfg.GetNodeGroupsFromNodes(keep_locks)) |
| |
| assert (self.owned_locks(locking.LEVEL_NODE) == |
| self.owned_locks(locking.LEVEL_NODE_RES)), \ |
| ("Node locks differ from node resource locks (%s vs %s)" |
| % (self.owned_locks(locking.LEVEL_NODE), |
| self.owned_locks(locking.LEVEL_NODE_RES))) |
| |
| #### node related checks |
| |
| # check primary node |
| self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode_uuid) |
| assert self.pnode is not None, \ |
| "Cannot retrieve locked node %s" % self.op.pnode_uuid |
| if pnode.offline: |
| raise errors.OpPrereqError("Cannot use offline primary node '%s'" % |
| pnode.name, errors.ECODE_STATE) |
| if pnode.drained: |
| raise errors.OpPrereqError("Cannot use drained primary node '%s'" % |
| pnode.name, errors.ECODE_STATE) |
| if not pnode.vm_capable: |
| raise errors.OpPrereqError("Cannot use non-vm_capable primary node" |
| " '%s'" % pnode.name, errors.ECODE_STATE) |
| |
| self.secondaries = [] |
| |
| # Fill in any IPs from IP pools. This must happen here, because we need to |
| # know the nic's primary node, as specified by the iallocator |
| for idx, nic in enumerate(self.nics): |
| net_uuid = nic.network |
| if net_uuid is not None: |
| nobj = self.cfg.GetNetwork(net_uuid) |
| netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.uuid) |
| if netparams is None: |
| raise errors.OpPrereqError("No netparams found for network" |
| " %s. Probably not connected to" |
| " node's %s nodegroup" % |
| (nobj.name, self.pnode.name), |
| errors.ECODE_INVAL) |
| self.LogInfo("NIC/%d inherits netparams %s" % |
| (idx, netparams.values())) |
| nic.nicparams = dict(netparams) |
| if nic.ip is not None: |
| if nic.ip.lower() == constants.NIC_IP_POOL: |
| try: |
| nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId()) |
| except errors.ReservationError: |
| raise errors.OpPrereqError("Unable to get a free IP for NIC %d" |
| " from the address pool" % idx, |
| errors.ECODE_STATE) |
| self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name) |
| else: |
| try: |
| self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId(), |
| check=self.op.conflicts_check) |
| except errors.ReservationError: |
| raise errors.OpPrereqError("IP address %s already in use" |
| " or does not belong to network %s" % |
| (nic.ip, nobj.name), |
| errors.ECODE_NOTUNIQUE) |
| |
| # net is None, ip None or given |
| elif self.op.conflicts_check: |
| CheckForConflictingIp(self, nic.ip, self.pnode.uuid) |
| |
| # mirror node verification |
| if self.op.disk_template in constants.DTS_INT_MIRROR: |
| if self.op.snode_uuid == pnode.uuid: |
| raise errors.OpPrereqError("The secondary node cannot be the" |
| " primary node", errors.ECODE_INVAL) |
| CheckNodeOnline(self, self.op.snode_uuid) |
| CheckNodeNotDrained(self, self.op.snode_uuid) |
| CheckNodeVmCapable(self, self.op.snode_uuid) |
| self.secondaries.append(self.op.snode_uuid) |
| |
| snode = self.cfg.GetNodeInfo(self.op.snode_uuid) |
| if pnode.group != snode.group: |
| self.LogWarning("The primary and secondary nodes are in two" |
| " different node groups; the disk parameters" |
| " from the first disk's node group will be" |
| " used") |
| |
| nodes = [pnode] |
| if self.op.disk_template in constants.DTS_INT_MIRROR: |
| nodes.append(snode) |
| has_es = lambda n: IsExclusiveStorageEnabledNode(self.cfg, n) |
| excl_stor = compat.any(map(has_es, nodes)) |
| if excl_stor and not self.op.disk_template in constants.DTS_EXCL_STORAGE: |
| raise errors.OpPrereqError("Disk template %s not supported with" |
| " exclusive storage" % self.op.disk_template, |
| errors.ECODE_STATE) |
| for disk in self.disks: |
| CheckSpindlesExclusiveStorage(disk, excl_stor, True) |
| |
| node_uuids = [pnode.uuid] + self.secondaries |
| |
| if not self.adopt_disks: |
| if self.op.disk_template == constants.DT_RBD: |
| # _CheckRADOSFreeSpace() is just a placeholder. |
| # Any function that checks prerequisites can be placed here. |
| # Check if there is enough space on the RADOS cluster. |
| CheckRADOSFreeSpace() |
| elif self.op.disk_template == constants.DT_EXT: |
| # FIXME: Function that checks prereqs if needed |
| pass |
| elif self.op.disk_template in constants.DTS_LVM: |
| # Check lv size requirements, if not adopting |
| req_sizes = ComputeDiskSizePerVG(self.op.disk_template, self.disks) |
| CheckNodesFreeDiskPerVG(self, node_uuids, req_sizes) |
| else: |
| # FIXME: add checks for other, non-adopting, non-lvm disk templates |
| pass |
| |
| elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data |
| all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], |
| disk[constants.IDISK_ADOPT]) |
| for disk in self.disks]) |
| if len(all_lvs) != len(self.disks): |
| raise errors.OpPrereqError("Duplicate volume names given for adoption", |
| errors.ECODE_INVAL) |
| for lv_name in all_lvs: |
| try: |
| # FIXME: lv_name here is "vg/lv" need to ensure that other calls |
| # to ReserveLV uses the same syntax |
| self.cfg.ReserveLV(lv_name, self.proc.GetECId()) |
| except errors.ReservationError: |
| raise errors.OpPrereqError("LV named %s used by another instance" % |
| lv_name, errors.ECODE_NOTUNIQUE) |
| |
| vg_names = self.rpc.call_vg_list([pnode.uuid])[pnode.uuid] |
| vg_names.Raise("Cannot get VG information from node %s" % pnode.name, |
| prereq=True) |
| |
| node_lvs = self.rpc.call_lv_list([pnode.uuid], |
| vg_names.payload.keys())[pnode.uuid] |
| node_lvs.Raise("Cannot get LV information from node %s" % pnode.name, |
| prereq=True) |
| node_lvs = node_lvs.payload |
| |
| delta = all_lvs.difference(node_lvs.keys()) |
| if delta: |
| raise errors.OpPrereqError("Missing logical volume(s): %s" % |
| utils.CommaJoin(delta), |
| errors.ECODE_INVAL) |
| online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]] |
| if online_lvs: |
| raise errors.OpPrereqError("Online logical volumes found, cannot" |
| " adopt: %s" % utils.CommaJoin(online_lvs), |
| errors.ECODE_STATE) |
| # update the size of disk based on what is found |
| for dsk in self.disks: |
| dsk[constants.IDISK_SIZE] = \ |
| int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG], |
| dsk[constants.IDISK_ADOPT])][0])) |
| |
| elif self.op.disk_template == constants.DT_BLOCK: |
| # Normalize and de-duplicate device paths |
| all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT]) |
| for disk in self.disks]) |
| if len(all_disks) != len(self.disks): |
| raise errors.OpPrereqError("Duplicate disk names given for adoption", |
| errors.ECODE_INVAL) |
| baddisks = [d for d in all_disks |
| if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)] |
| if baddisks: |
| raise errors.OpPrereqError("Device node(s) %s lie outside %s and" |
| " cannot be adopted" % |
| (utils.CommaJoin(baddisks), |
| constants.ADOPTABLE_BLOCKDEV_ROOT), |
| errors.ECODE_INVAL) |
| |
| node_disks = self.rpc.call_bdev_sizes([pnode.uuid], |
| list(all_disks))[pnode.uuid] |
| node_disks.Raise("Cannot get block device information from node %s" % |
| pnode.name, prereq=True) |
| node_disks = node_disks.payload |
| delta = all_disks.difference(node_disks.keys()) |
| if delta: |
| raise errors.OpPrereqError("Missing block device(s): %s" % |
| utils.CommaJoin(delta), |
| errors.ECODE_INVAL) |
| for dsk in self.disks: |
| dsk[constants.IDISK_SIZE] = \ |
| int(float(node_disks[dsk[constants.IDISK_ADOPT]])) |
| |
| # Check disk access param to be compatible with specified hypervisor |
| node_info = self.cfg.GetNodeInfo(self.op.pnode_uuid) |
| node_group = self.cfg.GetNodeGroup(node_info.group) |
| group_disk_params = self.cfg.GetGroupDiskParams(node_group) |
| group_access_type = group_disk_params[self.op.disk_template].get( |
| constants.RBD_ACCESS, constants.DISK_KERNELSPACE |
| ) |
| for dsk in self.disks: |
| access_type = dsk.get(constants.IDISK_ACCESS, group_access_type) |
| if not IsValidDiskAccessModeCombination(self.op.hypervisor, |
| self.op.disk_template, |
| access_type): |
| raise errors.OpPrereqError("Selected hypervisor (%s) cannot be" |
| " used with %s disk access param" % |
| (self.op.hypervisor, access_type), |
| errors.ECODE_STATE) |
| |
| # Verify instance specs |
| spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) |
| ispec = { |
| constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), |
| constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), |
| constants.ISPEC_DISK_COUNT: len(self.disks), |
| constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] |
| for disk in self.disks], |
| constants.ISPEC_NIC_COUNT: len(self.nics), |
| constants.ISPEC_SPINDLE_USE: spindle_use, |
| } |
| |
| group_info = self.cfg.GetNodeGroup(pnode.group) |
| ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) |
| disk_types = [self.op.disk_template] * len(self.disks) |
| res = ComputeIPolicyInstanceSpecViolation(ipolicy, ispec, disk_types) |
| if not self.op.ignore_ipolicy and res: |
| msg = ("Instance allocation to group %s (%s) violates policy: %s" % |
| (pnode.group, group_info.name, utils.CommaJoin(res))) |
| raise errors.OpPrereqError(msg, errors.ECODE_INVAL) |
| |
| CheckHVParams(self, node_uuids, self.op.hypervisor, self.op.hvparams) |
| |
| CheckOSParams(self, True, node_uuids, self.op.os_type, self.os_full, |
| self.op.force_variant) |
| |
| CheckNicsBridgesExist(self, self.nics, self.pnode.uuid) |
| |
| CheckCompressionTool(self, self.op.compress) |
| |
| #TODO: _CheckExtParams (remotely) |
| # Check parameters for extstorage |
| |
| # memory check on primary node |
| #TODO(dynmem): use MINMEM for checking |
| if self.op.start: |
| hvfull = objects.FillDict(cluster.hvparams.get(self.op.hypervisor, {}), |
| self.op.hvparams) |
| CheckNodeFreeMemory(self, self.pnode.uuid, |
| "creating instance %s" % self.op.instance_name, |
| self.be_full[constants.BE_MAXMEM], |
| self.op.hypervisor, hvfull) |
| |
| self.dry_run_result = list(node_uuids) |
| |
| def _RemoveDegradedDisks(self, feedback_fn, disk_abort, instance): |
| """Removes degraded disks and instance. |
| |
| It optionally checks whether disks are degraded. If the disks are |
| degraded, they are removed and the instance is also removed from |
| the configuration. |
| |
| If L{disk_abort} is True, then the disks are considered degraded |
| and removed, and the instance is removed from the configuration. |
| |
| If L{disk_abort} is False, then it first checks whether disks are |
| degraded and, if so, it removes the disks and the instance is |
| removed from the configuration. |
| |
| @type feedback_fn: callable |
| @param feedback_fn: function used send feedback back to the caller |
| |
| @type disk_abort: boolean |
| @param disk_abort: |
| True if disks are degraded, False to first check if disks are |
| degraded |
| @type instance: L{objects.Instance} |
| @param instance: instance containing the disks to check |
| |
| @rtype: NoneType |
| @return: None |
| @raise errors.OpPrereqError: if disks are degraded |
| |
| """ |
| disk_info = self.cfg.GetInstanceDisks(instance.uuid) |
| if disk_abort: |
| pass |
| elif self.op.wait_for_sync: |
| disk_abort = not WaitForSync(self, instance) |
| elif utils.AnyDiskOfType(disk_info, constants.DTS_INT_MIRROR): |
| # make sure the disks are not degraded (still sync-ing is ok) |
| feedback_fn("* checking mirrors status") |
| disk_abort = not WaitForSync(self, instance, oneshot=True) |
| else: |
| disk_abort = False |
| |
| if disk_abort: |
| RemoveDisks(self, instance) |
| for disk_uuid in instance.disks: |
| self.cfg.RemoveInstanceDisk(instance.uuid, disk_uuid) |
| self.cfg.RemoveInstance(instance.uuid) |
| raise errors.OpExecError("There are some degraded disks for" |
| " this instance") |
| |
| def RunOsScripts(self, feedback_fn, iobj): |
| """Run OS scripts |
| |
| If necessary, disks are paused. It handles instance create, |
| import, and remote import. |
| |
| @type feedback_fn: callable |
| @param feedback_fn: function used send feedback back to the caller |
| |
| @type iobj: L{objects.Instance} |
| @param iobj: instance object |
| |
| """ |
| if iobj.disks and not self.adopt_disks: |
| disks = self.cfg.GetInstanceDisks(iobj.uuid) |
| if self.op.mode == constants.INSTANCE_CREATE: |
| os_image = objects.GetOSImage(self.op.osparams) |
| |
| if os_image is None and not self.op.no_install: |
| pause_sync = (not self.op.wait_for_sync and |
| utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR)) |
| if pause_sync: |
| feedback_fn("* pausing disk sync to install instance OS") |
| result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, |
| (disks, iobj), |
| True) |
| for idx, success in enumerate(result.payload): |
| if not success: |
| logging.warn("pause-sync of instance %s for disk %d failed", |
| self.op.instance_name, idx) |
| |
| feedback_fn("* running the instance OS create scripts...") |
| # FIXME: pass debug option from opcode to backend |
| os_add_result = \ |
| self.rpc.call_instance_os_add(self.pnode.uuid, |
| (iobj, self.op.osparams_secret), |
| False, |
| self.op.debug_level) |
| if pause_sync: |
| feedback_fn("* resuming disk sync") |
| result = self.rpc.call_blockdev_pause_resume_sync(self.pnode.uuid, |
| (disks, iobj), |
| False) |
| for idx, success in enumerate(result.payload): |
| if not success: |
| logging.warn("resume-sync of instance %s for disk %d failed", |
| self.op.instance_name, idx) |
| |
| os_add_result.Raise("Could not add os for instance %s" |
| " on node %s" % (self.op.instance_name, |
| self.pnode.name)) |
| |
| else: |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| feedback_fn("* running the instance OS import scripts...") |
| |
| transfers = [] |
| |
| for idx, image in enumerate(self.src_images): |
| if not image: |
| continue |
| |
| if iobj.os: |
| dst_io = constants.IEIO_SCRIPT |
| dst_ioargs = ((disks[idx], iobj), idx) |
| else: |
| dst_io = constants.IEIO_RAW_DISK |
| dst_ioargs = (disks[idx], iobj) |
| |
| # FIXME: pass debug option from opcode to backend |
| dt = masterd.instance.DiskTransfer("disk/%s" % idx, |
| constants.IEIO_FILE, (image, ), |
| dst_io, dst_ioargs, |
| None) |
| transfers.append(dt) |
| |
| import_result = \ |
| masterd.instance.TransferInstanceData(self, feedback_fn, |
| self.op.src_node_uuid, |
| self.pnode.uuid, |
| self.pnode.secondary_ip, |
| self.op.compress, |
| iobj, transfers) |
| if not compat.all(import_result): |
| self.LogWarning("Some disks for instance %s on node %s were not" |
| " imported successfully" % (self.op.instance_name, |
| self.pnode.name)) |
| |
| rename_from = self._old_instance_name |
| |
| elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: |
| feedback_fn("* preparing remote import...") |
| # The source cluster will stop the instance before attempting to make |
| # a connection. In some cases stopping an instance can take a long |
| # time, hence the shutdown timeout is added to the connection |
| # timeout. |
| connect_timeout = (constants.RIE_CONNECT_TIMEOUT + |
| self.op.source_shutdown_timeout) |
| timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) |
| |
| assert iobj.primary_node == self.pnode.uuid |
| disk_results = \ |
| masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode, |
| self.source_x509_ca, |
| self._cds, self.op.compress, timeouts) |
| if not compat.all(disk_results): |
| # TODO: Should the instance still be started, even if some disks |
| # failed to import (valid for local imports, too)? |
| self.LogWarning("Some disks for instance %s on node %s were not" |
| " imported successfully" % (self.op.instance_name, |
| self.pnode.name)) |
| |
| rename_from = self.source_instance_name |
| |
| else: |
| # also checked in the prereq part |
| raise errors.ProgrammerError("Unknown OS initialization mode '%s'" |
| % self.op.mode) |
| |
| assert iobj.name == self.op.instance_name |
| |
| # Run rename script on newly imported instance |
| if iobj.os: |
| feedback_fn("Running rename script for %s" % self.op.instance_name) |
| result = self.rpc.call_instance_run_rename(self.pnode.uuid, iobj, |
| rename_from, |
| self.op.debug_level) |
| result.Warn("Failed to run rename script for %s on node %s" % |
| (self.op.instance_name, self.pnode.name), self.LogWarning) |
| |
| def GetOsInstallPackageEnvironment(self, instance, script): |
| """Returns the OS scripts environment for the helper VM |
| |
| @type instance: L{objects.Instance} |
| @param instance: instance for which the OS scripts are run |
| |
| @type script: string |
| @param script: script to run (e.g., |
| constants.OS_SCRIPT_CREATE_UNTRUSTED) |
| |
| @rtype: dict of string to string |
| @return: OS scripts environment for the helper VM |
| |
| """ |
| env = {"OS_SCRIPT": script} |
| |
| # We pass only the instance's disks, not the helper VM's disks. |
| if instance.hypervisor == constants.HT_KVM: |
| prefix = "/dev/vd" |
| elif instance.hypervisor in [constants.HT_XEN_PVM, constants.HT_XEN_HVM]: |
| prefix = "/dev/xvd" |
| else: |
| raise errors.OpExecError("Cannot run OS scripts in a virtualized" |
| " environment for hypervisor '%s'" |
| % instance.hypervisor) |
| |
| num_disks = len(self.cfg.GetInstanceDisks(instance.uuid)) |
| |
| for idx, disk_label in enumerate(utils.GetDiskLabels(prefix, num_disks + 1, |
| start=1)): |
| env["DISK_%d_PATH" % idx] = disk_label |
| |
| return env |
| |
| def UpdateInstanceOsInstallPackage(self, feedback_fn, instance, override_env): |
| """Updates the OS parameter 'os-install-package' for an instance. |
| |
| The OS install package is an archive containing an OS definition |
| and a file containing the environment variables needed to run the |
| OS scripts. |
| |
| The OS install package is served by the metadata daemon to the |
| instances, so the OS scripts can run inside the virtualized |
| environment. |
| |
| @type feedback_fn: callable |
| @param feedback_fn: function used send feedback back to the caller |
| |
| @type instance: L{objects.Instance} |
| @param instance: instance for which the OS parameter |
| 'os-install-package' is updated |
| |
| @type override_env: dict of string to string |
| @param override_env: if supplied, it overrides the environment of |
| the export OS scripts archive |
| |
| """ |
| if "os-install-package" in instance.osparams: |
| feedback_fn("Using OS install package '%s'" % |
| instance.osparams["os-install-package"]) |
| else: |
| result = self.rpc.call_os_export(instance.primary_node, instance, |
| override_env) |
| result.Raise("Could not export OS '%s'" % instance.os) |
| instance.osparams["os-install-package"] = result.payload |
| |
| feedback_fn("Created OS install package '%s'" % result.payload) |
| |
| def RunOsScriptsVirtualized(self, feedback_fn, instance): |
| """Runs the OS scripts inside a safe virtualized environment. |
| |
| The virtualized environment reuses the instance and temporarily |
| creates a disk onto which the image of the helper VM is dumped. |
| The temporary disk is used to boot the helper VM. The OS scripts |
| are passed to the helper VM through the metadata daemon and the OS |
| install package. |
| |
| @type feedback_fn: callable |
| @param feedback_fn: function used send feedback back to the caller |
| |
| @type instance: L{objects.Instance} |
| @param instance: instance for which the OS scripts must be run |
| inside the virtualized environment |
| |
| """ |
| install_image = self.cfg.GetInstallImage() |
| |
| if not install_image: |
| raise errors.OpExecError("Cannot create install instance because an" |
| " install image has not been specified") |
| |
| env = self.GetOsInstallPackageEnvironment( |
| instance, |
| constants.OS_SCRIPT_CREATE_UNTRUSTED) |
| self.UpdateInstanceOsInstallPackage(feedback_fn, instance, env) |
| UpdateMetadata(feedback_fn, self.rpc, instance, |
| osparams_private=self.op.osparams_private, |
| osparams_secret=self.op.osparams_secret) |
| |
| RunWithHelperVM(self, instance, install_image, |
| self.op.helper_startup_timeout, |
| self.op.helper_shutdown_timeout, |
| log_prefix="Running OS create script", |
| feedback_fn=feedback_fn) |
| |
| def Exec(self, feedback_fn): |
| """Create and add the instance to the cluster. |
| |
| """ |
| assert not (self.owned_locks(locking.LEVEL_NODE_RES) - |
| self.owned_locks(locking.LEVEL_NODE)), \ |
| "Node locks differ from node resource locks" |
| |
| ht_kind = self.op.hypervisor |
| if ht_kind in constants.HTS_REQ_PORT: |
| network_port = self.cfg.AllocatePort() |
| else: |
| network_port = None |
| |
| if self.op.commit: |
| (instance_uuid, _) = self.cfg.ExpandInstanceName(self.op.instance_name) |
| else: |
| instance_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) |
| |
| # This is ugly but we got a chicken-egg problem here |
| # We can only take the group disk parameters, as the instance |
| # has no disks yet (we are generating them right here). |
| nodegroup = self.cfg.GetNodeGroup(self.pnode.group) |
| |
| if self.op.commit: |
| disks = self.cfg.GetInstanceDisks(instance_uuid) |
| CommitDisks(disks) |
| else: |
| disks = GenerateDiskTemplate(self, |
| self.op.disk_template, |
| instance_uuid, self.pnode.uuid, |
| self.secondaries, |
| self.disks, |
| self.instance_file_storage_dir, |
| self.op.file_driver, |
| 0, |
| feedback_fn, |
| self.cfg.GetGroupDiskParams(nodegroup), |
| forthcoming=self.op.forthcoming) |
| |
| if self.op.os_type is None: |
| os_type = "" |
| else: |
| os_type = self.op.os_type |
| |
| iobj = objects.Instance(name=self.op.instance_name, |
| uuid=instance_uuid, |
| os=os_type, |
| primary_node=self.pnode.uuid, |
| nics=self.nics, disks=[], |
| disk_template=self.op.disk_template, |
| disks_active=False, |
| admin_state=constants.ADMINST_DOWN, |
| admin_state_source=constants.ADMIN_SOURCE, |
| network_port=network_port, |
| beparams=self.op.beparams, |
| hvparams=self.op.hvparams, |
| hypervisor=self.op.hypervisor, |
| osparams=self.op.osparams, |
| osparams_private=self.op.osparams_private, |
| forthcoming=self.op.forthcoming, |
| ) |
| |
| if self.op.tags: |
| for tag in self.op.tags: |
| iobj.AddTag(tag) |
| |
| if self.adopt_disks: |
| if self.op.disk_template == constants.DT_PLAIN: |
| # rename LVs to the newly-generated names; we need to construct |
| # 'fake' LV disks with the old data, plus the new unique_id |
| tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] |
| rename_to = [] |
| for t_dsk, a_dsk in zip(tmp_disks, self.disks): |
| rename_to.append(t_dsk.logical_id) |
| t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) |
| result = self.rpc.call_blockdev_rename(self.pnode.uuid, |
| zip(tmp_disks, rename_to)) |
| result.Raise("Failed to rename adoped LVs") |
| elif self.op.forthcoming: |
| feedback_fn("Instance is forthcoming, not creating disks") |
| else: |
| feedback_fn("* creating instance disks...") |
| try: |
| CreateDisks(self, iobj, disks=disks) |
| except errors.OpExecError: |
| self.LogWarning("Device creation failed") |
| for disk in disks: |
| self.cfg.ReleaseDRBDMinors(disk.uuid) |
| raise |
| |
| feedback_fn("adding instance %s to cluster config" % self.op.instance_name) |
| self.cfg.AddInstance(iobj, self.proc.GetECId(), replace=self.op.commit) |
| |
| feedback_fn("adding disks to cluster config") |
| for disk in disks: |
| self.cfg.AddInstanceDisk(iobj.uuid, disk, replace=self.op.commit) |
| |
| if self.op.forthcoming: |
| feedback_fn("Instance is forthcoming; not creating the actual instance") |
| return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid))) |
| |
| # re-read the instance from the configuration |
| iobj = self.cfg.GetInstanceInfo(iobj.uuid) |
| |
| if self.op.mode == constants.INSTANCE_IMPORT: |
| # Release unused nodes |
| ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node_uuid]) |
| else: |
| # Release all nodes |
| ReleaseLocks(self, locking.LEVEL_NODE) |
| |
| # Wipe disks |
| disk_abort = False |
| if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: |
| feedback_fn("* wiping instance disks...") |
| try: |
| WipeDisks(self, iobj) |
| except errors.OpExecError, err: |
| logging.exception("Wiping disks failed") |
| self.LogWarning("Wiping instance disks failed (%s)", err) |
| disk_abort = True |
| |
| self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) |
| |
| # Image disks |
| os_image = objects.GetOSImage(iobj.osparams) |
| disk_abort = False |
| |
| if not self.adopt_disks and os_image is not None: |
| feedback_fn("* imaging instance disks...") |
| try: |
| ImageDisks(self, iobj, os_image) |
| except errors.OpExecError, err: |
| logging.exception("Imaging disks failed") |
| self.LogWarning("Imaging instance disks failed (%s)", err) |
| disk_abort = True |
| |
| self._RemoveDegradedDisks(feedback_fn, disk_abort, iobj) |
| |
| # instance disks are now active |
| iobj.disks_active = True |
| |
| # Release all node resource locks |
| ReleaseLocks(self, locking.LEVEL_NODE_RES) |
| |
| if iobj.os: |
| result = self.rpc.call_os_diagnose([iobj.primary_node])[iobj.primary_node] |
| result.Raise("Failed to get OS '%s'" % iobj.os) |
| |
| trusted = None |
| |
| for (name, _, _, _, _, _, _, os_trusted) in result.payload: |
| if name == objects.OS.GetName(iobj.os): |
| trusted = os_trusted |
| break |
| |
| if trusted is None: |
| raise errors.OpPrereqError("OS '%s' is not available in node '%s'" % |
| (iobj.os, iobj.primary_node)) |
| elif trusted: |
| self.RunOsScripts(feedback_fn, iobj) |
| else: |
| self.RunOsScriptsVirtualized(feedback_fn, iobj) |
| # Instance is modified by 'RunOsScriptsVirtualized', |
| # therefore, it must be retrieved once again from the |
| # configuration, otherwise there will be a config object |
| # version mismatch. |
| iobj = self.cfg.GetInstanceInfo(iobj.uuid) |
| |
| # Update instance metadata so that it can be reached from the |
| # metadata service. |
| UpdateMetadata(feedback_fn, self.rpc, iobj, |
| osparams_private=self.op.osparams_private, |
| osparams_secret=self.op.osparams_secret) |
| |
| assert not self.owned_locks(locking.LEVEL_NODE_RES) |
| |
| if self.op.start: |
| iobj.admin_state = constants.ADMINST_UP |
| self.cfg.Update(iobj, feedback_fn) |
| logging.info("Starting instance %s on node %s", self.op.instance_name, |
| self.pnode.name) |
| feedback_fn("* starting instance...") |
| result = self.rpc.call_instance_start(self.pnode.uuid, (iobj, None, None), |
| False, self.op.reason) |
| result.Raise("Could not start instance") |
| |
| return self.cfg.GetNodeNames(list(self.cfg.GetInstanceNodes(iobj.uuid))) |
| |
| def PrepareRetry(self, feedback_fn): |
| # A temporary lack of resources can only happen if opportunistic locking |
| # is used. |
| assert self.op.opportunistic_locking |
| |
| logging.info("Opportunistic locking did not suceed, falling back to" |
| " full lock allocation") |
| feedback_fn("* falling back to full lock allocation") |
| self.op.opportunistic_locking = False |