Source code for libcloud.container.drivers.kubernetes

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import hashlib
import datetime
from typing import Any, Dict, List, Union, Optional
from collections import OrderedDict

from libcloud.compute.base import Node, NodeSize, NodeImage
from libcloud.compute.types import NodeState
from libcloud.container.base import Container, ContainerImage, ContainerDriver, ContainerCluster
from libcloud.container.types import ContainerState
from libcloud.common.exceptions import BaseHTTPError
from libcloud.common.kubernetes import (
    KubernetesException,
    KubernetesDriverMixin,
    KubernetesBasicAuthConnection,
)
from libcloud.container.providers import Provider

__all__ = [
    "KubernetesContainerDriver",
    "to_n_bytes",
    "to_memory_str",
    "to_cpu_str",
    "to_n_cpus",
]


ROOT_URL = "/api/"


K8S_UNIT_MAP = OrderedDict(
    {
        "K": 1000,
        "Ki": 1024,
        "M": 1000 * 1000,
        "Mi": 1024 * 1024,
        "G": 1000 * 1000 * 1000,
        "Gi": 1024 * 1024 * 1024,
    }
)


[docs]def to_n_bytes(memory_str: str) -> int: """Convert memory string to number of bytes (e.g. '1234Mi'-> 1293942784) """ if memory_str.startswith("0"): return 0 if memory_str.isnumeric(): return int(memory_str) for unit, multiplier in K8S_UNIT_MAP.items(): if memory_str.endswith(unit): return int(memory_str.strip(unit)) * multiplier
[docs]def to_memory_str(n_bytes: int, unit: Optional[str] = None) -> str: """Convert number of bytes to k8s memory string (e.g. 1293942784 -> '1234Mi') """ if n_bytes == 0: return "0K" n_bytes = int(n_bytes) memory_str = None if unit is None: for unit, multiplier in reversed(K8S_UNIT_MAP.items()): converted_n_bytes_float = n_bytes / multiplier converted_n_bytes = n_bytes // multiplier memory_str = f"{converted_n_bytes}{unit}" if converted_n_bytes_float % 1 == 0: break elif K8S_UNIT_MAP.get(unit): memory_str = f"{n_bytes // K8S_UNIT_MAP[unit]}{unit}" return memory_str
[docs]def to_cpu_str(n_cpus: Union[int, float]) -> str: """Convert number of cpus to cpu string (e.g. 0.5 -> '500m') """ if n_cpus == 0: return "0" millicores = n_cpus * 1000 if millicores % 1 == 0: return f"{int(millicores)}m" microcores = n_cpus * 1000000 if microcores % 1 == 0: return f"{int(microcores)}u" nanocores = n_cpus * 1000000000 return f"{int(nanocores)}n"
[docs]def to_n_cpus(cpu_str: str) -> Union[int, float]: """Convert cpu string to number of cpus (e.g. '500m' -> 0.5, '2000000000n' -> 2) """ if cpu_str.endswith("n"): return int(cpu_str.strip("n")) / 1000000000 elif cpu_str.endswith("u"): return int(cpu_str.strip("u")) / 1000000 elif cpu_str.endswith("m"): return int(cpu_str.strip("m")) / 1000 elif cpu_str.isnumeric(): return int(cpu_str) else: return 0
def sum_resources(*resource_dicts): total_cpu = 0 total_memory = 0 for rd in resource_dicts: total_cpu += to_n_cpus(rd.get("cpu", "0m")) total_memory += to_n_bytes(rd.get("memory", "0K")) return {"cpu": to_cpu_str(total_cpu), "memory": to_memory_str(total_memory)} class KubernetesDeployment: def __init__( self, id: str, name: str, namespace: str, created_at: str, replicas: int, selector: Dict[str, Any], extra: Optional[Dict[str, Any]] = None, ): self.id = id self.name = name self.namespace = namespace self.created_at = created_at self.replicas = replicas self.selector = selector self.extra = extra or {} def __repr__(self): return "<KubernetesDeployment name={} namespace={} replicas={}>".format( self.name, self.namespace, self.replicas, ) class KubernetesPod: def __init__( self, id: str, name: str, containers: List[Container], namespace: str, state: str, ip_addresses: List[str], created_at: datetime.datetime, node_name: str, extra: Dict[str, Any], ): """ A Kubernetes pod """ self.id = id self.name = name self.containers = containers self.namespace = namespace self.state = state self.ip_addresses = ip_addresses self.created_at = created_at self.node_name = node_name self.extra = extra def __repr__(self): return "<KubernetesPod name={} namespace={} state={}>".format( self.name, self.namespace, self.state, ) class KubernetesNamespace(ContainerCluster): """ A Kubernetes namespace """ def __repr__(self): return "<KubernetesNamespace name={}>".format(self.name)
[docs]class KubernetesContainerDriver(KubernetesDriverMixin, ContainerDriver): type = Provider.KUBERNETES name = "Kubernetes" website = "http://kubernetes.io" connectionCls = KubernetesBasicAuthConnection supports_clusters = True
[docs] def list_containers(self, image=None, all=True) -> List[Container]: """ List the deployed container images :param image: Filter to containers with a certain image(unused) :type image: :class:`libcloud.container.base.ContainerImage` :param all: Show all container (unused) :type all: ``bool`` :rtype: ``list`` of :class:`libcloud.container.base.Container` """ try: result = self.connection.request(ROOT_URL + "v1/pods").object except Exception as exc: errno = getattr(exc, "errno", None) if errno == 111: raise KubernetesException( errno, "Make sure kube host is accessible" "and the API port is correct", ) raise pods = [self._to_pod(value) for value in result["items"]] containers = [] for pod in pods: containers.extend(pod.containers) return containers
[docs] def get_container(self, id: str) -> Container: """ Get a container by ID :param id: The ID of the container to get :type id: ``str`` :rtype: :class:`libcloud.container.base.Container` """ containers = self.list_containers() match = [container for container in containers if container.id == id] return match[0]
[docs] def list_namespaces(self) -> List[KubernetesNamespace]: """ Get a list of namespaces that pods can be deployed into :rtype: ``list`` of :class:`.KubernetesNamespace` """ try: result = self.connection.request(ROOT_URL + "v1/namespaces/").object except Exception as exc: errno = getattr(exc, "errno", None) if errno == 111: raise KubernetesException( errno, "Make sure kube host is accessible" "and the API port is correct", ) raise namespaces = [self._to_namespace(value) for value in result["items"]] return namespaces
[docs] def get_namespace(self, id: str) -> KubernetesNamespace: """ Get a namespace by ID :param id: The ID of the namespace to get :type id: ``str`` :rtype: :class:`.KubernetesNamespace` """ result = self.connection.request(ROOT_URL + "v1/namespaces/%s" % id).object return self._to_namespace(result)
[docs] def delete_namespace(self, namespace: KubernetesNamespace) -> bool: """ Delete a namespace :return: ``True`` if the destroy was successful, otherwise ``False``. :rtype: ``bool`` """ self.connection.request( ROOT_URL + "v1/namespaces/%s" % namespace.id, method="DELETE" ).object return True
[docs] def create_namespace(self, name: str) -> KubernetesNamespace: """ Create a namespace :param name: The name of the namespace :type name: ``str`` :rtype: :class:`.KubernetesNamespace` """ request = {"metadata": {"name": name}} result = self.connection.request( ROOT_URL + "v1/namespaces", method="POST", data=json.dumps(request) ).object return self._to_namespace(result)
[docs] def deploy_container( self, name: str, image: ContainerImage, namespace: KubernetesNamespace = None, parameters: Optional[str] = None, start: Optional[bool] = True, ): """ Deploy an installed container image. In kubernetes this deploys a single container Pod. https://cloud.google.com/container-engine/docs/pods/single-container :param name: The name of the new container :type name: ``str`` :param image: The container image to deploy :type image: :class:`.ContainerImage` :param namespace: The namespace to deploy to, None is default :type namespace: :class:`.KubernetesNamespace` :param parameters: Container Image parameters(unused) :type parameters: ``str`` :param start: Start the container on deployment(unused) :type start: ``bool`` :rtype: :class:`.Container` """ if namespace is None: namespace = "default" else: namespace = namespace.id request = { "metadata": {"name": name}, "spec": {"containers": [{"name": name, "image": image.name}]}, } result = self.connection.request( ROOT_URL + "v1/namespaces/%s/pods" % namespace, method="POST", data=json.dumps(request), ).object return self._to_namespace(result)
[docs] def destroy_container(self, container: Container) -> bool: """ Destroy a deployed container. Because the containers are single container pods, this will delete the pod. :param container: The container to destroy :type container: :class:`.Container` :rtype: ``bool`` """ return self.ex_destroy_pod(container.extra["namespace"], container.extra["pod"])
[docs] def ex_list_pods(self, fetch_metrics: bool = False) -> List[KubernetesPod]: """ List available Pods :param fetch_metrics: Fetch metrics for pods :type fetch_metrics: ``bool`` :rtype: ``list`` of :class:`.KubernetesPod` """ result = self.connection.request(ROOT_URL + "v1/pods").object metrics = None if fetch_metrics: try: metrics = { ( metric["metadata"]["name"], metric["metadata"]["namespace"], ): metric["containers"] for metric in self.ex_list_pods_metrics() } except BaseHTTPError: # Metrics Server may not be installed pass return [self._to_pod(value, metrics=metrics) for value in result["items"]]
[docs] def ex_destroy_pod(self, namespace: str, pod_name: str) -> bool: """ Delete a pod and the containers within it. :param namespace: The pod's namespace :type namespace: ``str`` :param pod_name: Name of the pod to destroy :type pod_name: ``str`` :rtype: ``bool`` """ self.connection.request( ROOT_URL + "v1/namespaces/{}/pods/{}".format(namespace, pod_name), method="DELETE", ).object return True
[docs] def ex_list_nodes(self) -> List[Node]: """ List available Nodes :rtype: ``list`` of :class:`.Node` """ result = self.connection.request(ROOT_URL + "v1/nodes").object return [self._to_node(node) for node in result["items"]]
[docs] def ex_destroy_node(self, node_name: str) -> bool: """ Destroy a node. :param node_name: Name of the node to destroy :type node_name: ``str`` :rtype: ``bool`` """ self.connection.request(ROOT_URL + f"v1/nodes/{node_name}", method="DELETE").object return True
[docs] def ex_get_version(self) -> str: """Get Kubernetes version :rtype: ``str`` """ return self.connection.request("/version").object["gitVersion"]
[docs] def ex_list_nodes_metrics(self) -> List[Dict[str, Any]]: """Get nodes metrics from Kubernetes Metrics Server :rtype: ``list`` of ``dict`` """ return self.connection.request("/apis/metrics.k8s.io/v1beta1/nodes").object["items"]
[docs] def ex_list_pods_metrics(self) -> List[Dict[str, Any]]: """Get pods metrics from Kubernetes Metrics Server :rtype: ``list`` of ``dict`` """ return self.connection.request("/apis/metrics.k8s.io/v1beta1/pods").object["items"]
[docs] def ex_list_services(self) -> List[Dict[str, Any]]: """Get cluster services :rtype: ``list`` of ``dict`` """ return self.connection.request(ROOT_URL + "v1/services").object["items"]
[docs] def ex_list_deployments(self) -> List[KubernetesDeployment]: """Get cluster deployments :rtype: ``list`` of :class:`.KubernetesDeployment` """ items = self.connection.request("/apis/apps/v1/deployments").object["items"] return [self._to_deployment(item) for item in items]
def _to_deployment(self, data): id_ = data["metadata"]["uid"] name = data["metadata"]["name"] namespace = data["metadata"]["namespace"] created_at = data["metadata"]["creationTimestamp"] replicas = data["spec"]["replicas"] selector = data["spec"]["selector"] extra = { "labels": data["metadata"]["labels"], "strategy": data["spec"]["strategy"]["type"], "total_replicas": data["status"]["replicas"], "updated_replicas": data["status"]["updatedReplicas"], "ready_replicas": data["status"]["readyReplicas"], "available_replicas": data["status"]["availableReplicas"], "conditions": data["status"]["conditions"], } return KubernetesDeployment( id=id_, name=name, namespace=namespace, created_at=created_at, replicas=replicas, selector=selector, extra=extra, ) def _to_node(self, data): """ Convert an API node data object to a `Node` object """ ID = data["metadata"]["uid"] name = data["metadata"]["name"] driver = self.connection.driver memory = data["status"].get("capacity", {}).get("memory", "0K") cpu = data["status"].get("capacity", {}).get("cpu", "1") if isinstance(cpu, str) and not cpu.isnumeric(): cpu = to_n_cpus(cpu) image_name = data["status"]["nodeInfo"].get("osImage") image = NodeImage(image_name, image_name, driver) size_name = f"{cpu} vCPUs, {memory} Ram" size_id = hashlib.md5(size_name.encode("utf-8")).hexdigest() extra_size = {"cpus": cpu} size = NodeSize( id=size_id, name=size_name, ram=memory, disk=0, bandwidth=0, price=0, driver=driver, extra=extra_size, ) extra = {"memory": memory, "cpu": cpu} extra["os"] = data["status"]["nodeInfo"].get("operatingSystem") extra["kubeletVersion"] = data["status"]["nodeInfo"]["kubeletVersion"] extra["provider_id"] = data.get("spec", {}).get("providerID") for condition in data["status"]["conditions"]: if condition["type"] == "Ready" and condition["status"] == "True": state = NodeState.RUNNING break else: state = NodeState.UNKNOWN public_ips, private_ips = [], [] for address in data["status"]["addresses"]: if address["type"] == "InternalIP": private_ips.append(address["address"]) elif address["type"] == "ExternalIP": public_ips.append(address["address"]) created_at = datetime.datetime.strptime( data["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ" ) return Node( id=ID, name=name, state=state, public_ips=public_ips, private_ips=private_ips, driver=driver, image=image, size=size, extra=extra, created_at=created_at, ) def _to_pod(self, data, metrics=None): """ Convert an API response to a Pod object """ id_ = data["metadata"]["uid"] name = data["metadata"]["name"] namespace = data["metadata"]["namespace"] state = data["status"]["phase"].lower() node_name = data["spec"].get("nodeName") container_statuses = data["status"].get("containerStatuses", {}) containers = [] extra = {"resources": {}} if metrics: try: extra["metrics"] = metrics[name, namespace] except KeyError: pass # response contains the status of the containers in a separate field for container in data["spec"]["containers"]: if container_statuses: spec = list(filter(lambda i: i["name"] == container["name"], container_statuses))[0] else: spec = container_statuses container_obj = self._to_container(container, spec, data) # Calculate new resources resources = extra["resources"] container_resources = container_obj.extra.get("resources", {}) resources["limits"] = sum_resources( resources.get("limits", {}), container_resources.get("limits", {}) ) extra["resources"]["requests"] = sum_resources( resources.get("requests", {}), container_resources.get("requests", {}) ) containers.append(container_obj) ip_addresses = [ip_dict["ip"] for ip_dict in data["status"].get("podIPs", [])] created_at = datetime.datetime.strptime( data["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ" ) return KubernetesPod( id=id_, name=name, namespace=namespace, state=state, ip_addresses=ip_addresses, containers=containers, created_at=created_at, node_name=node_name, extra=extra, ) def _to_container(self, data, container_status, pod_data): """ Convert container in Container instances """ state = container_status.get("state") created_at = None if state: started_at = list(state.values())[0].get("startedAt") if started_at: created_at = datetime.datetime.strptime(started_at, "%Y-%m-%dT%H:%M:%SZ") extra = { "pod": pod_data["metadata"]["name"], "namespace": pod_data["metadata"]["namespace"], } resources = data.get("resources") if resources: extra["resources"] = resources return Container( id=container_status.get("containerID") or data["name"], name=data["name"], image=ContainerImage( id=container_status.get("imageID") or data["image"], name=data["image"], path=None, version=None, driver=self.connection.driver, ), ip_addresses=None, state=(ContainerState.RUNNING if container_status else ContainerState.UNKNOWN), driver=self.connection.driver, created_at=created_at, extra=extra, ) def _to_namespace(self, data): """ Convert an API node data object to a `KubernetesNamespace` object """ return KubernetesNamespace( id=data["metadata"]["name"], name=data["metadata"]["name"], driver=self.connection.driver, extra={"phase": data["status"]["phase"]}, )
def ts_to_str(timestamp): """ Return a timestamp as a nicely formatted datetime string. """ date = datetime.datetime.fromtimestamp(timestamp) date_string = date.strftime("%d/%m/%Y %H:%M %Z") return date_string