# MIT License
# Copyright 2023 Ryan Hausen and contributers
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Converts image files and catalogs into a leafletJS map."""
import copy
import csv
import io
import logging
import os
import sys
from functools import partial
from itertools import (
chain,
filterfalse,
groupby,
islice,
product,
repeat,
starmap,
)
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union
import cbor2
import mapbox_vector_tile as mvt
import matplotlib as mpl
mpl.use("Agg") # need to use this for processes safe matplotlib
import astropy
import matplotlib.pyplot as plt
import numpy as np
import ray
import ray.util.queue as queue
from astropy.io import fits
from astropy.wcs import WCS
from astropy.visualization import simple_norm
from PIL import Image
import fitsmap.utils as utils
import fitsmap.cartographer as cartographer
import fitsmap.padded_array as pa
from fitsmap.output_manager import OutputManager
from fitsmap.supercluster import Supercluster
# https://github.com/zimeon/iiif/issues/11#issuecomment-131129062
Image.MAX_IMAGE_PIXELS = sys.maxsize
Shape = Tuple[int, int]
IMG_FORMATS = ["fits", "jpg", "png"]
CAT_FORMAT = ["cat"]
IMG_ENGINE_PIL = "PIL"
IMG_ENGINE_MPL = "MPL"
MPL_CMAP = "gray"
# MPL SINGLETON ENGINE =========================================================
mpl_f, mpl_img, mpl_alpha_f, mpl_norm = None, None, None, None
# ==============================================================================
MIXED_WHITESPACE_DELIMITER = "mixed_ws"
[docs]def build_path(z, y, x, out_dir) -> str:
"""Maps zoom and coordinate location to a subdir in ``out_dir``
Args:
z (int): The zoom level for the tiles
y (int): The zoom level for the tiles
x (int): The zoom level for the tiles
out_dir (str): The root directory the tiles are saved in
Returns:
The str path to save the tile in
"""
z, y, x = str(z), str(y), str(x)
z_dir = os.path.join(out_dir, z)
y_dir = os.path.join(z_dir, y)
img_path = os.path.join(y_dir, "{}.png".format(x))
return img_path
[docs]def slice_idx_generator(
shape: Tuple[int, int], zoom: int, tile_size: int
) -> Iterable[Tuple[int, int, int, slice, slice]]:
dim0_tile_fraction = shape[0] / tile_size
dim1_tile_fraction = shape[1] / tile_size
if dim0_tile_fraction < 1 or dim1_tile_fraction < 1:
raise StopIteration()
num_tiles_dim0 = int(np.ceil(dim0_tile_fraction))
num_tiles_dim1 = int(np.ceil(dim1_tile_fraction))
tile_idxs_dim0 = [i * tile_size for i in range(num_tiles_dim0 + 1)]
tile_idxs_dim1 = [i * tile_size for i in range(num_tiles_dim1 + 1)]
pair_runner = lambda coll: [slice(c0, c1) for c0, c1 in zip(coll[:-1], coll[1:])]
row_slices = pair_runner(tile_idxs_dim0)
col_slices = pair_runner(tile_idxs_dim1)
rows = zip(range(num_tiles_dim0 - 1, -1, -1), row_slices)
cols = enumerate(col_slices)
rows_cols = product(rows, cols)
def transform_iteration(row_col):
((y, slice_y), (x, slice_x)) = row_col
return (zoom, y, x, slice_y, slice_x)
return map(transform_iteration, rows_cols)
[docs]def balance_array(array: np.ndarray) -> pa.PaddedArray:
"""Pads input array with zeros so that the long side is a multiple of the short side.
Args:
array (np.ndarray): array to balance
Returns:
a balanced version of ``array`` via a PaddedArray
"""
dim0, dim1 = array.shape[0], array.shape[1]
exp_val = np.ceil(np.log2(max(dim0, dim1)))
total_size = 2**exp_val
pad_dim0 = int(total_size - dim0)
pad_dim1 = int(total_size - dim1)
return pa.PaddedArray(array, (pad_dim0, pad_dim1))
[docs]def get_array(file_location: str) -> pa.PaddedArray:
"""Opens the array at ``file_location`` can be an image or a fits file
Args:
file_location (str): the path to the image
Returns:
A numpy array representing the image.
"""
_, ext = os.path.splitext(file_location)
if ext == ".fits":
array = fits.getdata(file_location).astype(np.float32)
shape = array.shape
if len(shape) != 2:
raise ValueError("FitsMap only supports 2D FITS files.")
else:
array = np.flipud(Image.open(file_location)).astype(np.float32)
return balance_array(array)
[docs]def filter_on_extension(
files: List[str], extensions: List[str], exclude_predicate: Callable = None
) -> List[str]:
"""Filters out files from ``files`` based on ``extensions`` and ``exclude_predicate``
Args:
files (List[str]): A list of file paths to be filtered
extensions (List[str]): List of extensions to filter ``files`` on
exclude_predicate (Callable): A function that accepts a single str as
input and returns a True if the file
should be excluded, and False if it should
be included
Returns:
A list of files which have an extension thats in ``extensions`` and
for which `exclude_predicate(file)==False``
"""
neg_predicate = exclude_predicate if exclude_predicate else lambda x: False
return list(
filter(
lambda s: (os.path.splitext(s)[1][1:] in extensions)
and not neg_predicate(s),
files,
)
)
[docs]def make_dirs(out_dir: str, min_zoom: int, max_zoom: int) -> None:
"""Builds the directory tree for storing image tiles.
Args:
out_dir (str): The root directory to generate the tree in
min_zoom (int): The minimum zoom level the image will be tiled at
max_zoom (int): The maximum zoom level the image will be tiled at
Returns:
None
"""
row_count = lambda z: int(np.sqrt(4**z))
def build_z_ys(z, ys):
list(
map(
lambda y: os.makedirs(os.path.join(out_dir, f"{z}/{y}"), exist_ok=True),
ys,
)
)
def build_zs(z):
ys = range(row_count(z))
build_z_ys(z, ys)
zs = range(min_zoom, max_zoom + 1)
list(map(build_zs, zs))
[docs]def get_zoom_range(
shape: Tuple[int, int], tile_size: Tuple[int, int]
) -> Tuple[int, int]:
"""Returns the supported native zoom range for an give image size and tile size.
Args:
shape (Tuple[int, int]): The shape that is going to be tiled
tile_size (Tuple[int, int]): The size of the image tiles
Returns:
A tuple containing the (minimum zoom level, maximum zoom level)
"""
long_side = max(shape[:2])
short_side = min(shape[:2])
max_zoom = int(np.log2(long_side / tile_size[0]))
min_zoom = int(np.ceil(np.log2(long_side / short_side)))
return min_zoom, max_zoom
[docs]def get_total_tiles(min_zoom: int, max_zoom: int) -> int:
"""Returns the total number of tiles that will be generated from an image.
Args:
min_zoom (int): The minimum zoom level te image will be tiled at
max_zoom (int): The maximum zoom level te image will be tiled at
Returns:
The total number of tiles that will be generated
"""
return int(sum([4**i for i in range(min_zoom, max_zoom + 1)]))
[docs]def imread_default(path: str, default: np.ndarray) -> np.ndarray:
"""Opens an image if it exists, if not returns a tranparent image.
Args:
path (str): Image file location
size (int, optional): The image size, assumed square. Defaults to 256.
Returns:
np.ndarray: the image if it exists. if not, a transparent image of
size (size, size, 4).
"""
try:
return np.flipud(Image.open(path))
except FileNotFoundError:
return default
[docs]def make_tile_mpl(
mpl_norm: mpl.colors.Normalize, mpl_cmap: mpl.colors.Colormap, tile: np.ndarray
) -> np.ndarray:
"""Converts array data into an image using matplotlib
Args:
mpl_f (mpl.figure.Figure): The matplotlib figure to use
mpl_img (mpl.image.AxesImage): The matplotlib image to use
mpl_alpha_f (Callable[[np.ndarray], np.ndarray]): A function that
converts the input
array into an RGBA
tile (np.ndarray): The array data
Returns:
np.ndarray: The array data converted into an image using Matplotlib
"""
if type(mpl_norm) == ray._raylet.ObjectRef:
mpl_norm = ray.get(mpl_norm)
mpl_cmap = ray.get(mpl_cmap)
tile_image = mpl_cmap(mpl_norm(np.flipud(tile)))
return Image.fromarray((tile_image * 255).astype(np.uint8))
[docs]def make_tile_pil(tile: np.ndarray) -> np.ndarray:
"""Converts the input array into an image using PIL
Args:
tile (np.ndarray): The array data to be converted
Returns:
np.ndarray: an RGBA version of the input data
"""
if len(tile.shape) == 2:
img_tile = np.dstack([tile, tile, tile, np.ones_like(tile) * 255])
elif tile.shape[2] == 3:
img_tile = np.concatenate(
(tile, np.ones(list(tile.shape[:-1]) + [1], dtype=np.float32) * 255),
axis=2,
)
else:
img_tile = np.copy(tile)
# else the image is already RGBA
ys, xs = np.where(np.isnan(np.atleast_3d(tile)[:, :, 0]))
img_tile[ys, xs, :] = np.array([0, 0, 0, 0], dtype=np.float32)
img = Image.fromarray(np.flipud(img_tile).astype(np.uint8))
return img
[docs]def mem_safe_make_tile(
out_dir: str,
tile_f: Callable[[np.ndarray], np.ndarray],
array: np.ndarray,
job: Union[
Tuple[int, int, int, slice, slice], List[Tuple[int, int, int, slice, slice]]
],
) -> None:
"""Extracts a tile from ``array`` and saves it at the proper place in ``out_dir`` using PIL.
Args:
out_dir (str): The directory to save tile in
tile_f (Callable[[np.ndarray], np.ndarray]): A function that converts a
subset of the image array
into an png tile. Is one
of `make_tile_pil` or
`make_tile_mpl`
array (np.ndarray): Array to extract a slice from
job (Tuple[int, int, int, slice, slice]): A tuple containing z, y, x,
dim0_slices, dim1_slices. Where
(z, y, x) define the zoom and
the coordinates, and (dim0_slices,
and dim1_slices) are slice
objects that extract the tile.
Returns:
None
"""
default_array = np.zeros([256, 256, 4], dtype=np.uint8)
get_img = partial(imread_default, default=default_array)
# If this is being run in parallel it will be a List of Tuples if its being
# run serially it will be a single Tuple, wrap the Tuple in a list to keep
# it simple
jobs = [job] if isinstance(job, tuple) else job
for z, y, x, slice_ys, slice_xs in jobs:
try:
img_path = build_path(z, y, x, out_dir)
with_out_dir = partial(os.path.join, out_dir)
if os.path.exists(with_out_dir(f"{z+1}")):
top_left = get_img(with_out_dir(f"{z+1}", f"{2*y+1}", f"{2*x}.png"))
top_right = get_img(with_out_dir(f"{z+1}", f"{2*y+1}", f"{2*x+1}.png"))
bottom_left = get_img(with_out_dir(f"{z+1}", f"{2*y}", f"{2*x}.png"))
bottom_right = get_img(with_out_dir(f"{z+1}", f"{2*y}", f"{2*x+1}.png"))
tile = np.concatenate(
[
np.concatenate([top_left, top_right], axis=1),
np.concatenate([bottom_left, bottom_right], axis=1),
],
axis=0,
)
img = Image.fromarray(np.flipud(tile))
else:
tile = array[slice_ys, slice_xs]
img = tile_f(tile)
# if the tile is all transparent, don't save to disk
if np.any(np.array(img)[..., -1]):
img.thumbnail([256, 256], Image.Resampling.LANCZOS)
img.convert("RGBA").save(img_path, "PNG")
else:
pass
except Exception as e:
print("Tile creation failed for tile:", z, y, x, slice_ys, slice_xs)
print(e)
[docs]def build_mpl_objects(
array: np.ndarray, norm_kwargs: Dict[str, Any]
) -> Tuple[mpl.colors.Normalize, mpl.colors.Colormap]:
"""Builds the matplotlib objects that norm and convert fits data to RGB.
Args:
array (np.ndarray): The array to be tiled
norm_kwargs (Dict[str, Any]): The kwargs to be passed to `simple_norm`
Returns:
Tuple[mpl.colors.Normalize, mpl.colors.Colormap]: The matplotlib objects
needed to create a tile
"""
mpl_norm = simple_norm(array, **norm_kwargs)
mpl_cmap = copy.copy(mpl.cm.get_cmap(MPL_CMAP))
mpl_cmap.set_bad(color=(0, 0, 0, 0))
return mpl_norm, mpl_cmap
[docs]def tile_img(
file_location: str,
pbar_ref: Tuple[int, queue.Queue],
tile_size: Shape = [256, 256],
min_zoom: int = 0,
out_dir: str = ".",
mp_procs: int = 0,
norm_kwargs: dict = {},
batch_size: int = 1000,
) -> None:
"""Extracts tiles from the array at ``file_location``.
Args:
file_location (str): The file location of the image to tile
pbar_loc (int): The index of the location of to print the tqdm bar
tile_size (Tuple[int, int]): The pixel size of the tiles in the map
min_zoom (int): The minimum zoom to create tiles for. The default value
is 0, but if it can be helpful to set it to a value
greater than zero if your running out of memory as the
lowest zoom images can be the most memory intensive.
out_dir (str): The root directory to save the tiles in
mp_procs (int): The number of multiprocessing processes to use for
generating tiles.
norm_kwargs (dict): Optional normalization keyword arguments passed to
`astropy.visualization.simple_norm`. The default is
linear scaling using min/max values. See documentation
for more information: https://docs.astropy.org/en/stable/api/astropy.visualization.mpl_normalize.simple_norm.html
batch_size (int): The number of tiles to process at a time, when tiling
in parallel. The default is 1000.
Returns:
None
"""
_, fname = os.path.split(file_location)
if get_map_layer_name(file_location) in os.listdir(out_dir):
OutputManager.write(pbar_ref, f"{fname} already tiled. Skipping tiling.")
return
# get image
array = get_array(file_location)
# if we're using matplotlib we need to instantiate the matplotlib objects
# before we pass them to ray
image_engine = IMG_ENGINE_MPL if file_location.endswith(".fits") else IMG_ENGINE_PIL
if image_engine == IMG_ENGINE_MPL:
image_norm = norm_kwargs.get(os.path.basename(file_location), norm_kwargs)
mpl_norm, mpl_cmap = build_mpl_objects(array.array, image_norm)
zooms = get_zoom_range(array.shape, tile_size)
min_zoom = max(min_zoom, zooms[0])
max_zoom = zooms[1]
# build directory structure
name = get_map_layer_name(file_location)
tile_dir = os.path.join(out_dir, name)
if name not in os.listdir(out_dir):
os.mkdir(tile_dir)
make_dirs(tile_dir, min_zoom, max_zoom)
tile_params = chain.from_iterable(
[
slice_idx_generator(array.shape, z, 256 * (2**i))
for (i, z) in enumerate(range(max_zoom, min_zoom - 1, -1), start=0)
]
)
total_tiles = get_total_tiles(min_zoom, max_zoom)
if mp_procs > 1:
# We need to process batches to offset the cost of spinning up a process
def batch_params(iter, batch_size):
while True:
batch = list(islice(iter, batch_size))
if batch:
yield batch
else:
break
# Put the array in the ray object store. After it's there remove the
# reference to it so that it can be garbage collected.
arr_obj_id = ray.put(array)
del array
if image_engine == IMG_ENGINE_MPL:
mpl_norm = ray.put(mpl_norm)
mpl_cmap = ray.put(mpl_cmap)
tile_f = partial(
make_tile_mpl,
mpl_norm,
mpl_cmap,
)
else:
tile_f = make_tile_pil
make_tile_f = ray.remote(num_cpus=1)(mem_safe_make_tile)
OutputManager.set_description(pbar_ref, f"Converting {name}")
OutputManager.set_units_total(pbar_ref, unit="tile", total=total_tiles)
# in parallel we have to go one zoom level at a time because the images
# at lower zoom levels are dependent on the tiles are higher zoom levels
for zoom, jobs in groupby(tile_params, lambda z: z[0]):
if zoom == max_zoom:
utils.backpressure_queue_ray(
make_tile_f.remote,
list(
zip(
repeat(tile_dir),
repeat(tile_f),
repeat(arr_obj_id),
batch_params(jobs, batch_size),
)
),
pbar_ref,
mp_procs,
batch_size,
)
else:
if "arr_obj_id" in locals():
del arr_obj_id
work = list(
zip(
repeat(tile_dir),
repeat(make_tile_pil),
repeat(None),
batch_params(jobs, batch_size),
)
)
utils.backpressure_queue_ray(
make_tile_f.remote,
work,
pbar_ref,
mp_procs,
batch_size,
)
else:
if image_engine == IMG_ENGINE_MPL:
tile_f = partial(make_tile_mpl, mpl_norm, mpl_cmap)
else:
tile_f = make_tile_pil
work = partial(mem_safe_make_tile, tile_dir, tile_f, array)
jobs = tile_params
OutputManager.set_description(pbar_ref, f"Converting {name}")
OutputManager.set_units_total(pbar_ref, "tile", total_tiles)
for job in jobs:
if job[0] == max_zoom:
work(job)
OutputManager.update(pbar_ref, 1)
else:
break
del array
work = partial(mem_safe_make_tile, tile_dir, image_engine, None)
work(job)
OutputManager.update(pbar_ref, 1)
for job in jobs:
work(job)
OutputManager.update(pbar_ref, 1)
OutputManager.update_done(pbar_ref)
[docs]def get_map_layer_name(file_location: str) -> str:
"""Tranforms a ``file_location`` into the javascript layer name.
Args:
file_location (str): The file location to convert
Returns:
The javascript name that will be used in the HTML map
"""
_, fname = os.path.split(file_location)
name = (
os.path.splitext(fname)[0]
.replace(".", "_")
.replace("-", "_")
.replace("(", "_")
.replace(")", "_")
)
return name
[docs]def get_marker_file_name(file_location: str) -> str:
"""Tranforms a ``file_location`` into the javascript marker file name.
Args:
file_location (str): The file location to convert
Returns:
The javascript name that will be used in the HTML map
"""
return os.path.split(file_location)[1] + ".js"
[docs]def line_to_cols(raw_col_vals: str) -> List[str]:
"""Transform a raw text line of column names into a list of column names
Args:
raw_line (str): String from textfile
Returns:
A list of the column names in order
"""
change_case = [
"RA",
"DEC",
"Ra",
"Dec",
"X",
"Y",
"ID",
"iD",
"Id",
"A",
"B",
"THETA",
"Theta",
]
# make ra and dec lowercase for ease of access
raw_cols = list(
map(
lambda s: s.lower() if s in change_case else s,
raw_col_vals,
)
)
# if header line starts with a '#' exclude it
if raw_cols[0] == "#":
return raw_cols[1:]
else:
return raw_cols
[docs]def line_to_json(
wcs: WCS,
columns: List[str],
catalog_assets_path: str,
src_vals: List[str],
catalog_starts_at_one: bool = 1,
) -> Dict[str, Any]:
"""Transform a raw text line attribute values into a JSON marker
Args:
raw_line (str): String from the marker file
Returns:
A list of the column names in order
"""
src_id = str(src_vals[columns.index("id")])
if "x" in columns and "y" in columns:
img_x = float(src_vals[columns.index("x")])
img_y = float(src_vals[columns.index("y")])
else:
ra = float(src_vals[columns.index("ra")])
dec = float(src_vals[columns.index("dec")])
[[img_x, img_y]] = wcs.wcs_world2pix([[ra, dec]], int(catalog_starts_at_one))
if "a" in columns and "b" in columns and "theta" in columns:
a = float(src_vals[columns.index("a")])
b = float(src_vals[columns.index("b")])
b = a if np.isnan(b) else b
theta = float(src_vals[columns.index("theta")])
theta = 0 if np.isnan(theta) else theta
else:
a = -1
b = -1
theta = -1
# Leaflet is 0 indexed, so if the catalog starts at 1 we need to offset
# The convention for leaflet is to place markers at the lower left corner
# of a pixel, to center the marker on the pixel, we subtract 1 to bring it
# to leaflet convention and add 0.5 to move it to the center of the pixel.
x = (img_x - int(catalog_starts_at_one)) + 0.5
y = (img_y - int(catalog_starts_at_one)) + 0.5
src_vals += [y, x, catalog_assets_path.split(os.sep)[-1]]
src_json = os.path.join(catalog_assets_path, f"{src_id}.cbor")
with open(src_json, "wb") as f:
cbor2.dump(dict(id=src_id, v=src_vals), f)
return dict(
geometry=dict(
coordinates=[x, y],
),
tags=dict(
a=a,
b=b,
theta=theta,
catalog_id=src_id,
cat_path=os.path.basename(catalog_assets_path),
),
)
[docs]def process_catalog_file_chunk(
process_f: Callable,
fname: str,
delimiter: str,
q: queue.Queue,
start: int,
end: int,
) -> List[dict]:
# newline="" for csv reader, see
# https://docs.python.org/3/library/csv.html#csv.reader
f = open(fname, "r", newline="")
f.seek(start)
f.readline() # id start==0 skip cols, else advance to next complete line
json_lines = []
raw_lines = []
reader = csv.reader(raw_lines, delimiter=delimiter, skipinitialspace=True)
current = f.tell()
update_every = 10000
count = 1
while current < end:
raw_lines.append(f.readline().strip())
processed_lines = next(reader)
json_lines.append(process_f(processed_lines))
current = f.tell()
count += 1
if count % update_every == 0:
q.put(count)
count = 1
if count > 1:
q.put(count)
f.close()
return json_lines
def _simplify_mixed_ws(catalog_fname: str) -> None:
with open(catalog_fname, "r") as f:
lines = [l.strip() for l in f.readlines()]
with open(catalog_fname, "w") as f:
for line in lines:
f.write(" ".join([token.strip() for token in line.split()]) + "\n")
[docs]def make_marker_tile(
cluster: Supercluster,
out_dir: str,
job: Union[Tuple[int, Tuple[int, int]], List[Tuple[int, Tuple[int, int]]]],
) -> None:
jobs = [job] if isinstance(job, tuple) else job
for z, (y, x) in jobs:
if not os.path.exists(os.path.join(out_dir, str(z))):
os.mkdir(os.path.join(out_dir, str(z)))
if not os.path.exists(os.path.join(out_dir, str(z), str(y))):
os.mkdir(os.path.join(out_dir, str(z), str(y)))
out_path = os.path.join(out_dir, str(z), str(y), f"{x}.pbf")
tile_sources = cluster.get_tile(z, x, y)
if tile_sources:
tile_sources["name"] = "Points"
for i in range(len(tile_sources["features"])):
tile_sources["features"][i][
"geometry"
] = "POINT(0 0)" # we dont' use this
encoded_tile = mvt.encode([tile_sources], extents=256)
with open(out_path, "wb") as f:
f.write(encoded_tile)
[docs]def tile_markers(
wcs_file: str,
out_dir: str,
catalog_delim: str,
mp_procs: int,
prefer_xy: bool,
min_zoom: int,
max_zoom: int,
tile_size: int,
max_x: int,
max_y: int,
catalog_starts_at_one: bool,
catalog_file: str,
pbar_ref: Tuple[int, queue.Queue],
batch_size: int = 500,
) -> None:
_, fname = os.path.split(catalog_file)
catalog_layer_name = get_map_layer_name(catalog_file)
if catalog_layer_name in os.listdir(out_dir):
OutputManager.write(pbar_ref, f"{fname} already tiled. Skipping tiling.")
return
else:
os.mkdir(os.path.join(out_dir, catalog_layer_name))
if catalog_delim == MIXED_WHITESPACE_DELIMITER:
_simplify_mixed_ws(catalog_file)
catalog_delim = " "
with open(catalog_file, "r", newline="") as f:
csv_reader = csv.reader(f, delimiter=catalog_delim, skipinitialspace=True)
columns = line_to_cols(next(csv_reader))
ra_dec_coords = "ra" in columns and "dec" in columns
x_y_coords = "x" in columns and "y" in columns
use_xy = (not ra_dec_coords) or (prefer_xy)
assert ra_dec_coords or x_y_coords, (
catalog_file + " is missing coordinate columns (ra/dec, xy),"
)
assert "id" in columns, catalog_file + " missing 'id' column"
assert use_xy or (wcs_file is not None), (
catalog_file + " uses ra/dec coords, but a WCS file wasn't provided."
)
wcs = WCS(wcs_file) if wcs_file else None
with open(os.path.join(out_dir, f"{catalog_layer_name}.columns"), "w") as f:
f.write(",".join(columns))
catalog_assets_parent_path = os.path.join(out_dir, "catalog_assets")
if "catalog_assets" not in os.listdir(out_dir):
os.mkdir(catalog_assets_parent_path)
catalog_assets_path = os.path.join(catalog_assets_parent_path, catalog_layer_name)
if catalog_layer_name not in os.listdir(catalog_assets_parent_path):
os.mkdir(catalog_assets_path)
line_func = partial(
line_to_json,
wcs,
columns,
catalog_assets_path,
catalog_starts_at_one=catalog_starts_at_one,
)
OutputManager.set_description(pbar_ref, f"Parsing {catalog_file}")
catalog_file_size = os.path.getsize(catalog_file)
q = queue.Queue()
process_f = partial(
process_catalog_file_chunk, line_func, catalog_file, catalog_delim, q
)
if mp_procs > 1:
process_f = ray.remote(num_cpus=1)(process_catalog_file_chunk)
boundaries = np.linspace(0, catalog_file_size, mp_procs + 1, dtype=np.int64)
remote_val_ids = list(
starmap(
process_f.remote,
zip(
repeat(line_func),
repeat(catalog_file),
repeat(catalog_delim),
repeat(q),
boundaries[:-1],
boundaries[1:],
),
)
)
_, remaining = ray.wait(remote_val_ids, timeout=0.01, fetch_local=False)
while remaining:
try:
lines_processed = sum([q.get(timeout=0.0001) for _ in range(mp_procs)])
OutputManager.update(pbar_ref, lines_processed)
except queue.Empty:
pass
_, remaining = ray.wait(remaining, timeout=0.01, fetch_local=False)
q.shutdown()
catalog_values = list(chain.from_iterable(list(ray.get(remote_val_ids))))
del q
else:
catalog_values = process_f(0, catalog_file_size)
q.shutdown()
del q
OutputManager.set_description(
pbar_ref, f"Clustering {catalog_file}(THIS MAY TAKE A WHILE)"
)
OutputManager.set_units_total(
pbar_ref, unit="zoom levels", total=max_zoom + 1 - min_zoom
)
# cluster the parsed sources
# need to get super cluster stuff in here
clusterer = Supercluster(
min_zoom=min_zoom,
max_zoom=max_zoom - 1,
extent=tile_size,
radius=max(max(max_x, max_y) / tile_size, 40),
node_size=np.log2(len(catalog_values)) * 2,
alternate_CRS=(max_x, max_y),
update_f=lambda: OutputManager.update(pbar_ref, 1),
log=True,
).load(catalog_values)
# Don't push any updates from the clustering algorithm after clustering
# has been completed
clusterer.update_f = None
clusterer.log = False
# tile the sources and save using protobuf
zs = range(min_zoom, max_zoom + 1)
ys = [range(2**z) for z in zs]
xs = [range(2**z) for z in zs]
tile_idxs = list(
chain.from_iterable(
[zip(repeat(zs[i]), product(ys[i], xs[i])) for i in range(len(zs))]
)
)
catalog_layer_dir = os.path.join(out_dir, catalog_layer_name)
OutputManager.set_description(pbar_ref, f"Tiling {catalog_file}")
OutputManager.set_units_total(pbar_ref, unit="tile", total=len(tile_idxs))
if mp_procs > 1:
# We need to process batches to offset the cost of spinning up a process
def batch_params(iter, batch_size):
while True:
batch = list(islice(iter, batch_size))
if batch:
yield batch
else:
break
tile_f = ray.remote(num_cpus=1)(make_marker_tile)
cluster_remote_id = ray.put(clusterer)
utils.backpressure_queue_ray(
tile_f.remote,
list(
zip(
repeat(cluster_remote_id),
repeat(catalog_layer_dir),
batch_params(iter(tile_idxs), batch_size),
)
),
pbar_ref,
mp_procs,
batch_size,
)
else:
for zyx in tile_idxs:
make_marker_tile(clusterer, catalog_layer_dir, zyx)
OutputManager.update(pbar_ref, 1)
[docs]def files_to_map(
files: List[str],
out_dir: str = ".",
title: str = "FitsMap",
task_procs: int = 0,
procs_per_task: int = 0,
catalog_delim: str = ",",
cat_wcs_fits_file: str = None,
max_catalog_zoom: int = -1,
tile_size: Tuple[int, int] = [256, 256],
norm_kwargs: dict = {},
rows_per_column: int = np.inf,
prefer_xy: bool = False,
catalog_starts_at_one: bool = True,
img_tile_batch_size: int = 1000,
) -> None:
"""Converts a list of files into a LeafletJS map.
Args:
files (List[str]): List of files to convert into a map, can include image
files (.fits, .png, .jpg) and catalog files (.cat)
out_dir (str): Directory to place the genreated web page and associated
subdirectories
title (str): The title to placed on the webpage
task_procs (int): The number of tasks to run in parallel
procs_per_task (int): The number of tiles to process in parallel
catalog_delim (str): The delimited for catalog (.cat) files. Deault is
whitespace.
cat_wcs_fits_file (str): A fits file that has the WCS that will be used
to map ra and dec coordinates from the catalog
files to x and y coordinates in the map
max_catalog_zoom (int): The zoom level to stop clustering on, the
default is the max zoom level of the image. For
images with a high source density, setting this
higher than the max zoom will help with
performance.
tile_size (Tuple[int, int]): The tile size for the leaflet map. Currently
only [256, 256] is supported.
norm_kwargs (Union[Dict[str, Any], Dict[str, Dict[str, Any]]]):
Optional normalization keyword arguments passed to
`astropy.visualization.simple_norm`. Can either be
a single dictionary of keyword arguments, or a
dictionary of keyword arguments for each image
where the keys are the image names not full paths.
The default is linear scaling using min/max values.
See documentation for more information:
https://docs.astropy.org/en/stable/api/astropy.visualization.mpl_normalize.simple_norm.html
rows_per_column (int): If converting a catalog, the number of items in
have in each column of the marker popup.
By default produces all values in a single
column. Setting this value can make it easier to
work with catalogs that have a lot of values for
each object.
prefer_xy (bool): If True x/y coordinates should be preferred if both
ra/dec and x/y are present in a catalog
catalog_starts_at_one (bool): True if the catalog is 1 indexed, False if
the catalog is 0 indexed
img_tile_batch_size (int): The number of image tiles to process in
parallel when task_procs > 1
Example of image specific norm_kwargs vs single norm_kwargs:
>>> norm_kwargs = {
>>> "test.fits": {"stretch": "log", "min_percent": 1, "max_percent": 99.5},
>>> "test2.fits": {"stretch": "linear", "min_percent": 5, "max_percent": 99.5},
>>> }
>>> # or
>>> norm_kwargs = {"stretch": "log", "min_percent": 1, "max_percent": 99.5}
Returns:
None
"""
assert len(files) > 0, "No files provided `files` is an empty list"
unlocatable_files = list(filterfalse(os.path.exists, files))
assert len(unlocatable_files) == 0, f"Files not found:{unlocatable_files}"
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if "js" not in os.listdir(out_dir):
os.mkdir(os.path.join(out_dir, "js"))
if "css" not in os.listdir(out_dir):
os.mkdir(os.path.join(out_dir, "css"))
# build image tasks
img_files = filter_on_extension(files, IMG_FORMATS)
img_layer_names = list(map(get_map_layer_name, img_files))
img_f_kwargs = dict(
tile_size=tile_size,
out_dir=out_dir,
mp_procs=procs_per_task,
norm_kwargs=norm_kwargs,
batch_size=img_tile_batch_size,
)
# we want to init ray in such a way that it doesn't print any output
# to the console. These should be changed during development
debug = os.getenv("FITSMAP_DEBUG", "False").lower() == "true"
ray.init(
include_dashboard=debug, # during dev == True
configure_logging=~debug, # during dev == False
logging_level=logging.INFO
if debug
else logging.CRITICAL, # during dev == logging.INFO, test == logging.CRITICAL
log_to_driver=debug, # during dev = True
)
if task_procs > 1:
img_task_f = ray.remote(num_cpus=1)(tile_img).remote
else:
img_task_f = tile_img
# build catalog tasks
cat_files = filter_on_extension(files, CAT_FORMAT)
cat_layer_names = list(map(get_map_layer_name, cat_files))
max_x, max_y = utils.peek_image_info(img_files)
max_dim = max(max_x, max_y)
if len(cat_files) > 0:
# get highlevel image info for catalogging function
max_zoom = int(np.log2(2 ** np.ceil(np.log2(max_dim)) / tile_size[0]))
max_dim = 2**max_zoom * tile_size[0]
if max_catalog_zoom == -1:
max_zoom = int(np.log2(2 ** np.ceil(np.log2(max_dim)) / tile_size[0]))
else:
max_zoom = max_catalog_zoom
if task_procs > 1:
cat_task_f = ray.remote(num_cpus=1)(tile_markers).remote
else:
cat_task_f = tile_markers
cat_f_kwargs = dict(
wcs_file=cat_wcs_fits_file,
out_dir=out_dir,
catalog_delim=catalog_delim,
mp_procs=procs_per_task,
prefer_xy=prefer_xy,
min_zoom=0,
max_zoom=max_zoom,
tile_size=tile_size[0],
max_x=max_dim,
max_y=max_dim,
catalog_starts_at_one=catalog_starts_at_one,
)
else:
cat_task_f = None
cat_f_kwargs = dict()
output_manager = OutputManager()
img_tasks = zip(
repeat(img_task_f),
map(
lambda x: dict(**x[0], **x[1]),
zip(
repeat(img_f_kwargs),
starmap(
lambda x, y: dict(file_location=x, pbar_ref=y),
zip(img_files, output_manager.make_bar()),
),
),
),
)
cat_tasks = zip(
repeat(cat_task_f),
map(
lambda x: dict(**x[0], **x[1]),
zip(
repeat(cat_f_kwargs),
starmap(
lambda x, y: dict(catalog_file=x, pbar_ref=y),
zip(cat_files, output_manager.make_bar()),
),
),
),
)
tasks = chain(img_tasks, cat_tasks)
if task_procs > 1:
# start runnning task_procs number of tasks
in_progress = list(
starmap(
lambda i, func_kwargs: func_kwargs[0](**func_kwargs[1]),
zip(range(task_procs), tasks),
)
)
while in_progress:
_, in_progress = ray.wait(in_progress, timeout=0.003)
output_manager.check_for_updates()
if len(in_progress) < task_procs:
try:
# try to get a task with kwargs from the iterator
func, kwargs = next(tasks)
in_progress.append(func(**kwargs))
print(in_progress)
except StopIteration:
# all of the tasks are in progress or completed
if not in_progress:
# we're done!
break
else:
# the tasks 'in_progress' are still running
pass
else:
def f(func_args):
func_args[0](**func_args[1])
any(map(f, tasks))
output_manager.close_up()
ray.shutdown()
print("Building index.html")
cat_wcs = WCS(cat_wcs_fits_file) if cat_wcs_fits_file else None
cartographer.chart(
out_dir,
title,
img_layer_names,
cat_layer_names,
cat_wcs,
rows_per_column,
(max_x, max_y),
)
print("Done.")
[docs]def dir_to_map(
directory: str,
out_dir: str = ".",
exclude_predicate: Callable = lambda f: False,
title: str = "FitsMap",
task_procs: int = 0,
procs_per_task: int = 0,
catalog_delim: str = ",",
cat_wcs_fits_file: str = None,
max_catalog_zoom: int = -1,
tile_size: Shape = [256, 256],
norm_kwargs: Union[Dict[str, Any], Dict[str, Dict[str, Any]]] = {},
rows_per_column: int = np.inf,
prefer_xy: bool = False,
catalog_starts_at_one: bool = True,
img_tile_batch_size: int = 1000,
) -> None:
"""Converts a list of files into a LeafletJS map.
Args:
directory (str): Path to directory containing the files to be converted
out_dir (str): Directory to place the genreated web page and associated
subdirectories
exclude_predicate (Callable): A function that is applied to every file
in ``directory`` and returns True if the
file should not be processed as a part of
the map, and False if it should be
processed
title (str): The title to placed on the webpage
task_procs (int): The number of tasks to run in parallel
procs_per_task (int): The number of tiles to process in parallel
catalog_delim (str): The delimiter for catalog (.cat) files. Deault is
comma.
cat_wcs_fits_file (str): A fits file that has the WCS that will be used
to map ra and dec coordinates from the catalog
files to x and y coordinates in the map. Note,
that this file isn't subject to the
``exlclude_predicate``, so you can exclude a
fits file from being tiled, but still use its
header for WCS.
max_catalog_zoom (int): The zoom level to stop clustering on, the
default is the max zoom level of the image. For
images with a high source density, setting this
higher than the max zoom will help with
performance.
tile_size (Tuple[int, int]): The tile size for the leaflet map. Currently
only [256, 256] is supported.
norm_kwargs (Union[Dict[str, Any], Dict[str, Dict[str, Any]]]):
Optional normalization keyword arguments passed to
`astropy.visualization.simple_norm`. Can either be
a single dictionary of keyword arguments, or a
dictionary of keyword arguments for each image
where the keys are the image names not full paths.
The default is linear scaling using min/max values.
See documentation for more information:
https://docs.astropy.org/en/stable/api/astropy.visualization.mpl_normalize.simple_norm.html
rows_per_column (int): If converting a catalog, the number of items in
have in each column of the marker popup.
By default produces all values in a single
column. Setting this value can make it easier to
work with catalogs that have a lot of values for
each object.
prefer_xy (bool): If True x/y coordinates should be preferred if both
ra/dec and x/y are present in a catalog
catalog_starts_at_one (bool): True if the catalog is 1 indexed, False if
the catalog is 0 indexed
img_tile_batch_size (int): The number of image tiles to process in
parallel when task_procs > 1
Example of image specific norm_kwargs vs single norm_kwargs:
>>> norm_kwargs = {
>>> "test.fits": {"stretch": "log", "min_percent": 1, "max_percent": 99.5},
>>> "test2.fits": {"stretch": "linear", "min_percent": 5, "max_percent": 99.5},
>>> }
>>> # or
>>> norm_kwargs = {"stretch": "log", "min_percent": 1, "max_percent": 99.5}
Returns:
None
Raises:
ValueError if the dir is empty, there are no convertable files or if
``exclude_predicate`` exlcudes all files
"""
dir_files = list(
map(
lambda d: os.path.join(directory, d),
filterfalse(
exclude_predicate,
os.listdir(directory),
),
)
)
assert (
len(dir_files) > 0
), "No files in `directory` or `exclude_predicate` excludes everything"
files_to_map(
sorted(dir_files),
out_dir=out_dir,
title=title,
task_procs=task_procs,
procs_per_task=procs_per_task,
catalog_delim=catalog_delim,
cat_wcs_fits_file=cat_wcs_fits_file,
max_catalog_zoom=max_catalog_zoom,
tile_size=tile_size,
norm_kwargs=norm_kwargs,
rows_per_column=rows_per_column,
prefer_xy=prefer_xy,
catalog_starts_at_one=catalog_starts_at_one,
img_tile_batch_size=img_tile_batch_size,
)