Source code for libcloud.compute.drivers.equinixmetal

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Equinix Metal Driver
"""

try:  # Try to use asyncio to perform requests in parallel across projects
    import asyncio
except ImportError:  # If not available will do things serially
    asyncio = None

import json
import datetime

from libcloud.utils.py3 import httplib
from libcloud.common.base import JsonResponse, ConnectionKey
from libcloud.compute.base import (
    Node,
    KeyPair,
    NodeSize,
    NodeImage,
    NodeDriver,
    NodeLocation,
    StorageVolume,
    VolumeSnapshot,
)
from libcloud.compute.types import Provider, NodeState, InvalidCredsError

EQUINIXMETAL_ENDPOINT = "api.equinix.com"

# True to use async io if available (aka running under Python 3)
USE_ASYNC_IO_IF_AVAILABLE = True


[docs]def use_asyncio(): return asyncio is not None and USE_ASYNC_IO_IF_AVAILABLE
[docs]class EquinixMetalResponse(JsonResponse): valid_response_codes = [ httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT, ]
[docs] def parse_error(self): if self.status == httplib.UNAUTHORIZED: body = self.parse_body() raise InvalidCredsError(body.get("error")) else: body = self.parse_body() if "message" in body: error = "{} (code: {})".format(body.get("message"), self.status) elif "errors" in body: error = body.get("errors") else: error = body raise Exception(error)
[docs] def success(self): return self.status in self.valid_response_codes
[docs]class EquinixMetalConnection(ConnectionKey): """ Connection class for the Equinix Metal driver. """ host = EQUINIXMETAL_ENDPOINT responseCls = EquinixMetalResponse
[docs] def add_default_headers(self, headers): """ Add headers that are necessary for every request """ headers["Content-Type"] = "application/json" headers["X-Auth-Token"] = self.key headers[ "X-Consumer-Token" ] = "kcrhMn7hwG8Ceo2hAhGFa2qpxLBvVHxEjS9ue8iqmsNkeeB2iQgMq4dNc1893pYu" return headers
[docs]class EquinixMetalNodeDriver(NodeDriver): """ Equinix Metal NodeDriver """ connectionCls = EquinixMetalConnection type = Provider.EQUINIXMETAL name = "EquinixMetal" website = "https://metal.equinix.com/" NODE_STATE_MAP = { "queued": NodeState.PENDING, "provisioning": NodeState.PENDING, "rebuilding": NodeState.PENDING, "powering_on": NodeState.REBOOTING, "powering_off": NodeState.REBOOTING, "rebooting": NodeState.REBOOTING, "inactive": NodeState.STOPPED, "deleted": NodeState.TERMINATED, "deprovisioning": NodeState.TERMINATED, "failed": NodeState.ERROR, "active": NodeState.RUNNING, } def __init__(self, key, project=None): """ Initialize a NodeDriver for Equinix Metal using the API token and optionally the project (name or id). If project name is specified we validate it lazily and populate self.project_id during the first access of self.projects variable """ super().__init__(key=key) self.project_name = project self.project_id = None # Lazily populated on first access to self.project self._project = project # Variable which indicates if self._projects has been populated yet and # has been called self._project validated self._projects_populated = False self._projects = None @property def projects(self): """ Lazily retrieve projects and set self.project_id variable on initial access to self.projects variable. """ if not self._projects_populated: # NOTE: Each EquinixMetal account needs at least one project, # but to be on the safe side and avoid infinite loop # in case there are no projects on the account, we don't use # a more robust way to determine # if project list has been populated yet self._projects = self.ex_list_projects() self._projects_populated = True # If project name is specified, verify it's valid and populate # self.project_id if self._project: for project_obj in self._projects: if self._project in [project_obj.name, project_obj.id]: self.project_id = project_obj.id break if not self.project_id: # Invalid project name self.project_name = None return self._projects
[docs] def ex_list_projects(self): projects = [] data = self.connection.request("/metal/v1/projects").object projects = data.get("projects") if projects: projects = [Project(project) for project in projects] return projects
[docs] def list_nodes(self, ex_project_id=None): if ex_project_id: return self.ex_list_nodes_for_project(ex_project_id=ex_project_id) # if project has been specified during driver initialization, then # return nodes for this project only if self.project_id: return self.ex_list_nodes_for_project(ex_project_id=self.project_id) # In case of Python2 perform requests serially if not use_asyncio(): nodes = [] for project in self.projects: nodes.extend(self.ex_list_nodes_for_project(ex_project_id=project.id)) return nodes # In case of Python3 use asyncio to perform requests in parallel return self.list_resources_async("nodes")
[docs] def list_resources_async(self, resource_type): # The _list_nodes function is defined dynamically using exec in # order to prevent a SyntaxError in Python2 due to "yield from". # This cruft can be removed once Python2 support is no longer # required. assert resource_type in ["nodes", "volumes"] glob = globals() loc = locals() exec( """ import asyncio @asyncio.coroutine def _list_async(driver): projects = [project.id for project in driver.projects] loop = asyncio.get_event_loop() futures = [ loop.run_in_executor(None, driver.ex_list_%s_for_project, p) for p in projects ] retval = [] for future in futures: result = yield from future retval.extend(result) return retval""" % resource_type, glob, loc, ) try: loop = asyncio.get_event_loop() except RuntimeError: asyncio.set_event_loop(asyncio.new_event_loop()) loop = asyncio.get_event_loop() return loop.run_until_complete(loc["_list_async"](loc["self"]))
[docs] def ex_list_nodes_for_project(self, ex_project_id, include="plan", page=1, per_page=1000): params = {"include": include, "page": page, "per_page": per_page} data = self.connection.request( "/metal/v1/projects/%s/devices" % (ex_project_id), params=params ).object["devices"] return list(map(self._to_node, data))
[docs] def list_locations(self): data = self.connection.request("/metal/v1/facilities").object["facilities"] return list(map(self._to_location, data))
[docs] def list_images(self): data = self.connection.request("/metal/v1/operating-systems").object["operating_systems"] return list(map(self._to_image, data))
[docs] def list_sizes(self, ex_project_id=None): project_id = ( ex_project_id or self.project_id or (len(self.projects) and self.projects[0].id) ) if project_id: data = self.connection.request("/metal/v1/projects/%s/plans" % project_id).object[ "plans" ] else: # This only works with personal tokens data = self.connection.request("/metal/v1/plans").object["plans"] return [self._to_size(size) for size in data if size.get("line") == "baremetal"]
[docs] def create_node( self, name, size, image, location, ex_project_id=None, ip_addresses=[], cloud_init=None, disk=None, disk_size=0, **kwargs, ): """ Create a node. :return: The newly created node. :rtype: :class:`Node` """ # if project has been specified on initialization of driver, then # create on this project if self.project_id: ex_project_id = self.project_id else: if not ex_project_id: raise Exception("ex_project_id needs to be specified") facility = location.extra["code"] params = { "hostname": name, "plan": size.id, "operating_system": image.id, "facility": facility, "include": "plan", "billing_cycle": "hourly", } if ip_addresses: params["ip_addresses"] = ip_addresses params.update(kwargs) if cloud_init: params["userdata"] = cloud_init data = self.connection.request( "/metal/v1/projects/%s/devices" % (ex_project_id), data=json.dumps(params), method="POST", ) status = data.object.get("status", "OK") if status == "ERROR": message = data.object.get("message", None) error_message = data.object.get("error_message", message) raise ValueError("Failed to create node: %s" % (error_message)) node = self._to_node(data=data.object) if disk: self.attach_volume(node, disk) if disk_size: volume = self.create_volume(size=disk_size, location=location) self.attach_volume(node, volume) return node
[docs] def reboot_node(self, node): params = {"type": "reboot"} res = self.connection.request( "/metal/v1/devices/%s/actions" % (node.id), params=params, method="POST" ) return res.status == httplib.OK
[docs] def start_node(self, node): params = {"type": "power_on"} res = self.connection.request( "/metal/v1/devices/%s/actions" % (node.id), params=params, method="POST" ) return res.status == httplib.OK
[docs] def stop_node(self, node): params = {"type": "power_off"} res = self.connection.request( "/metal/v1/devices/%s/actions" % (node.id), params=params, method="POST" ) return res.status == httplib.OK
[docs] def destroy_node(self, node): res = self.connection.request("/metal/v1/devices/%s" % (node.id), method="DELETE") return res.status == httplib.OK
[docs] def ex_start_node(self, node): # NOTE: This method is here for backward compatibility reasons after # this method was promoted to be part of the standard compute API in # Libcloud v2.7.0 return self.start_node(node=node)
[docs] def ex_stop_node(self, node): # NOTE: This method is here for backward compatibility reasons after # this method was promoted to be part of the standard compute API in # Libcloud v2.7.0 return self.stop_node(node=node)
[docs] def ex_reinstall_node(self, node): params = {"type": "reinstall"} res = self.connection.request( "/metal/v1/devices/%s/actions" % (node.id), params=params, method="POST" ) return res.status == httplib.OK
[docs] def ex_rescue_node(self, node): params = {"type": "rescue"} res = self.connection.request( "/metal/v1/devices/%s/actions" % (node.id), params=params, method="POST" ) return res.status == httplib.OK
[docs] def ex_update_node(self, node, **kwargs): path = "/metal/v1/devices/%s" % node.id res = self.connection.request(path, params=kwargs, method="PUT") return res.status == httplib.OK
[docs] def ex_get_node_bandwidth(self, node, from_time, until_time): path = "/metal/v1/devices/%s/bandwidth" % node.id params = {"from": from_time, "until": until_time} return self.connection.request(path, params=params).object
[docs] def ex_list_ip_assignments_for_node(self, node, include=""): path = "/metal/v1/devices/%s/ips" % node.id params = {"include": include} return self.connection.request(path, params=params).object
[docs] def list_key_pairs(self): """ List all the available SSH keys. :return: Available SSH keys. :rtype: ``list`` of :class:`.KeyPair` objects """ data = self.connection.request("/metal/v1/ssh-keys").object["ssh_keys"] return list(map(self._to_key_pairs, data))
[docs] def create_key_pair(self, name, public_key): """ Create a new SSH key. :param name: Key name (required) :type name: ``str`` :param public_key: Valid public key string (required) :type public_key: ``str`` """ params = {"label": name, "key": public_key} data = self.connection.request("/metal/v1/ssh-keys", method="POST", params=params).object return self._to_key_pairs(data)
[docs] def delete_key_pair(self, key): """ Delete an existing SSH key. :param key: SSH key (required) :type key: :class:`KeyPair` """ key_id = key.name res = self.connection.request("/metal/v1/ssh-keys/%s" % (key_id), method="DELETE") return res.status == httplib.NO_CONTENT
def _to_node(self, data): extra = {} extra_keys = [ "created_at", "updated_at", "userdata", "billing_cycle", "locked", "iqn", "locked", "project", "description", ] if "state" in data: state = self.NODE_STATE_MAP.get(data["state"], NodeState.UNKNOWN) else: state = NodeState.UNKNOWN if "ip_addresses" in data and data["ip_addresses"] is not None: ips = self._parse_ips(data["ip_addresses"]) if "operating_system" in data and data["operating_system"] is not None: image = self._to_image(data["operating_system"]) extra["operating_system"] = data["operating_system"].get("name") else: image = None if "plan" in data and data["plan"] is not None: size = self._to_size(data["plan"]) extra["plan"] = data["plan"].get("slug") else: size = None if "facility" in data: extra["facility"] = data["facility"] for key in extra_keys: if key in data: extra[key] = data[key] node = Node( id=data["id"], name=data["hostname"], state=state, public_ips=ips["public"], private_ips=ips["private"], size=size, image=image, extra=extra, driver=self, ) return node def _to_image(self, data): extra = { "distro": data["distro"], "version": data["version"], "supported_sizes": data.get("provisionable_on", []), } return NodeImage(id=data["slug"], name=data["name"], extra=extra, driver=self) def _to_location(self, data): extra = data return NodeLocation( id=data["id"], name=data["name"], country=None, driver=self, extra=extra ) def _to_size(self, data): try: cpus = data["specs"]["cpus"][0].get("count") except KeyError: cpus = None regions = [ region.get("href").replace("/metal/v1/facilities/", "") for region in data.get("available_in", []) ] extra = { "description": data["description"], "line": data["line"], "cpus": cpus, "regions": regions, } try: ram = int(data["specs"]["memory"]["total"].replace("GB", "")) * 1024 # noqa except KeyError: ram = None disk = None if data["specs"].get("drives", ""): disk = 0 for disks in data["specs"]["drives"]: disk_size = disks["size"].replace("GB", "") if "TB" in disk_size: disk_size = float(disks["size"].replace("TB", "")) * 1000 disk += disks["count"] * int(disk_size) name = "{} - {} RAM".format(data.get("name"), ram) price = data["pricing"].get("hour") return NodeSize( id=data["slug"], name=name, ram=ram, disk=disk, bandwidth=0, price=price, extra=extra, driver=self, ) def _to_key_pairs(self, data): extra = { "label": data["label"], "created_at": data["created_at"], "updated_at": data["updated_at"], } return KeyPair( name=data["id"], fingerprint=data["fingerprint"], public_key=data["key"], private_key=None, driver=self, extra=extra, ) def _parse_ips(self, data): public_ips = [] private_ips = [] for address in data: if "address" in address and address["address"] is not None: if "public" in address and address["public"] is True: public_ips.append(address["address"]) else: private_ips.append(address["address"]) return {"public": public_ips, "private": private_ips}
[docs] def ex_get_bgp_config_for_project(self, ex_project_id): path = "/metal/v1/projects/%s/bgp-config" % ex_project_id return self.connection.request(path).object
[docs] def ex_get_bgp_config(self, ex_project_id=None): if ex_project_id: projects = [ex_project_id] elif self.project_id: projects = [self.project_id] else: projects = [p.id for p in self.projects] retval = [] for p in projects: config = self.ex_get_bgp_config_for_project(p) if config: retval.append(config) return retval
[docs] def ex_get_bgp_session(self, session_uuid): path = "/metal/v1/bgp/sessions/%s" % session_uuid return self.connection.request(path).object
[docs] def ex_list_bgp_sessions_for_node(self, node): path = "/metal/v1/devices/%s/bgp/sessions" % node.id return self.connection.request(path).object
[docs] def ex_list_bgp_sessions_for_project(self, ex_project_id): path = "/metal/v1/projects/%s/bgp/sessions" % ex_project_id return self.connection.request(path).object
[docs] def ex_list_bgp_sessions(self, ex_project_id=None): if ex_project_id: projects = [ex_project_id] elif self.project_id: projects = [self.project_id] else: projects = [p.id for p in self.projects] retval = [] for p in projects: retval.extend(self.ex_list_bgp_sessions_for_project(p)["bgp_sessions"]) return retval
[docs] def ex_create_bgp_session(self, node, address_family="ipv4"): path = "/metal/v1/devices/%s/bgp/sessions" % node.id params = {"address_family": address_family} res = self.connection.request(path, params=params, method="POST") return res.object
[docs] def ex_delete_bgp_session(self, session_uuid): path = "/metal/v1/bgp/sessions/%s" % session_uuid res = self.connection.request(path, method="DELETE") return res.status == httplib.OK # or res.status == httplib.NO_CONTENT
[docs] def ex_list_events_for_node(self, node, include=None, page=1, per_page=10): path = "/metal/v1/devices/%s/events" % node.id params = {"include": include, "page": page, "per_page": per_page} return self.connection.request(path, params=params).object
[docs] def ex_list_events_for_project(self, project, include=None, page=1, per_page=10): path = "/metal/v1/projects/%s/events" % project.id params = {"include": include, "page": page, "per_page": per_page} return self.connection.request(path, params=params).object
[docs] def ex_describe_all_addresses(self, ex_project_id=None, only_associated=False): if ex_project_id: projects = [ex_project_id] elif self.project_id: projects = [self.project_id] else: projects = [p.id for p in self.projects] retval = [] for project in projects: retval.extend(self.ex_describe_all_addresses_for_project(project, only_associated)) return retval
[docs] def ex_describe_all_addresses_for_project( self, ex_project_id, include=None, only_associated=False ): """ Returns all the reserved IP addresses for this project optionally, returns only addresses associated with nodes. :param only_associated: If true, return only the addresses that are associated with an instance. :type only_associated: ``bool`` :return: List of IP addresses. :rtype: ``list`` of :class:`dict` """ path = "/metal/v1/projects/%s/ips" % ex_project_id params = { "include": include, } ip_addresses = self.connection.request(path, params=params).object result = [ a for a in ip_addresses.get("ip_addresses", []) if not only_associated or len(a.get("assignments", [])) > 0 ] return result
[docs] def ex_describe_address(self, ex_address_id, include=None): path = "/metal/v1/ips/%s" % ex_address_id params = { "include": include, } result = self.connection.request(path, params=params).object return result
[docs] def ex_request_address_reservation( self, ex_project_id, location_id=None, address_family="global_ipv4", quantity=1, comments="", customdata="", ): path = "/metal/v1/projects/%s/ips" % ex_project_id params = { "type": address_family, "quantity": quantity, } if location_id: params["facility"] = location_id if comments: params["comments"] = comments if customdata: params["customdata"] = customdata result = self.connection.request(path, params=params, method="POST").object return result
[docs] def ex_associate_address_with_node(self, node, address, manageable=False, customdata=""): path = "/metal/v1/devices/%s/ips" % node.id params = { "address": address, "manageable": manageable, "customdata": customdata, } result = self.connection.request(path, params=params, method="POST").object return result
[docs] def ex_disassociate_address(self, address_uuid, include=None): path = "/metal/v1/ips/%s" % address_uuid params = {} if include: params["include"] = include result = self.connection.request(path, params=params, method="DELETE").object return result
[docs] def list_volumes(self, ex_project_id=None): if ex_project_id: return self.ex_list_volumes_for_project(ex_project_id=ex_project_id) # if project has been specified during driver initialization, then # return nodes for this project only if self.project_id: return self.ex_list_volumes_for_project(ex_project_id=self.project_id) # In case of Python2 perform requests serially if not use_asyncio(): nodes = [] for project in self.projects: nodes.extend(self.ex_list_volumes_for_project(ex_project_id=project.id)) return nodes # In case of Python3 use asyncio to perform requests in parallel return self.list_resources_async("volumes")
[docs] def ex_list_volumes_for_project(self, ex_project_id, include="plan", page=1, per_page=1000): params = {"include": include, "page": page, "per_page": per_page} data = self.connection.request( "/metal/v1/projects/%s/storage" % (ex_project_id), params=params ).object["volumes"] return list(map(self._to_volume, data))
def _to_volume(self, data): return StorageVolume( id=data["id"], name=data["name"], size=data["size"], driver=self, extra=data )
[docs] def create_volume( self, size, location, plan="storage_1", description="", ex_project_id=None, locked=False, billing_cycle=None, customdata="", snapshot_policies=None, **kwargs, ): """ Create a new volume. :param size: Size of volume in gigabytes (required) :type size: ``int`` :param location: Which data center to create a volume in. If empty, undefined behavior will be selected. (optional) :type location: :class:`.NodeLocation` :return: The newly created volume. :rtype: :class:`StorageVolume` """ path = "/metal/v1/projects/%s/storage" % (ex_project_id or self.projects[0].id) try: facility = location.extra["code"] except AttributeError: facility = location params = {"facility": facility, "plan": plan, "size": size, "locked": locked} params.update(kwargs) if description: params["description"] = description if customdata: params["customdata"] = customdata if billing_cycle: params["billing_cycle"] = billing_cycle if snapshot_policies: params["snapshot_policies"] = snapshot_policies data = self.connection.request(path, params=params, method="POST").object return self._to_volume(data)
[docs] def destroy_volume(self, volume): """ Destroys a storage volume. :param volume: Volume to be destroyed :type volume: :class:`StorageVolume` :rtype: ``bool`` """ path = "/metal/v1/storage/%s" % volume.id res = self.connection.request(path, method="DELETE") return res.status == httplib.NO_CONTENT
[docs] def attach_volume(self, node, volume): """ Attaches volume to node. :param node: Node to attach volume to. :type node: :class:`.Node` :param volume: Volume to attach. :type volume: :class:`.StorageVolume` :rytpe: ``bool`` """ path = "/metal/v1/storage/%s/attachments" % volume.id params = {"device_id": node.id} res = self.connection.request(path, params=params, method="POST") return res.status == httplib.OK
[docs] def detach_volume(self, volume, ex_node=None, ex_attachment_id=""): """ Detaches a volume from a node. :param volume: Volume to be detached :type volume: :class:`.StorageVolume` :param ex_attachment_id: Attachment id to be detached, if empty detach all attachments :type name: ``str`` :rtype: ``bool`` """ path = "/metal/v1/storage/%s/attachments" % volume.id attachments = volume.extra["attachments"] assert len(attachments) > 0, "Volume is not attached to any node" success = True result = None for attachment in attachments: if not ex_attachment_id or ex_attachment_id in attachment["href"]: attachment_id = attachment["href"].split("/")[-1] if ex_node: node_id = self.ex_describe_attachment(attachment_id)["device"]["href"].split( "/" )[-1] if node_id != ex_node.id: continue path = "/metal/v1/storage/attachments/%s" % (ex_attachment_id or attachment_id) result = self.connection.request(path, method="DELETE") success = success and result.status == httplib.NO_CONTENT return result and success
[docs] def create_volume_snapshot(self, volume, name=""): """ Create a new volume snapshot. :param volume: Volume to create a snapshot for :type volume: class:`StorageVolume` :return: The newly created volume snapshot. :rtype: :class:`VolumeSnapshot` """ path = "/metal/v1/storage/%s/snapshots" % volume.id res = self.connection.request(path, method="POST") assert res.status == httplib.ACCEPTED return volume.list_snapshots()[-1]
[docs] def destroy_volume_snapshot(self, snapshot): """ Delete a volume snapshot :param snapshot: volume snapshot to delete :type snapshot: class:`VolumeSnapshot` :rtype: ``bool`` """ volume_id = snapshot.extra["volume"]["href"].split("/")[-1] path = "/metal/v1/storage/{}/snapshots/{}".format(volume_id, snapshot.id) res = self.connection.request(path, method="DELETE") return res.status == httplib.NO_CONTENT
[docs] def list_volume_snapshots(self, volume, include=""): """ List snapshots for a volume. :param volume: Volume to list snapshots for :type volume: class:`StorageVolume` :return: List of volume snapshots. :rtype: ``list`` of :class: `VolumeSnapshot` """ path = "/metal/v1/storage/%s/snapshots" % volume.id params = {} if include: params["include"] = include data = self.connection.request(path, params=params).object["snapshots"] return list(map(self._to_volume_snapshot, data))
def _to_volume_snapshot(self, data): created = datetime.datetime.strptime(data["created_at"], "%Y-%m-%dT%H:%M:%S") return VolumeSnapshot( id=data["id"], name=data["id"], created=created, state=data["status"], driver=self, extra=data, )
[docs] def ex_modify_volume( self, volume, description=None, size=None, locked=None, billing_cycle=None, customdata=None, ): path = "/metal/v1/storage/%s" % volume.id params = {} if description: params["description"] = description if size: params["size"] = size if locked is not None: params["locked"] = locked if billing_cycle: params["billing_cycle"] = billing_cycle res = self.connection.request(path, params=params, method="PUT") return self._to_volume(res.object)
[docs] def ex_restore_volume(self, snapshot): volume_id = snapshot.extra["volume"]["href"].split("/")[-1] ts = snapshot.extra["timestamp"] path = "/metal/v1/storage/{}/restore?restore_point={}".format(volume_id, ts) res = self.connection.request(path, method="POST") return res.status == httplib.NO_CONTENT
[docs] def ex_clone_volume(self, volume, snapshot=None): path = "/metal/v1/storage/%s/clone" % volume.id if snapshot: path += "?snapshot_timestamp=%s" % snapshot.extra["timestamp"] res = self.connection.request(path, method="POST") return res.status == httplib.NO_CONTENT
[docs] def ex_describe_volume(self, volume_id): path = "/metal/v1/storage/%s" % volume_id data = self.connection.request(path).object return self._to_volume(data)
[docs] def ex_describe_attachment(self, attachment_id): path = "/metal/v1/storage/attachments/%s" % attachment_id data = self.connection.request(path).object return data
[docs]class Project: def __init__(self, project): self.id = project.get("id") self.name = project.get("name") self.extra = {} self.extra["max_devices"] = project.get("max_devices") self.extra["payment_method"] = project.get("payment_method") self.extra["created_at"] = project.get("created_at") self.extra["credit_amount"] = project.get("credit_amount") self.extra["devices"] = project.get("devices") self.extra["invitations"] = project.get("invitations") self.extra["memberships"] = project.get("memberships") self.extra["href"] = project.get("href") self.extra["members"] = project.get("members") self.extra["ssh_keys"] = project.get("ssh_keys") def __repr__(self): return ("<Project: id=%s, name=%s>") % (self.id, self.name)