Source code for libcloud.container.drivers.ecs

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

try:
    import simplejson as json
except ImportError:
    import json

from libcloud.container.base import (ContainerDriver, Container,
                                     ContainerCluster, ContainerImage)
from libcloud.container.types import ContainerState
from libcloud.container.utils.docker import RegistryClient
from libcloud.common.aws import SignedAWSConnection, AWSJsonResponse

__all__ = [
    'ElasticContainerDriver'
]


ECS_VERSION = '2014-11-13'
ECR_VERSION = '2015-09-21'
ECS_HOST = 'ecs.%s.amazonaws.com'
ECR_HOST = 'ecr.%s.amazonaws.com'
ROOT = '/'
ECS_TARGET_BASE = 'AmazonEC2ContainerServiceV%s' % \
                  (ECS_VERSION.replace('-', ''))
ECR_TARGET_BASE = 'AmazonEC2ContainerRegistry_V%s' % \
                  (ECR_VERSION.replace('-', ''))


class ECSJsonConnection(SignedAWSConnection):
    version = ECS_VERSION
    host = ECS_HOST
    responseCls = AWSJsonResponse
    service_name = 'ecs'


class ECRJsonConnection(SignedAWSConnection):
    version = ECR_VERSION
    host = ECR_HOST
    responseCls = AWSJsonResponse
    service_name = 'ecr'


