Source code for libcloud.storage.drivers.cloudfiles

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import hmac
import atexit
from time import time
from hashlib import sha1

from libcloud.utils.py3 import b, httplib, urlquote, urlencode
from libcloud.common.base import Response, RawResponse
from libcloud.utils.files import read_in_chunks
from libcloud.common.types import LibcloudError, MalformedResponseError
from libcloud.storage.base import Object, Container, StorageDriver
from libcloud.storage.types import (
    ObjectDoesNotExistError,
    ObjectHashMismatchError,
    ContainerIsNotEmptyError,
    InvalidContainerNameError,
    ContainerDoesNotExistError,
    ContainerAlreadyExistsError,
)
from libcloud.common.openstack import OpenStackDriverMixin, OpenStackBaseConnection
from libcloud.common.rackspace import AUTH_URL
from libcloud.storage.providers import Provider

try:
    import simplejson as json
except ImportError:
    import json  # type: ignore

from io import FileIO as file

CDN_HOST = "cdn.clouddrive.com"
API_VERSION = "v1.0"

# Keys which are used to select a correct endpoint from the service catalog.
INTERNAL_ENDPOINT_KEY = "internalURL"
PUBLIC_ENDPOINT_KEY = "publicURL"


[docs]class CloudFilesResponse(Response): valid_response_codes = [httplib.NOT_FOUND, httplib.CONFLICT]
[docs] def success(self): i = int(self.status) return 200 <= i <= 299 or i in self.valid_response_codes
[docs] def parse_body(self): if not self.body: return None if "content-type" in self.headers: key = "content-type" elif "Content-Type" in self.headers: key = "Content-Type" else: raise LibcloudError("Missing content-type header") content_type = self.headers[key] if content_type.find(";") != -1: content_type = content_type.split(";")[0] if content_type == "application/json": try: data = json.loads(self.body) except Exception: raise MalformedResponseError( "Failed to parse JSON", body=self.body, driver=CloudFilesStorageDriver, ) elif content_type == "text/plain": data = self.body else: data = self.body return data
[docs]class CloudFilesRawResponse(CloudFilesResponse, RawResponse): pass
[docs]class OpenStackSwiftConnection(OpenStackBaseConnection): """ Connection class for the OpenStack Swift endpoint. """ responseCls = CloudFilesResponse rawResponseCls = CloudFilesRawResponse auth_url = AUTH_URL _auth_version = "1.0" # TODO: Reverse the relationship - Swift -> CloudFiles def __init__(self, user_id, key, secure=True, **kwargs): # Ignore this for now kwargs.pop("use_internal_url", None) super().__init__(user_id, key, secure=secure, **kwargs) self.api_version = API_VERSION self.accept_format = "application/json" self._service_type = self._ex_force_service_type or "object-store" self._service_name = self._ex_force_service_name or "swift" if self._ex_force_service_region: self._service_region = self._ex_force_service_region else: self._service_region = None
[docs] def get_endpoint(self, *args, **kwargs): if ("2.0" in self._auth_version) or ("3.x" in self._auth_version): endpoint = self.service_catalog.get_endpoint( service_type=self._service_type, name=self._service_name, region=self._service_region, ) elif ("1.1" in self._auth_version) or ("1.0" in self._auth_version): endpoint = self.service_catalog.get_endpoint( name=self._service_name, region=self._service_region ) else: endpoint = None if endpoint: return endpoint.url else: raise LibcloudError("Could not find specified endpoint")
[docs] def request( self, action, params=None, data="", headers=None, method="GET", raw=False, cdn_request=False, ): if not headers: headers = {} if not params: params = {} self.cdn_request = cdn_request params["format"] = "json" if method in ["POST", "PUT"] and "Content-Type" not in headers: headers.update({"Content-Type": "application/json; charset=UTF-8"}) return super().request( action=action, params=params, data=data, method=method, headers=headers, raw=raw, )
[docs]class CloudFilesConnection(OpenStackSwiftConnection): """ Base connection class for the Cloudfiles driver. """ responseCls = CloudFilesResponse rawResponseCls = CloudFilesRawResponse auth_url = AUTH_URL _auth_version = "2.0" def __init__(self, user_id, key, secure=True, use_internal_url=False, **kwargs): super().__init__(user_id, key, secure=secure, **kwargs) self.api_version = API_VERSION self.accept_format = "application/json" self.cdn_request = False self.use_internal_url = use_internal_url
[docs] def get_endpoint(self): region = self._ex_force_service_region.upper() if "2.0" in self._auth_version: ep = self.service_catalog.get_endpoint( service_type="object-store", name="cloudFiles", region=region, endpoint_type="internal" if self.use_internal_url else "external", ) cdn_ep = self.service_catalog.get_endpoint( service_type="rax:object-cdn", name="cloudFilesCDN", region=region, # Cloud Files has no concept of an "internal" CDN. endpoint_type="external", ) else: raise LibcloudError('Auth version "%s" not supported' % (self._auth_version)) # if this is a CDN request, return the cdn url instead if self.cdn_request: ep = cdn_ep if not ep or not ep.url: raise LibcloudError("Could not find specified endpoint") return ep.url
[docs] def request( self, action, params=None, data="", headers=None, method="GET", raw=False, cdn_request=False, ): if not headers: headers = {} if not params: params = {} self.cdn_request = cdn_request params["format"] = "json" if method in ["POST", "PUT"] and "Content-Type" not in headers: headers.update({"Content-Type": "application/json; charset=UTF-8"}) return super().request( action=action, params=params, data=data, method=method, headers=headers, raw=raw, cdn_request=cdn_request, )
[docs]class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin): """ CloudFiles driver. """ name = "CloudFiles" website = "http://www.rackspace.com/" connectionCls = CloudFilesConnection hash_type = "md5" supports_chunked_encoding = True def __init__( self, key, secret=None, secure=True, host=None, port=None, region="ord", use_internal_url=False, **kwargs, ): """ @inherits: :class:`StorageDriver.__init__` :param region: ID of the region which should be used. :type region: ``str`` """ # This is here for backard compatibility if "ex_force_service_region" in kwargs: region = kwargs["ex_force_service_region"] self.use_internal_url = use_internal_url OpenStackDriverMixin.__init__(self, **kwargs) super().__init__( key=key, secret=secret, secure=secure, host=host, port=port, region=region, **kwargs, )
[docs] @classmethod def list_regions(cls): return ["ord", "dfw", "iad", "lon", "hkg", "syd"]
[docs] def iterate_containers(self): response = self.connection.request("") if response.status == httplib.NO_CONTENT: return [] elif response.status == httplib.OK: return self._to_container_list(json.loads(response.body)) raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def get_container(self, container_name): container_name_encoded = self._encode_container_name(container_name) response = self.connection.request("/%s" % (container_name_encoded), method="HEAD") if response.status == httplib.NO_CONTENT: container = self._headers_to_container(container_name, response.headers) return container elif response.status == httplib.NOT_FOUND: raise ContainerDoesNotExistError(None, self, container_name) raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def get_object(self, container_name, object_name): container = self.get_container(container_name) container_name_encoded = self._encode_container_name(container_name) object_name_encoded = self._encode_object_name(object_name) response = self.connection.request( "/{}/{}".format(container_name_encoded, object_name_encoded), method="HEAD" ) if response.status in [httplib.OK, httplib.NO_CONTENT]: obj = self._headers_to_object(object_name, container, response.headers) return obj elif response.status == httplib.NOT_FOUND: raise ObjectDoesNotExistError(None, self, object_name) raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def get_container_cdn_url(self, container, ex_ssl_uri=False): # pylint: disable=unexpected-keyword-arg container_name_encoded = self._encode_container_name(container.name) response = self.connection.request( "/%s" % (container_name_encoded), method="HEAD", cdn_request=True ) if response.status == httplib.NO_CONTENT: if ex_ssl_uri: cdn_url = response.headers["x-cdn-ssl-uri"] else: cdn_url = response.headers["x-cdn-uri"] return cdn_url elif response.status == httplib.NOT_FOUND: raise ContainerDoesNotExistError(value="", container_name=container.name, driver=self) raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def get_object_cdn_url(self, obj): container_cdn_url = self.get_container_cdn_url(container=obj.container) return "{}/{}".format(container_cdn_url, obj.name)
[docs] def enable_container_cdn(self, container, ex_ttl=None): """ @inherits: :class:`StorageDriver.enable_container_cdn` :param ex_ttl: cache time to live :type ex_ttl: ``int`` """ container_name = self._encode_container_name(container.name) headers = {"X-CDN-Enabled": "True"} if ex_ttl: headers["X-TTL"] = ex_ttl # pylint: disable=unexpected-keyword-arg response = self.connection.request( "/%s" % (container_name), method="PUT", headers=headers, cdn_request=True ) return response.status in [httplib.CREATED, httplib.ACCEPTED]
[docs] def create_container(self, container_name): container_name_encoded = self._encode_container_name(container_name) response = self.connection.request("/%s" % (container_name_encoded), method="PUT") if response.status == httplib.CREATED: # Accepted mean that container is not yet created but it will be # eventually extra = {"object_count": 0} container = Container(name=container_name, extra=extra, driver=self) return container elif response.status == httplib.ACCEPTED: error = ContainerAlreadyExistsError(None, self, container_name) raise error raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def delete_container(self, container): name = self._encode_container_name(container.name) # Only empty container can be deleted response = self.connection.request("/%s" % (name), method="DELETE") if response.status == httplib.NO_CONTENT: return True elif response.status == httplib.NOT_FOUND: raise ContainerDoesNotExistError(value="", container_name=name, driver=self) elif response.status == httplib.CONFLICT: # @TODO: Add "delete_all_objects" parameter? raise ContainerIsNotEmptyError(value="", container_name=name, driver=self)
[docs] def download_object( self, obj, destination_path, overwrite_existing=False, delete_on_failure=True ): container_name = obj.container.name object_name = obj.name response = self.connection.request( "/{}/{}".format(container_name, object_name), method="GET", raw=True ) return self._get_object( obj=obj, callback=self._save_object, response=response, callback_kwargs={ "obj": obj, "response": response.response, "destination_path": destination_path, "overwrite_existing": overwrite_existing, "delete_on_failure": delete_on_failure, }, success_status_code=httplib.OK, )
[docs] def download_object_as_stream(self, obj, chunk_size=None): container_name = obj.container.name object_name = obj.name response = self.connection.request( "/{}/{}".format(container_name, object_name), method="GET", raw=True ) return self._get_object( obj=obj, callback=read_in_chunks, response=response, callback_kwargs={ "iterator": response.iter_content(chunk_size), "chunk_size": chunk_size, }, success_status_code=httplib.OK, )
[docs] def download_object_range( self, obj, destination_path, start_bytes, end_bytes=None, overwrite_existing=False, delete_on_failure=True, ): self._validate_start_and_end_bytes(start_bytes=start_bytes, end_bytes=end_bytes) container_name = obj.container.name object_name = obj.name headers = {"Range": self._get_standard_range_str(start_bytes, end_bytes)} response = self.connection.request( "/{}/{}".format(container_name, object_name), method="GET", headers=headers, raw=True, ) return self._get_object( obj=obj, callback=self._save_object, response=response, callback_kwargs={ "obj": obj, "response": response.response, "destination_path": destination_path, "overwrite_existing": overwrite_existing, "delete_on_failure": delete_on_failure, "partial_download": True, }, success_status_code=httplib.PARTIAL_CONTENT, )
[docs] def download_object_range_as_stream(self, obj, start_bytes, end_bytes=None, chunk_size=None): self._validate_start_and_end_bytes(start_bytes=start_bytes, end_bytes=end_bytes) container_name = obj.container.name object_name = obj.name headers = {"Range": self._get_standard_range_str(start_bytes, end_bytes)} response = self.connection.request( "/{}/{}".format(container_name, object_name), headers=headers, method="GET", raw=True, ) return self._get_object( obj=obj, callback=read_in_chunks, response=response, callback_kwargs={ "iterator": response.iter_content(chunk_size), "chunk_size": chunk_size, }, success_status_code=httplib.PARTIAL_CONTENT, )
[docs] def upload_object( self, file_path, container, object_name, extra=None, verify_hash=True, headers=None, ): """ Upload an object. Note: This will override file with a same name if it already exists. """ return self._put_object( container=container, object_name=object_name, extra=extra, file_path=file_path, verify_hash=verify_hash, headers=headers, )
[docs] def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): if isinstance(iterator, file): iterator = iter(iterator) return self._put_object( container=container, object_name=object_name, extra=extra, stream=iterator, headers=headers, )
[docs] def delete_object(self, obj): container_name = self._encode_container_name(obj.container.name) object_name = self._encode_object_name(obj.name) response = self.connection.request( "/{}/{}".format(container_name, object_name), method="DELETE" ) if response.status == httplib.NO_CONTENT: return True elif response.status == httplib.NOT_FOUND: raise ObjectDoesNotExistError(value="", object_name=object_name, driver=self) raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def ex_purge_object_from_cdn(self, obj, email=None): """ Purge edge cache for the specified object. :param email: Email where a notification will be sent when the job completes. (optional) :type email: ``str`` """ container_name = self._encode_container_name(obj.container.name) object_name = self._encode_object_name(obj.name) headers = {"X-Purge-Email": email} if email else {} # pylint: disable=unexpected-keyword-arg response = self.connection.request( "/{}/{}".format(container_name, object_name), method="DELETE", headers=headers, cdn_request=True, ) return response.status == httplib.NO_CONTENT
[docs] def ex_get_meta_data(self): """ Get meta data :rtype: ``dict`` """ response = self.connection.request("", method="HEAD") if response.status == httplib.NO_CONTENT: container_count = response.headers.get("x-account-container-count", "unknown") object_count = response.headers.get("x-account-object-count", "unknown") bytes_used = response.headers.get("x-account-bytes-used", "unknown") temp_url_key = response.headers.get("x-account-meta-temp-url-key", None) return { "container_count": int(container_count), "object_count": int(object_count), "bytes_used": int(bytes_used), "temp_url_key": temp_url_key, } raise LibcloudError("Unexpected status code: %s" % (response.status))
[docs] def ex_multipart_upload_object( self, file_path, container, object_name, chunk_size=33554432, extra=None, verify_hash=True, ): object_size = os.path.getsize(file_path) if object_size < chunk_size: return self.upload_object( file_path, container, object_name, extra=extra, verify_hash=verify_hash ) iter_chunk_reader = FileChunkReader(file_path, chunk_size) for index, iterator in enumerate(iter_chunk_reader): self._upload_object_part( container=container, object_name=object_name, part_number=index, iterator=iterator, verify_hash=verify_hash, ) return self._upload_object_manifest( container=container, object_name=object_name, extra=extra, verify_hash=verify_hash, )
[docs] def ex_enable_static_website(self, container, index_file="index.html"): """ Enable serving a static website. :param container: Container instance :type container: :class:`Container` :param index_file: Name of the object which becomes an index page for every sub-directory in this container. :type index_file: ``str`` :rtype: ``bool`` """ container_name = container.name headers = {"X-Container-Meta-Web-Index": index_file} # pylint: disable=unexpected-keyword-arg response = self.connection.request( "/%s" % (container_name), method="POST", headers=headers, cdn_request=False ) return response.status in [httplib.CREATED, httplib.ACCEPTED]
[docs] def ex_set_error_page(self, container, file_name="error.html"): """ Set a custom error page which is displayed if file is not found and serving of a static website is enabled. :param container: Container instance :type container: :class:`Container` :param file_name: Name of the object which becomes the error page. :type file_name: ``str`` :rtype: ``bool`` """ container_name = container.name headers = {"X-Container-Meta-Web-Error": file_name} # pylint: disable=unexpected-keyword-arg response = self.connection.request( "/%s" % (container_name), method="POST", headers=headers, cdn_request=False ) return response.status in [httplib.CREATED, httplib.ACCEPTED]
[docs] def ex_set_account_metadata_temp_url_key(self, key): """ Set the metadata header X-Account-Meta-Temp-URL-Key on your Cloud Files account. :param key: X-Account-Meta-Temp-URL-Key :type key: ``str`` :rtype: ``bool`` """ headers = {"X-Account-Meta-Temp-URL-Key": key} # pylint: disable=unexpected-keyword-arg response = self.connection.request("", method="POST", headers=headers, cdn_request=False) return response.status in [ httplib.OK, httplib.NO_CONTENT, httplib.CREATED, httplib.ACCEPTED, ]
[docs] def ex_get_object_temp_url(self, obj, method="GET", timeout=60): """ Create a temporary URL to allow others to retrieve or put objects in your Cloud Files account for as long or as short a time as you wish. This method is specifically for allowing users to retrieve or update an object. :param obj: The object that you wish to make temporarily public :type obj: :class:`Object` :param method: Which method you would like to allow, 'PUT' or 'GET' :type method: ``str`` :param timeout: Time (in seconds) after which you want the TempURL to expire. :type timeout: ``int`` :rtype: ``bool`` """ # pylint: disable=no-member self.connection._populate_hosts_and_request_paths() expires = int(time() + timeout) path = "{}/{}/{}".format(self.connection.request_path, obj.container.name, obj.name) try: key = self.ex_get_meta_data()["temp_url_key"] assert key is not None except Exception: raise KeyError( "You must first set the " + "X-Account-Meta-Temp-URL-Key header on your " + "Cloud Files account using " + "ex_set_account_metadata_temp_url_key before " + "you can use this method." ) hmac_body = "{}\n{}\n{}".format(method, expires, path) sig = hmac.new(b(key), b(hmac_body), sha1).hexdigest() params = urlencode({"temp_url_sig": sig, "temp_url_expires": expires}) temp_url = "https://{}/{}/{}?{}".format( self.connection.host + self.connection.request_path, obj.container.name, obj.name, params, ) return temp_url
def _upload_object_part(self, container, object_name, part_number, iterator, verify_hash=True): part_name = object_name + "/%08d" % part_number extra = {"content_type": "application/octet-stream"} self._put_object( container=container, object_name=part_name, extra=extra, stream=iterator, verify_hash=verify_hash, ) def _upload_object_manifest(self, container, object_name, extra=None, verify_hash=True): extra = extra or {} meta_data = extra.get("meta_data") container_name_encoded = self._encode_container_name(container.name) object_name_encoded = self._encode_object_name(object_name) request_path = "/{}/{}".format(container_name_encoded, object_name_encoded) # pylint: disable=no-member headers = { "X-Auth-Token": self.connection.auth_token, "X-Object-Manifest": "{}/{}/".format(container_name_encoded, object_name_encoded), } data = "" response = self.connection.request( request_path, method="PUT", data=data, headers=headers, raw=True ) object_hash = None if verify_hash: hash_function = self._get_hash_function() hash_function.update(b(data)) data_hash = hash_function.hexdigest() object_hash = response.headers.get("etag") if object_hash != data_hash: raise ObjectHashMismatchError( value=("MD5 hash checksum does not match (expected=%s, " + "actual=%s)") % (data_hash, object_hash), object_name=object_name, driver=self, ) obj = Object( name=object_name, size=0, hash=object_hash, extra=None, meta_data=meta_data, container=container, driver=self, ) return obj
[docs] def iterate_container_objects(self, container, prefix=None, ex_prefix=None): """ Return a generator of objects for the given container. :param container: Container instance :type container: :class:`Container` :param prefix: Only get objects with names starting with prefix :type prefix: ``str`` :param ex_prefix: (Deprecated.) Only get objects with names starting with ex_prefix :type ex_prefix: ``str`` :return: A generator of Object instances. :rtype: ``generator`` of :class:`Object` """ prefix = self._normalize_prefix_argument(prefix, ex_prefix) params = {} if prefix: params["prefix"] = prefix while True: container_name_encoded = self._encode_container_name(container.name) response = self.connection.request("/%s" % (container_name_encoded), params=params) if response.status == httplib.NO_CONTENT: # Empty or non-existent container break elif response.status == httplib.OK: objects = self._to_object_list(json.loads(response.body), container) if len(objects) == 0: break for obj in objects: yield obj params["marker"] = obj.name else: raise LibcloudError("Unexpected status code: %s" % (response.status))
def _put_object( self, container, object_name, extra=None, file_path=None, stream=None, verify_hash=True, headers=None, ): extra = extra or {} container_name_encoded = self._encode_container_name(container.name) object_name_encoded = self._encode_object_name(object_name) content_type = extra.get("content_type", None) meta_data = extra.get("meta_data", None) content_disposition = extra.get("content_disposition", None) headers = headers or {} if meta_data: for key, value in list(meta_data.items()): key = "X-Object-Meta-%s" % (key) headers[key] = value if content_disposition is not None: headers["Content-Disposition"] = content_disposition request_path = "/{}/{}".format(container_name_encoded, object_name_encoded) result_dict = self._upload_object( object_name=object_name, content_type=content_type, request_path=request_path, request_method="PUT", headers=headers, file_path=file_path, stream=stream, ) response = result_dict["response"] bytes_transferred = result_dict["bytes_transferred"] server_hash = result_dict["response"].headers.get("etag", None) if response.status == httplib.EXPECTATION_FAILED: raise LibcloudError(value="Missing content-type header", driver=self) elif verify_hash and not server_hash: raise LibcloudError(value="Server didn't return etag", driver=self) elif verify_hash and result_dict["data_hash"] != server_hash: raise ObjectHashMismatchError( value=("MD5 hash checksum does not match (expected=%s, " + "actual=%s)") % (result_dict["data_hash"], server_hash), object_name=object_name, driver=self, ) elif response.status == httplib.CREATED: obj = Object( name=object_name, size=bytes_transferred, hash=server_hash, extra=None, meta_data=meta_data, container=container, driver=self, ) return obj else: # @TODO: Add test case for this condition (probably 411) raise LibcloudError("status_code=%s" % (response.status), driver=self) def _encode_container_name(self, name): """ Encode container name so it can be used as part of the HTTP request. """ if name.startswith("/"): name = name[1:] name = urlquote(name) if name.find("/") != -1: raise InvalidContainerNameError( value="Container name cannot" " contain slashes", container_name=name, driver=self, ) if len(name) > 256: raise InvalidContainerNameError( value="Container name cannot be longer than 256 bytes", container_name=name, driver=self, ) return name def _encode_object_name(self, name): name = urlquote(name) return name def _to_container_list(self, response): # @TODO: Handle more than 10k containers - use "lazy list"? for container in response: extra = { "object_count": int(container["count"]), "size": int(container["bytes"]), } yield Container(name=container["name"], extra=extra, driver=self) def _to_object_list(self, response, container): objects = [] for obj in response: name = obj["name"] size = int(obj["bytes"]) hash = obj["hash"] extra = { "content_type": obj["content_type"], "last_modified": obj["last_modified"], } objects.append( Object( name=name, size=size, hash=hash, extra=extra, meta_data=None, container=container, driver=self, ) ) return objects def _headers_to_container(self, name, headers): size = int(headers.get("x-container-bytes-used", 0)) object_count = int(headers.get("x-container-object-count", 0)) extra = {"object_count": object_count, "size": size} container = Container(name=name, extra=extra, driver=self) return container def _headers_to_object(self, name, container, headers): size = int(headers.pop("content-length", 0)) last_modified = headers.pop("last-modified", None) etag = headers.pop("etag", None) content_type = headers.pop("content-type", None) meta_data = {} for key, value in list(headers.items()): if key.find("x-object-meta-") != -1: key = key.replace("x-object-meta-", "") meta_data[key] = value extra = {"content_type": content_type, "last_modified": last_modified} obj = Object( name=name, size=size, hash=etag, extra=extra, meta_data=meta_data, container=container, driver=self, ) return obj def _ex_connection_class_kwargs(self): kwargs = self.openstack_connection_kwargs() kwargs["ex_force_service_region"] = self.region kwargs["use_internal_url"] = self.use_internal_url return kwargs
[docs]class OpenStackSwiftStorageDriver(CloudFilesStorageDriver): """ Storage driver for the OpenStack Swift. """ type = Provider.CLOUDFILES_SWIFT name = "OpenStack Swift" connectionCls = OpenStackSwiftConnection # TODO: Reverse the relationship - Swift -> CloudFiles def __init__(self, key, secret=None, secure=True, host=None, port=None, region=None, **kwargs): super().__init__( key=key, secret=secret, secure=secure, host=host, port=port, region=region, **kwargs, )
[docs]class FileChunkReader: def __init__(self, file_path, chunk_size): self.file_path = file_path self.total = os.path.getsize(file_path) self.chunk_size = chunk_size self.bytes_read = 0 self.stop_iteration = False def __iter__(self): return self
[docs] def next(self): if self.stop_iteration: raise StopIteration start_block = self.bytes_read end_block = start_block + self.chunk_size if end_block >= self.total: end_block = self.total self.stop_iteration = True self.bytes_read += end_block - start_block return ChunkStreamReader( file_path=self.file_path, start_block=start_block, end_block=end_block, chunk_size=8192, )
def __next__(self): return self.next()
[docs]class ChunkStreamReader: def __init__(self, file_path, start_block, end_block, chunk_size): self.fd = open(file_path, "rb") self.fd.seek(start_block) self.start_block = start_block self.end_block = end_block self.chunk_size = chunk_size self.bytes_read = 0 self.stop_iteration = False # Work around to make sure file description is closed even if the # iterator is never read from or if it's not fully exhausted def close_file(fd): try: fd.close() except Exception: pass atexit.register(close_file, self.fd) def __iter__(self): return self
[docs] def next(self): if self.stop_iteration: self.fd.close() raise StopIteration block_size = self.chunk_size if self.bytes_read + block_size > self.end_block - self.start_block: block_size = self.end_block - self.start_block - self.bytes_read self.stop_iteration = True block = self.fd.read(block_size) self.bytes_read += block_size return block
def __next__(self): return self.next()