Source code for libcloud.test.compute.test_ktucloud

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import unittest

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlparse
from libcloud.utils.py3 import parse_qsl

try:
    import simplejson as json
except ImportError:
    import json

from libcloud.compute.drivers.ktucloud import KTUCloudNodeDriver

from libcloud.test import MockHttpTestCase
from libcloud.test.compute import TestCaseMixin
from libcloud.test.file_fixtures import ComputeFileFixtures


[docs]class KTUCloudNodeDriverTest(unittest.TestCase, TestCaseMixin):
[docs] def setUp(self): KTUCloudNodeDriver.connectionCls.conn_classes = \ (None, KTUCloudStackMockHttp) self.driver = KTUCloudNodeDriver('apikey', 'secret', path='/test/path', host='api.dummy.com') self.driver.path = '/test/path' self.driver.type = -1 KTUCloudStackMockHttp.fixture_tag = 'default' self.driver.connection.poll_interval = 0.0
[docs] def test_create_node_immediate_failure(self): size = self.driver.list_sizes()[0] image = self.driver.list_images()[0] KTUCloudStackMockHttp.fixture_tag = 'deployfail' self.assertRaises( Exception, self.driver.create_node, name='node-name', image=image, size=size)
[docs] def test_create_node_delayed_failure(self): size = self.driver.list_sizes()[0] image = self.driver.list_images()[0] KTUCloudStackMockHttp.fixture_tag = 'deployfail2' self.assertRaises( Exception, self.driver.create_node, name='node-name', image=image, size=size)
[docs] def test_list_images_no_images_available(self): KTUCloudStackMockHttp.fixture_tag = 'notemplates' images = self.driver.list_images() self.assertEqual(0, len(images))
[docs] def test_list_images_available(self): images = self.driver.list_images() self.assertEqual(112, len(images))
[docs] def test_list_sizes_available(self): sizes = self.driver.list_sizes() self.assertEqual(112, len(sizes))
[docs] def test_list_sizes_nodisk(self): KTUCloudStackMockHttp.fixture_tag = 'nodisk' sizes = self.driver.list_sizes() self.assertEqual(2, len(sizes)) check = False size = sizes[1] if size.id == KTUCloudNodeDriver.EMPTY_DISKOFFERINGID: check = True self.assertTrue(check)
[docs]class KTUCloudStackMockHttp(MockHttpTestCase): fixtures = ComputeFileFixtures('ktucloud') fixture_tag = 'default' def _load_fixture(self, fixture): body = self.fixtures.load(fixture) return body, json.loads(body) def _test_path(self, method, url, body, headers): url = urlparse.urlparse(url) query = dict(parse_qsl(url.query)) self.assertTrue('apiKey' in query) self.assertTrue('command' in query) self.assertTrue('response' in query) self.assertTrue('signature' in query) self.assertTrue(query['response'] == 'json') del query['apiKey'] del query['response'] del query['signature'] command = query.pop('command') if hasattr(self, '_cmd_' + command): return getattr(self, '_cmd_' + command)(**query) else: fixture = command + '_' + self.fixture_tag + '.json' body, obj = self._load_fixture(fixture) return (httplib.OK, body, obj, httplib.responses[httplib.OK]) def _cmd_queryAsyncJobResult(self, jobid): fixture = 'queryAsyncJobResult' + '_' + str(jobid) + '.json' body, obj = self._load_fixture(fixture) return (httplib.OK, body, obj, httplib.responses[httplib.OK])
if __name__ == '__main__': sys.exit(unittest.main())