Source code for fitsmap.convert

# MIT License
# Copyright 2023 Ryan Hausen and contributers

# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

"""Converts image files and catalogs into a leafletJS map."""

import copy
import csv
import logging
import os
import sys
from functools import partial
from itertools import (
    chain,
    filterfalse,
    groupby,
    islice,
    product,
    repeat,
    starmap,
)
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import cbor2
import mapbox_vector_tile as mvt
import matplotlib as mpl

mpl.use("Agg")  # need to use this for processes safe matplotlib
import numpy as np
import ray
import ray.util.queue as queue
from astropy.io import fits
from astropy.visualization import simple_norm
from astropy.wcs import WCS
from PIL import Image

import fitsmap.cartographer as cartographer
import fitsmap.padded_array as pa
import fitsmap.utils as utils
from fitsmap.output_manager import OutputManager
from fitsmap.supercluster import Supercluster

# https://github.com/zimeon/iiif/issues/11#issuecomment-131129062
Image.MAX_IMAGE_PIXELS = sys.maxsize

Shape = Tuple[int, int]

IMG_FORMATS = [".fits", ".fits.gz", ".jpg", ".png"]
CAT_FORMAT = ["cat"]
IMG_ENGINE_PIL = "PIL"
IMG_ENGINE_MPL = "MPL"
MPL_CMAP = "gray"

# MPL SINGLETON ENGINE =========================================================
mpl_f, mpl_img, mpl_alpha_f, mpl_norm = None, None, None, None
# ==============================================================================

MIXED_WHITESPACE_DELIMITER = "mixed_ws"


