# -*- mode: python; coding: utf-8 -*-
# Copyright 2020 the AAS WorldWide Telescope project
# Licensed under the MIT License.
"""A framework for automating the ingest of source images into the formats
used by AAS WorldWide Telescope.
"""
from __future__ import absolute_import, division, print_function
__all__ = '''
CandidateInput
ImageSource
IMAGE_SOURCE_CLASS_LOADERS
NotActionableError
PipelineIo
PIPELINE_IO_LOADERS
'''.split()
from abc import ABC, abstractclassmethod, abstractmethod
from datetime import datetime, timezone
import numpy as np
import os.path
import shutil
import sys
from urllib.parse import urlsplit, quote as urlquote
from wwt_data_formats import write_xml_doc
from wwt_data_formats.folder import Folder
from wwt_data_formats.imageset import ImageSet
from wwt_data_formats.place import Place
import yaml
[docs]
class NotActionableError(Exception):
"""Raised when an image is provided to the pipeline but for some reason we're
not going to be able to get it into a WWT-compatible form.
"""
def __init__(self, reason):
super(NotActionableError, self).__init__(reason)
PIPELINE_IO_LOADERS = {}
def _load_local_pio(config):
from .local_io import LocalPipelineIo
return LocalPipelineIo._new_from_config(config)
PIPELINE_IO_LOADERS['local'] = _load_local_pio
def _load_azure_blob_pio(config):
from .azure_io import AzureBlobPipelineIo
return AzureBlobPipelineIo._new_from_config(config)
PIPELINE_IO_LOADERS['azure-blob'] = _load_azure_blob_pio
[docs]
class PipelineIo(ABC):
"""
An abstract base class for I/O relating to pipeline processing. An instance
of this class might be used to fetch files from, and send them to, a cloud
storage system like S3 or Azure Storage.
"""
@abstractmethod
def _export_config(self):
"""
Export this object's configuration for serialization.
Returns
-------
A dictionary of settings that can be saved as YAML format. There should
be a key named "_type" with a string value identifying the I/O
implementation type.
"""
[docs]
def save_config(self, path):
"""
Save this object's configuration to the specified filesystem path.
"""
cfg = self._export_config()
# The config contains secrets, so create it privately and securely.
opener = lambda path, _mode: os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, mode=0o600)
with open(path, 'wt', opener=opener, encoding='utf8') as f:
yaml.dump(cfg, f, yaml.SafeDumper)
@abstractclassmethod
def _new_from_config(cls, config):
"""
Create a new instance of this class based on serialized configuration.
Parameters
----------
config : dict
A dict of configuration that was created with ``_export_config``
Returns
-------
A new instance of the class.
"""
[docs]
@classmethod
def load_from_config(self, path):
"""
Create a new I/O backend from saved configuration.
Parameters
----------
path : path-like
The path where the configuration was saved.
Returns
-------
A new instance implementing the PipelineIO abstract base class.
"""
with open(path, 'rt', encoding='utf8') as f:
config = yaml.safe_load(f)
ty = config.get('_type')
loader = PIPELINE_IO_LOADERS.get(ty)
if loader is None:
raise Exception(f'unrecognized pipeline I/O storage type {ty!r}')
return loader(config)
[docs]
@abstractmethod
def check_exists(self, *path):
"""Test whether an item at the specified path exists.
Parameters
----------
*path : strings
The path to the item, intepreted as components in a folder hierarchy.
Returns
-------
A boolean indicating whether the item in question exists.
"""
[docs]
@abstractmethod
def get_item(self, *path, dest=None):
"""Fetch a file-like item at the specified path, writing its contents into the
specified file-like object *dest*.
Parameters
----------
*path : strings
The path to the item, intepreted as components in a folder hierarchy.
dest : writeable file-like object
The object into which the item's data will be written as bytes.
Returns
-------
None.
"""
[docs]
@abstractmethod
def put_item(self, *path, source=None):
"""Put a file-like item at the specified path, reading its contents from the
specified file-like object *source*.
Parameters
----------
*path : strings
The path to the item, intepreted as components in a folder hierarchy.
source : readable file-like object
The object from which the item's data will be read, as bytes.
Returns
-------
None.
"""
[docs]
@abstractmethod
def list_items(self, *path):
"""List the items contained in the folder at the specified path.
Parameters
----------
*path : strings
The path to the item, intepreted as components in a folder hierarchy.
Returns
-------
An iterable of ``(stem, is_folder)``, where *stem* is the "basename" of an
item contained within the specified folder and *is_folder* is a boolean
indicating whether this item appears to be a folder itself.
"""
[docs]
class ImageSource(ABC):
"""An abstract base class representing a source of images to be processed in
the image-processing pipeline. An instance of this class might fetch
images from an RSS feed or an AstroPix search.
"""
[docs]
@abstractclassmethod
def get_config_key(cls):
"""Get the name of the section key used for this source's configuration data.
Returns
-------
A string giving a key name usable in a YAML file.
"""
[docs]
@abstractclassmethod
def deserialize(cls, data):
"""Create an instance of this class by deserializing configuration data.
Parameters
----------
data : dict-like object
A dict-like object containing configuration items deserialized from
a format such as JSON or YAML. The particular contents can vary
depending on the implementation.
Returns
-------
An instance of *cls*
"""
[docs]
@abstractmethod
def query_candidates(self):
"""
Generate a sequence of candidate input images that the pipeline may want
to process.
Returns
-------
A generator that yields a sequence of :class:`toasty.pipeline.CandidateInput` instances.
"""
[docs]
@abstractmethod
def fetch_candidate(self, unique_id, cand_data_stream, cachedir):
"""
Download a candidate image and prepare it for processing.
Parameters
----------
unique_id : str
The unique ID returned by the :class:`toasty.pipeline.CandidateInput` instance
that was returned from the initial query.
cand_data_stream : readable stream returning bytes
A data stream returning the data that were saved when the candidate
was queried (:meth:`toasty.pipeline.CandidateInput.save`).
cachedir : path-like
A path pointing to a local directory inside of which the
full image data and metadata should be cached.
"""
[docs]
@abstractmethod
def process(self, unique_id, cand_data_stream, cachedir, builder):
"""
Process an input into WWT format.
Parameters
----------
unique_id : str
The unique ID returned by the :class:`toasty.pipeline.CandidateInput` instance
that was returned from the initial query.
cand_data_stream : readable stream returning bytes
A data stream returning the data that were saved when the candidate
was queried (:meth:`toasty.pipeline.CandidateInput.save`).
cachedir : path-like
A path pointing to a local directory inside of which the
full image data and metadata should be cached.
builder : :class:`toasty.builder.Builder`
State object for constructing the WWT data files.
Notes
-----
Your image processor should run the tile cascade, if needed, but the
caller will take care of emitting the ``index_rel.wtml`` file.
"""
# The PipelineManager class that orchestrates it all
IMAGE_SOURCE_CLASS_LOADERS = {}
def _load_astropix_image_source_class():
from .astropix import AstroPixImageSource
return AstroPixImageSource
IMAGE_SOURCE_CLASS_LOADERS['astropix'] = _load_astropix_image_source_class
def _load_djangoplicity_image_source_class():
from .djangoplicity import DjangoplicityImageSource
return DjangoplicityImageSource
IMAGE_SOURCE_CLASS_LOADERS['djangoplicity'] = _load_djangoplicity_image_source_class
class PipelineManager(object):
_config = None
_pipeio = None
_workdir = None
_img_source = None
def __init__(self, workdir):
self._workdir = workdir
self._pipeio = PipelineIo.load_from_config(self._path('toasty-store-config.yaml'))
def _path(self, *path):
return os.path.join(self._workdir, *path)
def _ensure_dir(self, *path):
path = self._path(*path)
os.makedirs(path, exist_ok=True)
return path
def ensure_config(self):
if self._config is not None:
return self._config
self._ensure_dir()
cfg_path = self._path('toasty-pipeline-config.yaml')
if not os.path.exists(cfg_path): # racey
with open(cfg_path, 'wb') as f:
self._pipeio.get_item('toasty-pipeline-config.yaml', dest=f)
with open(cfg_path, 'rt', encoding='utf8') as f:
config = yaml.safe_load(f)
if config is None:
raise Exception('no toasty-pipeline-config.yaml found in the storage')
self._config = config
return self._config
def get_image_source(self):
if self._img_source is not None:
return self._img_source
self.ensure_config()
source_type = self._config.get('source_type')
if not source_type:
raise Exception('toasty pipeline configuration must have a source_type key')
cls_loader = IMAGE_SOURCE_CLASS_LOADERS.get(source_type)
if cls_loader is None:
raise Exception('unrecognized image source type %s' % source_type)
cls = cls_loader()
cfg_key = cls.get_config_key()
source_config = self._config.get(cfg_key)
if source_config is None:
raise Exception('no image source configuration key %s in the config file' % cfg_key)
self._img_source = cls.deserialize(source_config)
return self._img_source
def process_todos(self):
from ..builder import Builder
from .. import par_util
from ..pyramid import PyramidIO
src = self.get_image_source()
cand_dir = self._path('candidates')
self._ensure_dir('cache_done')
baseoutdir = self._ensure_dir('processed')
# Lame hack to tidy up output slightly
par_util.SHOW_INFORMATIONAL_MESSAGES = False
for uniq_id in os.listdir(self._path('cache_todo')):
cachedir = self._path('cache_todo', uniq_id)
outdir = self._path('processed', uniq_id)
pio = PyramidIO(outdir, scheme='LXY', default_format='png')
builder = Builder(pio)
cdata = open(os.path.join(cand_dir, uniq_id), 'rb')
print(f'processing {uniq_id} ... ', end='')
sys.stdout.flush()
src.process(uniq_id, cdata, cachedir, builder)
cdata.close()
builder.write_index_rel_wtml()
print('done')
# Woohoo, done!
os.rename(cachedir, self._path('cache_done', uniq_id))
def publish(self):
done_dir = self._ensure_dir('published')
todo_dir = self._path('approved')
pfx = todo_dir + os.path.sep
for uniq_id in os.listdir(todo_dir):
# If there's a index.wtml file, save it for last -- that will
# indicate that this directory has uploaded fully successfully.
filenames = os.listdir(os.path.join(todo_dir, uniq_id))
try:
index_index = filenames.index('index.wtml')
except ValueError:
pass
else:
temp = filenames[-1]
filenames[-1] = 'index.wtml'
filenames[index_index] = temp
print(f'publishing {uniq_id} ...')
for filename in filenames:
# Get the components of the item path relative to todo_dir.
sub_components = [todo_dir, uniq_id, filename]
p = os.path.join(*sub_components)
assert p.startswith(pfx)
with open(p, 'rb') as f:
self._pipeio.put_item(*sub_components[1:], source=f)
os.rename(os.path.join(todo_dir, uniq_id), os.path.join(done_dir, uniq_id))
def ignore_rejects(self):
from io import BytesIO
rejects_dir = self._path('rejects')
n = 0
# maybe one day this will be JSON with data?
flag_content = BytesIO(b'{}')
for uniq_id in os.listdir(rejects_dir):
print(f'ignoring {uniq_id} ...')
self._pipeio.put_item(uniq_id, 'skip.flag', source=flag_content)
n += 1
if n > 1:
print()
print(f'marked a total of {n} images to be permanently ignored')