Source code for libcloud.storage.drivers.oss

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=unexpected-keyword-arg

import base64
import codecs
import hmac
import os
import time
import sys
from hashlib import sha1

try:
    from lxml import etree as ET
except ImportError:
    from xml.etree import ElementTree as ET

try:
    from lxml.etree import Element, SubElement
except ImportError:
    from xml.etree.ElementTree import Element, SubElement

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlquote
from libcloud.utils.py3 import urlencode
from libcloud.utils.py3 import b
from libcloud.utils.py3 import tostring
from libcloud.utils.py3 import PY3
from libcloud.utils.xml import fixxpath, findtext
from libcloud.utils.files import guess_file_mime_type, read_in_chunks, \
    exhaust_iterator
from libcloud.common.types import InvalidCredsError, LibcloudError
from libcloud.common.base import ConnectionUserAndKey, RawResponse, \
    XmlResponse
from libcloud.common.types import MalformedResponseError
from libcloud.storage.base import Object, Container, StorageDriver, \
    DEFAULT_CONTENT_TYPE
from libcloud.storage.types import ContainerError
from libcloud.storage.types import ContainerIsNotEmptyError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ObjectDoesNotExistError
from libcloud.storage.types import ObjectHashMismatchError

__all__ = [
    'OSSStorageDriver',
    'OSSMultipartUpload',

    'EXPIRATION_SECONDS',
    'CHUNK_SIZE',
    'MAX_UPLOADS_PER_RESPONSE'
]

GMT_TIME_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
EXPIRATION_SECONDS = 15 * 60

# OSS multi-part chunks must be great than 100KB except the last one
CHUNK_SIZE = 100 * 1024

# Desired number of items in each response inside a paginated request in
# ex_iterate_multipart_uploads.
MAX_UPLOADS_PER_RESPONSE = 1000


class OSSResponse(XmlResponse):
    namespace = None
    valid_response_codes = [httplib.NOT_FOUND, httplib.CONFLICT,
                            httplib.BAD_REQUEST]

    def success(self):
        i = int(self.status)
        return i >= 200 and i <= 299 or i in self.valid_response_codes

    def parse_body(self):
        """
        OSSResponse body is in utf-8 encoding.
        """
        if len(self.body) == 0 and not self.parse_zero_length_body:
            return self.body

        try:
            if PY3:
                parser = ET.XMLParser(encoding='utf-8')
                body = ET.XML(self.body.encode('utf-8'), parser=parser)
            else:
                body = ET.XML(self.body)
        except:
            raise MalformedResponseError('Failed to parse XML',
                                         body=self.body,
                                         driver=self.connection.driver)
        return body

    def parse_error(self):
        if self.status in [httplib.UNAUTHORIZED, httplib.FORBIDDEN]:
            raise InvalidCredsError(self.body)
        elif self.status == httplib.MOVED_PERMANENTLY:
            raise LibcloudError('This bucket is located in a different ' +
                                'region. Please use the correct driver.',
                                driver=OSSStorageDriver)
        elif self.status == httplib.METHOD_NOT_ALLOWED:
            raise LibcloudError('The method is not allowed. Status code: %d, '
                                'headers: %s' % (self.status, self.headers))
        raise LibcloudError('Unknown error. Status code: %d, body: %s' %
                            (self.status, self.body),
                            driver=OSSStorageDriver)


class OSSRawResponse(OSSResponse, RawResponse):
    pass


