Source code for libcloud.common.durabledns

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import Dict, List
from xml.etree import ElementTree as ET  # noqa

from libcloud.common.base import XmlResponse, ConnectionUserAndKey

# API HOST to connect
API_HOST = "durabledns.com"


def _schema_builder(urn_nid, method, attributes):
    """
    Return a xml schema used to do an API request.

    :param urn_nid: API urn namespace id.
    :type urn_nid: type: ``str``

    :param method: API method.
    :type method: type: ``str``

    :param attributes: List of attributes to include.
    :type attributes: ``list`` of ``str``

    rtype: :class:`Element`
    """
    soap = ET.Element("soap:Body", {"xmlns:m": "https://durabledns.com/services/dns/%s" % method})
    urn = ET.SubElement(soap, "urn:{}:{}".format(urn_nid, method))
    # Attributes specification
    for attribute in attributes:
        ET.SubElement(urn, "urn:{}:{}".format(urn_nid, attribute))
    return soap


SCHEMA_BUILDER_MAP = {
    "list_zones": {
        "urn_nid": "listZoneswsdl",
        "method": "listZones",
        "attributes": ["apiuser", "apikey"],
    },
    "list_records": {
        "urn_nid": "listRecordswsdl",
        "method": "listRecords",
        "attributes": ["apiuser", "apikey", "zonename"],
    },
    "get_zone": {
        "urn_nid": "getZonewsdl",
        "method": "getZone",
        "attributes": ["apiuser", "apikey", "zonename"],
    },
    "get_record": {
        "urn_nid": "getRecordwsdl",
        "method": "getRecord",
        "attributes": ["apiuser", "apikey", "zonename", "recordid"],
    },
    "create_zone": {
        "urn_nid": "createZonewsdl",
        "method": "createZone",
        "attributes": [
            "apiuser",
            "apikey",
            "zonename",
            "ns",
            "mbox",
            "refresh",
            "retry",
            "expire",
            "minimum",
            "ttl",
            "xfer",
            "update_acl",
        ],
    },
    "create_record": {
        "urn_nid": "createRecordwsdl",
        "method": "createRecord",
        "attributes": [
            "apiuser",
            "apikey",
            "zonename",
            "name",
            "type",
            "data",
            "aux",
            "ttl",
            "ddns_enabled",
        ],
    },
    "update_zone": {
        "urn_nid": "updateZonewsdl",
        "method": "updateZone",
        "attributes": [
            "apiuser",
            "apikey",
            "zonename",
            "ns",
            "mbox",
            "refresh",
            "retry",
            "expire",
            "minimum",
            "ttl",
            "xfer",
            "update_acl",
        ],
    },
    "update_record": {
        "urn_nid": "updateRecordwsdl",
        "method": "updateRecord",
        "attributes": [
            "apiuser",
            "apikey",
            "zonename",
            "id",
            "name",
            "aux",
            "data",
            "ttl",
            "ddns_enabled",
        ],
    },
    "delete_zone": {
        "urn_nid": "deleteZonewsdl",
        "method": "deleteZone",
        "attributes": ["apiuser", "apikey", "zonename"],
    },
    "delete_record": {
        "urn_nid": "deleteRecordwsdl",
        "method": "deleteRecord",
        "attributes": ["apiuser", "apikey", "zonename", "id"],
    },
}


