# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from libcloud.common.base import ConnectionUserAndKey, JsonResponse
from libcloud.compute.types import InvalidCredsError

from libcloud.utils.py3 import b
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import base64_encode_string

    import simplejson as json
except ImportError:
    import json  # type: ignore

[docs]class BrightboxResponse(JsonResponse):
[docs] def success(self): return httplib.OK <= self.status < httplib.BAD_REQUEST
[docs] def parse_body(self): if self.headers['content-type'].split(';')[0] == 'application/json': return super(BrightboxResponse, self).parse_body() else: return self.body
[docs] def parse_error(self): response = super(BrightboxResponse, self).parse_body() if 'error' in response: if response['error'] in ['invalid_client', 'unauthorized_client']: raise InvalidCredsError(response['error']) return response['error'] elif 'error_name' in response: return '%s: %s' % (response['error_name'], response['errors'][0]) return self.body
[docs]class BrightboxConnection(ConnectionUserAndKey): """ Connection class for the Brightbox driver """ host = '' responseCls = BrightboxResponse def _fetch_oauth_token(self): body = json.dumps({'client_id': self.user_id, 'grant_type': 'none'}) authorization = 'Basic ' + str(base64_encode_string(b('%s:%s' % (self.user_id, self.key)))).rstrip() self.connect() headers = { 'Host':, 'User-Agent': self._user_agent(), 'Authorization': authorization, 'Content-Type': 'application/json', 'Content-Length': str(len(body)) } # pylint: disable=assignment-from-no-return response = self.connection.request(method='POST', url='/token', body=body, headers=headers) if response.status == httplib.OK: return json.loads(['access_token'] else: responseCls = BrightboxResponse( response=response.getresponse(), connection=self) message = responseCls.parse_error() raise InvalidCredsError(message)
[docs] def add_default_headers(self, headers): try: headers['Authorization'] = 'OAuth ' + self.token except AttributeError: self.token = self._fetch_oauth_token() headers['Authorization'] = 'OAuth ' + self.token return headers
[docs] def encode_data(self, data): return json.dumps(data)