Source code for toasty.pipeline.azure_io

# -*- mode: python; coding: utf-8 -*-
# Copyright 2020 the AAS WorldWide Telescope project
# Licensed under the MIT License.

"""
Azure Blob Storage I/O backend for the pipeline framework.

This module requires that the ``azure.storage.blob`` Python module be available.
If it is not, this module will still be importable, but it won't work. Check the
``ENABLED`` boolean variable or call :func:`assert_enabled` to raise an
exception offering guidance if the needed support is missing.

"""

__all__ = '''
AzureBlobPipelineIo
ENABLED
assert_enabled
'''.split()

import shutil

from . import PipelineIo

try:
    from azure.storage.blob import BlobServiceClient
    ENABLED = True
except ImportError:
    ENABLED = False


[docs] def assert_enabled(): if not ENABLED: raise Exception('Azure pipeline I/O backend is needed but unavailable -' ' install the `azure-storage-blob` package')
[docs] class AzureBlobPipelineIo(PipelineIo): """I/O for pipeline processing that uses Microsoft Azure Blob Storage. Parameters ---------- connection_string : str The Azure "connection string" to use container_name : str The name of the blob container within the storage account path_prefix : str or iterable of str A list folder names within the blob container that will be prepended to all paths accessed through this object. """ _connection_string = None _svc_client = None _cnt_client = None _container_name = None _path_prefix = None def __init__(self, connection_string, container_name, path_prefix): assert_enabled() if isinstance(path_prefix, str): path_prefix = (path_prefix, ) else: try: path_prefix = tuple(path_prefix) for item in path_prefix: assert isinstance(item, str) except Exception: raise ValueError('path_prefix should be a string or iterable of strings; ' 'got %r' % (path_prefix, )) self._connection_string = connection_string self._container_name = container_name self._svc_client = BlobServiceClient.from_connection_string(connection_string) self._cnt_client = self._svc_client.get_container_client(container_name) self._path_prefix = path_prefix def _export_config(self): return { '_type': 'azure-blob', 'connection_secret': self._connection_string, 'container_name': self._container_name, 'path_prefix': self._path_prefix, } @classmethod def _new_from_config(cls, config): return cls( config['connection_secret'], config['container_name'], config['path_prefix'], ) def _make_blob_name(self, path_array): """TODO: is this actually correct? Escaping?""" return '/'.join(self._path_prefix + tuple(path_array))
[docs] def check_exists(self, *path): from azure.core.exceptions import ResourceNotFoundError blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path)) try: blob_client.get_blob_properties() except ResourceNotFoundError: return False return True
[docs] def get_item(self, *path, dest=None): blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path)) blob_client.download_blob().readinto(dest)
[docs] def put_item(self, *path, source=None): blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path)) blob_client.upload_blob(source)
[docs] def list_items(self, *path): from azure.storage.blob.models import BlobPrefix prefix = self._make_blob_name(path) + '/' for item in self._cnt_client.list_blobs( prefix = prefix, delimiter = '/' ): assert item.name.startswith(prefix) stem = item.name[len(prefix):] is_folder = isinstance(item, BlobPrefix) if is_folder: # Returned names end with a '/' too assert stem[-1] == '/' stem = stem[:-1] yield stem, is_folder