# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
try:
import simplejson as json
except:
import json
from libcloud.utils.py3 import httplib
from libcloud.common.types import InvalidCredsError
from libcloud.compute.drivers.cloudsigma import CloudSigmaNodeDriver
from libcloud.compute.drivers.cloudsigma import CloudSigma_2_0_NodeDriver
from libcloud.compute.drivers.cloudsigma import CloudSigmaError
from libcloud.compute.types import NodeState
from libcloud.test import unittest
from libcloud.test import MockHttpTestCase
from libcloud.test.file_fixtures import ComputeFileFixtures
[docs]class CloudSigmaAPI20BaseTestCase(object):
[docs] def setUp(self):
self.driver_klass.connectionCls.conn_classes = \
(CloudSigmaMockHttp, CloudSigmaMockHttp)
CloudSigmaMockHttp.type = None
CloudSigmaMockHttp.use_param = 'do'
self.driver = self.driver_klass(*self.driver_args,
**self.driver_kwargs)
self.driver.DRIVE_TRANSITION_SLEEP_INTERVAL = 0.1
self.driver.DRIVE_TRANSITION_TIMEOUT = 1
self.node = self.driver.list_nodes()[0]
[docs] def test_invalid_api_versions(self):
expected_msg = 'Unsupported API version: invalid'
self.assertRaisesRegexp(NotImplementedError, expected_msg,
CloudSigmaNodeDriver, 'username', 'password',
api_version='invalid')
[docs] def test_invalid_credentials(self):
CloudSigmaMockHttp.type = 'INVALID_CREDS'
self.assertRaises(InvalidCredsError, self.driver.list_nodes)
[docs] def test_invalid_region(self):
expected_msg = 'Invalid region:'
self.assertRaisesRegexp(ValueError, expected_msg,
CloudSigma_2_0_NodeDriver, 'foo', 'bar',
region='invalid')
[docs] def test_list_sizes(self):
sizes = self.driver.list_sizes()
size = sizes[0]
self.assertEqual(size.id, 'micro-regular')
[docs] def test_list_images(self):
images = self.driver.list_images()
image = images[0]
self.assertEqual(image.name, 'ubuntu-10.04-toMP')
self.assertEqual(image.extra['image_type'], 'preinst')
self.assertEqual(image.extra['media'], 'disk')
self.assertEqual(image.extra['os'], 'linux')
[docs] def test_list_nodes(self):
nodes = self.driver.list_nodes()
node = nodes[0]
self.assertEqual(len(nodes), 2)
self.assertEqual(node.id, '9de75ed6-fd33-45e2-963f-d405f31fd911')
self.assertEqual(node.name, 'test no drives')
self.assertEqual(node.state, NodeState.RUNNING)
self.assertEqual(node.public_ips, ['185.12.5.181', '178.22.68.55'])
self.assertEqual(node.private_ips, [])
[docs] def test_create_node(self):
image = self.driver.list_images()[0]
size = self.driver.list_sizes()[0]
metadata = {'foo': 'bar'}
node = self.driver.create_node(name='test node', size=size, image=image,
ex_metadata=metadata)
self.assertEqual(node.name, 'test node')
self.assertEqual(len(node.extra['nics']), 1)
self.assertEqual(node.extra['nics'][0]['ip_v4_conf']['conf'], 'dhcp')
[docs] def test_create_node_with_vlan(self):
image = self.driver.list_images()[0]
size = self.driver.list_sizes()[0]
vlan_uuid = '39ae851d-433f-4ac2-a803-ffa24cb1fa3e'
node = self.driver.create_node(name='test node vlan', size=size,
image=image, ex_vlan=vlan_uuid)
self.assertEqual(node.name, 'test node vlan')
self.assertEqual(len(node.extra['nics']), 2)
self.assertEqual(node.extra['nics'][0]['ip_v4_conf']['conf'], 'dhcp')
self.assertEqual(node.extra['nics'][1]['vlan']['uuid'], vlan_uuid)
[docs] def test_destroy_node(self):
status = self.driver.destroy_node(node=self.node)
self.assertTrue(status)
[docs] def test_ex_start_node(self):
status = self.driver.ex_start_node(node=self.node)
self.assertTrue(status)
[docs] def test_ex_start_node_avoid_mode(self):
CloudSigmaMockHttp.type = 'AVOID_MODE'
ex_avoid = ['1', '2']
status = self.driver.ex_start_node(node=self.node,
ex_avoid=ex_avoid)
self.assertTrue(status)
[docs] def test_ex_start_node_already_started(self):
CloudSigmaMockHttp.type = 'ALREADY_STARTED'
expected_msg = 'Cannot start guest in state "started". Guest should ' \
'be in state "stopped'
self.assertRaisesRegexp(CloudSigmaError, expected_msg,
self.driver.ex_start_node, node=self.node)
[docs] def test_ex_stop_node(self):
status = self.driver.ex_stop_node(node=self.node)
self.assertTrue(status)
[docs] def test_ex_stop_node_already_stopped(self):
CloudSigmaMockHttp.type = 'ALREADY_STOPPED'
expected_msg = 'Cannot stop guest in state "stopped"'
self.assertRaisesRegexp(CloudSigmaError, expected_msg,
self.driver.ex_stop_node, node=self.node)
[docs] def test_ex_clone_node(self):
node_to_clone = self.driver.list_nodes()[0]
cloned_node = self.driver.ex_clone_node(node=node_to_clone,
name='test cloned node')
self.assertEqual(cloned_node.name, 'test cloned node')
[docs] def test_ex_open_vnc_tunnel(self):
node = self.driver.list_nodes()[0]
vnc_url = self.driver.ex_open_vnc_tunnel(node=node)
self.assertEqual(vnc_url, 'vnc://direct.lvs.cloudsigma.com:41111')
[docs] def test_ex_close_vnc_tunnel(self):
node = self.driver.list_nodes()[0]
status = self.driver.ex_close_vnc_tunnel(node=node)
self.assertTrue(status)
[docs] def test_ex_list_library_drives(self):
drives = self.driver.ex_list_library_drives()
drive = drives[0]
self.assertEqual(drive.name, 'IPCop 2.0.2')
self.assertEqual(drive.size, 1000000000)
self.assertEqual(drive.media, 'cdrom')
self.assertEqual(drive.status, 'unmounted')
[docs] def test_ex_list_user_drives(self):
drives = self.driver.ex_list_user_drives()
drive = drives[0]
self.assertEqual(drive.name, 'test node 2-drive')
self.assertEqual(drive.size, 13958643712)
self.assertEqual(drive.media, 'disk')
self.assertEqual(drive.status, 'unmounted')
[docs] def test_ex_create_drive(self):
CloudSigmaMockHttp.type = 'CREATE'
name = 'test drive 5'
size = 2000 * 1024 * 1024
drive = self.driver.ex_create_drive(name=name, size=size, media='disk')
self.assertEqual(drive.name, 'test drive 5')
self.assertEqual(drive.media, 'disk')
[docs] def test_ex_clone_drive(self):
drive = self.driver.ex_list_user_drives()[0]
cloned_drive = self.driver.ex_clone_drive(drive=drive,
name='cloned drive')
self.assertEqual(cloned_drive.name, 'cloned drive')
[docs] def test_ex_resize_drive(self):
drive = self.driver.ex_list_user_drives()[0]
size = 1111 * 1024 * 1024
resized_drive = self.driver.ex_resize_drive(drive=drive, size=size)
self.assertEqual(resized_drive.name, 'test drive 5')
self.assertEqual(resized_drive.media, 'disk')
self.assertEqual(resized_drive.size, size)
[docs] def test_ex_list_firewall_policies(self):
policies = self.driver.ex_list_firewall_policies()
policy = policies[1]
rule = policy.rules[0]
self.assertEqual(policy.name, 'My awesome policy')
self.assertEqual(rule.action, 'drop')
self.assertEqual(rule.direction, 'out')
self.assertEqual(rule.dst_ip, '23.0.0.0/32')
self.assertEqual(rule.ip_proto, 'tcp')
self.assertEqual(rule.dst_port, None)
self.assertEqual(rule.src_ip, None)
self.assertEqual(rule.src_port, None)
self.assertEqual(rule.comment, 'Drop traffic from the VM to IP address 23.0.0.0/32')
[docs] def test_ex_create_firewall_policy_no_rules(self):
CloudSigmaMockHttp.type = 'CREATE_NO_RULES'
policy = self.driver.ex_create_firewall_policy(name='test policy 1')
self.assertEqual(policy.name, 'test policy 1')
self.assertEqual(policy.rules, [])
[docs] def test_ex_create_firewall_policy_with_rules(self):
CloudSigmaMockHttp.type = 'CREATE_WITH_RULES'
rules = [
{
'action': 'accept',
'direction': 'out',
'ip_proto': 'tcp',
'src_ip': '127.0.0.1',
'dst_ip': '127.0.0.1'
}
]
policy = self.driver.ex_create_firewall_policy(name='test policy 2',
rules=rules)
rule = policy.rules[0]
self.assertEqual(policy.name, 'test policy 2')
self.assertEqual(len(policy.rules), 1)
self.assertEqual(rule.action, 'accept')
self.assertEqual(rule.direction, 'out')
self.assertEqual(rule.ip_proto, 'tcp')
[docs] def test_ex_attach_firewall_policy(self):
policy = self.driver.ex_list_firewall_policies()[0]
node = self.driver.list_nodes()[0]
CloudSigmaMockHttp.type = 'ATTACH_POLICY'
updated_node = self.driver.ex_attach_firewall_policy(policy=policy,
node=node)
nic = updated_node.extra['nics'][0]
self.assertEqual(nic['firewall_policy']['uuid'],
'461dfb8c-e641-43d7-a20e-32e2aa399086')
[docs] def test_ex_attach_firewall_policy_inexistent_nic(self):
policy = self.driver.ex_list_firewall_policies()[0]
node = self.driver.list_nodes()[0]
nic_mac = 'inexistent'
expected_msg = 'Cannot find the NIC interface to attach a policy to'
self.assertRaisesRegexp(ValueError, expected_msg,
self.driver.ex_attach_firewall_policy,
policy=policy,
node=node,
nic_mac=nic_mac)
[docs] def test_ex_delete_firewall_policy(self):
policy = self.driver.ex_list_firewall_policies()[0]
status = self.driver.ex_delete_firewall_policy(policy=policy)
self.assertTrue(status)
[docs] def test_ex_get_tag(self):
tag = self.driver.ex_get_tag(tag_id='a010ec41-2ead-4630-a1d0-237fa77e4d4d')
self.assertEqual(tag.id, 'a010ec41-2ead-4630-a1d0-237fa77e4d4d')
self.assertEqual(tag.name, 'test tag 2')
[docs] def test_ex_create_tag(self):
tag = self.driver.ex_create_tag(name='test tag 3')
self.assertEqual(tag.name, 'test tag 3')
[docs] def test_ex_create_tag_with_resources(self):
CloudSigmaMockHttp.type = 'WITH_RESOURCES'
resource_uuids = ['1']
tag = self.driver.ex_create_tag(name='test tag 3',
resource_uuids=resource_uuids)
self.assertEqual(tag.name, 'test tag 3')
self.assertEqual(tag.resources, resource_uuids)
[docs] def test_ex_tag_resource(self):
node = self.driver.list_nodes()[0]
tag = self.driver.ex_list_tags()[0]
updated_tag = self.driver.ex_tag_resource(resource=node, tag=tag)
self.assertEqual(updated_tag.name, 'test tag 3')
[docs] def test_ex_tag_resources(self):
nodes = self.driver.list_nodes()
tag = self.driver.ex_list_tags()[0]
updated_tag = self.driver.ex_tag_resources(resources=nodes, tag=tag)
self.assertEqual(updated_tag.name, 'test tag 3')
[docs] def test_ex_tag_resource_invalid_resource_object(self):
tag = self.driver.ex_list_tags()[0]
expected_msg = 'Resource doesn\'t have id attribute'
self.assertRaisesRegexp(ValueError, expected_msg,
self.driver.ex_tag_resource, tag=tag,
resource={})
[docs] def test_ex_delete_tag(self):
tag = self.driver.ex_list_tags()[0]
status = self.driver.ex_delete_tag(tag=tag)
self.assertTrue(status)
[docs] def test_ex_get_balance(self):
balance = self.driver.ex_get_balance()
self.assertEqual(balance['balance'], '10.00')
self.assertEqual(balance['currency'], 'USD')
[docs] def test_ex_get_pricing(self):
pricing = self.driver.ex_get_pricing()
self.assertTrue('current' in pricing)
self.assertTrue('next' in pricing)
self.assertTrue('objects' in pricing)
[docs] def test_ex_get_usage(self):
pricing = self.driver.ex_get_usage()
self.assertTrue('balance' in pricing)
self.assertTrue('usage' in pricing)
[docs] def test_ex_list_subscriptions(self):
subscriptions = self.driver.ex_list_subscriptions()
subscription = subscriptions[0]
self.assertEqual(len(subscriptions), 5)
self.assertEqual(subscription.id, '7272')
self.assertEqual(subscription.resource, 'vlan')
self.assertEqual(subscription.amount, 1)
self.assertEqual(subscription.period, '345 days, 0:00:00')
self.assertEqual(subscription.status, 'active')
self.assertEqual(subscription.price, '0E-20')
[docs] def test_ex_create_subscription(self):
CloudSigmaMockHttp.type = 'CREATE_SUBSCRIPTION'
subscription = self.driver.ex_create_subscription(amount=1,
period='1 month',
resource='vlan')
self.assertEqual(subscription.amount, 1)
self.assertEqual(subscription.period, '1 month')
self.assertEqual(subscription.resource, 'vlan')
self.assertEqual(subscription.price, '10.26666666666666666666666667')
self.assertEqual(subscription.auto_renew, False)
self.assertEqual(subscription.subscribed_object, '2494079f-8376-40bf-9b37-34d633b8a7b7')
[docs] def test_ex_list_subscriptions_status_filterting(self):
CloudSigmaMockHttp.type = 'STATUS_FILTER'
self.driver.ex_list_subscriptions(status='active')
[docs] def test_ex_list_subscriptions_resource_filterting(self):
CloudSigmaMockHttp.type = 'RESOURCE_FILTER'
resources = ['cpu', 'mem']
self.driver.ex_list_subscriptions(resources=resources)
[docs] def test_ex_toggle_subscription_auto_renew(self):
subscription = self.driver.ex_list_subscriptions()[0]
status = self.driver.ex_toggle_subscription_auto_renew(
subscription=subscription)
self.assertTrue(status)
[docs] def test_ex_list_capabilities(self):
capabilities = self.driver.ex_list_capabilities()
self.assertEqual(capabilities['servers']['cpu']['min'], 250)
[docs] def test_ex_list_servers_availability_groups(self):
groups = self.driver.ex_list_servers_availability_groups()
self.assertEqual(len(groups), 3)
self.assertEqual(len(groups[0]), 2)
self.assertEqual(len(groups[2]), 1)
[docs] def test_ex_list_drives_availability_groups(self):
groups = self.driver.ex_list_drives_availability_groups()
self.assertEqual(len(groups), 1)
self.assertEqual(len(groups[0]), 11)
[docs] def test_wait_for_drive_state_transition_timeout(self):
drive = self.driver.ex_list_user_drives()[0]
state = 'timeout'
expected_msg = 'Timed out while waiting for drive transition'
self.assertRaisesRegexp(Exception, expected_msg,
self.driver._wait_for_drive_state_transition,
drive=drive, state=state,
timeout=0.5)
[docs] def test_wait_for_drive_state_transition_success(self):
drive = self.driver.ex_list_user_drives()[0]
state = 'unmounted'
drive = self.driver._wait_for_drive_state_transition(drive=drive,
state=state,
timeout=0.5)
self.assertEqual(drive.status, state)
[docs]class CloudSigmaAPI20DirectTestCase(CloudSigmaAPI20BaseTestCase,
unittest.TestCase):
driver_klass = CloudSigma_2_0_NodeDriver
driver_args = ('foo', 'bar')
driver_kwargs = {}
[docs]class CloudSigmaAPI20IndirectTestCase(CloudSigmaAPI20BaseTestCase,
unittest.TestCase):
driver_klass = CloudSigmaNodeDriver
driver_args = ('foo', 'bar')
driver_kwargs = {'api_version': '2.0'}
[docs]class CloudSigmaMockHttp(MockHttpTestCase):
fixtures = ComputeFileFixtures('cloudsigma_2_0')
def _api_2_0_servers_detail_INVALID_CREDS(self, method, url, body, headers):
body = self.fixtures.load('libdrives.json')
return (httplib.UNAUTHORIZED, body, {},
httplib.responses[httplib.UNAUTHORIZED])
def _api_2_0_libdrives(self, method, url, body, headers):
body = self.fixtures.load('libdrives.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_servers_detail(self, method, url, body, headers):
body = self.fixtures.load('servers_detail_mixed_state.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911(self, method, url, body, headers):
body = ''
return (httplib.NO_CONTENT, body, {}, httplib.responses[httplib.NO_CONTENT])
def _api_2_0_servers(self, method, url, body, headers):
if method == 'POST':
# create_node
parsed = json.loads(body)
if 'vlan' in parsed['name']:
self.assertEqual(len(parsed['nics']), 2)
body = self.fixtures.load('servers_create_with_vlan.json')
else:
body = self.fixtures.load('servers_create.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_start(self, method, url, body, headers):
body = self.fixtures.load('start_success.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_AVOID_MODE_start(self, method, url, body, headers):
self.assertUrlContainsQueryParams(url, {'avoid': '1,2'})
body = self.fixtures.load('start_success.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_ALREADY_STARTED_start(self, method, url, body, headers):
body = self.fixtures.load('start_already_started.json')
return (httplib.FORBIDDEN, body, {}, httplib.responses[httplib.FORBIDDEN])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_stop(self, method, url, body, headers):
body = self.fixtures.load('stop_success.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_ALREADY_STOPPED_stop(self, method, url, body, headers):
body = self.fixtures.load('stop_already_stopped.json')
return (httplib.FORBIDDEN, body, {}, httplib.responses[httplib.FORBIDDEN])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_clone(self, method, url, body, headers):
body = self.fixtures.load('servers_clone.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_open_vnc(self, method, url, body, headers):
body = self.fixtures.load('servers_open_vnc.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_close_vnc(self, method, url, body, headers):
body = self.fixtures.load('servers_close_vnc.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_detail(self, method, url, body, headers):
body = self.fixtures.load('drives_detail.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_b02311e2_a83c_4c12_af10_b30d51c86913(self, method, url, body, headers):
body = self.fixtures.load('drives_get.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_9d1d2cf3_08c1_462f_8485_f4b073560809(self, method, url, body, headers):
body = self.fixtures.load('drives_get.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_CREATE(self, method, url, body, headers):
body = self.fixtures.load('drives_create.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_drives_9d1d2cf3_08c1_462f_8485_f4b073560809_action_clone(self, method, url, body, headers):
body = self.fixtures.load('drives_clone.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_5236b9ee_f735_42fd_a236_17558f9e12d3_action_clone(self, method, url, body, headers):
body = self.fixtures.load('drives_clone.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_b02311e2_a83c_4c12_af10_b30d51c86913_action_resize(self, method, url, body, headers):
body = self.fixtures.load('drives_resize.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_9d1d2cf3_08c1_462f_8485_f4b073560809_action_resize(self, method, url, body, headers):
body = self.fixtures.load('drives_resize.json')
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_fwpolicies_detail(self, method, url, body, headers):
body = self.fixtures.load('fwpolicies_detail.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_fwpolicies_CREATE_NO_RULES(self, method, url, body, headers):
body = self.fixtures.load('fwpolicies_create_no_rules.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_fwpolicies_CREATE_WITH_RULES(self, method, url, body, headers):
body = self.fixtures.load('fwpolicies_create_with_rules.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_ATTACH_POLICY(self, method, url, body, headers):
body = self.fixtures.load('servers_attach_policy.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_fwpolicies_0e339282_0cb5_41ac_a9db_727fb62ff2dc(self, method, url, body, headers):
if method == 'DELETE':
body = ''
return (httplib.NO_CONTENT, body, {},
httplib.responses[httplib.NO_CONTENT])
def _api_2_0_tags_detail(self, method, url, body, headers):
body = self.fixtures.load('tags_detail.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_tags(self, method, url, body, headers):
if method == 'POST':
body = self.fixtures.load('tags_create.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_tags_WITH_RESOURCES(self, method, url, body, headers):
if method == 'POST':
body = self.fixtures.load('tags_create_with_resources.json')
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_tags_a010ec41_2ead_4630_a1d0_237fa77e4d4d(self, method, url, body, headers):
if method == 'GET':
# ex_get_tag
body = self.fixtures.load('tags_get.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
elif method == 'PUT':
# ex_tag_resource
body = self.fixtures.load('tags_update.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
elif method == 'DELETE':
# ex_delete_tag
body = ''
return (httplib.NO_CONTENT, body, {},
httplib.responses[httplib.NO_CONTENT])
def _api_2_0_balance(self, method, url, body, headers):
body = self.fixtures.load('balance.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_pricing(self, method, url, body, headers):
body = self.fixtures.load('pricing.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_currentusage(self, method, url, body, headers):
body = self.fixtures.load('currentusage.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions(self, method, url, body, headers):
body = self.fixtures.load('subscriptions.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_STATUS_FILTER(self, method, url, body, headers):
self.assertUrlContainsQueryParams(url, {'status': 'active'})
body = self.fixtures.load('subscriptions.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_RESOURCE_FILTER(self, method, url, body, headers):
expected_params = {'resource': 'cpu,mem', 'status': 'all'}
self.assertUrlContainsQueryParams(url, expected_params)
body = self.fixtures.load('subscriptions.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_7272_action_auto_renew(self, method, url, body, headers):
body = ''
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_CREATE_SUBSCRIPTION(self, method, url, body, headers):
body = self.fixtures.load('create_subscription.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_capabilities(self, method, url, body, headers):
body = self.fixtures.load('capabilities.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_servers_availability_groups(self, method, url, body, headers):
body = self.fixtures.load('servers_avail_groups.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_availability_groups(self, method, url, body, headers):
body = self.fixtures.load('drives_avail_groups.json')
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if __name__ == '__main__':
sys.exit(unittest.main())