Source code for libcloud.pricing

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
A class which handles loading the pricing files.
"""


import re
import os.path
from typing import Dict, Union, Optional
from os.path import join as pjoin

try:
    import simplejson as json

    try:
        JSONDecodeError = json.JSONDecodeError
    except AttributeError:
        # simplejson < 2.1.0 does not have the JSONDecodeError exception class
        JSONDecodeError = ValueError  # type: ignore
except ImportError:
    import json  # type: ignore

    JSONDecodeError = ValueError  # type: ignore

__all__ = [
    "get_pricing",
    "get_size_price",
    "get_image_price",
    "set_pricing",
    "clear_pricing_data",
    "download_pricing_file",
]

# Default URL to the pricing file in a git repo
DEFAULT_FILE_URL_GIT = "https://git.apache.org/repos/asf?p=libcloud.git;a=blob_plain;f=libcloud/data/pricing.json"  # NOQA

DEFAULT_FILE_URL_S3_BUCKET = "https://libcloud-pricing-data.s3.amazonaws.com/pricing.json"  # NOQA

CURRENT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
DEFAULT_PRICING_FILE_PATH = pjoin(CURRENT_DIRECTORY, "data/pricing.json")
CUSTOM_PRICING_FILE_PATH = os.path.expanduser("~/.libcloud/pricing.json")

# Pricing data cache
PRICING_DATA = {"compute": {}, "storage": {}}  # type: Dict[str, Dict]

VALID_PRICING_DRIVER_TYPES = ["compute", "storage"]

# Set this to True to cache all the pricing data in memory instead of just the
# one for the drivers which are used
CACHE_ALL_PRICING_DATA = False


def get_pricing_file_path(file_path=None):
    # type: (Optional[str]) -> str
    if os.path.exists(CUSTOM_PRICING_FILE_PATH) and os.path.isfile(CUSTOM_PRICING_FILE_PATH):
        # Custom pricing file is available, use it
        return CUSTOM_PRICING_FILE_PATH

    return DEFAULT_PRICING_FILE_PATH


[docs]def get_pricing(driver_type, driver_name, pricing_file_path=None, cache_all=False): # type: (str, str, Optional[str], bool) -> Optional[dict] """ Return pricing for the provided driver. NOTE: This method will also cache data for the requested driver memory. We intentionally only cache data for the requested driver and not all the pricing data since the whole pricing data is quite large (~2 MB). This way we avoid unnecessary memory overhead. :type driver_type: ``str`` :param driver_type: Driver type ('compute' or 'storage') :type driver_name: ``str`` :param driver_name: Driver name :type pricing_file_path: ``str`` :param pricing_file_path: Custom path to a price file. If not provided it uses a default path. :type cache_all: ``bool`` :param cache_all: True to cache pricing data in memory for all the drivers and not just for the requested one. :rtype: ``dict`` :return: Dictionary with pricing where a key name is size ID and the value is a price. """ cache_all = cache_all or CACHE_ALL_PRICING_DATA if driver_type not in VALID_PRICING_DRIVER_TYPES: raise AttributeError("Invalid driver type: %s", driver_type) if driver_name in PRICING_DATA[driver_type]: return PRICING_DATA[driver_type][driver_name] if not pricing_file_path: pricing_file_path = get_pricing_file_path(file_path=pricing_file_path) with open(pricing_file_path) as fp: content = fp.read() pricing_data = json.loads(content) driver_pricing = pricing_data[driver_type][driver_name] # NOTE: We only cache prices in memory for the the requested drivers. # This way we avoid storing massive pricing data for all the drivers in # memory if cache_all: for driver_type in VALID_PRICING_DRIVER_TYPES: # pylint: disable=maybe-no-member pricing = pricing_data.get(driver_type, None) if not pricing: continue PRICING_DATA[driver_type] = pricing else: set_pricing(driver_type=driver_type, driver_name=driver_name, pricing=driver_pricing) return driver_pricing
[docs]def set_pricing(driver_type, driver_name, pricing): # type: (str, str, dict) -> None """ Populate the driver pricing dictionary. :type driver_type: ``str`` :param driver_type: Driver type ('compute' or 'storage') :type driver_name: ``str`` :param driver_name: Driver name :type pricing: ``dict`` :param pricing: Dictionary where a key is a size ID and a value is a price. """ PRICING_DATA[driver_type][driver_name] = pricing
[docs]def get_size_price(driver_type, driver_name, size_id, region=None): # type: (str, str, Union[str,int], Optional[str]) -> Optional[float] """ Return price for the provided size. :type driver_type: ``str`` :param driver_type: Driver type ('compute' or 'storage') :type driver_name: ``str`` :param driver_name: Driver name :type size_id: ``str`` or ``int`` :param size_id: Unique size ID (can be an integer or a string - depends on the driver) :rtype: ``float`` :return: Size price. """ pricing = get_pricing(driver_type=driver_type, driver_name=driver_name) assert pricing is not None price = None # Type: Optional[float] try: if region is None: price = float(pricing[size_id]) else: price = float(pricing[size_id][region]) except KeyError: # Price not available price = None return price
[docs]def get_image_price(driver_name, image_name, size_name=None, cores=1): # for now only images of GCE have pricing data if driver_name == "gce_images": return _get_gce_image_price(image_name=image_name, size_name=size_name, cores=cores) return 0
def _get_gce_image_price(image_name, size_name, cores=1): """ Return price per hour for an gce image. Price depends on the size of the VM. :type image_name: ``str`` :param image_name: GCE image full name. Can be found from GCENodeImage.name :type size_name: ``str`` :param size_name: Size name of the machine running the image. Can be found from GCENodeSize.name :type cores: ``int`` :param cores: The number of the CPUs the machine running the image has. Can be found from GCENodeSize.extra['guestCpus'] :rtype: ``float`` :return: Image price """ # helper function to get image family for gce images def _get_gce_image_family(image_name): image_family = None # Decide if the image is a premium image if "sql" in image_name: image_family = "SQL Server" elif "windows" in image_name: image_family = "Windows Server" elif "rhel" in image_name and "sap" in image_name: image_family = "RHEL with Update Services" elif "sles" in image_name and "sap" in image_name: image_family = "SLES for SAP" elif "rhel" in image_name: image_family = "RHEL" elif "sles" in image_name: image_family = "SLES" return image_family image_family = _get_gce_image_family(image_name) # if there is no premium image return 0 if not image_family: return 0 pricing = get_pricing(driver_type="compute", driver_name="gce_images") try: price_dict = pricing[image_family] except KeyError: # Price not available return 0 size_type = "any" if "f1" in size_name: size_type = "f1" elif "g1" in size_name: size_type = "g1" price_dict_keys = price_dict.keys() # search keys to find the one we want for key in price_dict_keys: if key == "description": continue # eg. 4vcpu or less if re.search(".{1}vcpu or less", key) and cores <= int(key[0]): return float(price_dict[key]["price"]) # eg. 1-2vcpu if re.search(".{1}-.{1}vcpu", key) and str(cores) in key: return float(price_dict[key]["price"]) # eg 6vcpu or more if re.search(".{1}vcpu or more", key) and cores >= int(key[0]): return float(price_dict[key]["price"]) if key in {"standard", "enterprise", "web"} and key in image_name: return float(price_dict[key]["price"]) if key in {"f1", "g1"} and size_type == key: return float(price_dict[key]["price"]) elif key == "any": price = float(price_dict[key]["price"]) return price * cores if "sles" not in image_name else price # fallback return 0 def invalidate_pricing_cache(): # type: () -> None """ Invalidate pricing cache for all the drivers. """ PRICING_DATA["compute"] = {} PRICING_DATA["storage"] = {}
[docs]def clear_pricing_data(): # type: () -> None """ Invalidate pricing cache for all the drivers. Note: This method does the same thing as invalidate_pricing_cache and is here for backward compatibility reasons. """ invalidate_pricing_cache()
def invalidate_module_pricing_cache(driver_type, driver_name): # type: (str, str) -> None """ Invalidate the cache for the specified driver. :type driver_type: ``str`` :param driver_type: Driver type ('compute' or 'storage') :type driver_name: ``str`` :param driver_name: Driver name """ if driver_name in PRICING_DATA[driver_type]: del PRICING_DATA[driver_type][driver_name]
[docs]def download_pricing_file(file_url=DEFAULT_FILE_URL_S3_BUCKET, file_path=CUSTOM_PRICING_FILE_PATH): # type: (str, str) -> None """ Download pricing file from the file_url and save it to file_path. :type file_url: ``str`` :param file_url: URL pointing to the pricing file. :type file_path: ``str`` :param file_path: Path where a download pricing file will be saved. """ from libcloud.utils.connection import get_response_object dir_name = os.path.dirname(file_path) if not os.path.exists(dir_name): # Verify a valid path is provided msg = "Can't write to {}, directory {}, doesn't exist".format(file_path, dir_name) raise ValueError(msg) if os.path.exists(file_path) and os.path.isdir(file_path): msg = "Can't write to %s file path because it's a" " directory" % (file_path) raise ValueError(msg) response = get_response_object(file_url) body = response.body # Verify pricing file is valid try: data = json.loads(body) except JSONDecodeError: msg = "Provided URL doesn't contain valid pricing data" raise Exception(msg) # pylint: disable=maybe-no-member if not data.get("updated", None): msg = "Provided URL doesn't contain valid pricing data" raise Exception(msg) # No need to stream it since file is small with open(file_path, "w") as file_handle: file_handle.write(body)