Source code for libcloud.common.cloudstack

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import hashlib
import copy
import hmac

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlencode
from libcloud.utils.py3 import urlquote
from libcloud.utils.py3 import b

from libcloud.common.types import ProviderError
from libcloud.common.base import ConnectionUserAndKey, PollingConnection
from libcloud.common.base import JsonResponse
from libcloud.common.types import MalformedResponseError
from libcloud.compute.types import InvalidCredsError


[docs]class CloudStackResponse(JsonResponse):
[docs] def parse_error(self): if self.status == httplib.UNAUTHORIZED: raise InvalidCredsError('Invalid provider credentials') value = None body = self.parse_body() if hasattr(body, 'values'): values = list(body.values())[0] if 'errortext' in values: value = values['errortext'] if value is None: value = self.body if not value: value = 'WARNING: error message text sent by provider was empty.' error = ProviderError(value=value, http_code=self.status, driver=self.connection.driver) raise error
[docs]class CloudStackConnection(ConnectionUserAndKey, PollingConnection): responseCls = CloudStackResponse poll_interval = 1 request_method = '_sync_request' timeout = 600 ASYNC_PENDING = 0 ASYNC_SUCCESS = 1 ASYNC_FAILURE = 2
[docs] def encode_data(self, data): """ Must of the data is sent as part of query params (eeww), but in newer versions, userdata argument can be sent as a urlencoded data in the request body. """ if data: data = urlencode(data) return data
def _make_signature(self, params): signature = [(k.lower(), v) for k, v in list(params.items())] signature.sort(key=lambda x: x[0]) pairs = [] for pair in signature: key = urlquote(str(pair[0]), safe='[]') value = urlquote(str(pair[1]), safe='[]*') item = '%s=%s' % (key, value) pairs .append(item) signature = '&'.join(pairs) signature = signature.lower().replace('+', '%20') signature = hmac.new(b(self.key), msg=b(signature), digestmod=hashlib.sha1) return base64.b64encode(b(signature.digest()))
[docs] def add_default_params(self, params): params['apiKey'] = self.user_id params['response'] = 'json' return params
[docs] def pre_connect_hook(self, params, headers): params['signature'] = self._make_signature(params) return params, headers
def _async_request(self, command, action=None, params=None, data=None, headers=None, method='GET', context=None): if params: context = copy.deepcopy(params) else: context = {} # Command is specified as part of GET call context['command'] = command result = super(CloudStackConnection, self).async_request( action=action, params=params, data=data, headers=headers, method=method, context=context) return result['jobresult']
[docs] def get_request_kwargs(self, action, params=None, data='', headers=None, method='GET', context=None): command = context['command'] request_kwargs = {'command': command, 'action': action, 'params': params, 'data': data, 'headers': headers, 'method': method} return request_kwargs
[docs] def get_poll_request_kwargs(self, response, context, request_kwargs): job_id = response['jobid'] params = {'jobid': job_id} kwargs = {'command': 'queryAsyncJobResult', 'params': params} return kwargs
[docs] def has_completed(self, response): status = response.get('jobstatus', self.ASYNC_PENDING) if status == self.ASYNC_FAILURE: msg = response.get('jobresult', {}).get('errortext', status) raise Exception(msg) return status == self.ASYNC_SUCCESS
def _sync_request(self, command, action=None, params=None, data=None, headers=None, method='GET'): """ This method handles synchronous calls which are generally fast information retrieval requests and thus return 'quickly'. """ # command is always sent as part of "command" query parameter if params: params = copy.deepcopy(params) else: params = {} params['command'] = command # pylint: disable=maybe-no-member result = self.request(action=self.driver.path, params=params, data=data, headers=headers, method=method) command = command.lower() # Work around for older verions which don't return "response" suffix # in delete ingress rule response command name if (command == 'revokesecuritygroupingress' and 'revokesecuritygroupingressresponse' not in result.object): command = command elif (command == 'restorevirtualmachine' and 'restorevmresponse' in result.object): command = "restorevmresponse" else: command = command + 'response' if command not in result.object: raise MalformedResponseError( "Unknown response format {}".format(command), body=result.body, driver=self.driver) result = result.object[command] return result
[docs]class CloudStackDriverMixIn(object): host = None path = None connectionCls = CloudStackConnection def __init__(self, key, secret=None, secure=True, host=None, port=None): host = host or self.host super(CloudStackDriverMixIn, self).__init__(key, secret, secure, host, port) def _sync_request(self, command, action=None, params=None, data=None, headers=None, method='GET'): return self.connection._sync_request(command=command, action=action, params=params, data=data, headers=headers, method=method) def _async_request(self, command, action=None, params=None, data=None, headers=None, method='GET', context=None): return self.connection._async_request(command=command, action=action, params=params, data=data, headers=headers, method=method, context=context)