Source code for libcloud.compute.drivers.libvirt_driver

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import re
import time
import platform
import mimetypes
import subprocess
from os.path import join as pjoin
from collections import defaultdict

from libcloud.utils.py3 import ET, ensure_string
from libcloud.compute.base import Node, NodeState, NodeDriver
from libcloud.compute.types import Provider
from libcloud.utils.networking import is_public_subnet

try:
    import libvirt

    have_libvirt = True
except ImportError:
    have_libvirt = False


[docs]class LibvirtNodeDriver(NodeDriver): """ Libvirt (http://libvirt.org/) node driver. To enable debug mode, set LIBVIR_DEBUG environment variable. """ type = Provider.LIBVIRT name = "Libvirt" website = "http://libvirt.org/" NODE_STATE_MAP = { 0: NodeState.TERMINATED, # no state 1: NodeState.RUNNING, # domain is running 2: NodeState.PENDING, # domain is blocked on resource 3: NodeState.TERMINATED, # domain is paused by user 4: NodeState.TERMINATED, # domain is being shut down 5: NodeState.TERMINATED, # domain is shut off 6: NodeState.UNKNOWN, # domain is crashed 7: NodeState.UNKNOWN, # domain is suspended by guest power management } def __init__(self, uri, key=None, secret=None): """ :param uri: Hypervisor URI (e.g. vbox:///session, qemu:///system, etc.). :type uri: ``str`` :param key: the username for a remote libvirtd server :type key: ``str`` :param secret: the password for a remote libvirtd server :type key: ``str`` """ if not have_libvirt: raise RuntimeError("Libvirt driver requires 'libvirt' Python " + "package") self._uri = uri self._key = key self._secret = secret if uri is not None and "+tcp" in self._uri: if key is None and secret is None: raise RuntimeError( "The remote Libvirt instance requires " + "authentication, please set 'key' and " + "'secret' parameters" ) auth = [ [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], self._cred_callback, None, ] self.connection = libvirt.openAuth(uri, auth, 0) else: self.connection = libvirt.open(uri) if uri is None: self._uri = self.connection.getInfo() def _cred_callback(self, cred, user_data): """ Callback for the authentication scheme, which will provide username and password for the login. Reference: ( http://bit.ly/1U5yyQg ) :param cred: The credentials requested and the return :type cred: ``list`` :param user_data: Custom data provided to the authentication routine :type user_data: ``list`` :rtype: ``int`` """ for credential in cred: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = self._key elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = self._secret return 0
[docs] def list_nodes(self): domains = self.connection.listAllDomains() nodes = self._to_nodes(domains=domains) return nodes
[docs] def reboot_node(self, node): domain = self._get_domain_for_node(node=node) return domain.reboot(flags=0) == 0
[docs] def destroy_node(self, node): domain = self._get_domain_for_node(node=node) return domain.destroy() == 0
[docs] def start_node(self, node): domain = self._get_domain_for_node(node=node) return domain.create() == 0
[docs] def stop_node(self, node): domain = self._get_domain_for_node(node=node) return domain.shutdown() == 0
[docs] def ex_start_node(self, node): # NOTE: This method is here for backward compatibility reasons after # this method was promoted to be part of the standard compute API in # Libcloud v2.7.0 """ Start a stopped node. :param node: Node which should be used :type node: :class:`Node` :rtype: ``bool`` """ return self.start_node(node=node)
[docs] def ex_shutdown_node(self, node): # NOTE: This method is here for backward compatibility reasons after # this method was promoted to be part of the standard compute API in # Libcloud v2.7.0 """ Shutdown a running node. Note: Usually this will result in sending an ACPI event to the node. :param node: Node which should be used :type node: :class:`Node` :rtype: ``bool`` """ return self.stop_node(node=node)
[docs] def ex_suspend_node(self, node): """ Suspend a running node. :param node: Node which should be used :type node: :class:`Node` :rtype: ``bool`` """ domain = self._get_domain_for_node(node=node) return domain.suspend() == 0
[docs] def ex_resume_node(self, node): """ Resume a suspended node. :param node: Node which should be used :type node: :class:`Node` :rtype: ``bool`` """ domain = self._get_domain_for_node(node=node) return domain.resume() == 0
[docs] def ex_get_node_by_uuid(self, uuid): """ Retrieve Node object for a domain with a provided uuid. :param uuid: Uuid of the domain. :type uuid: ``str`` """ domain = self._get_domain_for_uuid(uuid=uuid) node = self._to_node(domain=domain) return node
[docs] def ex_get_node_by_name(self, name): """ Retrieve Node object for a domain with a provided name. :param name: Name of the domain. :type name: ``str`` """ domain = self._get_domain_for_name(name=name) node = self._to_node(domain=domain) return node
[docs] def ex_take_node_screenshot(self, node, directory, screen=0): """ Take a screenshot of a monitoring of a running instance. :param node: Node to take the screenshot of. :type node: :class:`libcloud.compute.base.Node` :param directory: Path where the screenshot will be saved. :type directory: ``str`` :param screen: ID of the monitor to take the screenshot of. :type screen: ``int`` :return: Full path where the screenshot has been saved. :rtype: ``str`` """ if not os.path.exists(directory) or not os.path.isdir(directory): raise ValueError("Invalid value for directory argument") domain = self._get_domain_for_node(node=node) stream = self.connection.newStream() mime_type = domain.screenshot(stream=stream, screen=0) extensions = mimetypes.guess_all_extensions(type=mime_type) if extensions: extension = extensions[0] else: extension = ".png" name = "screenshot-{}{}".format(int(time.time()), extension) file_path = pjoin(directory, name) with open(file_path, "wb") as fp: def write(stream, buf, opaque): fp.write(buf) stream.recvAll(write, None) try: stream.finish() except Exception: # Finish is not supported by all backends pass return file_path
[docs] def ex_get_hypervisor_hostname(self): """ Return a system hostname on which the hypervisor is running. """ hostname = self.connection.getHostname() return hostname
[docs] def ex_get_hypervisor_sysinfo(self): """ Retrieve hypervisor system information. :rtype: ``dict`` """ xml = self.connection.getSysinfo() etree = ET.XML(xml) attributes = ["bios", "system", "processor", "memory_device"] sysinfo = {} for attribute in attributes: element = etree.find(attribute) entries = self._get_entries(element=element) sysinfo[attribute] = entries return sysinfo
def _to_nodes(self, domains): nodes = [self._to_node(domain=domain) for domain in domains] return nodes def _to_node(self, domain): state, max_mem, memory, vcpu_count, used_cpu_time = domain.info() state = self.NODE_STATE_MAP.get(state, NodeState.UNKNOWN) public_ips, private_ips = [], [] ip_addresses = self._get_ip_addresses_for_domain(domain) for ip_address in ip_addresses: if is_public_subnet(ip_address): public_ips.append(ip_address) else: private_ips.append(ip_address) extra = { "uuid": domain.UUIDString(), "os_type": domain.OSType(), "types": self.connection.getType(), "used_memory": memory / 1024, "vcpu_count": vcpu_count, "used_cpu_time": used_cpu_time, } node = Node( id=domain.ID(), name=domain.name(), state=state, public_ips=public_ips, private_ips=private_ips, driver=self, extra=extra, ) node._uuid = domain.UUIDString() # we want to use a custom UUID return node def _get_ip_addresses_for_domain(self, domain): """ Retrieve IP addresses for the provided domain. Note: This functionality is currently only supported on Linux and only works if this code is run on the same machine as the VMs run on. :return: IP addresses for the provided domain. :rtype: ``list`` """ result = [] if platform.system() != "Linux": # Only Linux is supported atm return result if "///" not in self._uri: # Only local libvirtd is supported atm return result mac_addresses = self._get_mac_addresses_for_domain(domain=domain) arp_table = {} try: cmd = ["arp", "-an"] child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, _ = child.communicate() arp_table = self._parse_ip_table_arp(arp_output=stdout) except OSError as e: if e.errno == 2: cmd = ["ip", "neigh"] child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, _ = child.communicate() arp_table = self._parse_ip_table_neigh(ip_output=stdout) for mac_address in mac_addresses: if mac_address in arp_table: ip_addresses = arp_table[mac_address] result.extend(ip_addresses) return result def _get_mac_addresses_for_domain(self, domain): """ Parses network interface MAC addresses from the provided domain. """ xml = domain.XMLDesc() etree = ET.XML(xml) elems = etree.findall("devices/interface[@type='network']/mac") result = [] for elem in elems: mac_address = elem.get("address") result.append(mac_address) return result def _get_domain_for_node(self, node): """ Return libvirt domain object for the provided node. """ domain = self.connection.lookupByUUIDString(node.uuid) return domain def _get_domain_for_uuid(self, uuid): """ Return libvirt domain object for the provided uuid. """ domain = self.connection.lookupByUUIDString(uuid) return domain def _get_domain_for_name(self, name): """ Return libvirt domain object for the provided name. """ domain = self.connection.lookupByName(name) return domain def _get_entries(self, element): """ Parse entries dictionary. :rtype: ``dict`` """ elements = element.findall("entry") result = {} for element in elements: name = element.get("name") value = element.text result[name] = value return result def _parse_ip_table_arp(self, arp_output): """ Sets up the regexp for parsing out IP addresses from the 'arp -an' command and pass it along to the parser function. :return: Dictionary from the parsing function :rtype: ``dict`` """ arp_regex = re.compile(r".*?\((.*?)\) at (.*?)\s+") return self._parse_mac_addr_table(arp_output, arp_regex) def _parse_ip_table_neigh(self, ip_output): """ Sets up the regexp for parsing out IP addresses from the 'ip neighbor' command and pass it along to the parser function. :return: Dictionary from the parsing function :rtype: ``dict`` """ ip_regex = re.compile(r"(.*?)\s+.*lladdr\s+(.*?)\s+") return self._parse_mac_addr_table(ip_output, ip_regex) def _parse_mac_addr_table(self, cmd_output, mac_regex): """ Parse the command output and return a dictionary which maps mac address to an IP address. :return: Dictionary which maps mac address to IP address. :rtype: ``dict`` """ lines = ensure_string(cmd_output).split("\n") arp_table = defaultdict(list) for line in lines: match = mac_regex.match(line) if not match: continue groups = match.groups() ip_address = groups[0] mac_address = groups[1] arp_table[mac_address].append(ip_address) return arp_table