Source code for toasty.pipeline.local_io
# -*- mode: python; coding: utf-8 -*-
# Copyright 2020 the AAS WorldWide Telescope project
# Licensed under the MIT License.
"""
Pipeline I/O using the local filesystem as a storage backend.
Note that pipeline processing *always* uses the local filesystem for
intermediate steps. This module is for scenarios where the final long-term
storage of processed data should also involve the local filesystem.
"""
__all__ = '''
LocalPipelineIo
'''.split()
import os.path
import shutil
from . import PipelineIo
[docs]
class LocalPipelineIo(PipelineIo):
"""
I/O for pipeline processing using the local disk.
Parameters
----------
path_prefix : str
A path prefix that will be used for all I/O options.
"""
_path_prefix = None
def __init__(self, path_prefix):
self._path_prefix = path_prefix
def _export_config(self):
return {
'_type': 'local',
'path': self._path_prefix,
}
@classmethod
def _new_from_config(cls, config):
return cls(config['path'])
def _make_item_name(self, path_array):
return os.path.join(self._path_prefix, *path_array)
[docs]
def check_exists(self, *path):
return os.path.exists(self._make_item_name(path))
[docs]
def get_item(self, *path, dest=None):
with open(self._make_item_name(path), 'rb') as f:
shutil.copyfileobj(f, dest)
[docs]
def put_item(self, *path, source=None):
fpath = self._make_item_name(path)
cdir = os.path.split(fpath)[0]
os.makedirs(cdir, exist_ok=True)
with open(fpath, 'wb') as f:
shutil.copyfileobj(source, f)
[docs]
def list_items(self, *path):
dpath = self._make_item_name(path)
for stem in os.listdir(dpath):
yield stem, os.path.isdir(os.path.join(dpath, stem))