Source code for toasty.fits_tiler

# -*- mode: python; coding: utf-8 -*-
# Copyright 2013-2021 Chris Beaumont and the AAS WorldWide Telescope project
# Licensed under the MIT License.

"""
Tools for automagically tiling FITS files.

Instead of using these interfaces directly, most users should probably use the
all-in-one function :func:`toasty.tile_fits`.
"""

import os.path
from shutil import copyfile
from subprocess import Popen, PIPE, STDOUT, run
import tempfile

from astropy.coordinates import Angle
from astropy.utils.data import download_file
import reproject
from wwt_data_formats.enums import ProjectionType

from . import builder, pyramid, multi_tan, multi_wcs, TilingMethod


__all__ = [
    "FitsTiler",
]


[docs] class FitsTiler(object): """ A class to manage the tiling of a FITS file collection. Parameters ---------- coll : :class:`~toasty.collection.ImageCollection` A collection of one or more input FITS files out_dir : optional str, default None The output directory for the tiled FITS data. If None, a sensible default next to the first input file will be chosen. tiling_method : optional :class:`~toasty.TilingMethod` Whether the tiling process will be auto-detected or manually chosen. Defaults to auto detection based on the angular size of the imageset. """ coll = None "The input :class:`~toasty.collection.ImageCollection`." out_dir = None """The output directory. This can be set upon creation or left as None. If the latter, this attribute will be filled in after a successful invocation of the :meth:`tile` method. """ tiling_method = TilingMethod.AUTO_DETECT """Whether the tiling process will be auto-detected or manually chosen.""" builder = None """A :class:`~toasty.builder.Builder` describing the output dataset. This attribute is None upon creation of a class instance. It will be filled in after a successful invocation of the :meth:`tile` method. """ def __init__( self, coll, out_dir=None, tiling_method=TilingMethod.AUTO_DETECT, add_place_for_toast=True, ): self.coll = coll self.out_dir = out_dir self.tiling_method = tiling_method self.add_place_for_toast = add_place_for_toast if self.tiling_method == TilingMethod.AUTO_DETECT: if self._fits_covers_large_area(): self.tiling_method = TilingMethod.TOAST else: self.tiling_method = TilingMethod.TAN
[docs] def tile( self, cli_progress=False, parallel=None, override=False, **kwargs, ): """ Process the collection into a tile pyramid, using either a common tangential projection, TOAST, or HiPS. Parameters ---------- cli_progress : optional boolean, defaults to False If true, progress messages will be printed as the FITS files are being processed. parallel : integer or None (the default) The level of parallelization to use. If unspecified, defaults to using all CPUs. If the OS does not support fork-based multiprocessing, parallel processing is not possible and serial processing will be forced. Pass ``1`` to force serial processing. override : optional boolean, defaults to False By default, if the output directory already exists, the tiling process is skipped. If this argument is true, an existing output directory will be deleted if it exists. kwargs Settings for the tiling process. For example, ``blankval``. Returns ------- Self. Notes ----- After this function returns, the attributes :attr:`out_dir` and :attr:`builder` of *self* will be fully initialized. """ if self.out_dir is None: # Kind of hacky here ... Note that export_simple() might return a # generator so we can't just index the return value. for tup in self.coll.export_simple(): first_fits_path = tup[0] break first_file_name = first_fits_path.split(".gz")[0] self.out_dir = first_file_name[: first_file_name.rfind(".")] + "_tiled" if self.tiling_method == TilingMethod.HIPS: self.out_dir += "_HiPS" if self.tiling_method == TilingMethod.TOAST: self.out_dir += "_TOAST" if cli_progress: print(f"Tile output directory is `{self.out_dir}`") pio = pyramid.PyramidIO(self.out_dir, default_format="fits") self.builder = builder.Builder(pio) self.builder.set_name(self.out_dir.split("/")[-1]) if os.path.isdir(self.out_dir): if override: if cli_progress: print(f"Tile directory already exists -- removing") import shutil shutil.rmtree(self.out_dir) else: if cli_progress: print("Tile directory already exists -- reusing") if os.path.exists(os.path.join(self.out_dir, "properties")): self._copy_hips_properties_to_builder() return if self.tiling_method == TilingMethod.HIPS: self._tile_hips(cli_progress, parallel) elif self.tiling_method == TilingMethod.TOAST: self._tile_toast(cli_progress, parallel, **kwargs) else: self._tile_tan(cli_progress, parallel, **kwargs) self.builder.write_index_rel_wtml( add_place_for_toast=self.add_place_for_toast, ) return self
def _tile_tan(self, cli_progress, parallel, **kwargs): if self.coll._is_multi_tan(): if cli_progress: print("Tiling base layer in multi-TAN mode (step 1 of 2)") tile_processor = multi_tan.MultiTanProcessor(self.coll) tile_processor.compute_global_pixelization(self.builder) tile_processor.tile( self.builder.pio, cli_progress=cli_progress, parallel=parallel, **kwargs ) else: if cli_progress: print("Tiling base layer in multi-WCS mode (step 1 of 2)") tile_processor = multi_wcs.MultiWcsProcessor(self.coll) tile_processor.compute_global_pixelization(self.builder) tile_processor.tile( self.builder.pio, reproject.reproject_interp, cli_progress=cli_progress, parallel=parallel, **kwargs, ) if cli_progress: print("Downsampling (step 2 of 2)") self.builder.cascade(cli_progress=cli_progress, **kwargs) def _tile_toast(self, cli_progress, parallel, **kwargs): start = kwargs.pop("start", None) if start is None: start = 1 for image in self.coll.images(): if image.has_wcs(): level = pyramid.guess_base_layer_level(wcs=image.wcs) if level > start: start = level if cli_progress: import math print( "Assuming level {} (~{} arcmin/pixel) is appropriate".format( start, 21.095 / math.pow(2, start - 1) ) ) if cli_progress: print("Tiling base layer (step 1 of 2)") from .samplers import WcsSampler filters = [] for image in self.coll.images(): wcs_sampler = WcsSampler(data=image.asarray(), wcs=image.wcs) tile_filter = wcs_sampler.filter() sampler = wcs_sampler.sampler() self.builder.toast_base( sampler, start, cli_progress=cli_progress, tile_filter=tile_filter, parallel=parallel, ) filters.append(tile_filter) # Saving the WCS of the last image to have something to # point the imageset to. Of course, the WCS of any image # in the collection would result in an acceptable position. # but in case of image overlap, the last image is always # put on top, since it is processed last. last_wcs = image.wcs if cli_progress: print("Downsampling (step 2 of 2)") def tile_filters(tile): for filter in filters: if filter(tile): return True return False self.builder.cascade( cli_progress=cli_progress, tile_filter=tile_filters, parallel=parallel ) height, width = last_wcs._naxis self.builder.apply_wcs_info(wcs=last_wcs, width=width, height=height) def _tile_hips(self, cli_progress, _parallel): """ For later: we could try to honor _parallel here. """ if not self._is_java_installed(): raise Exception("Java is required to run to hipsgen") cached_hipsgen_path = download_file( "https://aladin.unistra.fr/java/Hipsgen.jar", show_progress=cli_progress, cache=True, pkgname="toasty", ) # Get the "simple export" version of the collection that we can pass to # hipsgen. try: export_info = self.coll.export_simple() except Exception as e: raise Exception( 'cannot export FITS collection in "simple" mode for passing to hipsgen' ) from e fits_paths = [] hdu_indexes = [] for tup in export_info: fits_paths.append(tup[0]) hdu_indexes.append(str(tup[1])) # If we give hipsgen a relative output directory it will end up # inside `in_dir` abs_out_dir = os.path.abspath(self.out_dir) out_base = os.path.basename(self.out_dir) with self._create_hipsgen_input_dir(fits_paths) as in_dir_path: # Some antivirus programs (at least Avast) disallow "java -jar" # invocation of files lacking ".jar" file extensions. Since we # cannot set the file extension of the file in the astropy cache, # we have to create a temporary copy with a ".jar" extension. hipsgen_path = os.path.join(in_dir_path, "hipsgen.jar") copyfile(cached_hipsgen_path, hipsgen_path) argv = [ "java", "-jar", "{0}".format(hipsgen_path), "in={0}".format(in_dir_path), "out={0}".format(abs_out_dir), "creator_did=ivo://aas.wwt.toasty/{0}".format(out_base), "hdu={0}".format(",".join(hdu_indexes)), "INDEX", "TILES", ] p = Popen( argv, stdout=PIPE, stderr=STDOUT, shell=False, ) # Even if we don't want to print the output, this loop is still useful since it waits until the HiPSgen process # is completed. for line in p.stdout: if cli_progress: print(line.decode("UTF-8"), end="") if p.wait() != 0: if cli_progress: m = "see its output printed above" else: m = "set `cli_progress=True` to see its output" raise Exception(f"an error occurred running hipsgen; {m}") if not os.path.isdir(self.out_dir): if cli_progress: m = "see its output printed above" else: m = "set `cli_progress=True` to see its output" raise Exception( f"output directory `{self.out_dir}` does not exist, meaning `hipsgen` probably failed; {m}" ) self._copy_hips_properties_to_builder() def _fits_covers_large_area(self): """ Here, "large" means "large enough that the TAN projection 'study' mode yields visually unacceptable results." """ corners = [] for desc in self.coll.descriptions(): corners.append(desc.wcs.pixel_to_world(0, 0)) corners.append(desc.wcs.pixel_to_world(desc.shape[0], 0)) corners.append(desc.wcs.pixel_to_world(desc.shape[0], desc.shape[1])) corners.append(desc.wcs.pixel_to_world(0, desc.shape[1])) # This is a naive N^2 search but the input would have to be pretty # pathological for it to make sense to try to be more efficient here. max_distance = Angle("0d") for compare_index in range(len(corners)): for index in range(compare_index + 1, len(corners)): distance = corners[compare_index].separation(corners[index]) if distance > max_distance: max_distance = distance return max_distance > Angle("20d") def _is_java_installed(self): try: java_version = run( ["java", "-version"], capture_output=True, text=True, check=True ) return ( "version" in java_version.stdout.lower() or "version" in java_version.stderr.lower() ) except: return False def _create_hipsgen_input_dir(self, fits_list): dir = tempfile.TemporaryDirectory() for fits_file in fits_list: link_name = os.path.basename(fits_file) absolute_path = os.path.abspath(fits_file) link_path = os.path.join(dir.name, link_name) os.symlink(src=absolute_path, dst=link_path) return dir def _copy_hips_properties_to_builder(self): hips_properties = dict() with open(os.path.join(self.out_dir, "properties")) as prop: for line in prop: if line[0] != "#": key_value_pair = line.split("=") hips_properties[key_value_pair[0].strip()] = key_value_pair[ 1 ].strip() self.builder.imgset.projection = ProjectionType.HEALPIX self.builder.imgset.file_type = "fits" self.builder.imgset.tile_levels = int(hips_properties["hips_order"]) self.builder.place.dec_deg = float(hips_properties["hips_initial_dec"]) self.builder.place.ra_hr = float(hips_properties["hips_initial_ra"]) / 15.0 self.builder.place.zoom_level = float(hips_properties["hips_initial_fov"]) self.builder.imgset.center_x = float(hips_properties["hips_initial_ra"]) self.builder.imgset.center_y = float(hips_properties["hips_initial_dec"]) self.builder.imgset.base_degrees_per_tile = float( hips_properties["hips_initial_fov"] ) pixel_cut = hips_properties.get("hips_pixel_cut") if pixel_cut is not None: pixel_cut = pixel_cut.split(" ") self.builder.imgset.pixel_cut_low = float(pixel_cut[0]) self.builder.imgset.pixel_cut_high = float(pixel_cut[1]) data_range = hips_properties.get("hips_data_range") if data_range is not None: data_range = data_range.split(" ") self.builder.imgset.data_min = float(data_range[0]) self.builder.imgset.data_max = float(data_range[1]) self.builder.imgset.url = "Norder{0}/Dir{1}/Npix{2}"