Source code for libcloud.test.test_httplib_ssl

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import os.path
import ssl
import socket

import mock
from mock import patch

import libcloud.security

from libcloud.utils.py3 import reload
from libcloud.httplib_ssl import LibcloudHTTPSConnection

from libcloud.test import unittest

ORIGINAL_CA_CERS_PATH = libcloud.security.CA_CERTS_PATH


[docs]class TestHttpLibSSLTests(unittest.TestCase):
[docs] def setUp(self): libcloud.security.VERIFY_SSL_CERT = False libcloud.security.CA_CERTS_PATH = ORIGINAL_CA_CERS_PATH self.httplib_object = LibcloudHTTPSConnection('foo.bar')
[docs] def test_custom_ca_path_using_env_var_doesnt_exist(self): os.environ['SSL_CERT_FILE'] = '/foo/doesnt/exist' try: reload(libcloud.security) except ValueError: e = sys.exc_info()[1] msg = 'Certificate file /foo/doesnt/exist doesn\'t exist' self.assertEqual(str(e), msg) else: self.fail('Exception was not thrown')
[docs] def test_custom_ca_path_using_env_var_is_directory(self): file_path = os.path.dirname(os.path.abspath(__file__)) os.environ['SSL_CERT_FILE'] = file_path expected_msg = 'Certificate file can\'t be a directory' self.assertRaisesRegexp(ValueError, expected_msg, reload, libcloud.security)
[docs] def test_custom_ca_path_using_env_var_exist(self): # When setting a path we don't actually check that a valid CA file is # provided. # This happens later in the code in httplib_ssl.connect method file_path = os.path.abspath(__file__) os.environ['SSL_CERT_FILE'] = file_path reload(libcloud.security) self.assertEqual(libcloud.security.CA_CERTS_PATH, [file_path])
@patch('warnings.warn')
[docs] def test_setup_verify(self, _): libcloud.security.CA_CERTS_PATH = [] # Should throw a runtime error libcloud.security.VERIFY_SSL_CERT = True expected_msg = libcloud.security.CA_CERTS_UNAVAILABLE_ERROR_MSG self.assertRaisesRegexp(RuntimeError, expected_msg, self.httplib_object._setup_verify) libcloud.security.VERIFY_SSL_CERT = False self.httplib_object._setup_verify()
@patch('warnings.warn')
[docs] def test_setup_ca_cert(self, _): # verify = False, _setup_ca_cert should be a no-op self.httplib_object.verify = False self.httplib_object._setup_ca_cert() self.assertEqual(self.httplib_object.ca_cert, None) # verify = True, a valid path is provided, self.ca_cert should be set to # a valid path self.httplib_object.verify = True libcloud.security.CA_CERTS_PATH = [os.path.abspath(__file__)] self.httplib_object._setup_ca_cert() self.assertTrue(self.httplib_object.ca_cert is not None) # verify = True, no CA certs are available, exception should be thrown libcloud.security.CA_CERTS_PATH = [] expected_msg = libcloud.security.CA_CERTS_UNAVAILABLE_ERROR_MSG self.assertRaisesRegexp(RuntimeError, expected_msg, self.httplib_object._setup_ca_cert)
@mock.patch('socket.create_connection', mock.MagicMock()) @mock.patch('socket.socket', mock.MagicMock())
[docs] def test_connect_throws_friendly_error_message_on_ssl_wrap_connection_reset_by_peer(self): mock_wrap_socket = None if getattr(ssl, 'HAS_SNI', False): ssl.SSLContext.wrap_socket = mock.MagicMock() mock_wrap_socket = ssl.SSLContext.wrap_socket else: ssl.wrap_socket = mock.MagicMock() mock_wrap_socket = ssl.wrap_socket # Test that we re-throw a more friendly error message in case # "connection reset by peer" error occurs when trying to establish a # SSL connection libcloud.security.VERIFY_SSL_CERT = True self.httplib_object.verify = True self.httplib_object.http_proxy_used = False # No connection reset by peer, original exception should be thrown mock_wrap_socket.side_effect = Exception('foo bar fail') expected_msg = 'foo bar fail' self.assertRaisesRegexp(Exception, expected_msg, self.httplib_object.connect) # Connection reset by peer, wrapped exception with friendly error # message should be thrown mock_wrap_socket.side_effect = socket.error('Connection reset by peer') expected_msg = 'Failed to establish SSL / TLS connection' self.assertRaisesRegexp(socket.error, expected_msg, self.httplib_object.connect) # Same error but including errno with self.assertRaises(socket.error) as cm: mock_wrap_socket.side_effect = socket.error(104, 'Connection reset by peer') self.httplib_object.connect() e = cm.exception self.assertEqual(e.errno, 104) self.assertTrue(expected_msg in str(e)) # Test original exception is propagated correctly on non reset by peer # error with self.assertRaises(socket.error) as cm: mock_wrap_socket.side_effect = socket.error(105, 'Some random error') self.httplib_object.connect() e = cm.exception self.assertEqual(e.errno, 105) self.assertTrue('Some random error' in str(e))
[docs] def test_certifi_ca_bundle_in_search_path(self): mock_certifi_ca_bundle_path = '/certifi/bundle/path' # Certifi not available import libcloud.security reload(libcloud.security) original_length = len(libcloud.security.CA_CERTS_PATH) self.assertTrue(mock_certifi_ca_bundle_path not in libcloud.security.CA_CERTS_PATH) # Certifi is available mock_certifi = mock.Mock() mock_certifi.where.return_value = mock_certifi_ca_bundle_path sys.modules['certifi'] = mock_certifi # Certifi CA bundle path should be injected at the begining of search list import libcloud.security reload(libcloud.security) self.assertEqual(libcloud.security.CA_CERTS_PATH[0], mock_certifi_ca_bundle_path) self.assertEqual(len(libcloud.security.CA_CERTS_PATH), (original_length + 1)) # Certifi is available, but USE_CERTIFI is set to False os.environ['LIBCLOUD_SSL_USE_CERTIFI'] = 'false' import libcloud.security reload(libcloud.security) self.assertTrue(mock_certifi_ca_bundle_path not in libcloud.security.CA_CERTS_PATH) self.assertEqual(len(libcloud.security.CA_CERTS_PATH), original_length) # And enabled os.environ['LIBCLOUD_SSL_USE_CERTIFI'] = 'true' import libcloud.security reload(libcloud.security) self.assertEqual(libcloud.security.CA_CERTS_PATH[0], mock_certifi_ca_bundle_path) self.assertEqual(len(libcloud.security.CA_CERTS_PATH), (original_length + 1))
if __name__ == '__main__': sys.exit(unittest.main())