Source code for libcloud.compute.drivers.voxel

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Voxel VoxCloud driver
"""
import datetime
import hashlib

from libcloud.utils.py3 import b

from libcloud.common.base import XmlResponse, ConnectionUserAndKey
from libcloud.common.types import InvalidCredsError
from libcloud.compute.providers import Provider
from libcloud.compute.types import NodeState
from libcloud.compute.base import Node, NodeDriver
from libcloud.compute.base import NodeSize, NodeImage, NodeLocation

VOXEL_API_HOST = "api.voxel.net"


[docs]class VoxelResponse(XmlResponse): def __init__(self, response, connection): self.parsed = None super(VoxelResponse, self).__init__(response=response, connection=connection)
[docs] def parse_body(self): if not self.body: return None if self.parsed is None: self.parsed = super(VoxelResponse, self).parse_body() return self.parsed
[docs] def parse_error(self): err_list = [] if not self.body: return None if self.parsed is None: self.parsed = super(VoxelResponse, self).parse_body() for err in self.parsed.findall("err"): code = err.get("code") err_list.append("(%s) %s" % (code, err.get("msg"))) # From voxel docs: # 1: Invalid login or password # 9: Permission denied: user lacks access rights for this method if code == "1" or code == "9": # sucks, but only way to detect # bad authentication tokens so far raise InvalidCredsError(err_list[-1]) return "\n".join(err_list)
[docs] def success(self): if not self.parsed: self.parsed = super(VoxelResponse, self).parse_body() stat = self.parsed.get("stat") if stat != "ok": return False return True
[docs]class VoxelConnection(ConnectionUserAndKey): """ Connection class for the Voxel driver """ host = VOXEL_API_HOST responseCls = VoxelResponse
[docs] def add_default_params(self, params): params = dict([(k, v) for k, v in list(params.items()) if v is not None]) params["key"] = self.user_id params["timestamp"] = datetime.datetime.utcnow().isoformat() + "+0000" keys = list(params.keys()) keys.sort() md5 = hashlib.md5() md5.update(b(self.key)) for key in keys: if params[key]: if not params[key] is None: md5.update(b("%s%s" % (key, params[key]))) else: md5.update(b(key)) params["api_sig"] = md5.hexdigest() return params
VOXEL_INSTANCE_TYPES = {} RAM_PER_CPU = 2048 NODE_STATE_MAP = { "IN_PROGRESS": NodeState.PENDING, "QUEUED": NodeState.PENDING, "SUCCEEDED": NodeState.RUNNING, "shutting-down": NodeState.TERMINATED, "terminated": NodeState.TERMINATED, "unknown": NodeState.UNKNOWN, }
[docs]class VoxelNodeDriver(NodeDriver): """ Voxel VoxCLOUD node driver """ connectionCls = VoxelConnection type = Provider.VOXEL name = "Voxel VoxCLOUD" website = "http://www.voxel.net/" def _initialize_instance_types(): # pylint: disable=no-method-argument for cpus in range(1, 14): if cpus == 1: name = "Single CPU" else: name = "%d CPUs" % cpus id = "%dcpu" % cpus ram = cpus * RAM_PER_CPU VOXEL_INSTANCE_TYPES[id] = { "id": id, "name": name, "ram": ram, "disk": None, "bandwidth": None, "price": None, } features = {"create_node": [], "list_sizes": ["variable_disk"]} _initialize_instance_types()
[docs] def list_nodes(self): params = {"method": "voxel.devices.list"} result = self.connection.request("/", params=params).object return self._to_nodes(result)
[docs] def list_sizes(self, location=None): return [ NodeSize(driver=self.connection.driver, **i) for i in list(VOXEL_INSTANCE_TYPES.values()) ]
[docs] def list_images(self, location=None): params = {"method": "voxel.images.list"} result = self.connection.request("/", params=params).object return self._to_images(result)
[docs] def create_node( self, name, size, image, location, ex_privateip=None, ex_publicip=None, ex_rootpass=None, ex_consolepass=None, ex_sshuser=None, ex_sshpass=None, ex_voxel_access=None, ): """Create Voxel Node :keyword name: the name to assign the node (mandatory) :type name: ``str`` :keyword image: distribution to deploy :type image: :class:`NodeImage` :keyword size: the plan size to create (mandatory) Requires size.disk (GB) to be set manually :type size: :class:`NodeSize` :keyword location: which datacenter to create the node in :type location: :class:`NodeLocation` :keyword ex_privateip: Backend IP address to assign to node; must be chosen from the customer's private VLAN assignment. :type ex_privateip: ``str`` :keyword ex_publicip: Public-facing IP address to assign to node; must be chosen from the customer's public VLAN assignment. :type ex_publicip: ``str`` :keyword ex_rootpass: Password for root access; generated if unset. :type ex_rootpass: ``str`` :keyword ex_consolepass: Password for remote console; generated if unset. :type ex_consolepass: ``str`` :keyword ex_sshuser: Username for SSH access :type ex_sshuser: ``str`` :keyword ex_sshpass: Password for SSH access; generated if unset. :type ex_sshpass: ``str`` :keyword ex_voxel_access: Allow access Voxel administrative access. Defaults to False. :type ex_voxel_access: ``bool`` :rtype: :class:`Node` or ``None`` """ # assert that disk > 0 if not size.disk: raise ValueError("size.disk must be non-zero") # convert voxel_access to string boolean if needed if ex_voxel_access is not None: ex_voxel_access = "true" if ex_voxel_access else "false" params = { "method": "voxel.voxcloud.create", "hostname": name, "disk_size": int(size.disk), "facility": location.id, "image_id": image.id, "processing_cores": size.ram / RAM_PER_CPU, "backend_ip": ex_privateip, "frontend_ip": ex_publicip, "admin_password": ex_rootpass, "console_password": ex_consolepass, "ssh_username": ex_sshuser, "ssh_password": ex_sshpass, "voxel_access": ex_voxel_access, } object = self.connection.request("/", params=params).object if self._getstatus(object): return Node( id=object.findtext("device/id"), name=name, state=NODE_STATE_MAP[object.findtext("device/status")], public_ips=ex_publicip, private_ips=ex_privateip, driver=self.connection.driver, ) else: return None
[docs] def reboot_node(self, node): params = { "method": "voxel.devices.power", "device_id": node.id, "power_action": "reboot", } return self._getstatus(self.connection.request("/", params=params).object)
[docs] def destroy_node(self, node): params = {"method": "voxel.voxcloud.delete", "device_id": node.id} return self._getstatus(self.connection.request("/", params=params).object)
[docs] def list_locations(self): params = {"method": "voxel.voxcloud.facilities.list"} result = self.connection.request("/", params=params).object nodes = self._to_locations(result) return nodes
def _getstatus(self, element): status = element.attrib["stat"] return status == "ok" def _to_locations(self, object): return [ NodeLocation( element.attrib["label"], element.findtext("description"), element.findtext("description"), self, ) for element in object.findall("facilities/facility") ] def _to_nodes(self, object): nodes = [] for element in object.findall("devices/device"): if element.findtext("type") == "Virtual Server": try: state = self.NODE_STATE_MAP[element.attrib["status"]] except KeyError: state = NodeState.UNKNOWN public_ip = private_ip = None ipassignments = element.findall("ipassignments/ipassignment") for ip in ipassignments: if ip.attrib["type"] == "frontend": public_ip = ip.text elif ip.attrib["type"] == "backend": private_ip = ip.text nodes.append( Node( id=element.attrib["id"], name=element.attrib["label"], state=state, public_ips=public_ip, private_ips=private_ip, driver=self.connection.driver, ) ) return nodes def _to_images(self, object): images = [] for element in object.findall("images/image"): images.append( NodeImage( id=element.attrib["id"], name=element.attrib["summary"], driver=self.connection.driver, ) ) return images