[docs]class ElasticContainerDriver(ContainerDriver): name = 'Amazon Elastic Container Service' website = 'https://aws.amazon.com/ecs/details/' ecr_repository_host = '%s.dkr.ecr.%s.amazonaws.com' connectionCls = ECSJsonConnection ecrConnectionClass = ECRJsonConnection supports_clusters = False status_map = { 'RUNNING': ContainerState.RUNNING } def __init__(self, access_id, secret, region): super(ElasticContainerDriver, self).__init__(access_id, secret) self.region = region self.region_name = region self.connection.host = ECS_HOST % (region) # Setup another connection class for ECR conn_kwargs = self._ex_connection_class_kwargs() self.ecr_connection = self.ecrConnectionClass( access_id, secret, **conn_kwargs) self.ecr_connection.host = ECR_HOST % (region) self.ecr_connection.driver = self self.ecr_connection.connect() def _ex_connection_class_kwargs(self): return {'signature_version': '4'}
[docs] def list_images(self, ex_repository_name): """ List the images in an ECR repository :param ex_repository_name: The name of the repository to check defaults to the default repository. :type ex_repository_name: ``str`` :return: a list of images :rtype: ``list`` of :class:`libcloud.container.base.ContainerImage` """ request = {} request['repositoryName'] = ex_repository_name list_response = self.ecr_connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_ecr_headers('ListImages') ).object repository_id = self.ex_get_repository_id(ex_repository_name) host = self._get_ecr_host(repository_id) return self._to_images(list_response['imageIds'], host, ex_repository_name)
[docs] def list_clusters(self): """ Get a list of potential locations to deploy clusters into :param location: The location to search in :type location: :class:`libcloud.container.base.ClusterLocation` :rtype: ``list`` of :class:`libcloud.container.base.ContainerCluster` """ listdata = self.connection.request( ROOT, method='POST', data=json.dumps({}), headers=self._get_headers('ListClusters') ).object request = {'clusters': listdata['clusterArns']} data = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('DescribeClusters') ).object return self._to_clusters(data)
[docs] def create_cluster(self, name, location=None): """ Create a container cluster :param name: The name of the cluster :type name: ``str`` :param location: The location to create the cluster in :type location: :class:`libcloud.container.base.ClusterLocation` :rtype: :class:`libcloud.container.base.ContainerCluster` """ request = {'clusterName': name} response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('CreateCluster') ).object return self._to_cluster(response['cluster'])
[docs] def destroy_cluster(self, cluster): """ Delete a cluster :return: ``True`` if the destroy was successful, otherwise ``False``. :rtype: ``bool`` """ request = {'cluster': cluster.id} data = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('DeleteCluster') ).object return data['cluster']['status'] == 'INACTIVE'
[docs] def list_containers(self, image=None, cluster=None): """ List the deployed container images :param image: Filter to containers with a certain image :type image: :class:`libcloud.container.base.ContainerImage` :param cluster: Filter to containers in a cluster :type cluster: :class:`libcloud.container.base.ContainerCluster` :rtype: ``list`` of :class:`libcloud.container.base.Container` """ request = {'cluster': 'default'} if cluster is not None: request['cluster'] = cluster.id if image is not None: request['family'] = image.name list_response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('ListTasks') ).object if len(list_response['taskArns']) == 0: return [] containers = self.ex_list_containers_for_task( list_response['taskArns']) return containers
[docs] def deploy_container(self, name, image, cluster=None, parameters=None, start=True, ex_cpu=10, ex_memory=500, ex_container_port=None, ex_host_port=None): """ Creates a task definition from a container image that can be run in a cluster. :param name: The name of the new container :type name: ``str`` :param image: The container image to deploy :type image: :class:`libcloud.container.base.ContainerImage` :param cluster: The cluster to deploy to, None is default :type cluster: :class:`libcloud.container.base.ContainerCluster` :param parameters: Container Image parameters :type parameters: ``str`` :param start: Start the container on deployment :type start: ``bool`` :rtype: :class:`libcloud.container.base.Container` """ data = {} if ex_container_port is None and ex_host_port is None: port_maps = [] else: port_maps = [ { "containerPort": ex_container_port, "hostPort": ex_host_port } ] data['containerDefinitions'] = [ { "mountPoints": [], "name": name, "image": image.name, "cpu": ex_cpu, "environment": [], "memory": ex_memory, "portMappings": port_maps, "essential": True, "volumesFrom": [] } ] data['family'] = name response = self.connection.request( ROOT, method='POST', data=json.dumps(data), headers=self._get_headers('RegisterTaskDefinition') ).object if start: return self.ex_start_task( response['taskDefinition']['taskDefinitionArn'])[0] else: return Container( id=None, name=name, image=image, state=ContainerState.RUNNING, ip_addresses=[], extra={ 'taskDefinitionArn': response['taskDefinition']['taskDefinitionArn'] }, driver=self.connection.driver )
[docs] def get_container(self, id): """ Get a container by ID :param id: The ID of the container to get :type id: ``str`` :rtype: :class:`libcloud.container.base.Container` """ containers = self.ex_list_containers_for_task([id]) return containers[0]
[docs] def start_container(self, container, count=1): """ Start a deployed task :param container: The container to start :type container: :class:`libcloud.container.base.Container` :param count: Number of containers to start :type count: ``int`` :rtype: :class:`libcloud.container.base.Container` """ return self.ex_start_task(container.extra['taskDefinitionArn'], count)
[docs] def stop_container(self, container): """ Stop a deployed container :param container: The container to stop :type container: :class:`libcloud.container.base.Container` :rtype: :class:`libcloud.container.base.Container` """ request = {'task': container.extra['taskArn']} response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('StopTask') ).object containers = [] containers.extend(self._to_containers( response['task'], container.extra['taskDefinitionArn'])) return containers
[docs] def restart_container(self, container): """ Restart a deployed container :param container: The container to restart :type container: :class:`libcloud.container.base.Container` :rtype: :class:`libcloud.container.base.Container` """ self.stop_container(container) return self.start_container(container)
[docs] def destroy_container(self, container): """ Destroy a deployed container :param container: The container to destroy :type container: :class:`libcloud.container.base.Container` :rtype: :class:`libcloud.container.base.Container` """ return self.stop_container(container)
[docs] def ex_start_task(self, task_arn, count=1): """ Run a task definition and get the containers :param task_arn: The task ARN to Run :type task_arn: ``str`` :param count: The number of containers to start :type count: ``int`` :rtype: ``list`` of :class:`libcloud.container.base.Container` """ request = None request = {'count': count, 'taskDefinition': task_arn} response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('RunTask') ).object containers = [] for task in response['tasks']: containers.extend(self._to_containers(task, task_arn)) return containers
[docs] def ex_list_containers_for_task(self, task_arns): """ Get a list of containers by ID collection (ARN) :param task_arns: The list of ARNs :type task_arns: ``list`` of ``str`` :rtype: ``list`` of :class:`libcloud.container.base.Container` """ describe_request = {'tasks': task_arns} descripe_response = self.connection.request( ROOT, method='POST', data=json.dumps(describe_request), headers=self._get_headers('DescribeTasks') ).object containers = [] for task in descripe_response['tasks']: containers.extend(self._to_containers( task, task['taskDefinitionArn'])) return containers
[docs] def ex_create_service(self, name, cluster, task_definition, desired_count=1): """ Runs and maintains a desired number of tasks from a specified task definition. If the number of tasks running in a service drops below desired_count, Amazon ECS spawns another instantiation of the task in the specified cluster. :param name: the name of the service :type name: ``str`` :param cluster: The cluster to run the service on :type cluster: :class:`libcloud.container.base.ContainerCluster` :param task_definition: The task definition name or ARN for the service :type task_definition: ``str`` :param desired_count: The desired number of tasks to be running at any one time :type desired_count: ``int`` :rtype: ``object`` The service object """ request = { 'serviceName': name, 'taskDefinition': task_definition, 'desiredCount': desired_count, 'cluster': cluster.id} response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('CreateService') ).object return response['service']
[docs] def ex_list_service_arns(self, cluster=None): """ List the services :param cluster: The cluster hosting the services :type cluster: :class:`libcloud.container.base.ContainerCluster` :rtype: ``list`` of ``str`` """ request = {} if cluster is not None: request['cluster'] = cluster.id response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('ListServices') ).object return response['serviceArns']
[docs] def ex_describe_service(self, service_arn): """ Get the details of a service :param cluster: The hosting cluster :type cluster: :class:`libcloud.container.base.ContainerCluster` :param service_arn: The service ARN to describe :type service_arn: ``str`` :return: The service object :rtype: ``object`` """ request = {'services': [service_arn]} response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('DescribeServices') ).object return response['services'][0]
[docs] def ex_destroy_service(self, service_arn): """ Deletes a service :param cluster: The target cluster :type cluster: :class:`libcloud.container.base.ContainerCluster` :param service_arn: The service ARN to destroy :type service_arn: ``str`` """ request = { 'service': service_arn} response = self.connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_headers('DeleteService') ).object return response['service']
[docs] def ex_get_registry_client(self, repository_name): """ Get a client for an ECR repository :param repository_name: The unique name of the repository :type repository_name: ``str`` :return: a docker registry API client :rtype: :class:`libcloud.container.utils.docker.RegistryClient` """ repository_id = self.ex_get_repository_id(repository_name) token = self.ex_get_repository_token(repository_id) host = self._get_ecr_host(repository_id) return RegistryClient( host=host, username='AWS', password=token )
[docs] def ex_get_repository_token(self, repository_id): """ Get the authorization token (12 hour expiry) for a repository :param repository_id: The ID of the repository :type repository_id: ``str`` :return: A token for login :rtype: ``str`` """ request = {'RegistryIds': [repository_id]} response = self.ecr_connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_ecr_headers('GetAuthorizationToken') ).object return response['authorizationData'][0]['authorizationToken']
[docs] def ex_get_repository_id(self, repository_name): """ Get the ID of a repository :param repository_name: The unique name of the repository :type repository_name: ``str`` :return: The repository ID :rtype: ``str`` """ request = {'repositoryNames': [repository_name]} list_response = self.ecr_connection.request( ROOT, method='POST', data=json.dumps(request), headers=self._get_ecr_headers('DescribeRepositories') ).object repository_id = list_response['repositories'][0]['registryId'] return repository_id
def _get_ecr_host(self, repository_id): return self.ecr_repository_host % ( repository_id, self.region) def _get_headers(self, action): """ Get the default headers for a request to the ECS API """ return {'x-amz-target': '%s.%s' % (ECS_TARGET_BASE, action), 'Content-Type': 'application/x-amz-json-1.1' } def _get_ecr_headers(self, action): """ Get the default headers for a request to the ECR API """ return {'x-amz-target': '%s.%s' % (ECR_TARGET_BASE, action), 'Content-Type': 'application/x-amz-json-1.1' } def _to_clusters(self, data): clusters = [] for cluster in data['clusters']: clusters.append(self._to_cluster(cluster)) return clusters def _to_cluster(self, data): return ContainerCluster( id=data['clusterArn'], name=data['clusterName'], driver=self.connection.driver ) def _to_containers(self, data, task_definition_arn): clusters = [] for cluster in data['containers']: clusters.append(self._to_container(cluster, task_definition_arn)) return clusters def _to_container(self, data, task_definition_arn): return Container( id=data['containerArn'], name=data['name'], image=ContainerImage( id=None, name=data['name'], path=None, version=None, driver=self.connection.driver ), ip_addresses=None, state=self.status_map.get(data['lastStatus'], None), extra={ 'taskArn': data['taskArn'], 'taskDefinitionArn': task_definition_arn }, driver=self.connection.driver ) def _to_images(self, data, host, repository_name): images = [] for image in data: images.append(self._to_image(image, host, repository_name)) return images def _to_image(self, data, host, repository_name): path = '%s/%s:%s' % ( host, repository_name, data['imageTag'] ) return ContainerImage( id=None, name=path, path=path, version=data['imageTag'], driver=self.connection.driver )