[docs]class DurableDNSException(Exception): def __init__(self, code, message): self.code = code self.message = message self.args = (code, message) def __str__(self): return "{} {}".format(self.code, self.message) def __repr__(self): return "DurableDNSException {} {}".format(self.code, self.message)
[docs]class DurableResponse(XmlResponse): errors = [] # type: List[Dict] objects = [] # type: List[Dict] def __init__(self, response, connection): super().__init__(response=response, connection=connection) self.objects, self.errors = self.parse_body_and_error() if self.errors: raise self._make_excp(self.errors[0])
[docs] def parse_body_and_error(self): """ Used to parse body from httplib.HttpResponse object. """ objects = [] errors = [] error_dict = {} extra = {} zone_dict = {} record_dict = {} xml_obj = self.parse_body() # pylint: disable=no-member envelop_body = list(xml_obj)[0] method_resp = list(envelop_body)[0] # parse the xml_obj # handle errors if "Fault" in method_resp.tag: fault = [fault for fault in list(method_resp) if fault.tag == "faultstring"][0] error_dict["ERRORMESSAGE"] = fault.text.strip() error_dict["ERRORCODE"] = self.status errors.append(error_dict) # parsing response from listZonesResponse if "listZonesResponse" in method_resp.tag: answer = list(method_resp)[0] for element in answer: zone_dict["id"] = list(element)[0].text objects.append(zone_dict) # reset the zone_dict zone_dict = {} # parse response from listRecordsResponse if "listRecordsResponse" in method_resp.tag: answer = list(method_resp)[0] for element in answer: for child in list(element): if child.tag == "id": record_dict["id"] = child.text.strip() objects.append(record_dict) # reset the record_dict for later usage record_dict = {} # parse response from getZoneResponse if "getZoneResponse" in method_resp.tag: for child in list(method_resp): if child.tag == "origin": zone_dict["id"] = child.text.strip() zone_dict["domain"] = child.text.strip() elif child.tag == "ttl": zone_dict["ttl"] = int(child.text.strip()) elif child.tag == "retry": extra["retry"] = int(child.text.strip()) elif child.tag == "expire": extra["expire"] = int(child.text.strip()) elif child.tag == "minimum": extra["minimum"] = int(child.text.strip()) else: if child.text: extra[child.tag] = child.text.strip() else: extra[child.tag] = "" zone_dict["extra"] = extra objects.append(zone_dict) # parse response from getRecordResponse if "getRecordResponse" in method_resp.tag: answer = list(method_resp)[0] for child in list(method_resp): if child.tag == "id" and child.text: record_dict["id"] = child.text.strip() elif child.tag == "name" and child.text: record_dict["name"] = child.text.strip() elif child.tag == "type" and child.text: record_dict["type"] = child.text.strip() elif child.tag == "data" and child.text: record_dict["data"] = child.text.strip() elif child.tag == "aux" and child.text: record_dict["aux"] = child.text.strip() elif child.tag == "ttl" and child.text: record_dict["ttl"] = child.text.strip() if not record_dict: error_dict["ERRORMESSAGE"] = "Record does not exist" error_dict["ERRORCODE"] = 404 errors.append(error_dict) objects.append(record_dict) record_dict = {} if "createZoneResponse" in method_resp.tag: answer = list(method_resp)[0] if answer.tag == "return" and answer.text: record_dict["id"] = answer.text.strip() objects.append(record_dict) # catch Record does not exists error when deleting record if "deleteRecordResponse" in method_resp.tag: answer = list(method_resp)[0] if "Record does not exists" in answer.text.strip(): errors.append({"ERRORMESSAGE": answer.text.strip(), "ERRORCODE": self.status}) # parse response in createRecordResponse if "createRecordResponse" in method_resp.tag: answer = list(method_resp)[0] record_dict["id"] = answer.text.strip() objects.append(record_dict) record_dict = {} return (objects, errors)
[docs] def parse_body(self): # A problem arise in the api response because there are undeclared # xml namespaces. In order to fix that at the moment, we use the # _fix_response method to clean up since we won't always have lxml # library. self._fix_response() body = super().parse_body() return body
[docs] def success(self): """ Used to determine if the request was successful. """ return len(self.errors) == 0
def _make_excp(self, error): return DurableDNSException(error["ERRORCODE"], error["ERRORMESSAGE"]) def _fix_response(self): items = re.findall('<ns1:.+ xmlns:ns1="">', self.body, flags=0) for item in items: parts = item.split(" ") prefix = parts[0].replace("<", "").split(":")[1] new_item = "<" + prefix + ">" close_tag = "</" + parts[0].replace("<", "") + ">" new_close_tag = "</" + prefix + ">" self.body = self.body.replace(item, new_item) self.body = self.body.replace(close_tag, new_close_tag)
[docs]class DurableConnection(ConnectionUserAndKey): host = API_HOST responseCls = DurableResponse
[docs] def add_default_params(self, params): params["user_id"] = self.user_id params["key"] = self.key return params
[docs] def add_default_headers(self, headers): headers["Content-Type"] = "text/xml" headers["Content-Encoding"] = "gzip; charset=ISO-8859-1" return headers