# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import datetime

    import simplejson as json
    import json

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b

from libcloud.common.base import JsonResponse, ConnectionUserAndKey
from libcloud.common.types import InvalidCredsError

from libcloud.container.base import (Container, ContainerDriver,
                                     ContainerImage, ContainerCluster)

from libcloud.container.providers import Provider
from libcloud.container.types import ContainerState

VALID_RESPONSE_CODES = [httplib.OK, httplib.ACCEPTED, httplib.CREATED,

ROOT_URL = '/api/'

[docs]class KubernetesResponse(JsonResponse): valid_response_codes = [httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT]
[docs] def parse_error(self): if self.status == 401: raise InvalidCredsError('Invalid credentials') return self.body
[docs] def success(self): return self.status in self.valid_response_codes
[docs]class KubernetesException(Exception): def __init__(self, code, message): self.code = code self.message = message self.args = (code, message) def __str__(self): return "%s %s" % (self.code, self.message) def __repr__(self): return "KubernetesException %s %s" % (self.code, self.message)
[docs]class KubernetesConnection(ConnectionUserAndKey): responseCls = KubernetesResponse timeout = 60
[docs] def add_default_headers(self, headers): """ Add parameters that are necessary for every request If user and password are specified, include a base http auth header """ headers['Content-Type'] = 'application/json' if self.key and self.secret: user_b64 = base64.b64encode(b('%s:%s' % (self.key, self.secret))) headers['Authorization'] = 'Basic %s' % (user_b64.decode('utf-8')) return headers
[docs]class KubernetesPod(object): def __init__(self, name, containers, namespace): """ A Kubernetes pod """ = name self.containers = containers self.namespace = namespace
[docs]class KubernetesContainerDriver(ContainerDriver): type = Provider.KUBERNETES name = 'Kubernetes' website = '' connectionCls = KubernetesConnection supports_clusters = True def __init__(self, key=None, secret=None, secure=False, host='localhost', port=4243): """ :param key: API key or username to used (required) :type key: ``str`` :param secret: Secret password to be used (required) :type secret: ``str`` :param secure: Whether to use HTTPS or HTTP. Note: Some providers only support HTTPS, and it is on by default. :type secure: ``bool`` :param host: Override hostname used for connections. :type host: ``str`` :param port: Override port used for connections. :type port: ``int`` :return: ``None`` """ super(KubernetesContainerDriver, self).__init__(key=key, secret=secret, secure=secure, host=host, port=port) if host is not None: if host.startswith('https://'): secure = True # strip the prefix prefixes = ['http://', 'https://'] for prefix in prefixes: if host.startswith(prefix): host = host.strip(prefix) = host self.connection.port = port = secure self.connection.key = key self.connection.secret = secret
[docs] def list_containers(self, image=None, all=True): """ List the deployed container images :param image: Filter to containers with a certain image :type image: :class:`libcloud.container.base.ContainerImage` :param all: Show all container (including stopped ones) :type all: ``bool`` :rtype: ``list`` of :class:`libcloud.container.base.Container` """ try: result = self.connection.request( ROOT_URL + "v1/pods").object except Exception as exc: errno = getattr(exc, 'errno', None) if errno == 111: raise KubernetesException( errno, 'Make sure kube host is accessible' 'and the API port is correct') raise pods = [self._to_pod(value) for value in result['items']] containers = [] for pod in pods: containers.extend(pod.containers) return containers
[docs] def get_container(self, id): """ Get a container by ID :param id: The ID of the container to get :type id: ``str`` :rtype: :class:`libcloud.container.base.Container` """ containers = self.list_containers() match = [container for container in containers if == id] return match[0]
[docs] def list_clusters(self): """ Get a list of namespaces that pods can be deployed into :param location: The location to search in :type location: :class:`libcloud.container.base.ClusterLocation` :rtype: ``list`` of :class:`libcloud.container.base.ContainerCluster` """ try: result = self.connection.request( ROOT_URL + "v1/namespaces/").object except Exception as exc: errno = getattr(exc, 'errno', None) if errno == 111: raise KubernetesException( errno, 'Make sure kube host is accessible' 'and the API port is correct') raise clusters = [self._to_cluster(value) for value in result['items']] return clusters
[docs] def get_cluster(self, id): """ Get a cluster by ID :param id: The ID of the cluster to get :type id: ``str`` :rtype: :class:`libcloud.container.base.ContainerCluster` """ result = self.connection.request(ROOT_URL + "v1/namespaces/%s" % id).object return self._to_cluster(result)
[docs] def destroy_cluster(self, cluster): """ Delete a cluster (namespace) :return: ``True`` if the destroy was successful, otherwise ``False``. :rtype: ``bool`` """ self.connection.request(ROOT_URL + "v1/namespaces/%s" %, method='DELETE').object return True
[docs] def create_cluster(self, name, location=None): """ Create a container cluster (a namespace) :param name: The name of the cluster :type name: ``str`` :param location: The location to create the cluster in :type location: :class:`.ClusterLocation` :rtype: :class:`.ContainerCluster` """ request = { 'metadata': { 'name': name } } result = self.connection.request(ROOT_URL + "v1/namespaces", method='POST', data=json.dumps(request)).object return self._to_cluster(result)
[docs] def deploy_container(self, name, image, cluster=None, parameters=None, start=True): """ Deploy an installed container image. In kubernetes this deploys a single container Pod. :param name: The name of the new container :type name: ``str`` :param image: The container image to deploy :type image: :class:`.ContainerImage` :param cluster: The cluster to deploy to, None is default :type cluster: :class:`.ContainerCluster` :param parameters: Container Image parameters :type parameters: ``str`` :param start: Start the container on deployment :type start: ``bool`` :rtype: :class:`.Container` """ if cluster is None: namespace = 'default' else: namespace = request = { "metadata": { "name": name }, "spec": { "containers": [ { "name": name, "image": } ] } } result = self.connection.request(ROOT_URL + "v1/namespaces/%s/pods" % namespace, method='POST', data=json.dumps(request)).object return self._to_cluster(result)
[docs] def destroy_container(self, container): """ Destroy a deployed container. Because the containers are single container pods, this will delete the pod. :param container: The container to destroy :type container: :class:`.Container` :rtype: ``bool`` """ return self.ex_destroy_pod(container.extra['namespace'], container.extra['pod'])
[docs] def ex_list_pods(self): """ List available Pods :rtype: ``list`` of :class:`.KubernetesPod` """ result = self.connection.request(ROOT_URL + "v1/pods").object return [self._to_pod(value) for value in result['items']]
[docs] def ex_destroy_pod(self, namespace, pod_name): """ Delete a pod and the containers within it. """ self.connection.request( ROOT_URL + "v1/namespaces/%s/pods/%s" % ( namespace, pod_name), method='DELETE').object return True
def _to_pod(self, data): """ Convert an API response to a Pod object """ container_statuses = data['status']['containerStatuses'] containers = [] # response contains the status of the containers in a separate field for container in data['spec']['containers']: spec = list(filter(lambda i: i['name'] == container['name'], container_statuses))[0] containers.append( self._to_container(container, spec, data) ) return KubernetesPod( name=data['metadata']['name'], namespace=data['metadata']['namespace'], containers=containers) def _to_container(self, data, container_status, pod_data): """ Convert container in Container instances """ return Container( id=container_status['containerID'], name=data['name'], image=ContainerImage( id=container_status['imageID'], name=data['image'], path=None, version=None, driver=self.connection.driver), ip_addresses=None, state=ContainerState.RUNNING, driver=self.connection.driver, extra={ 'pod': pod_data['metadata']['name'], 'namespace': pod_data['metadata']['namespace'] }) def _to_cluster(self, data): """ Convert namespace to a cluster """ metadata = data['metadata'] status = data['status'] return ContainerCluster( id=metadata['name'], name=metadata['name'], driver=self.connection.driver, extra={'phase': status['phase']})
[docs]def ts_to_str(timestamp): """ Return a timestamp as a nicely formated datetime string. """ date = datetime.datetime.fromtimestamp(timestamp) date_string = date.strftime("%d/%m/%Y %H:%M %Z") return date_string