Source code for libcloud.common.nfsn

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import random
import string
import time

from libcloud.common.base import ConnectionUserAndKey
from libcloud.common.base import JsonResponse
from libcloud.common.types import InvalidCredsError, ProviderError
from libcloud.utils.py3 import basestring, httplib, urlencode

SALT_CHARACTERS = string.ascii_letters + string.digits

[docs]class NFSNException(ProviderError): def __init__(self, value, http_code, code, driver=None): self.code = code super(NFSNException, self).__init__(value, http_code, driver)
[docs]class NFSNResponse(JsonResponse):
[docs] def parse_error(self): if self.status == httplib.UNAUTHORIZED: raise InvalidCredsError("Invalid provider credentials") body = self.parse_body() if isinstance(body, basestring): return body + " (HTTP Code: %d)" % self.status error = body.get("error", None) debug = body.get("debug", None) # If we only have one of "error" or "debug", use the one that we have. # If we have both, use both, with a space character in between them. value = "No message specified" if error is not None: value = error if debug is not None: value = debug if error is not None and value is not None: value = error + " " + value value = value + " (HTTP Code: %d)" % self.status return value
[docs]class NFSNConnection(ConnectionUserAndKey): host = "" responseCls = NFSNResponse allow_insecure = False def _header(self, action, data): """Build the contents of the X-NFSN-Authentication HTTP header. See for more explanation.""" login = self.user_id timestamp = self._timestamp() salt = self._salt() api_key = self.key data = urlencode(data) data_hash = hashlib.sha1(data.encode("utf-8")).hexdigest() string = ";".join((login, timestamp, salt, api_key, action, data_hash)) string_hash = hashlib.sha1(string.encode("utf-8")).hexdigest() return ";".join((login, timestamp, salt, string_hash))
[docs] def request(self, action, params=None, data="", headers=None, method="GET"): """Add the X-NFSN-Authentication header to an HTTP request.""" if not headers: headers = {} if not params: params = {} header = self._header(action, data) headers["X-NFSN-Authentication"] = header if method == "POST": headers["Content-Type"] = "application/x-www-form-urlencoded" return ConnectionUserAndKey.request(self, action, params, data, headers, method)
[docs] def encode_data(self, data): """NFSN expects the body to be regular key-value pairs that are not JSON-encoded.""" if data: data = urlencode(data) return data
def _salt(self): """Return a 16-character alphanumeric string.""" r = random.SystemRandom() return "".join(r.choice(SALT_CHARACTERS) for _ in range(16)) def _timestamp(self): """Return the current number of seconds since the Unix epoch, as a string.""" return str(int(time.time()))