Source code for libcloud.common.kubernetes

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module which contains common Kubernetes related code.
"""

from typing import Optional

import os
import base64
import warnings

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b

from libcloud.common.base import JsonResponse, ConnectionUserAndKey
from libcloud.common.base import KeyCertificateConnection, ConnectionKey
from libcloud.common.types import InvalidCredsError

__all__ = [
    'KubernetesException',

    'KubernetesBasicAuthConnection',
    'KubernetesTLSAuthConnection',
    'KubernetesTokenAuthConnection',

    'KubernetesDriverMixin',

    'VALID_RESPONSE_CODES'
]

VALID_RESPONSE_CODES = [httplib.OK, httplib.ACCEPTED, httplib.CREATED,
                        httplib.NO_CONTENT]


[docs]class KubernetesException(Exception): def __init__(self, code, message): self.code = code self.message = message self.args = (code, message) def __str__(self): return "%s %s" % (self.code, self.message) def __repr__(self): return "KubernetesException %s %s" % (self.code, self.message)
class KubernetesResponse(JsonResponse): valid_response_codes = [httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT] def parse_error(self): if self.status == 401: raise InvalidCredsError('Invalid credentials') return self.body def success(self): return self.status in self.valid_response_codes
[docs]class KubernetesTLSAuthConnection(KeyCertificateConnection): responseCls = KubernetesResponse timeout = 60 def __init__(self, key, secure=True, host='localhost', port='6443', key_file=None, cert_file=None, **kwargs): super(KubernetesTLSAuthConnection, self).__init__( key_file=key_file, cert_file=cert_file, secure=secure, host=host, port=port, url=None, proxy_url=None, timeout=None, backoff=None, retry_delay=None) if key_file: keypath = os.path.expanduser(key_file) is_file_path = os.path.exists(keypath) and os.path.isfile(keypath) if not is_file_path: raise InvalidCredsError( 'You need an key PEM file to authenticate ' 'via tls. For more info please visit:' 'https://kubernetes.io/docs/concepts/' 'cluster-administration/certificates/') self.key_file = key_file certpath = os.path.expanduser(cert_file) is_file_path = os.path.exists( certpath) and os.path.isfile(certpath) if not is_file_path: raise InvalidCredsError( 'You need an certificate PEM file to authenticate ' 'via tls. For more info please visit:' 'https://kubernetes.io/docs/concepts/' 'cluster-administration/certificates/' ) self.cert_file = cert_file
[docs] def add_default_headers(self, headers): if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' return headers
[docs]class KubernetesTokenAuthConnection(ConnectionKey): responseCls = KubernetesResponse timeout = 60
[docs] def add_default_headers(self, headers): if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' if self.key: headers['Authorization'] = 'Bearer ' + self.key else: raise ValueError("Please provide a valid token in the key param") return headers
[docs]class KubernetesBasicAuthConnection(ConnectionUserAndKey): responseCls = KubernetesResponse timeout = 60
[docs] def add_default_headers(self, headers): """ Add parameters that are necessary for every request If user and password are specified, include a base http auth header """ if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' if self.user_id and self.key: auth_string = b('%s:%s' % (self.user_id, self.key)) user_b64 = base64.b64encode(auth_string) headers['Authorization'] = 'Basic %s' % (user_b64.decode('utf-8')) return headers
[docs]class KubernetesDriverMixin(object): """ Base driver class to be used with various Kubernetes drivers. NOTE: This base class can be used in different APIs such as container and compute one. """ def __init__(self, key=None, secret=None, secure=False, host='localhost', port=4243, key_file=None, cert_file=None, ca_cert=None, ex_token_bearer_auth=False): """ :param key: API key or username to be used (required) :type key: ``str`` :param secret: Secret password to be used (required) :type secret: ``str`` :param secure: Whether to use HTTPS or HTTP. Note: Some providers only support HTTPS, and it is on by default. :type secure: ``bool`` :param host: Override hostname used for connections. :type host: ``str`` :param port: Override port used for connections. :type port: ``int`` :param key_file: Path to the key file used to authenticate (when using key file auth). :type key_file: ``str`` :param cert_file: Path to the cert file used to authenticate (when using key file auth). :type cert_file: ``str`` :param ex_token_bearer_auth: True to use token bearer auth. :type ex_token_bearer_auth: ``bool`` :return: ``None`` """ if ex_token_bearer_auth: self.connectionCls = KubernetesTokenAuthConnection if not key: msg = 'The token must be a string provided via "key" argument' raise ValueError(msg) secure = True if key_file or cert_file: # Certificate based auth is used if not (key_file and cert_file): raise ValueError("Both key and certificate files are needed") if key_file: self.connectionCls = KubernetesTLSAuthConnection self.key_file = key_file self.cert_file = cert_file secure = True if host and host.startswith('https://'): secure = True host = self._santize_host(host=host) super(KubernetesDriverMixin, self).__init__(key=key, secret=secret, secure=secure, host=host, port=port, key_file=key_file, cert_file=cert_file) if ca_cert: self.connection.connection.ca_cert = ca_cert else: # do not verify SSL certificate warnings.warn("Kubernetes has its own CA, since you didn't supply " "a CA certificate be aware that SSL verification " "will be disabled for this session.") self.connection.connection.ca_cert = False self.connection.secure = secure self.connection.host = host if port is not None: self.connection.port = port def _ex_connection_class_kwargs(self): kwargs = {} if hasattr(self, 'key_file'): kwargs['key_file'] = self.key_file if hasattr(self, 'cert_file'): kwargs['cert_file'] = self.cert_file return kwargs def _santize_host(self, host=None): # type: (Optional[str]) -> Optional[str] """ Sanitize "host" argument any remove any protocol prefix (if specified). """ if not host: return None prefixes = ['http://', 'https://'] for prefix in prefixes: if host.startswith(prefix): host = host.lstrip(prefix) return host