Source code for libcloud.storage.drivers.atmos

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hmac
import time
import base64
import hashlib
from io import FileIO as file

from libcloud.utils.py3 import b, next, httplib, urlparse, urlquote, urlencode, urlunquote
from libcloud.common.base import XmlResponse, ConnectionUserAndKey
from libcloud.utils.files import read_in_chunks
from libcloud.common.types import LibcloudError
from libcloud.storage.base import CHUNK_SIZE, Object, Container, StorageDriver
from libcloud.storage.types import (
    ObjectDoesNotExistError,
    ContainerIsNotEmptyError,
    ContainerDoesNotExistError,
    ContainerAlreadyExistsError,
)


[docs]def collapse(s): return " ".join([x for x in s.split(" ") if x])
[docs]class AtmosError(LibcloudError): def __init__(self, code, message, driver=None): super().__init__(value=message, driver=driver) self.code = code
[docs]class AtmosResponse(XmlResponse):
[docs] def success(self): return self.status in ( httplib.OK, httplib.CREATED, httplib.NO_CONTENT, httplib.PARTIAL_CONTENT, )
[docs] def parse_error(self): tree = self.parse_body() if tree is None: return None code = int(tree.find("Code").text) message = tree.find("Message").text raise AtmosError(code=code, message=message, driver=self.connection.driver)
[docs]class AtmosConnection(ConnectionUserAndKey): responseCls = AtmosResponse
[docs] def add_default_headers(self, headers): headers["x-emc-uid"] = self.user_id headers["Date"] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) headers["x-emc-date"] = headers["Date"] if "Content-Type" not in headers: headers["Content-Type"] = "application/octet-stream" if "Accept" not in headers: headers["Accept"] = "*/*" return headers
[docs] def pre_connect_hook(self, params, headers): headers["x-emc-signature"] = self._calculate_signature(params, headers) return params, headers
def _calculate_signature(self, params, headers): pathstring = urlunquote(self.action) driver_path = self.driver.path # pylint: disable=no-member if pathstring.startswith(driver_path): pathstring = pathstring[len(driver_path) :] if params: if type(params) is dict: params = list(params.items()) pathstring += "?" + urlencode(params) pathstring = pathstring.lower() xhdrs = [(k, v) for k, v in list(headers.items()) if k.startswith("x-emc-")] xhdrs.sort(key=lambda x: x[0]) signature = [ self.method, headers.get("Content-Type", ""), headers.get("Range", ""), headers.get("Date", ""), pathstring, ] signature.extend([k + ":" + collapse(v) for k, v in xhdrs]) signature = "\n".join(signature) key = base64.b64decode(self.key) signature = hmac.new(b(key), b(signature), hashlib.sha1).digest() return base64.b64encode(b(signature)).decode("utf-8")
[docs]class AtmosDriver(StorageDriver): connectionCls = AtmosConnection host = None # type: str path = None # type: str api_name = "atmos" supports_chunked_encoding = True website = "http://atmosonline.com/" name = "atmos" DEFAULT_CDN_TTL = 60 * 60 * 24 * 7 # 1 week def __init__(self, key, secret=None, secure=True, host=None, port=None): host = host or self.host super().__init__(key, secret, secure, host, port)
[docs] def iterate_containers(self): result = self.connection.request(self._namespace_path("")) entries = self._list_objects(result.object, object_type="directory") for entry in entries: extra = {"object_id": entry["id"]} yield Container(entry["name"], extra, self)
[docs] def get_container(self, container_name): path = self._namespace_path(container_name) + "/?metadata/system" try: result = self.connection.request(path) except AtmosError as e: if e.code != 1003: raise raise ContainerDoesNotExistError(e, self, container_name) meta = self._emc_meta(result) extra = {"object_id": meta["objectid"]} return Container(container_name, extra, self)
[docs] def create_container(self, container_name): path = self._namespace_path(container_name) + "/" try: self.connection.request(path, method="POST") except AtmosError as e: if e.code != 1016: raise raise ContainerAlreadyExistsError(e, self, container_name) return self.get_container(container_name)
[docs] def delete_container(self, container): try: self.connection.request(self._namespace_path(container.name) + "/", method="DELETE") except AtmosError as e: if e.code == 1003: raise ContainerDoesNotExistError(e, self, container.name) elif e.code == 1023: raise ContainerIsNotEmptyError(e, self, container.name) return True
[docs] def get_object(self, container_name, object_name): container = self.get_container(container_name) object_name_cleaned = self._clean_object_name(object_name) path = self._namespace_path(container_name) + "/" + object_name_cleaned try: result = self.connection.request(path + "?metadata/system") system_meta = self._emc_meta(result) result = self.connection.request(path + "?metadata/user") user_meta = self._emc_meta(result) except AtmosError as e: if e.code != 1003: raise raise ObjectDoesNotExistError(e, self, object_name) last_modified = time.strptime(system_meta["mtime"], "%Y-%m-%dT%H:%M:%SZ") last_modified = time.strftime("%a, %d %b %Y %H:%M:%S GMT", last_modified) extra = {"object_id": system_meta["objectid"], "last_modified": last_modified} data_hash = user_meta.pop("md5", "") return Object( object_name, int(system_meta["size"]), data_hash, extra, user_meta, container, self, )
[docs] def upload_object( self, file_path, container, object_name, extra=None, verify_hash=True, headers=None, ): method = "PUT" extra = extra or {} object_name_cleaned = self._clean_object_name(object_name) request_path = self._namespace_path(container.name) + "/" + object_name_cleaned content_type = extra.get("content_type", None) try: self.connection.request(request_path + "?metadata/system") except AtmosError as e: if e.code != 1003: raise method = "POST" result_dict = self._upload_object( object_name=object_name, content_type=content_type, request_path=request_path, request_method=method, headers={}, file_path=file_path, ) bytes_transferred = result_dict["bytes_transferred"] if extra is None: meta_data = {} else: meta_data = extra.get("meta_data", {}) meta_data["md5"] = result_dict["data_hash"] user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())]) self.connection.request( request_path + "?metadata/user", method="POST", headers={"x-emc-meta": user_meta}, ) result = self.connection.request(request_path + "?metadata/system") meta = self._emc_meta(result) del meta_data["md5"] extra = { "object_id": meta["objectid"], "meta_data": meta_data, } return Object( object_name, bytes_transferred, result_dict["data_hash"], extra, meta_data, container, self, )
[docs] def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): if isinstance(iterator, file): iterator = iter(iterator) extra_headers = headers or {} data_hash = hashlib.md5() generator = read_in_chunks(iterator, CHUNK_SIZE, True) bytes_transferred = 0 try: chunk = next(generator) except StopIteration: chunk = "" path = self._namespace_path(container.name + "/" + object_name) method = "PUT" if extra is not None: content_type = extra.get("content_type", None) else: content_type = None content_type = self._determine_content_type(content_type, object_name) try: self.connection.request(path + "?metadata/system") except AtmosError as e: if e.code != 1003: raise method = "POST" while True: end = bytes_transferred + len(chunk) - 1 data_hash.update(b(chunk)) headers = dict(extra_headers) headers.update( { "x-emc-meta": "md5=" + data_hash.hexdigest(), "Content-Type": content_type, } ) if len(chunk) > 0 and bytes_transferred > 0: headers["Range"] = "Bytes=%d-%d" % (bytes_transferred, end) method = "PUT" result = self.connection.request(path, method=method, data=chunk, headers=headers) bytes_transferred += len(chunk) try: chunk = next(generator) except StopIteration: break if len(chunk) == 0: break data_hash = data_hash.hexdigest() if extra is None: meta_data = {} else: meta_data = extra.get("meta_data", {}) meta_data["md5"] = data_hash user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())]) self.connection.request( path + "?metadata/user", method="POST", headers={"x-emc-meta": user_meta} ) result = self.connection.request(path + "?metadata/system") meta = self._emc_meta(result) extra = { "object_id": meta["objectid"], "meta_data": meta_data, } return Object(object_name, bytes_transferred, data_hash, extra, meta_data, container, self)
[docs] def download_object( self, obj, destination_path, overwrite_existing=False, delete_on_failure=True ): path = self._namespace_path(obj.container.name + "/" + obj.name) response = self.connection.request(path, method="GET", raw=True) return self._get_object( obj=obj, callback=self._save_object, response=response, callback_kwargs={ "obj": obj, "response": response.response, "destination_path": destination_path, "overwrite_existing": overwrite_existing, "delete_on_failure": delete_on_failure, }, success_status_code=httplib.OK, )
[docs] def download_object_as_stream(self, obj, chunk_size=None): path = self._namespace_path(obj.container.name + "/" + obj.name) response = self.connection.request(path, method="GET", raw=True) return self._get_object( obj=obj, callback=read_in_chunks, response=response, callback_kwargs={"iterator": response.response, "chunk_size": chunk_size}, success_status_code=httplib.OK, )
[docs] def delete_object(self, obj): path = self._namespace_path(obj.container.name) + "/" + self._clean_object_name(obj.name) try: self.connection.request(path, method="DELETE") except AtmosError as e: if e.code != 1003: raise raise ObjectDoesNotExistError(e, self, obj.name) return True
[docs] def enable_object_cdn(self, obj): return True
[docs] def get_object_cdn_url(self, obj, expiry=None, use_object=False): """ Return an object CDN URL. :param obj: Object instance :type obj: :class:`Object` :param expiry: Expiry :type expiry: ``str`` :param use_object: Use object :type use_object: ``bool`` :rtype: ``str`` """ if use_object: path = "/rest/objects" + obj.meta_data["object_id"] else: path = "/rest/namespace/" + obj.container.name + "/" + obj.name if self.secure: protocol = "https" else: protocol = "http" expiry = str(expiry or int(time.time()) + self.DEFAULT_CDN_TTL) params = [ ("uid", self.key), ("expires", expiry), ] params.append(("signature", self._cdn_signature(path, params, expiry))) params = urlencode(params) path = self.path + path return urlparse.urlunparse((protocol, self.host, path, "", params, ""))
def _cdn_signature(self, path, params, expiry): key = base64.b64decode(self.secret) signature = "\n".join(["GET", path.lower(), self.key, expiry]) signature = hmac.new(key, signature, hashlib.sha1).digest() return base64.b64encode(signature) def _list_objects(self, tree, object_type=None): listing = tree.find(self._emc_tag("DirectoryList")) entries = [] for entry in listing.findall(self._emc_tag("DirectoryEntry")): file_type = entry.find(self._emc_tag("FileType")).text if object_type is not None and object_type != file_type: continue entries.append( { "id": entry.find(self._emc_tag("ObjectID")).text, "type": file_type, "name": entry.find(self._emc_tag("Filename")).text, } ) return entries def _clean_object_name(self, name): return urlquote(name.encode("ascii")) def _namespace_path(self, path): return self.path + "/rest/namespace/" + urlquote(path.encode("ascii")) def _object_path(self, object_id): return self.path + "/rest/objects/" + object_id.encode("ascii") @staticmethod def _emc_tag(tag): return "{http://www.emc.com/cos/}" + tag def _emc_meta(self, response): meta = response.headers.get("x-emc-meta", "") if len(meta) == 0: return {} meta = meta.split(", ") return dict([x.split("=", 1) for x in meta]) def _entries_to_objects(self, container, entries): for entry in entries: metadata = {"object_id": entry["id"]} yield Object(entry["name"], 0, "", {}, metadata, container, self)
[docs] def iterate_container_objects(self, container, prefix=None, ex_prefix=None): """ Return a generator of objects for the given container. :param container: Container instance :type container: :class:`Container` :param prefix: Filter objects starting with a prefix. Filtering is performed client-side. :type prefix: ``str`` :param ex_prefix: (Deprecated.) Filter objects starting with a prefix. Filtering is performed client-side. :type ex_prefix: ``str`` :return: A generator of Object instances. :rtype: ``generator`` of :class:`Object` """ prefix = self._normalize_prefix_argument(prefix, ex_prefix) headers = {"x-emc-include-meta": "1"} path = self._namespace_path(container.name) + "/" result = self.connection.request(path, headers=headers) entries = self._list_objects(result.object, object_type="regular") objects = self._entries_to_objects(container, entries) return self._filter_listed_container_objects(objects, prefix)