class OSSConnection(ConnectionUserAndKey):
    """
    Represents a single connection to the Aliyun OSS Endpoint
    """

    _domain = 'aliyuncs.com'
    _default_location = 'oss'
    responseCls = OSSResponse
    rawResponseCls = OSSRawResponse

    @staticmethod
    def _get_auth_signature(method, headers, params, expires, secret_key, path,
                            vendor_prefix):
        """
        Signature = base64(hmac-sha1(AccessKeySecret,
          VERB + "\n"
          + CONTENT-MD5 + "\n"
          + CONTENT-TYPE + "\n"
          + EXPIRES + "\n"
          + CanonicalizedOSSHeaders
          + CanonicalizedResource))
        """
        special_headers = {'content-md5': '',
                           'content-type': '',
                           'expires': ''}
        vendor_headers = {}

        for key, value in list(headers.items()):
            key_lower = key.lower()
            if key_lower in special_headers:
                special_headers[key_lower] = value.strip()
            elif key_lower.startswith(vendor_prefix):
                vendor_headers[key_lower] = value.strip()

        if expires:
            special_headers['expires'] = str(expires)

        buf = [method]
        for _, value in sorted(special_headers.items()):
            buf.append(value)
        string_to_sign = '\n'.join(buf)

        buf = []
        for key, value in sorted(vendor_headers.items()):
            buf.append('%s:%s' % (key, value))
        header_string = '\n'.join(buf)

        values_to_sign = []
        for value in [string_to_sign, header_string, path]:
            if value:
                values_to_sign.append(value)

        string_to_sign = '\n'.join(values_to_sign)
        b64_hmac = base64.b64encode(
            hmac.new(b(secret_key), b(string_to_sign), digestmod=sha1).digest()
        )
        return b64_hmac

    @staticmethod
    def _get_expires(params):
        """
        Get expires timeout seconds from parameters.
        """
        expires = None
        if 'expires' in params:
            expires = params['expires']
        elif 'Expires' in params:
            expires = params['Expires']
        if expires:
            try:
                return int(expires)
            except Exception:
                pass
        return int(time.time()) + EXPIRATION_SECONDS

    def add_default_params(self, params):
        expires_at = self._get_expires(params)
        expires = str(expires_at)
        params['OSSAccessKeyId'] = self.user_id
        params['Expires'] = expires
        return params

    def add_default_headers(self, headers):
        headers['Date'] = time.strftime(GMT_TIME_FORMAT, time.gmtime())
        return headers

    def pre_connect_hook(self, params, headers):
        if self._container:
            path = '/%s%s' % (self._container.name, self.action)
        else:
            path = self.action
        params['Signature'] = self._get_auth_signature(
            method=self.method, headers=headers, params=params,
            expires=params['Expires'], secret_key=self.key, path=path,
            vendor_prefix=self.driver.http_vendor_prefix)
        return params, headers

    def request(self, action, params=None, data=None, headers=None,
                method='GET', raw=False, container=None):
        self.host = '%s.%s' % (self._default_location, self._domain)
        self._container = container
        if container and container.name:
            if 'location' in container.extra:
                self.host = '%s.%s.%s' % (container.name,
                                          container.extra['location'],
                                          self._domain)
            else:
                self.host = '%s.%s' % (container.name, self.host)
        return super(OSSConnection, self).request(action=action,
                                                  params=params,
                                                  data=data,
                                                  headers=headers,
                                                  method=method,
                                                  raw=raw)


