Source code for libcloud.common.aliyun

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import hashlib
import hmac
import sys
import time
import uuid

try:
    from lxml import etree as ET
except ImportError:
    from xml.etree import ElementTree as ET

from libcloud.common.base import ConnectionUserAndKey, XmlResponse
from libcloud.common.types import MalformedResponseError
from libcloud.utils.py3 import b, u, urlquote, PY3
from libcloud.utils.xml import findtext

__all__ = [
    'AliyunXmlResponse',
    'AliyunRequestSigner',
    'AliyunRequestSignerAlgorithmV1_0',
    'SignedAliyunConnection',
    'AliyunConnection',

    'SIGNATURE_VERSION_1_0',
    'DEFAULT_SIGNATURE_VERSION'
]

SIGNATURE_VERSION_1_0 = '1.0'
DEFAULT_SIGNATURE_VERSION = SIGNATURE_VERSION_1_0


[docs]class AliyunXmlResponse(XmlResponse): namespace = None
[docs] def success(self): return self.status >= 200 and self.status < 300
[docs] def parse_body(self): """ Each response from Aliyun contains a request id and a host id. The response body is in utf-8 encoding. """ if len(self.body) == 0 and not self.parse_zero_length_body: return self.body try: if PY3: parser = ET.XMLParser(encoding='utf-8') body = ET.XML(self.body.encode('utf-8'), parser=parser) else: body = ET.XML(self.body) except: raise MalformedResponseError('Failed to parse XML', body=self.body, driver=self.connection.driver) self.request_id = findtext(element=body, xpath='RequestId', namespace=self.namespace) self.host_id = findtext(element=body, xpath='HostId', namespace=self.namespace) return body
[docs] def parse_error(self): """ Parse error responses from Aliyun. """ body = super(AliyunXmlResponse, self).parse_error() code, message = self._parse_error_details(element=body) request_id = findtext(element=body, xpath='RequestId', namespace=self.namespace) host_id = findtext(element=body, xpath='HostId', namespace=self.namespace) error = {'code': code, 'message': message, 'request_id': request_id, 'host_id': host_id} return u(error)
def _parse_error_details(self, element): """ Parse error code and message from the provided error element. :return: ``tuple`` with two elements: (code, message) :rtype: ``tuple`` """ code = findtext(element=element, xpath='Code', namespace=self.namespace) message = findtext(element=element, xpath='Message', namespace=self.namespace) return (code, message)
[docs]class AliyunRequestSigner(object): """ Class handles signing the outgoing Aliyun requests. """ def __init__(self, access_key, access_secret, version): """ :param access_key: Access key. :type access_key: ``str`` :param access_secret: Access secret. :type access_secret: ``str`` :param version: API version. :type version: ``str`` """ self.access_key = access_key self.access_secret = access_secret self.version = version
[docs] def get_request_params(self, params, method='GET', path='/'): return params
[docs] def get_request_headers(self, params, headers, method='GET', path='/'): return params, headers
[docs]class AliyunRequestSignerAlgorithmV1_0(AliyunRequestSigner): """Aliyun request signer using signature version 1.0."""
[docs] def get_request_params(self, params, method='GET', path='/'): params['Format'] = 'XML' params['Version'] = self.version params['AccessKeyId'] = self.access_key params['SignatureMethod'] = 'HMAC-SHA1' params['SignatureVersion'] = SIGNATURE_VERSION_1_0 params['SignatureNonce'] = _get_signature_nonce() # TODO: Support 'ResourceOwnerAccount' params['Timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) params['Signature'] = self._sign_request(params, method, path) return params
def _sign_request(self, params, method, path): """ Sign Aliyun requests parameters and get the signature. StringToSign = HTTPMethod + '&' + percentEncode('/') + '&' + percentEncode(CanonicalizedQueryString) """ keys = list(params.keys()) keys.sort() pairs = [] for key in keys: pairs.append('%s=%s' % (_percent_encode(key), _percent_encode(params[key]))) qs = urlquote('&'.join(pairs), safe='-_.~') string_to_sign = '&'.join((method, urlquote(path, safe=''), qs)) b64_hmac = base64.b64encode( hmac.new(b(self._get_access_secret()), b(string_to_sign), digestmod=hashlib.sha1).digest() ) return b64_hmac.decode('utf8') def _get_access_secret(self): return '%s&' % self.access_secret
[docs]class AliyunConnection(ConnectionUserAndKey): pass
[docs]class SignedAliyunConnection(AliyunConnection): api_version = None def __init__(self, user_id, key, secure=True, host=None, port=None, url=None, timeout=None, proxy_url=None, retry_delay=None, backoff=None, api_version=None, signature_version=DEFAULT_SIGNATURE_VERSION): super(SignedAliyunConnection, self).__init__(user_id=user_id, key=key, secure=secure, host=host, port=port, url=url, timeout=timeout, proxy_url=proxy_url, retry_delay=retry_delay, backoff=backoff) self.signature_version = str(signature_version) if self.signature_version == '1.0': signer_cls = AliyunRequestSignerAlgorithmV1_0 else: raise ValueError('Unsupported signature_version: %s' % signature_version) if api_version is not None: self.api_version = str(api_version) else: if self.api_version is None: raise ValueError('Unsupported null api_version') self.signer = signer_cls(access_key=self.user_id, access_secret=self.key, version=self.api_version)
[docs] def add_default_params(self, params): params = self.signer.get_request_params(params=params, method=self.method, path=self.action) return params
def _percent_encode(encode_str): """ Encode string to utf8, quote for url and replace '+' with %20, '*' with %2A and keep '~' not converted. :param src_str: ``str`` in the same encoding with sys.stdin, default to encoding cp936. :return: ``str`` represents the encoded result :rtype: ``str`` """ encoding = sys.stdin.encoding or 'cp936' decoded = str(encode_str) if PY3: if isinstance(encode_str, bytes): decoded = encode_str.decode(encoding) else: decoded = str(encode_str).decode(encoding) res = urlquote( decoded.encode('utf8'), '') res = res.replace('+', '%20') res = res.replace('*', '%2A') res = res.replace('%7E', '~') return res def _get_signature_nonce(): return str(uuid.uuid4())