Source code for libcloud.storage.base

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Provides base classes for working with storage
"""

# Backward compatibility for Python 2.5
from __future__ import with_statement

from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Type

import os.path                          # pylint: disable-msg=W0404
import hashlib
import warnings
import errno
from os.path import join as pjoin

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b

import libcloud.utils.files
from libcloud.common.types import LibcloudError
from libcloud.common.base import Connection
from libcloud.common.base import ConnectionUserAndKey, BaseDriver
from libcloud.storage.types import ObjectDoesNotExistError

__all__ = [
    'Object',
    'Container',
    'StorageDriver',

    'CHUNK_SIZE',
    'DEFAULT_CONTENT_TYPE'
]

CHUNK_SIZE = 8096

# Default Content-Type which is sent when uploading an object if one is not
# supplied and can't be detected when using non-strict mode.
DEFAULT_CONTENT_TYPE = 'application/octet-stream'


[docs]class Object(object): """ Represents an object (BLOB). """ def __init__(self, name, # type: str size, # type: int hash, # type: str extra, # type: dict meta_data, # type: dict container, # type: Container driver, # type: StorageDriver ): """ :param name: Object name (must be unique per container). :type name: ``str`` :param size: Object size in bytes. :type size: ``int`` :param hash: Object hash. :type hash: ``str`` :param container: Object container. :type container: :class:`libcloud.storage.base.Container` :param extra: Extra attributes. :type extra: ``dict`` :param meta_data: Optional object meta data. :type meta_data: ``dict`` :param driver: StorageDriver instance. :type driver: :class:`libcloud.storage.base.StorageDriver` """ self.name = name self.size = size self.hash = hash self.container = container self.extra = extra or {} self.meta_data = meta_data or {} self.driver = driver
[docs] def get_cdn_url(self): # type: () -> str return self.driver.get_object_cdn_url(obj=self)
[docs] def enable_cdn(self): # type: () -> bool return self.driver.enable_object_cdn(obj=self)
[docs] def download(self, destination_path, overwrite_existing=False, delete_on_failure=True): # type: (str, bool, bool) -> bool return self.driver.download_object( obj=self, destination_path=destination_path, overwrite_existing=overwrite_existing, delete_on_failure=delete_on_failure)
[docs] def as_stream(self, chunk_size=None): # type: (Optional[int]) -> Iterator[bytes] return self.driver.download_object_as_stream(obj=self, chunk_size=chunk_size)
[docs] def download_range(self, destination_path, start_bytes, end_bytes=None, overwrite_existing=False, delete_on_failure=True): # type: (str, int, Optional[int], bool, bool) -> bool return self.driver.download_object_range( obj=self, destination_path=destination_path, start_bytes=start_bytes, end_bytes=end_bytes, overwrite_existing=overwrite_existing, delete_on_failure=delete_on_failure)
[docs] def range_as_stream(self, start_bytes, end_bytes=None, chunk_size=None): # type: (int, Optional[int], Optional[int]) -> Iterator[bytes] return self.driver.download_object_range_as_stream( obj=self, start_bytes=start_bytes, end_bytes=end_bytes, chunk_size=chunk_size)
[docs] def delete(self): # type: () -> bool return self.driver.delete_object(self)
def __repr__(self): return ('<Object: name=%s, size=%s, hash=%s, provider=%s ...>' % (self.name, self.size, self.hash, self.driver.name))
[docs]class Container(object): """ Represents a container (bucket) which can hold multiple objects. """ def __init__(self, name, # type: str extra, # type: dict driver, # type: StorageDriver ): """ :param name: Container name (must be unique). :type name: ``str`` :param extra: Extra attributes. :type extra: ``dict`` :param driver: StorageDriver instance. :type driver: :class:`libcloud.storage.base.StorageDriver` """ self.name = name self.extra = extra or {} self.driver = driver
[docs] def iterate_objects(self, prefix=None, ex_prefix=None): # type: (Optional[str], Optional[str]) -> Iterator[Object] return self.driver.iterate_container_objects(container=self, prefix=prefix, ex_prefix=ex_prefix)
[docs] def list_objects(self, prefix=None, ex_prefix=None): # type: (Optional[str], Optional[str]) -> List[Object] return self.driver.list_container_objects(container=self, prefix=prefix, ex_prefix=ex_prefix)
[docs] def get_cdn_url(self): # type: () -> str return self.driver.get_container_cdn_url(container=self)
[docs] def enable_cdn(self): # type: () -> bool return self.driver.enable_container_cdn(container=self)
[docs] def get_object(self, object_name): # type: (str) -> Object return self.driver.get_object(container_name=self.name, object_name=object_name)
[docs] def upload_object(self, file_path, object_name, extra=None, verify_hash=True, headers=None): # type: (str, str, Optional[dict], bool, Optional[Dict[str, str]]) -> Object # noqa: E501 return self.driver.upload_object( file_path, self, object_name, extra=extra, verify_hash=verify_hash, headers=headers)
[docs] def upload_object_via_stream(self, iterator, object_name, extra=None, headers=None): # type: (Iterator[bytes], str, Optional[dict], Optional[Dict[str, str]]) -> Object # noqa: E501 return self.driver.upload_object_via_stream( iterator, self, object_name, extra=extra, headers=headers)
[docs] def download_object(self, obj, destination_path, overwrite_existing=False, delete_on_failure=True): # type: (Object, str, bool, bool) -> bool return self.driver.download_object( obj, destination_path, overwrite_existing=overwrite_existing, delete_on_failure=delete_on_failure)
[docs] def download_object_as_stream(self, obj, chunk_size=None): # type: (Object, Optional[int]) -> Iterator[bytes] return self.driver.download_object_as_stream(obj, chunk_size)
[docs] def download_object_range(self, obj, destination_path, start_bytes, end_bytes=None, overwrite_existing=False, delete_on_failure=True): # type: (Object, str, int, Optional[int], bool, bool) -> bool return self.driver.download_object_range( obj=obj, destination_path=destination_path, start_bytes=start_bytes, end_bytes=end_bytes, overwrite_existing=overwrite_existing, delete_on_failure=delete_on_failure)
[docs] def download_object_range_as_stream(self, obj, start_bytes, end_bytes=None, chunk_size=None): # type: (Object, int, Optional[int], Optional[int]) -> Iterator[bytes] return self.driver.download_object_range_as_stream( obj=obj, start_bytes=start_bytes, end_bytes=end_bytes, chunk_size=chunk_size)
[docs] def delete_object(self, obj): # type: (Object) -> bool return self.driver.delete_object(obj)
[docs] def delete(self): # type: () -> bool return self.driver.delete_container(self)
def __repr__(self): return ('<Container: name=%s, provider=%s>' % (self.name, self.driver.name))
[docs]class StorageDriver(BaseDriver): """ A base StorageDriver to derive from. """ connectionCls = ConnectionUserAndKey # type: Type[Connection] name = None # type: str hash_type = 'md5' # type: str supports_chunked_encoding = False # type: bool # When strict mode is used, exception will be thrown if no content type is # provided and none can be detected when uploading an object strict_mode = False # type: bool
[docs] def iterate_containers(self): # type: () -> Iterator[Container] """ Return a iterator of containers for the given account :return: A iterator of Container instances. :rtype: ``iterator`` of :class:`libcloud.storage.base.Container` """ raise NotImplementedError( 'iterate_containers not implemented for this driver')
[docs] def list_containers(self): # type: () -> List[Container] """ Return a list of containers. :return: A list of Container instances. :rtype: ``list`` of :class:`Container` """ return list(self.iterate_containers())
[docs] def iterate_container_objects(self, container, prefix=None, ex_prefix=None): # type: (Container, Optional[str], Optional[str]) -> Iterator[Object] """ Return a iterator of objects for the given container. :param container: Container instance :type container: :class:`libcloud.storage.base.Container` :param prefix: Filter objects starting with a prefix. :type prefix: ``str`` :param ex_prefix: (Deprecated.) Filter objects starting with a prefix. :type ex_prefix: ``str`` :return: A iterator of Object instances. :rtype: ``iterator`` of :class:`libcloud.storage.base.Object` """ raise NotImplementedError( 'iterate_container_objects not implemented for this driver')
[docs] def list_container_objects(self, container, prefix=None, ex_prefix=None): # type: (Container, Optional[str], Optional[str]) -> List[Object] """ Return a list of objects for the given container. :param container: Container instance. :type container: :class:`libcloud.storage.base.Container` :param prefix: Filter objects starting with a prefix. :type prefix: ``str`` :param ex_prefix: (Deprecated.) Filter objects starting with a prefix. :type ex_prefix: ``str`` :return: A list of Object instances. :rtype: ``list`` of :class:`libcloud.storage.base.Object` """ return list(self.iterate_container_objects(container, prefix=prefix, ex_prefix=ex_prefix))
def _normalize_prefix_argument(self, prefix, ex_prefix): if ex_prefix: warnings.warn('The ``ex_prefix`` argument is deprecated - ' 'please update code to use ``prefix``', DeprecationWarning) return ex_prefix return prefix def _filter_listed_container_objects(self, objects, prefix): if prefix is not None: warnings.warn('Driver %s does not implement native object ' 'filtering; falling back to filtering the full ' 'object stream.' % self.__class__.__name__) for obj in objects: if prefix is None or obj.name.startswith(prefix): yield obj
[docs] def get_container(self, container_name): # type: (str) -> Container """ Return a container instance. :param container_name: Container name. :type container_name: ``str`` :return: :class:`Container` instance. :rtype: :class:`libcloud.storage.base.Container` """ raise NotImplementedError( 'get_object not implemented for this driver')
[docs] def get_container_cdn_url(self, container): # type: (Container) -> str """ Return a container CDN URL. :param container: Container instance :type container: :class:`libcloud.storage.base.Container` :return: A CDN URL for this container. :rtype: ``str`` """ raise NotImplementedError( 'get_container_cdn_url not implemented for this driver')
[docs] def get_object(self, container_name, object_name): # type: (str, str) -> Object """ Return an object instance. :param container_name: Container name. :type container_name: ``str`` :param object_name: Object name. :type object_name: ``str`` :return: :class:`Object` instance. :rtype: :class:`libcloud.storage.base.Object` """ raise NotImplementedError( 'get_object not implemented for this driver')
[docs] def get_object_cdn_url(self, obj): # type: (Object) -> str """ Return an object CDN URL. :param obj: Object instance :type obj: :class:`libcloud.storage.base.Object` :return: A CDN URL for this object. :rtype: ``str`` """ raise NotImplementedError( 'get_object_cdn_url not implemented for this driver')
[docs] def enable_container_cdn(self, container): # type: (Container) -> bool """ Enable container CDN. :param container: Container instance :type container: :class:`libcloud.storage.base.Container` :rtype: ``bool`` """ raise NotImplementedError( 'enable_container_cdn not implemented for this driver')
[docs] def enable_object_cdn(self, obj): # type: (Object) -> bool """ Enable object CDN. :param obj: Object instance :type obj: :class:`libcloud.storage.base.Object` :rtype: ``bool`` """ raise NotImplementedError( 'enable_object_cdn not implemented for this driver')
[docs] def download_object(self, obj, destination_path, overwrite_existing=False, delete_on_failure=True): # type: (Object, str, bool, bool) -> bool """ Download an object to the specified destination path. :param obj: Object instance. :type obj: :class:`libcloud.storage.base.Object` :param destination_path: Full path to a file or a directory where the incoming file will be saved. :type destination_path: ``str`` :param overwrite_existing: True to overwrite an existing file, defaults to False. :type overwrite_existing: ``bool`` :param delete_on_failure: True to delete a partially downloaded file if the download was not successful (hash mismatch / file size). :type delete_on_failure: ``bool`` :return: True if an object has been successfully downloaded, False otherwise. :rtype: ``bool`` """ raise NotImplementedError( 'download_object not implemented for this driver')
[docs] def download_object_as_stream(self, obj, chunk_size=None): # type: (Object, Optional[int]) -> Iterator[bytes] """ Return a iterator which yields object data. :param obj: Object instance :type obj: :class:`libcloud.storage.base.Object` :param chunk_size: Optional chunk size (in bytes). :type chunk_size: ``int`` :rtype: ``iterator`` of ``bytes`` """ raise NotImplementedError( 'download_object_as_stream not implemented for this driver')
[docs] def download_object_range(self, obj, destination_path, start_bytes, end_bytes=None, overwrite_existing=False, delete_on_failure=True): # type: (Object, str, int, Optional[int], bool, bool) -> bool """ Download part of an object. :param obj: Object instance. :type obj: :class:`libcloud.storage.base.Object` :param destination_path: Full path to a file or a directory where the incoming file will be saved. :type destination_path: ``str`` :param start_bytes: Start byte offset (inclusive) for the range download. Offset is 0 index based so the first byte in file file is "0". :type start_bytes: ``int`` :param end_bytes: End byte offset (non-inclusive) for the range download. If not provided, it will default to the end of the file. :type end_bytes: ``int`` :param overwrite_existing: True to overwrite an existing file, defaults to False. :type overwrite_existing: ``bool`` :param delete_on_failure: True to delete a partially downloaded file if the download was not successful (hash mismatch / file size). :type delete_on_failure: ``bool`` :return: True if an object has been successfully downloaded, False otherwise. :rtype: ``bool`` """ raise NotImplementedError( 'download_object_range not implemented for this driver')
[docs] def download_object_range_as_stream(self, obj, start_bytes, end_bytes=None, chunk_size=None): # type: (Object, int, Optional[int], Optional[int]) -> Iterator[bytes] """ Return a iterator which yields range / part of the object data. :param obj: Object instance :type obj: :class:`libcloud.storage.base.Object` :param start_bytes: Start byte offset (inclusive) for the range download. Offset is 0 index based so the first byte in file file is "0". :type start_bytes: ``int`` :param end_bytes: End byte offset (non-inclusive) for the range download. If not provided, it will default to the end of the file. :type end_bytes: ``int`` :param chunk_size: Optional chunk size (in bytes). :type chunk_size: ``int`` :rtype: ``iterator`` of ``bytes`` """ raise NotImplementedError( 'download_object_range_as_stream not implemented for this driver')
[docs] def upload_object(self, file_path, container, object_name, extra=None, verify_hash=True, headers=None): # type: (str, Container, str, Optional[dict], bool, Optional[Dict[str, str]]) -> Object # noqa: E501 """ Upload an object currently located on a disk. :param file_path: Path to the object on disk. :type file_path: ``str`` :param container: Destination container. :type container: :class:`libcloud.storage.base.Container` :param object_name: Object name. :type object_name: ``str`` :param verify_hash: Verify hash :type verify_hash: ``bool`` :param extra: Extra attributes (driver specific). (optional) :type extra: ``dict`` :param headers: (optional) Additional request headers, such as CORS headers. For example: headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'} :type headers: ``dict`` :rtype: :class:`libcloud.storage.base.Object` """ raise NotImplementedError( 'upload_object not implemented for this driver')
[docs] def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): # type: (Iterator[bytes], Container, str, Optional[dict], Optional[Dict[str, str]]) -> Object # noqa: E501 """ Upload an object using an iterator. If a provider supports it, chunked transfer encoding is used and you don't need to know in advance the amount of data to be uploaded. Otherwise if a provider doesn't support it, iterator will be exhausted so a total size for data to be uploaded can be determined. Note: Exhausting the iterator means that the whole data must be buffered in memory which might result in memory exhausting when uploading a very large object. If a file is located on a disk you are advised to use upload_object function which uses fs.stat function to determine the file size and it doesn't need to buffer whole object in the memory. :param iterator: An object which implements the iterator interface. :type iterator: :class:`object` :param container: Destination container. :type container: :class:`libcloud.storage.base.Container` :param object_name: Object name. :type object_name: ``str`` :param extra: (optional) Extra attributes (driver specific). Note: This dictionary must contain a 'content_type' key which represents a content type of the stored object. :type extra: ``dict`` :param headers: (optional) Additional request headers, such as CORS headers. For example: headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'} :type headers: ``dict`` :rtype: ``libcloud.storage.base.Object`` """ raise NotImplementedError( 'upload_object_via_stream not implemented for this driver')
[docs] def delete_object(self, obj): # type: (Object) -> bool """ Delete an object. :param obj: Object instance. :type obj: :class:`libcloud.storage.base.Object` :return: ``bool`` True on success. :rtype: ``bool`` """ raise NotImplementedError( 'delete_object not implemented for this driver')
[docs] def create_container(self, container_name): # type: (str) -> Container """ Create a new container. :param container_name: Container name. :type container_name: ``str`` :return: Container instance on success. :rtype: :class:`libcloud.storage.base.Container` """ raise NotImplementedError( 'create_container not implemented for this driver')
[docs] def delete_container(self, container): # type: (Container) -> bool """ Delete a container. :param container: Container instance :type container: :class:`libcloud.storage.base.Container` :return: ``True`` on success, ``False`` otherwise. :rtype: ``bool`` """ raise NotImplementedError( 'delete_container not implemented for this driver')
def _get_object(self, obj, callback, callback_kwargs, response, success_status_code=None): """ Call passed callback and start transfer of the object' :param obj: Object instance. :type obj: :class:`Object` :param callback: Function which is called with the passed callback_kwargs :type callback: :class:`function` :param callback_kwargs: Keyword arguments which are passed to the callback. :type callback_kwargs: ``dict`` :param response: Response instance. :type response: :class:`Response` :param success_status_code: Status code which represents a successful transfer (defaults to httplib.OK) :type success_status_code: ``int`` :return: ``True`` on success, ``False`` otherwise. :rtype: ``bool`` """ success_status_code = success_status_code or httplib.OK if not isinstance(success_status_code, (list, tuple)): success_status_codes = [success_status_code] else: success_status_codes = success_status_code if response.status in success_status_codes: return callback(**callback_kwargs) elif response.status == httplib.NOT_FOUND: raise ObjectDoesNotExistError(object_name=obj.name, value='', driver=self) raise LibcloudError(value='Unexpected status code: %s' % (response.status), driver=self) def _save_object(self, response, obj, destination_path, overwrite_existing=False, delete_on_failure=True, chunk_size=None, partial_download=False): """ Save object to the provided path. :param response: RawResponse instance. :type response: :class:`RawResponse` :param obj: Object instance. :type obj: :class:`Object` :param destination_path: Destination directory. :type destination_path: ``str`` :param delete_on_failure: True to delete partially downloaded object if the download fails. :type delete_on_failure: ``bool`` :param overwrite_existing: True to overwrite a local path if it already exists. :type overwrite_existing: ``bool`` :param chunk_size: Optional chunk size (defaults to ``libcloud.storage.base.CHUNK_SIZE``, 8kb) :type chunk_size: ``int`` :param partial_download: True if this is a range (partial) save, False otherwise. :type partial_download: ``bool`` :return: ``True`` on success, ``False`` otherwise. :rtype: ``bool`` """ chunk_size = chunk_size or CHUNK_SIZE base_name = os.path.basename(destination_path) if not base_name and not os.path.exists(destination_path): raise LibcloudError( value='Path %s does not exist' % (destination_path), driver=self) if not base_name: file_path = pjoin(destination_path, obj.name) else: file_path = destination_path if os.path.exists(file_path) and not overwrite_existing: raise LibcloudError( value='File %s already exists, but ' % (file_path) + 'overwrite_existing=False', driver=self) bytes_transferred = 0 with open(file_path, 'wb') as file_handle: for chunk in response._response.iter_content(chunk_size): file_handle.write(b(chunk)) bytes_transferred += len(chunk) if not partial_download and int(obj.size) != int(bytes_transferred): # Transfer failed, support retry? # NOTE: We only perform this check if this is a regular and not a # partial / range download if delete_on_failure: try: os.unlink(file_path) except Exception: pass return False return True def _upload_object(self, object_name, content_type, request_path, request_method='PUT', headers=None, file_path=None, stream=None, chunked=False, multipart=False): """ Helper function for setting common request headers and calling the passed in callback which uploads an object. """ headers = headers or {} if file_path and not os.path.exists(file_path): raise OSError('File %s does not exist' % (file_path)) if stream is not None and not hasattr(stream, 'next') and not \ hasattr(stream, '__next__'): raise AttributeError('iterator object must implement next() ' + 'method.') headers['Content-Type'] = self._determine_content_type( content_type, object_name, file_path=file_path) if stream: response = self.connection.request( request_path, method=request_method, data=stream, headers=headers, raw=True) stream_hash, stream_length = self._hash_buffered_stream( stream, self._get_hash_function()) else: with open(file_path, 'rb') as file_stream: response = self.connection.request( request_path, method=request_method, data=file_stream, headers=headers, raw=True) with open(file_path, 'rb') as file_stream: stream_hash, stream_length = self._hash_buffered_stream( file_stream, self._get_hash_function()) if not response.success(): response.parse_error() return {'response': response, 'bytes_transferred': stream_length, 'data_hash': stream_hash} def _determine_content_type(self, content_type, object_name, file_path=None): if content_type: return content_type name = file_path or object_name content_type, _ = libcloud.utils.files.guess_file_mime_type(name) if self.strict_mode and not content_type: raise AttributeError('File content-type could not be guessed for ' '"%s" and no content_type value is provided' % name) return content_type or DEFAULT_CONTENT_TYPE def _hash_buffered_stream(self, stream, hasher, blocksize=65536): total_len = 0 if hasattr(stream, '__next__') or hasattr(stream, 'next'): # Ensure we start from the begining of a stream in case stream is # not at the beginning if hasattr(stream, 'seek'): try: stream.seek(0) except OSError as e: if e.errno != errno.ESPIPE: # This represents "OSError: [Errno 29] Illegal seek" # error. This could either mean that the underlying # handle doesn't support seek operation (e.g. pipe) or # that the invalid seek position is provided. Sadly # there is no good robust way to distinghuish that so # we simply ignore all the "Illeal seek" errors so # this function works correctly with pipes. # See https://github.com/apache/libcloud/pull/1427 for # details raise e for chunk in libcloud.utils.files.read_in_chunks(iterator=stream): hasher.update(b(chunk)) total_len += len(chunk) return (hasher.hexdigest(), total_len) if not hasattr(stream, '__exit__'): for s in stream: hasher.update(s) total_len = total_len + len(s) return (hasher.hexdigest(), total_len) with stream: buf = stream.read(blocksize) while len(buf) > 0: total_len = total_len + len(buf) hasher.update(buf) buf = stream.read(blocksize) return (hasher.hexdigest(), total_len) def _get_hash_function(self): """ Return instantiated hash function for the hash type supported by the provider. """ try: func = getattr(hashlib, self.hash_type)() except AttributeError: raise RuntimeError('Invalid or unsupported hash type: %s' % (self.hash_type)) return func def _validate_start_and_end_bytes(self, start_bytes, end_bytes=None): # type: (int, Optional[int]) -> bool """ Method which validates that start_bytes and end_bytes arguments contain valid values. """ if start_bytes < 0: raise ValueError('start_bytes must be greater than 0') if end_bytes is not None: if start_bytes > end_bytes: raise ValueError('start_bytes must be smaller than end_bytes') elif start_bytes == end_bytes: raise ValueError('start_bytes and end_bytes can\'t be the ' 'same. end_bytes is non-inclusive') return True def _get_standard_range_str(self, start_bytes, end_bytes=None, end_bytes_inclusive=False): # type: (int, Optional[int], bool) -> str """ Return range string which is used as a Range header value for range requests for drivers which follow standard Range header notation This returns range string in the following format: bytes=<start_bytes>-<end bytes>. For example: bytes=1-10 bytes=0-2 bytes=5- bytes=100-5000 :param end_bytes_inclusive: True if "end_bytes" offset should be inclusive (aka opposite from the Python indexing behavior where the end index is not inclusive). """ range_str = 'bytes=%s-' % (start_bytes) if end_bytes is not None: if end_bytes_inclusive: range_str += str(end_bytes) else: range_str += str(end_bytes - 1) return range_str