Source code for toasty.pipeline.cli

# -*- mode: python; coding: utf-8 -*-
# Copyright 2019-2020 the AAS WorldWide Telescope project.
# Licensed under the MIT License.

"""
Entrypoints for the "toasty pipeline" command-line tools.
"""

__all__ = '''
pipeline_getparser
pipeline_impl
'''.split()

import argparse
from fnmatch import fnmatch
import glob
import os.path
import sys
from wwt_data_formats.cli import EnsureGlobsExpandedAction

from ..cli import die, warn
from . import NotActionableError


def evaluate_imageid_args(searchdir, args):
    """
    Figure out which image-ID's to process.
    """

    matched_ids = set()
    globs_todo = set()

    for arg in args:
        if glob.has_magic(arg):
            globs_todo.add(arg)
        else:
            # If an ID is explicitly (non-gobbily) added, always add it to the
            # list, without checking if it exists in `searchdir`. We could check
            # for it in searchdir now, but we'll have to check later anyway, so
            # we don't bother.
            matched_ids.add(arg)

    if len(globs_todo):
        for filename in os.listdir(searchdir):
            for g in globs_todo:
                if fnmatch(filename, g):
                    matched_ids.add(filename)
                    break

    return sorted(matched_ids)


# The "approve" subcommand

def approve_setup_parser(parser):
    parser.add_argument(
        '--workdir',
        metavar = 'PATH',
        default = '.',
        help = 'The working directory for this processing session'
    )
    parser.add_argument(
        'cand_ids',
        nargs = '+',
        action = EnsureGlobsExpandedAction,
        metavar = 'IMAGE-ID',
        help = 'Name(s) of image(s) to approve for publication (globs accepted)'
    )


def approve_impl(settings):
    from wwt_data_formats.folder import Folder, make_absolutizing_url_mutator
    from . import PipelineManager

    mgr = PipelineManager(settings.workdir)
    mgr.ensure_config()

    pub_url_prefix = mgr._config.get('publish_url_prefix')
    if pub_url_prefix:
        if pub_url_prefix[-1] != '/':
            pub_url_prefix += '/'

    proc_dir = mgr._ensure_dir('processed')
    app_dir = mgr._ensure_dir('approved')

    for cid in evaluate_imageid_args(proc_dir, settings.cand_ids):
        if not os.path.isdir(os.path.join(proc_dir, cid)):
            die(f'no such processed candidate ID {cid!r}')

        index_path = os.path.join(proc_dir, cid, 'index.wtml')
        prefix = pub_url_prefix + cid + '/'

        try:
            f = Folder.from_file(os.path.join(proc_dir, cid, 'index_rel.wtml'))
            f.mutate_urls(make_absolutizing_url_mutator(prefix))

            with open(index_path, 'wt', encoding='utf8') as f_out:
                f.write_xml(f_out)
        except Exception as e:
            print('error: failed to create index.wtml from index_rel.wtml', file=sys.stderr)

            try:
                os.remove(index_path)
            except Exception:
                pass

            raise

        os.rename(os.path.join(proc_dir, cid), os.path.join(app_dir, cid))


# The "fetch" subcommand

def fetch_setup_parser(parser):
    parser.add_argument(
        '--workdir',
        metavar = 'PATH',
        default = '.',
        help = 'The working directory for this processing session'
    )
    parser.add_argument(
        'cand_ids',
        nargs = '+',
        action = EnsureGlobsExpandedAction,
        metavar = 'CAND-ID',
        help = 'Name(s) of candidate(s) to fetch and prepare for processing (globs accepted)'
    )


def fetch_impl(settings):
    from . import PipelineManager

    mgr = PipelineManager(settings.workdir)
    cand_dir = mgr._ensure_dir('candidates')
    rej_dir = mgr._ensure_dir('rejects')
    src = mgr.get_image_source()

    for cid in evaluate_imageid_args(cand_dir, settings.cand_ids):
        # Funky structure here is to try to ensure that cdata is closed in case
        # a NotActionable happens, so that we can move the directory on Windows.
        try:
            try:
                cdata = open(os.path.join(cand_dir, cid), 'rb')
            except FileNotFoundError:
                die(f'no such candidate ID {cid!r}')

            try:
                print(f'fetching {cid} ... ', end='')
                sys.stdout.flush()
                cachedir = mgr._ensure_dir('cache_todo', cid)
                src.fetch_candidate(cid, cdata, cachedir)
                print('done')
            finally:
                cdata.close()
        except NotActionableError as e:
            print('not usable:', e)
            os.rename(os.path.join(cand_dir, cid), os.path.join(rej_dir, cid))
            os.rmdir(cachedir)


# The "init" subcommand

