libcloud.storage.drivers.dummy module
- class libcloud.storage.drivers.dummy.DummyFileObject(yield_count=5, chunk_len=10)[source]
Bases:
FileIO
- class libcloud.storage.drivers.dummy.DummyStorageDriver(api_key, api_secret)[source]
Bases:
StorageDriver
Dummy Storage driver.
>>> from libcloud.storage.drivers.dummy import DummyStorageDriver >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container(container_name='test container') >>> container <Container: name=test container, provider=Dummy Storage Provider> >>> container.name 'test container' >>> container.extra['object_count'] 0
- Parameters:
api_key (
str
) – API key or username to used (required)api_secret (
str
) – Secret password to be used (required)
- Return type:
None
- create_container(container_name)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 1') ... Traceback (most recent call last): ContainerAlreadyExistsError:
@inherits:
StorageDriver.create_container
- delete_container(container)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container = Container(name = 'test container', ... extra={'object_count': 0}, driver=driver) >>> driver.delete_container(container=container) ... Traceback (most recent call last): ContainerDoesNotExistError: >>> container = driver.create_container( ... container_name='test container 1') ... >>> len(driver._containers) 1 >>> driver.delete_container(container=container) True >>> len(driver._containers) 0 >>> container = driver.create_container( ... container_name='test container 1') ... >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.delete_container(container=container) ... Traceback (most recent call last): ContainerIsNotEmptyError:
@inherits:
StorageDriver.delete_container
- delete_object(obj)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj <Object: name=test object, size=50, ...> >>> container.delete_object(obj=obj) True >>> obj = Object(name='test object 2', ... size=1000, hash=None, extra=None, ... meta_data=None, container=container,driver=None) >>> container.delete_object(obj=obj) Traceback (most recent call last): ObjectDoesNotExistError:
@inherits:
StorageDriver.delete_object
- download_object(obj, destination_path, overwrite_existing=False, delete_on_failure=True)[source]
Download an object to the specified destination path.
- Parameters:
obj (
libcloud.storage.base.Object
) – Object instance.destination_path (
str
) – Full path to a file or a directory where the incoming file will be saved.overwrite_existing (
bool
) – True to overwrite an existing file, defaults to False.delete_on_failure (
bool
) – True to delete a partially downloaded file if the download was not successful (hash mismatch / file size).
- Returns:
True if an object has been successfully downloaded, False otherwise.
- Return type:
bool
- download_object_as_stream(obj, chunk_size=None)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> stream = container.download_object_as_stream(obj) >>> stream <...closed...>
@inherits:
StorageDriver.download_object_as_stream
- get_container(container_name)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> driver.get_container('test container 1') <Container: name=test container 1, provider=Dummy Storage Provider>
@inherits:
StorageDriver.get_container
- get_container_cdn_url(container)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container.get_cdn_url() 'http://www.test.com/container/test_container_1'
@inherits:
StorageDriver.get_container_cdn_url
- get_meta_data()[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_meta_data()['object_count'] 0 >>> driver.get_meta_data()['container_count'] 0 >>> driver.get_meta_data()['bytes_used'] 0 >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.get_meta_data()['object_count'] 1 >>> driver.get_meta_data()['container_count'] 2 >>> driver.get_meta_data()['bytes_used'] 50
- Return type:
dict
- get_object(container_name, object_name)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_object('unknown', 'unknown') ... Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> driver.get_object( ... 'test container 1', 'unknown') Traceback (most recent call last): ObjectDoesNotExistError: >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object' >>> obj.size 50
@inherits:
StorageDriver.get_object
- get_object_cdn_url(obj)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> obj = container.upload_object_via_stream( ... object_name='test object 5', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object 5' >>> obj.get_cdn_url() 'http://www.test.com/object/test_object_5'
@inherits:
StorageDriver.get_object_cdn_url
- iterate_container_objects(container, prefix=None, ex_prefix=None)[source]
Return a iterator of objects for the given container.
- Parameters:
container (
libcloud.storage.base.Container
) – Container instanceprefix (
str
) – Filter objects starting with a prefix.ex_prefix (
str
) – (Deprecated.) Filter objects starting with a prefix.
- Returns:
A iterator of Object instances.
- Return type:
iterator
oflibcloud.storage.base.Object
- iterate_containers()[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> list(driver.iterate_containers()) [] >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 2, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 2') ... Traceback (most recent call last): ContainerAlreadyExistsError: >>> container_list=list(driver.iterate_containers()) >>> sorted([c.name for c in container_list]) ['test container 1', 'test container 2']
@inherits:
StorageDriver.iterate_containers
- upload_object(file_path, container, object_name, extra=None, verify_hash=True, headers=None)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container.upload_object(file_path='/tmp/inexistent.file', ... object_name='test') Traceback (most recent call last): LibcloudError: >>> file_path = path = os.path.abspath(__file__) >>> file_size = os.path.getsize(file_path) >>> obj = container.upload_object(file_path=file_path, ... object_name='test') >>> obj <Object: name=test, size=...> >>> obj.size == file_size True
@inherits:
StorageDriver.upload_object
- upload_object_via_stream(iterator, container, object_name, extra=None, headers=None)[source]
>>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> obj <Object: name=test object, size=50, ...>
@inherits:
StorageDriver.upload_object_via_stream
- website = 'http://example.com'