# -*- mode: python; coding: utf-8 -*-
# Copyright 2019-2022 the AAS WorldWide Telescope project.
# Licensed under the MIT License.
"""
Entrypoints for the "toasty pipeline" command-line tools.
"""
__all__ = """
pipeline_getparser
pipeline_impl
""".split()
from fnmatch import fnmatch
import glob
import os.path
import sys
from wwt_data_formats.cli import EnsureGlobsExpandedAction
from ..cli import die
from . import NotActionableError
def evaluate_imageid_args(searchdir, args):
"""
Figure out which image-ID's to process.
"""
matched_ids = set()
globs_todo = set()
for arg in args:
if glob.has_magic(arg):
globs_todo.add(arg)
else:
# If an ID is explicitly (non-gobbily) added, always add it to the
# list, without checking if it exists in `searchdir`. We could check
# for it in searchdir now, but we'll have to check later anyway, so
# we don't bother.
matched_ids.add(arg)
if len(globs_todo):
for filename in os.listdir(searchdir):
for g in globs_todo:
if fnmatch(filename, g):
matched_ids.add(filename)
break
return sorted(matched_ids)
# The "approve" subcommand
def approve_setup_parser(parser):
parser.add_argument(
"--workdir",
metavar="PATH",
default=".",
help="The working directory for this processing session",
)
parser.add_argument(
"cand_ids",
nargs="+",
action=EnsureGlobsExpandedAction,
metavar="IMAGE-ID",
help="Name(s) of image(s) to approve for publication (globs accepted)",
)
def approve_impl(settings):
from wwt_data_formats.folder import Folder, make_absolutizing_url_mutator
from . import PipelineManager
mgr = PipelineManager(settings.workdir)
mgr.ensure_config()
pub_url_prefix = mgr._config.get("publish_url_prefix")
if pub_url_prefix:
if pub_url_prefix[-1] != "/":
pub_url_prefix += "/"
proc_dir = mgr._ensure_dir("processed")
app_dir = mgr._ensure_dir("approved")
for cid in evaluate_imageid_args(proc_dir, settings.cand_ids):
if not os.path.isdir(os.path.join(proc_dir, cid)):
die(f"no such processed candidate ID {cid!r}")
index_path = os.path.join(proc_dir, cid, "index.wtml")
prefix = pub_url_prefix + cid + "/"
try:
f = Folder.from_file(os.path.join(proc_dir, cid, "index_rel.wtml"))
f.mutate_urls(make_absolutizing_url_mutator(prefix))
with open(index_path, "wt", encoding="utf8") as f_out:
f.write_xml(f_out)
except Exception as e:
print(
"error: failed to create index.wtml from index_rel.wtml",
file=sys.stderr,
)
try:
os.remove(index_path)
except Exception:
pass
raise
os.rename(os.path.join(proc_dir, cid), os.path.join(app_dir, cid))
# The "fetch" subcommand
def fetch_setup_parser(parser):
parser.add_argument(
"--workdir",
metavar="PATH",
default=".",
help="The working directory for this processing session",
)
parser.add_argument(
"cand_ids",
nargs="+",
action=EnsureGlobsExpandedAction,
metavar="CAND-ID",
help="Name(s) of candidate(s) to fetch and prepare for processing (globs accepted)",
)
def fetch_impl(settings):
from . import PipelineManager
mgr = PipelineManager(settings.workdir)
cand_dir = mgr._ensure_dir("candidates")
rej_dir = mgr._ensure_dir("rejects")
src = mgr.get_image_source()
for cid in evaluate_imageid_args(cand_dir, settings.cand_ids):
# Funky structure here is to try to ensure that cdata is closed in case
# a NotActionable happens, so that we can move the directory on Windows.
try:
try:
cdata = open(os.path.join(cand_dir, cid), "rb")
except FileNotFoundError:
die(f"no such candidate ID {cid!r}")
try:
print(f"fetching {cid} ... ", end="")
sys.stdout.flush()
cachedir = mgr._ensure_dir("cache_todo", cid)
src.fetch_candidate(cid, cdata, cachedir)
print("done")
finally:
cdata.close()
except NotActionableError as e:
print("not usable:", e)
os.rename(os.path.join(cand_dir, cid), os.path.join(rej_dir, cid))
os.rmdir(cachedir)
# The "init" subcommand
def init_setup_parser(parser):
parser.add_argument(
"--azure-conn-env",
metavar="ENV-VAR-NAME",
help="The name of an environment variable contain an Azure Storage "
"connection string",
)
parser.add_argument(
"--azure-container",
metavar="CONTAINER-NAME",
help="The name of a blob container in the Azure storage account",
)
parser.add_argument(
"--azure-path-prefix",
metavar="PATH-PREFIX",
help="A slash-separated path prefix for blob I/O within the container",
)
parser.add_argument(
"--local", metavar="PATH", help="Use the local-disk I/O backend"
)
parser.add_argument(
"workdir",
nargs="?",
metavar="PATH",
default=".",
help="The working directory for this processing session",
)
def _pipeline_io_from_settings(settings):
from . import azure_io, local_io
if settings.local:
return local_io.LocalPipelineIo(settings.local)
if settings.azure_conn_env:
conn_str = os.environ.get(settings.azure_conn_env)
if not conn_str:
die(
"--azure-conn-env=%s provided, but that environment variable is unset"
% settings.azure_conn_env
)
if not settings.azure_container:
die("--azure-container-name must be provided if --azure-conn-env is")
path_prefix = settings.azure_path_prefix
if not path_prefix:
path_prefix = ""
azure_io.assert_enabled()
return azure_io.AzureBlobPipelineIo(
conn_str, settings.azure_container, path_prefix
)
die("An I/O backend must be specified with the arguments --local or --azure-*")
def init_impl(settings):
pipeio = _pipeline_io_from_settings(settings)
os.makedirs(settings.workdir, exist_ok=True)
pipeio.save_config(os.path.join(settings.workdir, "toasty-store-config.yaml"))
# The "refresh" subcommand
#
# TODO: for large feeds, we should potentially add features to make it so that
# we don't re-check every single candidate that's ever been posted.
def refresh_setup_parser(parser):
parser.add_argument(
"--workdir",
nargs="?",
metavar="PATH",
default=".",
help="The working directory for this processing session",
)
def refresh_impl(settings):
from . import PipelineManager
mgr = PipelineManager(settings.workdir)
cand_dir = mgr._ensure_dir("candidates")
rej_dir = mgr._ensure_dir("rejects")
src = mgr.get_image_source()
n_cand = 0
n_saved = 0
n_done = 0
n_skipped = 0
n_rejected = 0
for cand in src.query_candidates():
n_cand += 1
uniq_id = cand.get_unique_id()
if mgr._pipeio.check_exists(uniq_id, "index.wtml"):
n_done += 1
continue # skip already-done inputs
if mgr._pipeio.check_exists(uniq_id, "skip.flag"):
n_skipped += 1
continue # skip inputs that are explicitly flagged
cand_path = os.path.join(cand_dir, uniq_id)
try:
with open(cand_path, "wb") as f:
cand.save(f)
n_saved += 1
except NotActionableError as e:
os.remove(cand_path)
with open(os.path.join(rej_dir, uniq_id, "wb")) as f:
pass # for now, just touch the file
n_rejected += 1
print(f"analyzed {n_cand} candidates from the image source")
print(f" - {n_saved} processing candidates saved")
print(f" - {n_rejected} rejected as definitely unusable")
print(f" - {n_done} were already done")
print(f" - {n_skipped} were already marked to be ignored")
print()
print("See the `candidates` directory for candidate image IDs.")
# Other subcommands not yet split out.
[docs]
def pipeline_getparser(parser):
subparsers = parser.add_subparsers(dest="pipeline_command")
def add_manager_command(name):
subp = subparsers.add_parser(name)
subp.add_argument(
"--workdir",
nargs="?",
metavar="WORKDIR",
default=".",
help="The local working directory",
)
return subp
approve_setup_parser(subparsers.add_parser("approve"))
fetch_setup_parser(subparsers.add_parser("fetch"))
add_manager_command("ignore-rejects")
init_setup_parser(subparsers.add_parser("init"))
add_manager_command("process-todos")
add_manager_command("publish")
refresh_setup_parser(subparsers.add_parser("refresh"))
[docs]
def pipeline_impl(settings):
from . import PipelineManager
if settings.pipeline_command is None:
print('Run the "pipeline" command with `--help` for help on its subcommands')
return
if settings.pipeline_command == "approve":
approve_impl(settings)
elif settings.pipeline_command == "fetch":
fetch_impl(settings)
elif settings.pipeline_command == "ignore-rejects":
mgr = PipelineManager(settings.workdir)
mgr.ignore_rejects()
elif settings.pipeline_command == "init":
init_impl(settings)
elif settings.pipeline_command == "process-todos":
mgr = PipelineManager(settings.workdir)
mgr.process_todos()
elif settings.pipeline_command == "publish":
mgr = PipelineManager(settings.workdir)
mgr.publish()
elif settings.pipeline_command == "refresh":
refresh_impl(settings)
else:
die('unrecognized "pipeline" subcommand ' + settings.pipeline_command)