def init_setup_parser(parser):
    parser.add_argument(
        '--azure-conn-env',
        metavar = 'ENV-VAR-NAME',
        help = 'The name of an environment variable contain an Azure Storage '
                'connection string'
    )
    parser.add_argument(
        '--azure-container',
        metavar = 'CONTAINER-NAME',
        help = 'The name of a blob container in the Azure storage account'
    )
    parser.add_argument(
        '--azure-path-prefix',
        metavar = 'PATH-PREFIX',
        help = 'A slash-separated path prefix for blob I/O within the container'
    )
    parser.add_argument(
        '--local',
        metavar = 'PATH',
        help = 'Use the local-disk I/O backend'
    )
    parser.add_argument(
        'workdir',
        nargs = '?',
        metavar = 'PATH',
        default = '.',
        help = 'The working directory for this processing session'
    )


def _pipeline_io_from_settings(settings):
    from . import azure_io, local_io

    if settings.local:
        return local_io.LocalPipelineIo(settings.local)

    if settings.azure_conn_env:
        conn_str = os.environ.get(settings.azure_conn_env)
        if not conn_str:
            die('--azure-conn-env=%s provided, but that environment variable is unset'
                % settings.azure_conn_env)

        if not settings.azure_container:
            die('--azure-container-name must be provided if --azure-conn-env is')

        path_prefix = settings.azure_path_prefix
        if not path_prefix:
            path_prefix = ''

        azure_io.assert_enabled()

        return azure_io.AzureBlobPipelineIo(
            conn_str,
            settings.azure_container,
            path_prefix
        )

    die('An I/O backend must be specified with the arguments --local or --azure-*')


def init_impl(settings):
    pipeio = _pipeline_io_from_settings(settings)
    os.makedirs(settings.workdir, exist_ok=True)
    pipeio.save_config(os.path.join(settings.workdir, 'toasty-store-config.yaml'))


# The "refresh" subcommand
#
# TODO: for large feeds, we should potentially add features to make it so that
# we don't re-check every single candidate that's ever been posted.

def refresh_setup_parser(parser):
    parser.add_argument(
        '--workdir',
        nargs = '?',
        metavar = 'PATH',
        default = '.',
        help = 'The working directory for this processing session'
    )


def refresh_impl(settings):
    from . import PipelineManager

    mgr = PipelineManager(settings.workdir)
    cand_dir = mgr._ensure_dir('candidates')
    rej_dir = mgr._ensure_dir('rejects')
    src = mgr.get_image_source()
    n_cand = 0
    n_saved = 0
    n_done = 0
    n_skipped = 0
    n_rejected = 0

    for cand in src.query_candidates():
        n_cand += 1
        uniq_id = cand.get_unique_id()

        if mgr._pipeio.check_exists(uniq_id, 'index.wtml'):
            n_done += 1
            continue  # skip already-done inputs

        if mgr._pipeio.check_exists(uniq_id, 'skip.flag'):
            n_skipped += 1
            continue  # skip inputs that are explicitly flagged

        cand_path = os.path.join(cand_dir, uniq_id)

        try:
            with open(cand_path, 'wb') as f:
                cand.save(f)
            n_saved += 1
        except NotActionableError as e:
            os.remove(cand_path)

            with open(os.path.join(rej_dir, uniq_id, 'wb')) as f:
                pass  # for now, just touch the file

            n_rejected += 1

    print(f'analyzed {n_cand} candidates from the image source')
    print(f'  - {n_saved} processing candidates saved')
    print(f'  - {n_rejected} rejected as definitely unusable')
    print(f'  - {n_done} were already done')
    print(f'  - {n_skipped} were already marked to be ignored')
    print()
    print('See the `candidates` directory for candidate image IDs.')


# Other subcommands not yet split out.

[docs]def pipeline_getparser(parser): subparsers = parser.add_subparsers(dest='pipeline_command') approve_setup_parser(subparsers.add_parser('approve')) fetch_setup_parser(subparsers.add_parser('fetch')) init_setup_parser(subparsers.add_parser('init')) parser = subparsers.add_parser('process-todos') parser.add_argument( '--workdir', nargs = '?', metavar = 'WORKDIR', default = '.', help = 'The local working directory', ) parser = subparsers.add_parser('publish') parser.add_argument( '--workdir', nargs = '?', metavar = 'WORKDIR', default = '.', help = 'The local working directory', ) refresh_setup_parser(subparsers.add_parser('refresh'))
[docs]def pipeline_impl(settings): from . import PipelineManager if settings.pipeline_command is None: print('Run the "pipeline" command with `--help` for help on its subcommands') return if settings.pipeline_command == 'approve': approve_impl(settings) elif settings.pipeline_command == 'fetch': fetch_impl(settings) elif settings.pipeline_command == 'init': init_impl(settings) elif settings.pipeline_command == 'process-todos': mgr = PipelineManager(settings.workdir) mgr.process_todos() elif settings.pipeline_command == 'publish': mgr = PipelineManager(settings.workdir) mgr.publish() elif settings.pipeline_command == 'refresh': refresh_impl(settings) else: die('unrecognized "pipeline" subcommand ' + settings.pipeline_command)