[docs]class OSSMultipartUpload(object): """ Class representing an Aliyun OSS multipart upload """ def __init__(self, key, id, initiated): """ Class representing an Aliyun OSS multipart upload :param key: The object/key that was being uploaded :type key: ``str`` :param id: The upload id assigned by Aliyun :type id: ``str`` :param initiated: The date/time at which the upload was started :type created_at: ``str`` """ self.key = key self.id = id self.initiated = initiated def __repr__(self): return ('<OSSMultipartUpload: key=%s>' % (self.key))
[docs]class OSSStorageDriver(StorageDriver): name = 'Aliyun OSS' website = 'http://www.aliyun.com/product/oss' connectionCls = OSSConnection hash_type = 'md5' supports_chunked_encoding = False supports_multipart_upload = True namespace = None http_vendor_prefix = 'x-oss-'
[docs] def iterate_containers(self): response = self.connection.request('/') if response.status == httplib.OK: containers = self._to_containers(obj=response.object, xpath='Buckets/Bucket') return containers raise LibcloudError('Unexpected status code: %s' % (response.status), driver=self)
[docs] def list_container_objects(self, container, ex_prefix=None): """ Return a list of objects for the given container. :param container: Container instance. :type container: :class:`Container` :keyword ex_prefix: Only return objects starting with ex_prefix :type ex_prefix: ``str`` :return: A list of Object instances. :rtype: ``list`` of :class:`Object` """ return list(self.iterate_container_objects(container, ex_prefix=ex_prefix))
[docs] def iterate_container_objects(self, container, ex_prefix=None): """ Return a generator of objects for the given container. :param container: Container instance :type container: :class:`Container` :keyword ex_prefix: Only return objects starting with ex_prefix :type ex_prefix: ``str`` :return: A generator of Object instances. :rtype: ``generator`` of :class:`Object` """ params = {} if ex_prefix: params['prefix'] = ex_prefix last_key = None exhausted = False while not exhausted: if last_key: params['marker'] = last_key response = self.connection.request('/', params=params, container=container) if response.status != httplib.OK: raise LibcloudError('Unexpected status code: %s' % (response.status), driver=self) objects = self._to_objs(obj=response.object, xpath='Contents', container=container) is_truncated = response.object.findtext(fixxpath( xpath='IsTruncated', namespace=self.namespace)).lower() exhausted = (is_truncated == 'false') last_key = None for obj in objects: last_key = obj.name yield obj
[docs] def get_container(self, container_name): for container in self.iterate_containers(): if container.name == container_name: return container raise ContainerDoesNotExistError(value=None, driver=self, container_name=container_name)
[docs] def get_object(self, container_name, object_name): container = self.get_container(container_name=container_name) object_path = self._get_object_path(container, object_name) response = self.connection.request(object_path, method='HEAD', container=container) if response.status == httplib.OK: obj = self._headers_to_object(object_name=object_name, container=container, headers=response.headers) return obj raise ObjectDoesNotExistError(value=None, driver=self, object_name=object_name)
[docs] def create_container(self, container_name, ex_location=None): """ @inherits :class:`StorageDriver.create_container` :keyword ex_location: The desired location where to create container :type keyword: ``str`` """ extra = None if ex_location: root = Element('CreateBucketConfiguration') child = SubElement(root, 'LocationConstraint') child.text = ex_location data = tostring(root) extra = {'location': ex_location} else: data = '' container = Container(name=container_name, extra=extra, driver=self) response = self.connection.request('/', data=data, method='PUT', container=container) if response.status == httplib.OK: return container elif response.status == httplib.CONFLICT: raise InvalidContainerNameError( value='Container with this name already exists. The name must ' 'be unique among all the containers in the system', container_name=container_name, driver=self) elif response.status == httplib.BAD_REQUEST: raise ContainerError( value='Bad request when creating container: %s' % response.body, container_name=container_name, driver=self) raise LibcloudError('Unexpected status code: %s' % (response.status), driver=self)
[docs] def delete_container(self, container): # Note: All the objects in the container must be deleted first response = self.connection.request('/', method='DELETE', container=container) if response.status == httplib.NO_CONTENT: return True elif response.status == httplib.CONFLICT: raise ContainerIsNotEmptyError( value='Container must be empty before it can be deleted.', container_name=container.name, driver=self) elif response.status == httplib.NOT_FOUND: raise ContainerDoesNotExistError(value=None, driver=self, container_name=container.name) return False
[docs] def download_object(self, obj, destination_path, overwrite_existing=False, delete_on_failure=True): obj_path = self._get_object_path(obj.container, obj.name) response = self.connection.request(obj_path, method='GET', raw=True, container=obj.container) return self._get_object(obj=obj, callback=self._save_object, response=response, callback_kwargs={ 'obj': obj, 'response': response.response, 'destination_path': destination_path, 'overwrite_existing': overwrite_existing, 'delete_on_failure': delete_on_failure}, success_status_code=httplib.OK)
[docs] def download_object_as_stream(self, obj, chunk_size=None): obj_path = self._get_object_path(obj.container, obj.name) response = self.connection.request(obj_path, method='GET', raw=True, container=obj.container) return self._get_object(obj=obj, callback=read_in_chunks, response=response, callback_kwargs={'iterator': response.response, 'chunk_size': chunk_size}, success_status_code=httplib.OK)
[docs] def upload_object(self, file_path, container, object_name, extra=None, verify_hash=True, headers=None): upload_func = self._upload_file upload_func_kwargs = {'file_path': file_path} return self._put_object(container=container, object_name=object_name, upload_func=upload_func, upload_func_kwargs=upload_func_kwargs, extra=extra, file_path=file_path, verify_hash=verify_hash)
[docs] def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): method = 'PUT' params = None if self.supports_multipart_upload: # Initiate the multipart request and get an upload id upload_func = self._upload_multipart upload_func_kwargs = {'iterator': iterator, 'container': container, 'object_name': object_name} method = 'POST' iterator = iter('') params = 'uploads' elif self.supports_chunked_encoding: upload_func = self._stream_data upload_func_kwargs = {'iterator': iterator} else: # In this case, we have to load the entire object to # memory and send it as normal data upload_func = self._upload_data upload_func_kwargs = {} return self._put_object(container=container, object_name=object_name, upload_func=upload_func, upload_func_kwargs=upload_func_kwargs, extra=extra, method=method, query_args=params, iterator=iterator, verify_hash=False)
[docs] def delete_object(self, obj): object_path = self._get_object_path(obj.container, obj.name) response = self.connection.request(object_path, method='DELETE', container=obj.container) if response.status == httplib.NO_CONTENT: return True elif response.status == httplib.NOT_FOUND: raise ObjectDoesNotExistError(value=None, driver=self, object_name=obj.name) return False
[docs] def ex_iterate_multipart_uploads(self, container, prefix=None, delimiter=None, max_uploads=MAX_UPLOADS_PER_RESPONSE): """ Extension method for listing all in-progress OSS multipart uploads. Each multipart upload which has not been committed or aborted is considered in-progress. :param container: The container holding the uploads :type container: :class:`Container` :keyword prefix: Print only uploads of objects with this prefix :type prefix: ``str`` :keyword delimiter: The object/key names are grouped based on being split by this delimiter :type delimiter: ``str`` :keyword max_uploads: The max uplod items returned for one request :type max_uploads: ``int`` :return: A generator of OSSMultipartUpload instances. :rtype: ``generator`` of :class:`OSSMultipartUpload` """ if not self.supports_multipart_upload: raise LibcloudError('Feature not supported', driver=self) request_path = '/?uploads' params = {'max-uploads': max_uploads} if prefix: params['prefix'] = prefix if delimiter: params['delimiter'] = delimiter def finder(node, text): return node.findtext(fixxpath(xpath=text, namespace=self.namespace)) while True: response = self.connection.request(request_path, params=params, container=container) if response.status != httplib.OK: raise LibcloudError('Error fetching multipart uploads. ' 'Got code: %s' % response.status, driver=self) body = response.parse_body() # pylint: disable=maybe-no-member for node in body.findall(fixxpath(xpath='Upload', namespace=self.namespace)): key = finder(node, 'Key') upload_id = finder(node, 'UploadId') initiated = finder(node, 'Initiated') yield OSSMultipartUpload(key, upload_id, initiated) # Check if this is the last entry in the listing # pylint: disable=maybe-no-member is_truncated = body.findtext(fixxpath(xpath='IsTruncated', namespace=self.namespace)) if is_truncated.lower() == 'false': break # Provide params for the next request upload_marker = body.findtext(fixxpath(xpath='NextUploadIdMarker', namespace=self.namespace)) key_marker = body.findtext(fixxpath(xpath='NextKeyMarker', namespace=self.namespace)) params['key-marker'] = key_marker params['upload-id-marker'] = upload_marker
[docs] def ex_abort_all_multipart_uploads(self, container, prefix=None): """ Extension method for removing all partially completed OSS multipart uploads. :param container: The container holding the uploads :type container: :class:`Container` :keyword prefix: Delete only uploads of objects with this prefix :type prefix: ``str`` """ # Iterate through the container and delete the upload ids for upload in self.ex_iterate_multipart_uploads(container, prefix, delimiter=None): object_path = self._get_object_path(container, upload.key) self._abort_multipart(object_path, upload.id, container=container)
def _clean_object_name(self, name): name = urlquote(name) return name def _put_object(self, container, object_name, upload_func, upload_func_kwargs, method='PUT', query_args=None, extra=None, file_path=None, iterator=None, verify_hash=False): """ Create an object and upload data using the given function. """ headers = {} extra = extra or {} content_type = extra.get('content_type', None) meta_data = extra.get('meta_data', None) acl = extra.get('acl', None) if meta_data: for key, value in list(meta_data.items()): key = self.http_vendor_prefix + 'meta-%s' % (key) headers[key] = value if acl: if acl not in ['public-read', 'private', 'public-read-write']: raise AttributeError('invalid acl value: %s' % acl) headers[self.http_vendor_prefix + 'object-acl'] = acl request_path = self._get_object_path(container, object_name) if query_args: request_path = '?'.join((request_path, query_args)) # TODO: Let the underlying exceptions bubble up and capture the SIGPIPE # here. # SIGPIPE is thrown if the provided container does not exist or the # user does not have correct permission result_dict = self._upload_object( object_name=object_name, content_type=content_type, upload_func=upload_func, upload_func_kwargs=upload_func_kwargs, request_path=request_path, request_method=method, headers=headers, file_path=file_path, iterator=iterator, container=container) response = result_dict['response'] bytes_transferred = result_dict['bytes_transferred'] headers = response.headers response = response.response server_hash = headers['etag'].replace('"', '') if (verify_hash and result_dict['data_hash'].upper() != server_hash): raise ObjectHashMismatchError( value='MD5 hash checksum does not match', object_name=object_name, driver=self) elif response.status == httplib.OK: obj = Object( name=object_name, size=bytes_transferred, hash=server_hash, extra={'acl': acl}, meta_data=meta_data, container=container, driver=self) return obj else: raise LibcloudError( 'Unexpected status code, status_code=%s' % (response.status), driver=self) def _upload_multipart(self, response, data, iterator, container, object_name, calculate_hash=True): """ Callback invoked for uploading data to OSS using Aliyun's multipart upload mechanism :param response: Response object from the initial POST request :type response: :class:`OSSRawResponse` :param data: Any data from the initial POST request :type data: ``str`` :param iterator: The generator for fetching the upload data :type iterator: ``generator`` :param container: The container owning the object to which data is being uploaded :type container: :class:`Container` :param object_name: The name of the object to which we are uploading :type object_name: ``str`` :keyword calculate_hash: Indicates if we must calculate the data hash :type calculate_hash: ``bool`` :return: A tuple of (status, checksum, bytes transferred) :rtype: ``tuple`` """ object_path = self._get_object_path(container, object_name) # Get the upload id from the response xml response.body = response.response.read() body = response.parse_body() upload_id = body.find(fixxpath(xpath='UploadId', namespace=self.namespace)).text try: # Upload the data through the iterator result = self._upload_from_iterator(iterator, object_path, upload_id, calculate_hash, container=container) (chunks, data_hash, bytes_transferred) = result # Commit the chunk info and complete the upload etag = self._commit_multipart(object_path, upload_id, chunks, container=container) except Exception: exc = sys.exc_info()[1] # Amazon provides a mechanism for aborting an upload. self._abort_multipart(object_path, upload_id, container=container) raise exc # Modify the response header of the first request. This is used # by other functions once the callback is done response.headers['etag'] = etag return (True, data_hash, bytes_transferred) def _upload_from_iterator(self, iterator, object_path, upload_id, calculate_hash=True, container=None): """ Uploads data from an interator in fixed sized chunks to OSS :param iterator: The generator for fetching the upload data :type iterator: ``generator`` :param object_path: The path of the object to which we are uploading :type object_name: ``str`` :param upload_id: The upload id allocated for this multipart upload :type upload_id: ``str`` :keyword calculate_hash: Indicates if we must calculate the data hash :type calculate_hash: ``bool`` :keyword container: the container object to upload object to :type container: :class:`Container` :return: A tuple of (chunk info, checksum, bytes transferred) :rtype: ``tuple`` """ data_hash = None if calculate_hash: data_hash = self._get_hash_function() bytes_transferred = 0 count = 1 chunks = [] params = {'uploadId': upload_id} # Read the input data in chunk sizes suitable for AWS for data in read_in_chunks(iterator, chunk_size=CHUNK_SIZE, fill_size=True, yield_empty=True): bytes_transferred += len(data) if calculate_hash: data_hash.update(data) chunk_hash = self._get_hash_function() chunk_hash.update(data) chunk_hash = base64.b64encode(chunk_hash.digest()).decode('utf-8') # OSS will calculate hash of the uploaded data and # check this header. headers = {'Content-MD5': chunk_hash} params['partNumber'] = count request_path = '?'.join((object_path, urlencode(params))) resp = self.connection.request(request_path, method='PUT', data=data, headers=headers, container=container) if resp.status != httplib.OK: raise LibcloudError('Error uploading chunk', driver=self) server_hash = resp.headers['etag'] # Keep this data for a later commit chunks.append((count, server_hash)) count += 1 if calculate_hash: data_hash = data_hash.hexdigest() return (chunks, data_hash, bytes_transferred) def _commit_multipart(self, object_path, upload_id, chunks, container=None): """ Makes a final commit of the data. :param object_path: Server side object path. :type object_path: ``str`` :param upload_id: ID of the multipart upload. :type upload_id: ``str`` :param upload_id: A list of (chunk_number, chunk_hash) tuples. :type upload_id: ``list`` :keyword container: The container owning the object to which data is being uploaded :type container: :class:`Container` """ root = Element('CompleteMultipartUpload') for (count, etag) in chunks: part = SubElement(root, 'Part') part_no = SubElement(part, 'PartNumber') part_no.text = str(count) etag_id = SubElement(part, 'ETag') etag_id.text = str(etag) data = tostring(root) params = {'uploadId': upload_id} request_path = '?'.join((object_path, urlencode(params))) response = self.connection.request(request_path, data=data, method='POST', container=container) if response.status != httplib.OK: element = response.object # pylint: disable=maybe-no-member code, message = response._parse_error_details(element=element) msg = 'Error in multipart commit: %s (%s)' % (message, code) raise LibcloudError(msg, driver=self) # Get the server's etag to be passed back to the caller body = response.parse_body() server_hash = body.find(fixxpath(xpath='ETag', namespace=self.namespace)).text return server_hash def _abort_multipart(self, object_path, upload_id, container=None): """ Aborts an already initiated multipart upload :param object_path: Server side object path. :type object_path: ``str`` :param upload_id: ID of the multipart upload. :type upload_id: ``str`` :keyword container: The container owning the object to which data is being uploaded :type container: :class:`Container` """ params = {'uploadId': upload_id} request_path = '?'.join((object_path, urlencode(params))) resp = self.connection.request(request_path, method='DELETE', container=container) if resp.status != httplib.NO_CONTENT: raise LibcloudError('Error in multipart abort. status_code=%d' % (resp.status), driver=self) def _upload_object(self, object_name, content_type, upload_func, upload_func_kwargs, request_path, request_method='PUT', headers=None, file_path=None, iterator=None, container=None): """ Helper function for setting common request headers and calling the passed in callback which uploads an object. """ headers = headers or {} if file_path and not os.path.exists(file_path): raise OSError('File %s does not exist' % (file_path)) if iterator is not None and not hasattr(iterator, 'next') and not \ hasattr(iterator, '__next__'): raise AttributeError('iterator object must implement next() ' + 'method.') if not content_type: if file_path: name = file_path else: name = object_name content_type, _ = guess_file_mime_type(name) if not content_type: if self.strict_mode: raise AttributeError('File content-type could not be ' 'guessed and no content_type value ' 'is provided') else: # Fallback to a content-type content_type = DEFAULT_CONTENT_TYPE file_size = None if iterator: if self.supports_chunked_encoding: headers['Transfer-Encoding'] = 'chunked' upload_func_kwargs['chunked'] = True else: # Chunked transfer encoding is not supported. Need to buffer # all the data in memory so we can determine file size. iterator = read_in_chunks( iterator=iterator) data = exhaust_iterator(iterator=iterator) file_size = len(data) upload_func_kwargs['data'] = data else: file_size = os.path.getsize(file_path) upload_func_kwargs['chunked'] = False if file_size is not None and 'Content-Length' not in headers: headers['Content-Length'] = file_size headers['Content-Type'] = content_type response = self.connection.request(request_path, method=request_method, data=None, headers=headers, raw=True, container=container) upload_func_kwargs['response'] = response success, data_hash, bytes_transferred = upload_func( **upload_func_kwargs) if not success: raise LibcloudError( value='Object upload failed, Perhaps a timeout?', driver=self) result_dict = {'response': response, 'data_hash': data_hash, 'bytes_transferred': bytes_transferred} return result_dict def _to_containers(self, obj, xpath): for element in obj.findall(fixxpath(xpath=xpath, namespace=self.namespace)): yield self._to_container(element) def _to_container(self, element): extra = { 'creation_date': findtext(element=element, xpath='CreationDate', namespace=self.namespace), 'location': findtext(element=element, xpath='Location', namespace=self.namespace) } container = Container(name=findtext(element=element, xpath='Name', namespace=self.namespace), extra=extra, driver=self ) return container def _to_objs(self, obj, xpath, container): return [self._to_obj(element, container) for element in obj.findall(fixxpath(xpath=xpath, namespace=self.namespace))] def _to_obj(self, element, container): owner_id = findtext(element=element, xpath='Owner/ID', namespace=self.namespace) owner_display_name = findtext(element=element, xpath='Owner/DisplayName', namespace=self.namespace) meta_data = {'owner': {'id': owner_id, 'display_name': self._safe_decode( owner_display_name)}} last_modified = findtext(element=element, xpath='LastModified', namespace=self.namespace) extra = {'last_modified': last_modified} name = self._safe_decode(findtext(element=element, xpath='Key', namespace=self.namespace)) obj = Object(name=name, size=int(findtext(element=element, xpath='Size', namespace=self.namespace)), hash=findtext(element=element, xpath='ETag', namespace=self.namespace).replace('"', ''), extra=extra, meta_data=meta_data, container=container, driver=self ) return obj def _safe_decode(self, encoded): """ Decode it as an escaped string and then treate the content as UTF-8 encoded. """ try: if encoded: unescaped, _ign = codecs.escape_decode(encoded) return unescaped.decode('utf-8') return encoded except Exception: return encoded def _get_container_path(self, container): """ Return a container path :param container: Container instance :type container: :class:`Container` :return: A path for this container. :rtype: ``str`` """ return '/%s' % (container.name) def _get_object_path(self, container, object_name): """ Return an object's path. Aliyun OSS api puts the container name in the host, so ignore container here. :param container: Container instance :type container: :class:`Container` :param object_name: Object name :type object_name: :class:`str` :return: A path for this object. :rtype: ``str`` """ object_name_cleaned = self._clean_object_name(object_name) object_path = '/%s' % object_name_cleaned return object_path def _headers_to_object(self, object_name, container, headers): hash = headers['etag'].replace('"', '') extra = {'content_type': headers['content-type'], 'etag': headers['etag']} meta_data = {} if 'last-modified' in headers: extra['last_modified'] = headers['last-modified'] for key, value in headers.items(): if not key.lower().startswith(self.http_vendor_prefix + 'meta-'): continue key = key.replace(self.http_vendor_prefix + 'meta-', '') meta_data[key] = value obj = Object(name=object_name, size=int(headers['content-length']), hash=hash, extra=extra, meta_data=meta_data, container=container, driver=self) return obj