[docs] def build_path(z, y, x, out_dir) -> str: """Maps zoom and coordinate location to a subdir in ``out_dir`` Args: z (int): The zoom level for the tiles y (int): The zoom level for the tiles x (int): The zoom level for the tiles out_dir (str): The root directory the tiles are saved in Returns: The str path to save the tile in """ z, y, x = str(z), str(y), str(x) z_dir = os.path.join(out_dir, z) y_dir = os.path.join(z_dir, y) img_path = os.path.join(y_dir, "{}.png".format(x)) return img_path
[docs] def slice_idx_generator( shape: Tuple[int, int], zoom: int, tile_size: int ) -> Iterable[Tuple[int, int, int, slice, slice]]: dim0_tile_fraction = shape[0] / tile_size dim1_tile_fraction = shape[1] / tile_size if dim0_tile_fraction < 1 or dim1_tile_fraction < 1: raise StopIteration() num_tiles_dim0 = int(np.ceil(dim0_tile_fraction)) num_tiles_dim1 = int(np.ceil(dim1_tile_fraction)) tile_idxs_dim0 = [i * tile_size for i in range(num_tiles_dim0 + 1)] tile_idxs_dim1 = [i * tile_size for i in range(num_tiles_dim1 + 1)] def pair_runner(coll: List[int]) -> List[slice]: return [slice(c0, c1) for c0, c1 in zip(coll[:-1], coll[1:])] row_slices = pair_runner(tile_idxs_dim0) col_slices = pair_runner(tile_idxs_dim1) rows = zip(range(num_tiles_dim0 - 1, -1, -1), row_slices) cols = enumerate(col_slices) rows_cols = product(rows, cols) def transform_iteration(row_col): ((y, slice_y), (x, slice_x)) = row_col return (zoom, y, x, slice_y, slice_x) return map(transform_iteration, rows_cols)
[docs] def balance_array(array: np.ndarray) -> pa.PaddedArray: """Pads input array with zeros so that the long side is a multiple of the short side. Args: array (np.ndarray): array to balance Returns: a balanced version of ``array`` via a PaddedArray """ dim0, dim1 = array.shape[0], array.shape[1] exp_val = np.ceil(np.log2(max(dim0, dim1))) total_size = 2**exp_val pad_dim0 = int(total_size - dim0) pad_dim1 = int(total_size - dim1) return pa.PaddedArray(array, (pad_dim0, pad_dim1))
[docs] def get_array(file_location: str) -> pa.PaddedArray: """Opens the array at ``file_location`` can be an image or a fits file Args: file_location (str): the path to the image Returns: A numpy array representing the image. """ if file_location.endswith(".fits") or file_location.endswith(".fits.gz"): array = fits.getdata(file_location).astype(np.float32) shape = array.shape if len(shape) != 2: raise ValueError("FitsMap only supports 2D FITS files.") else: array = np.flipud(Image.open(file_location)).astype(np.float32) return balance_array(array)
[docs] def filter_on_extension( files: List[str], extensions: List[str], exclude_predicate: Callable = None ) -> List[str]: """Filters out files from ``files`` based on ``extensions`` and ``exclude_predicate`` Args: files (List[str]): A list of file paths to be filtered extensions (List[str]): List of extensions to filter ``files`` on exclude_predicate (Callable): A function that accepts a single str as input and returns a True if the file should be excluded, and False if it should be included Returns: A list of files which have an extension thats in ``extensions`` and for which `exclude_predicate(file)==False`` """ neg_predicate = exclude_predicate if exclude_predicate else lambda x: False return list( filter( lambda s: any((s.endswith(e) for e in extensions)) and not neg_predicate(s), files, ) )
[docs] def make_dirs(out_dir: str, min_zoom: int, max_zoom: int) -> None: """Builds the directory tree for storing image tiles. Args: out_dir (str): The root directory to generate the tree in min_zoom (int): The minimum zoom level the image will be tiled at max_zoom (int): The maximum zoom level the image will be tiled at Returns: None """ def build_z_ys(z, ys): list( map( lambda y: os.makedirs(os.path.join(out_dir, f"{z}/{y}"), exist_ok=True), ys, ) ) def build_zs(z): ys = range(int(np.sqrt(4**z))) build_z_ys(z, ys) zs = range(min_zoom, max_zoom + 1) list(map(build_zs, zs))
[docs] def get_zoom_range( shape: Tuple[int, int], tile_size: Tuple[int, int] ) -> Tuple[int, int]: """Returns the supported native zoom range for an give image size and tile size. Args: shape (Tuple[int, int]): The shape that is going to be tiled tile_size (Tuple[int, int]): The size of the image tiles Returns: A tuple containing the (minimum zoom level, maximum zoom level) """ long_side = max(shape[:2]) short_side = min(shape[:2]) max_zoom = int(np.log2(long_side / tile_size[0])) min_zoom = int(np.ceil(np.log2(long_side / short_side))) return min_zoom, max_zoom
[docs] def get_total_tiles(min_zoom: int, max_zoom: int) -> int: """Returns the total number of tiles that will be generated from an image. Args: min_zoom (int): The minimum zoom level te image will be tiled at max_zoom (int): The maximum zoom level te image will be tiled at Returns: The total number of tiles that will be generated """ return int(sum([4**i for i in range(min_zoom, max_zoom + 1)]))
[docs] def imread_default(path: str, default: np.ndarray) -> np.ndarray: """Opens an image if it exists, if not returns a tranparent image. Args: path (str): Image file location size (int, optional): The image size, assumed square. Defaults to 256. Returns: np.ndarray: the image if it exists. if not, a transparent image of size (size, size, 4). """ try: return np.flipud(Image.open(path)) except FileNotFoundError: return default
[docs] def make_tile_mpl( mpl_norm: mpl.colors.Normalize, mpl_cmap: mpl.colors.Colormap, tile: np.ndarray ) -> np.ndarray: """Converts array data into an image using matplotlib. Args: mpl_f (mpl.figure.Figure): The matplotlib figure to use mpl_img (mpl.image.AxesImage): The matplotlib image to use mpl_alpha_f (Callable[[np.ndarray], np.ndarray]): A function that converts the input array into an RGBA tile (np.ndarray): The array data Returns: np.ndarray: The array data converted into an image using Matplotlib """ if isinstance(mpl_norm, ray._raylet.ObjectRef): mpl_norm = ray.get(mpl_norm) mpl_cmap = ray.get(mpl_cmap) tile_image = mpl_cmap(mpl_norm(np.flipud(tile))) return Image.fromarray((tile_image * 255).astype(np.uint8))
[docs] def make_tile_pil(tile: np.ndarray) -> np.ndarray: """Converts the input array into an image using PIL Args: tile (np.ndarray): The array data to be converted Returns: np.ndarray: an RGBA version of the input data """ # black and white pngs have a single channel if len(tile.shape) < 3: img_tile = np.dstack([tile, tile, tile, np.ones_like(tile) * 255]) # RGB images need an alpha channel added elif tile.shape[2] == 3: img_tile = np.concatenate( (tile, np.ones(list(tile.shape[:-1]) + [1], dtype=np.float32) * 255), axis=2, ) # RGBA images are already in the correct format elif tile.shape[2] == 4: img_tile = np.array(tile) else: raise ValueError("Image must be 2D or 3D with 1, 3, or 4 channels") ys, xs = np.where(np.isnan(np.atleast_3d(tile)[:, :, 0])) img_tile[ys, xs, :] = np.array([0, 0, 0, 0], dtype=np.float32) img = Image.fromarray(np.flipud(img_tile).astype(np.uint8)) return img
[docs] def mem_safe_make_tile( out_dir: str, tile_f: Callable[[np.ndarray], np.ndarray], array: np.ndarray, job: Union[ Tuple[int, int, int, slice, slice], List[Tuple[int, int, int, slice, slice]] ], ) -> None: """Extracts a tile from ``array`` and saves it at the proper place in ``out_dir`` using PIL. Args: out_dir (str): The directory to save tile in tile_f (Callable[[np.ndarray], np.ndarray]): A function that converts a subset of the image array into an png tile. Is one of `make_tile_pil` or `make_tile_mpl` array (np.ndarray): Array to extract a slice from job (Tuple[int, int, int, slice, slice]): A tuple containing z, y, x, dim0_slices, dim1_slices. Where (z, y, x) define the zoom and the coordinates, and (dim0_slices, and dim1_slices) are slice objects that extract the tile. Returns: None """ default_array = np.zeros([256, 256, 4], dtype=np.uint8) get_img = partial(imread_default, default=default_array) # If this is being run in parallel it will be a List of Tuples if its being # run serially it will be a single Tuple, wrap the Tuple in a list to keep # it simple jobs = [job] if isinstance(job, tuple) else job for z, y, x, slice_ys, slice_xs in jobs: try: img_path = build_path(z, y, x, out_dir) with_out_dir = partial(os.path.join, out_dir) if os.path.exists(with_out_dir(f"{z + 1}")): top_left = get_img( with_out_dir(f"{z + 1}", f"{2 * y + 1}", f"{2 * x}.png") ) top_right = get_img( with_out_dir(f"{z + 1}", f"{2 * y + 1}", f"{2 * x + 1}.png") ) bottom_left = get_img( with_out_dir(f"{z + 1}", f"{2 * y}", f"{2 * x}.png") ) bottom_right = get_img( with_out_dir(f"{z + 1}", f"{2 * y}", f"{2 * x + 1}.png") ) tile = np.concatenate( [ np.concatenate([top_left, top_right], axis=1), np.concatenate([bottom_left, bottom_right], axis=1), ], axis=0, ) img = Image.fromarray(np.flipud(tile)) else: tile = array[slice_ys, slice_xs] img = tile_f(tile) # if the tile is all transparent, don't save to disk if np.any(np.array(img)[..., -1]): img.thumbnail([256, 256], Image.Resampling.LANCZOS) img.convert("RGBA").save(img_path, "PNG") else: pass except Exception as e: print("Tile creation failed for tile:", z, y, x, slice_ys, slice_xs) print(e)
[docs] def build_mpl_objects( array: np.ndarray, norm_kwargs: Dict[str, Any] ) -> Tuple[mpl.colors.Normalize, mpl.colors.Colormap]: """Builds the matplotlib objects that norm and convert fits data to RGB. Args: array (np.ndarray): The array to be tiled norm_kwargs (Dict[str, Any]): The kwargs to be passed to `simple_norm` Returns: Tuple[mpl.colors.Normalize, mpl.colors.Colormap]: The matplotlib objects needed to create a tile """ mpl_norm = simple_norm(array, **norm_kwargs) mpl_cmap = copy.copy(mpl.colormaps[MPL_CMAP]) mpl_cmap.set_bad(color=(0, 0, 0, 0)) return mpl_norm, mpl_cmap
[docs] def tile_img( file_location: str, pbar_ref: Tuple[int, queue.Queue], tile_size: Shape = [256, 256], min_zoom: int = 0, out_dir: str = ".", mp_procs: int = 0, norm_kwargs: dict = {}, batch_size: int = 1000, ) -> None: """Extracts tiles from the array at ``file_location``. Args: file_location (str): The file location of the image to tile pbar_loc (int): The index of the location of to print the tqdm bar tile_size (Tuple[int, int]): The pixel size of the tiles in the map min_zoom (int): The minimum zoom to create tiles for. The default value is 0, but if it can be helpful to set it to a value greater than zero if your running out of memory as the lowest zoom images can be the most memory intensive. out_dir (str): The root directory to save the tiles in mp_procs (int): The number of multiprocessing processes to use for generating tiles. norm_kwargs (dict): Optional normalization keyword arguments passed to `astropy.visualization.simple_norm`. The default is linear scaling using min/max values. See documentation for more information: https://docs.astropy.org/en/stable/api/astropy.visualization.mpl_normalize.simple_norm.html batch_size (int): The number of tiles to process at a time, when tiling in parallel. The default is 1000. Returns: None """ _, fname = os.path.split(file_location) if get_map_layer_name(file_location) in os.listdir(out_dir): OutputManager.write(pbar_ref, f"{fname} already tiled. Skipping tiling.") return # get image array = get_array(file_location) # if we're using matplotlib we need to instantiate the matplotlib objects # before we pass them to ray image_engine = ( IMG_ENGINE_MPL if (file_location.endswith(".fits") or file_location.endswith(".fits.gz")) else IMG_ENGINE_PIL ) if image_engine == IMG_ENGINE_MPL: image_norm = norm_kwargs.get(os.path.basename(file_location), norm_kwargs) mpl_norm, mpl_cmap = build_mpl_objects(array.array, image_norm) zooms = get_zoom_range(array.shape, tile_size) min_zoom = max(min_zoom, zooms[0]) max_zoom = zooms[1] # build directory structure name = get_map_layer_name(file_location) tile_dir = os.path.join(out_dir, name) if name not in os.listdir(out_dir): os.mkdir(tile_dir) make_dirs(tile_dir, min_zoom, max_zoom) tile_params = chain.from_iterable( [ slice_idx_generator(array.shape, z, 256 * (2**i)) for (i, z) in enumerate(range(max_zoom, min_zoom - 1, -1), start=0) ] ) total_tiles = get_total_tiles(min_zoom, max_zoom) if mp_procs > 1: # We need to process batches to offset the cost of spinning up a process def batch_params(iter, batch_size): while True: batch = list(islice(iter, batch_size)) if batch: yield batch else: break # Put the array in the ray object store. After it's there remove the # reference to it so that it can be garbage collected. arr_obj_id = ray.put(array) del array if image_engine == IMG_ENGINE_MPL: mpl_norm = ray.put(mpl_norm) mpl_cmap = ray.put(mpl_cmap) tile_f = partial( make_tile_mpl, mpl_norm, mpl_cmap, ) else: tile_f = make_tile_pil make_tile_f = ray.remote(num_cpus=1)(mem_safe_make_tile) OutputManager.set_description(pbar_ref, f"Converting {name}") OutputManager.set_units_total(pbar_ref, unit="tile", total=total_tiles) # in parallel we have to go one zoom level at a time because the images # at lower zoom levels are dependent on the tiles are higher zoom levels for zoom, jobs in groupby(tile_params, lambda z: z[0]): if zoom == max_zoom: utils.backpressure_queue_ray( make_tile_f.remote, list( zip( repeat(tile_dir), repeat(tile_f), repeat(arr_obj_id), batch_params(jobs, batch_size), ) ), pbar_ref, mp_procs, batch_size, ) else: if "arr_obj_id" in locals(): del arr_obj_id work = list( zip( repeat(tile_dir), repeat(make_tile_pil), repeat(None), batch_params(jobs, batch_size), ) ) utils.backpressure_queue_ray( make_tile_f.remote, work, pbar_ref, mp_procs, batch_size, ) else: if image_engine == IMG_ENGINE_MPL: tile_f = partial(make_tile_mpl, mpl_norm, mpl_cmap) else: tile_f = make_tile_pil work = partial(mem_safe_make_tile, tile_dir, tile_f, array) jobs = tile_params OutputManager.set_description(pbar_ref, f"Converting {name}") OutputManager.set_units_total(pbar_ref, "tile", total_tiles) for job in jobs: if job[0] == max_zoom: work(job) OutputManager.update(pbar_ref, 1) else: break del array work = partial(mem_safe_make_tile, tile_dir, image_engine, None) work(job) OutputManager.update(pbar_ref, 1) for job in jobs: work(job) OutputManager.update(pbar_ref, 1) OutputManager.update_done(pbar_ref)
[docs] def get_map_layer_name(file_location: str) -> str: """Tranforms a ``file_location`` into the javascript layer name. Args: file_location (str): The file location to convert Returns: The javascript name that will be used in the HTML map """ _, fname = os.path.split(file_location) name = ( os.path.splitext(fname)[0] .replace(".", "_") .replace("-", "_") .replace("(", "_") .replace(")", "_") ) return name
[docs] def get_marker_file_name(file_location: str) -> str: """Tranforms a ``file_location`` into the javascript marker file name. Args: file_location (str): The file location to convert Returns: The javascript name that will be used in the HTML map """ return os.path.split(file_location)[1] + ".js"
[docs] def line_to_cols(raw_col_vals: str) -> List[str]: """Transform a raw text line of column names into a list of column names Args: raw_line (str): String from textfile Returns: A list of the column names in order """ change_case = [ "RA", "DEC", "Ra", "Dec", "X", "Y", "ID", "iD", "Id", "A", "B", "THETA", "Theta", ] # make ra and dec lowercase for ease of access raw_cols = list( map( lambda s: s.lower() if s in change_case else s, raw_col_vals, ) ) # if header line starts with a '#' exclude it if raw_cols[0] == "#": return raw_cols[1:] else: return raw_cols
[docs] def line_to_json( wcs: WCS, columns: List[str], catalog_assets_path: str, src_vals: List[str], catalog_starts_at_one: bool = 1, ) -> Dict[str, Any]: """Transform a raw text line attribute values into a JSON marker Args: raw_line (str): String from the marker file Returns: A list of the column names in order """ src_id = str(src_vals[columns.index("id")]) if "x" in columns and "y" in columns: img_x = float(src_vals[columns.index("x")]) img_y = float(src_vals[columns.index("y")]) else: ra = float(src_vals[columns.index("ra")]) dec = float(src_vals[columns.index("dec")]) [[img_x, img_y]] = wcs.wcs_world2pix([[ra, dec]], int(catalog_starts_at_one)) if "a" in columns and "b" in columns and "theta" in columns: a = float(src_vals[columns.index("a")]) b = float(src_vals[columns.index("b")]) b = a if np.isnan(b) else b theta = float(src_vals[columns.index("theta")]) theta = 0 if np.isnan(theta) else theta else: a = -1 b = -1 theta = -1 # Leaflet is 0 indexed, so if the catalog starts at 1 we need to offset # The convention for leaflet is to place markers at the lower left corner # of a pixel, to center the marker on the pixel, we subtract 1 to bring it # to leaflet convention and add 0.5 to move it to the center of the pixel. x = (img_x - int(catalog_starts_at_one)) + 0.5 y = (img_y - int(catalog_starts_at_one)) + 0.5 src_vals += [y, x, catalog_assets_path.split(os.sep)[-1]] src_json = os.path.join(catalog_assets_path, f"{src_id}.cbor") with open(src_json, "wb") as f: cbor2.dump(dict(id=src_id, v=src_vals), f) return dict( geometry=dict( coordinates=[x, y], ), tags=dict( a=a, b=b, theta=theta, catalog_id=src_id, cat_path=os.path.basename(catalog_assets_path), ), )
[docs] def process_catalog_file_chunk( process_f: Callable, fname: str, delimiter: str, q: queue.Queue, start: int, end: int, ) -> List[dict]: # newline="" for csv reader, see # https://docs.python.org/3/library/csv.html#csv.reader f = open(fname, "r", newline="") f.seek(start) f.readline() # id start==0 skip cols, else advance to next complete line json_lines = [] raw_lines = [] reader = csv.reader(raw_lines, delimiter=delimiter, skipinitialspace=True) current = f.tell() update_every = 10000 count = 1 while current < end: raw_lines.append(f.readline().strip()) processed_lines = next(reader) json_lines.append(process_f(processed_lines)) current = f.tell() count += 1 if count % update_every == 0: q.put(count) count = 1 if count > 1: q.put(count) f.close() return json_lines
def _simplify_mixed_ws(catalog_fname: str) -> None: with open(catalog_fname, "r") as f: lines = [line.strip() for line in f.readlines()] with open(catalog_fname, "w") as f: for line in lines: f.write(" ".join([token.strip() for token in line.split()]) + "\n")
[docs] def make_marker_tile( cluster: Supercluster, out_dir: str, job: Union[Tuple[int, Tuple[int, int]], List[Tuple[int, Tuple[int, int]]]], ) -> None: jobs = [job] if isinstance(job, tuple) else job for z, (y, x) in jobs: if not os.path.exists(os.path.join(out_dir, str(z))): os.mkdir(os.path.join(out_dir, str(z))) if not os.path.exists(os.path.join(out_dir, str(z), str(y))): os.mkdir(os.path.join(out_dir, str(z), str(y))) out_path = os.path.join(out_dir, str(z), str(y), f"{x}.pbf") tile_sources = cluster.get_tile(z, x, y) if tile_sources: tile_sources["name"] = "Points" for i in range(len(tile_sources["features"])): tile_sources["features"][i]["geometry"] = ( "POINT(0 0)" # we dont' use this ) encoded_tile = mvt.encode([tile_sources], extents=256) with open(out_path, "wb") as f: f.write(encoded_tile)
[docs] def tile_markers( wcs_file: str, out_dir: str, catalog_delim: str, mp_procs: int, prefer_xy: bool, cluster_min_points: int, cluster_radius: float, cluster_node_size: int, min_zoom: int, max_zoom: int, tile_size: int, max_x: int, max_y: int, catalog_starts_at_one: bool, catalog_file: str, pbar_ref: Tuple[int, queue.Queue], batch_size: int = 500, ) -> None: _, fname = os.path.split(catalog_file) catalog_layer_name = get_map_layer_name(catalog_file) if catalog_layer_name in os.listdir(out_dir): OutputManager.write(pbar_ref, f"{fname} already tiled. Skipping tiling.") return else: os.mkdir(os.path.join(out_dir, catalog_layer_name)) if catalog_delim == MIXED_WHITESPACE_DELIMITER: _simplify_mixed_ws(catalog_file) catalog_delim = " " with open(catalog_file, "r", newline="") as f: csv_reader = csv.reader(f, delimiter=catalog_delim, skipinitialspace=True) columns = line_to_cols(next(csv_reader)) ra_dec_coords = "ra" in columns and "dec" in columns x_y_coords = "x" in columns and "y" in columns use_xy = (not ra_dec_coords) or (prefer_xy) assert ra_dec_coords or x_y_coords, ( catalog_file + " is missing coordinate columns (ra/dec, xy)," ) assert "id" in columns, catalog_file + " missing 'id' column" assert use_xy or (wcs_file is not None), ( catalog_file + " uses ra/dec coords, but a WCS file wasn't provided." ) wcs = WCS(wcs_file) if wcs_file else None with open(os.path.join(out_dir, f"{catalog_layer_name}.columns"), "w") as f: f.write(",".join(columns)) catalog_assets_parent_path = os.path.join(out_dir, "catalog_assets") if "catalog_assets" not in os.listdir(out_dir): os.mkdir(catalog_assets_parent_path) catalog_assets_path = os.path.join(catalog_assets_parent_path, catalog_layer_name) if catalog_layer_name not in os.listdir(catalog_assets_parent_path): os.mkdir(catalog_assets_path) line_func = partial( line_to_json, wcs, columns, catalog_assets_path, catalog_starts_at_one=catalog_starts_at_one, ) OutputManager.set_description(pbar_ref, f"Parsing {catalog_file}") catalog_file_size = os.path.getsize(catalog_file) q = queue.Queue() process_f = partial( process_catalog_file_chunk, line_func, catalog_file, catalog_delim, q ) if mp_procs > 1: process_f = ray.remote(num_cpus=1)(process_catalog_file_chunk) boundaries = np.linspace(0, catalog_file_size, mp_procs + 1, dtype=np.int64) remote_val_ids = list( starmap( process_f.remote, zip( repeat(line_func), repeat(catalog_file), repeat(catalog_delim), repeat(q), boundaries[:-1], boundaries[1:], ), ) ) _, remaining = ray.wait(remote_val_ids, timeout=0.01, fetch_local=False) while remaining: try: lines_processed = sum([q.get(timeout=0.0001) for _ in range(mp_procs)]) OutputManager.update(pbar_ref, lines_processed) except queue.Empty: pass _, remaining = ray.wait(remaining, timeout=0.01, fetch_local=False) q.shutdown() catalog_values = list(chain.from_iterable(list(ray.get(remote_val_ids)))) del q else: catalog_values = process_f(0, catalog_file_size) q.shutdown() del q OutputManager.set_description( pbar_ref, f"Clustering {catalog_file}(THIS MAY TAKE A WHILE)" ) OutputManager.set_units_total( pbar_ref, unit="zoom levels", total=max_zoom + 1 - min_zoom ) if cluster_radius is None: cluster_radius = max(max(max_x, max_y) / tile_size, 40) if cluster_node_size is None: cluster_node_size = np.log2(len(catalog_values)) * 2 # cluster the parsed sources # need to get super cluster stuff in here clusterer = Supercluster( min_zoom=min_zoom, max_zoom=max_zoom - 1, min_points=cluster_min_points, extent=tile_size, radius=cluster_radius, node_size=cluster_node_size, alternate_CRS=(max_x, max_y), update_f=lambda: OutputManager.update(pbar_ref, 1), log=True, ).load(catalog_values) # Don't push any updates from the clustering algorithm after clustering # has been completed clusterer.update_f = None clusterer.log = False # tile the sources and save using protobuf zs = range(min_zoom, max_zoom + 1) ys = [range(2**z) for z in zs] xs = [range(2**z) for z in zs] tile_idxs = list( chain.from_iterable( [zip(repeat(zs[i]), product(ys[i], xs[i])) for i in range(len(zs))] ) ) catalog_layer_dir = os.path.join(out_dir, catalog_layer_name) OutputManager.set_description(pbar_ref, f"Tiling {catalog_file}") OutputManager.set_units_total(pbar_ref, unit="tile", total=len(tile_idxs)) if mp_procs > 1: # We need to process batches to offset the cost of spinning up a process def batch_params(iter, batch_size): while True: batch = list(islice(iter, batch_size)) if batch: yield batch else: break tile_f = ray.remote(num_cpus=1)(make_marker_tile) cluster_remote_id = ray.put(clusterer) utils.backpressure_queue_ray( tile_f.remote, list( zip( repeat(cluster_remote_id), repeat(catalog_layer_dir), batch_params(iter(tile_idxs), batch_size), ) ), pbar_ref, mp_procs, batch_size, ) else: for zyx in tile_idxs: make_marker_tile(clusterer, catalog_layer_dir, zyx) OutputManager.update(pbar_ref, 1)
[docs] def files_to_map( files: List[str], out_dir: str = ".", title: str = "FitsMap", task_procs: int = 0, procs_per_task: int = 0, catalog_delim: str = ",", cat_wcs_fits_file: Optional[str] = None, max_catalog_zoom: int = -1, tile_size: Tuple[int, int] = [256, 256], norm_kwargs: dict = {}, rows_per_column: Optional[int] = None, n_columns: int = 1, prefer_xy: bool = False, catalog_starts_at_one: bool = True, img_tile_batch_size: int = 1000, pixel_scale: float = 1.0, units_are_pixels: bool = True, cluster_min_points: int = 2, cluster_radius: Optional[float] = None, cluster_node_size: Optional[int] = None, ) -> None: """Converts a list of files into a LeafletJS map. Args: files (List[str]): List of files to convert into a map, can include image files (.fits, .png, .jpg) and catalog files (.cat) out_dir (str): Directory to place the genreated web page and associated subdirectories title (str): The title to placed on the webpage task_procs (int): The number of tasks to run in parallel procs_per_task (int): The number of tiles to process in parallel catalog_delim (str): The delimited for catalog (.cat) files. Deault is whitespace. cat_wcs_fits_file (str): A fits file that has the WCS that will be used to map ra and dec coordinates from the catalog files to x and y coordinates in the map max_catalog_zoom (int): The zoom level to stop clustering on, the default is the max zoom level of the image. For images with a high source density, setting this higher than the max zoom will help with performance. tile_size (Tuple[int, int]): The tile size for the leaflet map. Currently only [256, 256] is supported. norm_kwargs (Union[Dict[str, Any], Dict[str, Dict[str, Any]]]): Optional normalization keyword arguments passed to `astropy.visualization.simple_norm`. Can either be a single dictionary of keyword arguments, or a dictionary of keyword arguments for each image where the keys are the image names not full paths. The default is linear scaling using min/max values. See documentation for more information: https://docs.astropy.org/en/stable/api/astropy.visualization.mpl_normalize.simple_norm.html rows_per_column (Optional[int]): (deprecated) please use `n_columns` instead n_columns (int): If converting a catalog, the number of columns to use when displaying the values in the popup. The default displays everything in a single column. prefer_xy (bool): If True x/y coordinates should be preferred if both ra/dec and x/y are present in a catalog catalog_starts_at_one (bool): True if the catalog is 1 indexed, False if the catalog is 0 indexed img_tile_batch_size (int): The number of image tiles to process in parallel when task_procs > 1 pixel_scale (float): The pixel scale of the image in either arcsec/pix or pixels, depending on the value of `units_are_pixels` units_are_pixels (bool): If True, the pixel scale is in pixels, if False, the pixel scale is in arcsec/pix cluster_min_points (int): The minimum points to form a catalog cluster cluster_radius (Optional[float]): The radius of each cluster in pixels. cluster_node_size (Optional[int]): The size for the kd-tree leaf mode, afftects performance. Example of image specific norm_kwargs vs single norm_kwargs: >>> norm_kwargs = { >>> "test.fits": {"stretch": "log", "min_percent": 1, "max_percent": 99.5}, >>> "test2.fits": {"stretch": "linear", "min_percent": 5, "max_percent": 99.5}, >>> } >>> # or >>> norm_kwargs = {"stretch": "log", "min_percent": 1, "max_percent": 99.5} Returns: None """ if rows_per_column is not None: raise DeprecationWarning( "rows_per_column is deprecated, use `n_columns` instead" ) assert len(files) > 0, "No files provided `files` is an empty list" unlocatable_files = list(filterfalse(os.path.exists, files)) assert len(unlocatable_files) == 0, f"Files not found:{unlocatable_files}" if not os.path.exists(out_dir): os.makedirs(out_dir) if "js" not in os.listdir(out_dir): os.mkdir(os.path.join(out_dir, "js")) if "css" not in os.listdir(out_dir): os.mkdir(os.path.join(out_dir, "css")) # build image tasks img_files = filter_on_extension(files, IMG_FORMATS) img_layer_names = list(map(get_map_layer_name, img_files)) img_f_kwargs = dict( tile_size=tile_size, out_dir=out_dir, mp_procs=procs_per_task, norm_kwargs=norm_kwargs, batch_size=img_tile_batch_size, ) # we want to init ray in such a way that it doesn't print any output # to the console. These should be changed during development debug = os.getenv("FITSMAP_DEBUG", "False").lower() == "true" if ray.is_initialized(): ray.shutdown() ray.init( include_dashboard=debug, # during dev == True configure_logging=not debug, # during dev == False logging_level=( logging.INFO if debug else logging.CRITICAL ), # during dev == logging.INFO, test == logging.CRITICAL log_to_driver=debug, # during dev = True ) if task_procs > 1: img_task_f = ray.remote(num_cpus=1)(tile_img).remote else: img_task_f = tile_img # build catalog tasks cat_files = filter_on_extension(files, CAT_FORMAT) cat_layer_names = list(map(get_map_layer_name, cat_files)) max_x, max_y = utils.peek_image_info(img_files) max_dim = max(max_x, max_y) if len(cat_files) > 0: # get highlevel image info for catalogging function max_zoom = int(np.log2(2 ** np.ceil(np.log2(max_dim)) / tile_size[0])) max_dim = 2**max_zoom * tile_size[0] if max_catalog_zoom == -1: max_zoom = int(np.log2(2 ** np.ceil(np.log2(max_dim)) / tile_size[0])) else: max_zoom = max_catalog_zoom if task_procs > 1: cat_task_f = ray.remote(num_cpus=1)(tile_markers).remote else: cat_task_f = tile_markers cat_f_kwargs = dict( wcs_file=cat_wcs_fits_file, out_dir=out_dir, catalog_delim=catalog_delim, mp_procs=procs_per_task, prefer_xy=prefer_xy, min_zoom=0, max_zoom=max_zoom, tile_size=tile_size[0], max_x=max_dim, max_y=max_dim, catalog_starts_at_one=catalog_starts_at_one, cluster_min_points=cluster_min_points, cluster_radius=cluster_radius, cluster_node_size=cluster_node_size, ) else: cat_task_f = None cat_f_kwargs = dict() output_manager = OutputManager() img_tasks = zip( repeat(img_task_f), map( lambda x: dict(**x[0], **x[1]), zip( repeat(img_f_kwargs), starmap( lambda x, y: dict(file_location=x, pbar_ref=y), zip(img_files, output_manager.make_bar()), ), ), ), ) cat_tasks = zip( repeat(cat_task_f), map( lambda x: dict(**x[0], **x[1]), zip( repeat(cat_f_kwargs), starmap( lambda x, y: dict(catalog_file=x, pbar_ref=y), zip(cat_files, output_manager.make_bar()), ), ), ), ) tasks = chain(img_tasks, cat_tasks) if task_procs > 1: # start runnning task_procs number of tasks in_progress = list( starmap( lambda i, func_kwargs: func_kwargs[0](**func_kwargs[1]), zip(range(task_procs), tasks), ) ) while in_progress: _, in_progress = ray.wait(in_progress, timeout=0.003) output_manager.check_for_updates() if len(in_progress) < task_procs: try: # try to get a task with kwargs from the iterator func, kwargs = next(tasks) in_progress.append(func(**kwargs)) print(in_progress) except StopIteration: # all of the tasks are in progress or completed if not in_progress: # we're done! break else: # the tasks 'in_progress' are still running pass else: def f(func_args): func_args[0](**func_args[1]) any(map(f, tasks)) output_manager.close_up() ray.shutdown() print("Building index.html") cat_wcs = WCS(cat_wcs_fits_file) if cat_wcs_fits_file else None cartographer.chart( out_dir, title, img_layer_names, cat_layer_names, cat_wcs, n_columns, (max_x, max_y), pixel_scale, units_are_pixels, ) print("Done.")
[docs] def dir_to_map( directory: str, out_dir: str = ".", exclude_predicate: Callable = lambda f: False, title: str = "FitsMap", task_procs: int = 0, procs_per_task: int = 0, catalog_delim: str = ",", cat_wcs_fits_file: str = None, max_catalog_zoom: int = -1, tile_size: Shape = [256, 256], norm_kwargs: Union[Dict[str, Any], Dict[str, Dict[str, Any]]] = {}, rows_per_column: int = None, n_columns: int = 1, prefer_xy: bool = False, catalog_starts_at_one: bool = True, img_tile_batch_size: int = 1000, pixel_scale: float = 1.0, units_are_pixels: bool = True, cluster_min_points: int = 2, cluster_radius: float = None, cluster_node_size: int = None, ) -> None: """Converts a list of files into a LeafletJS map. Args: directory (str): Path to directory containing the files to be converted out_dir (str): Directory to place the genreated web page and associated subdirectories exclude_predicate (Callable): A function that is applied to every file in ``directory`` and returns True if the file should not be processed as a part of the map, and False if it should be processed title (str): The title to placed on the webpage task_procs (int): The number of tasks to run in parallel procs_per_task (int): The number of tiles to process in parallel catalog_delim (str): The delimiter for catalog (.cat) files. Deault is comma. cat_wcs_fits_file (str): A fits file that has the WCS that will be used to map ra and dec coordinates from the catalog files to x and y coordinates in the map. Note, that this file isn't subject to the ``exlclude_predicate``, so you can exclude a fits file from being tiled, but still use its header for WCS. max_catalog_zoom (int): The zoom level to stop clustering on, the default is the max zoom level of the image. For images with a high source density, setting this higher than the max zoom will help with performance. tile_size (Tuple[int, int]): The tile size for the leaflet map. Currently only [256, 256] is supported. norm_kwargs (Union[Dict[str, Any], Dict[str, Dict[str, Any]]]): Optional normalization keyword arguments passed to `astropy.visualization.simple_norm`. Can either be a single dictionary of keyword arguments, or a dictionary of keyword arguments for each image where the keys are the image names not full paths. The default is linear scaling using min/max values. See documentation for more information: https://docs.astropy.org/en/stable/api/astropy.visualization.mpl_normalize.simple_norm.html rows_per_column (Optional[int]): (deprecated) please use `n_columns` instead n_columns (int): If converting a catalog, the number of columns to use when displaying the values in the popup. The default displays everything in a single column. prefer_xy (bool): If True x/y coordinates should be preferred if both ra/dec and x/y are present in a catalog catalog_starts_at_one (bool): True if the catalog is 1 indexed, False if the catalog is 0 indexed img_tile_batch_size (int): The number of image tiles to process in parallel when task_procs > 1 pixel_scale (float): The pixel scale of the image in either arcsec/pix or pixels, depending on the value of `units_are_pixels` units_are_pixels (bool): If True, the pixel scale is in pixels, if False, the pixel scale is in arcsec/pix cluster_min_points (int): The minimum points to form a catalog cluster cluster_radius (Optional[float]): The radius of each cluster in pixels. cluster_node_size (Optional[int]): The size for the kd-tree leaf mode, afftects performance. Example of image specific norm_kwargs vs single norm_kwargs: >>> norm_kwargs = { >>> "test.fits": {"stretch": "log", "min_percent": 1, "max_percent": 99.5}, >>> "test2.fits": {"stretch": "linear", "min_percent": 5, "max_percent": 99.5}, >>> } >>> # or >>> norm_kwargs = {"stretch": "log", "min_percent": 1, "max_percent": 99.5} Returns: None Raises: ValueError if the dir is empty, there are no convertable files or if ``exclude_predicate`` exlcudes all files """ dir_files = list( map( lambda d: os.path.join(directory, d), filterfalse( exclude_predicate, os.listdir(directory), ), ) ) assert len(dir_files) > 0, ( "No files in `directory` or `exclude_predicate` excludes everything" ) files_to_map( sorted(dir_files), out_dir=out_dir, title=title, task_procs=task_procs, procs_per_task=procs_per_task, catalog_delim=catalog_delim, cat_wcs_fits_file=cat_wcs_fits_file, max_catalog_zoom=max_catalog_zoom, tile_size=tile_size, norm_kwargs=norm_kwargs, rows_per_column=rows_per_column, n_columns=n_columns, prefer_xy=prefer_xy, catalog_starts_at_one=catalog_starts_at_one, img_tile_batch_size=img_tile_batch_size, pixel_scale=pixel_scale, units_are_pixels=units_are_pixels, cluster_min_points=cluster_min_points, cluster_radius=cluster_radius, cluster_node_size=cluster_node_size, )