#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-01-10
# @Filename: exposure.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import functools
import os
import pathlib
import re
import shutil
import tempfile
import warnings
from typing import Callable, List, Optional, Tuple, Union
import astropy
import astropy.io.fits as fits
import astropy.time
import astropy.wcs
import numpy
from astropy.io.fits import BinTableHDU, HDUList, ImageHDU
from sdsstools.time import get_sjd
import basecam.camera
import basecam.models
from basecam.exceptions import ExposureError
from .utils import gzip_async
__all__ = ["Exposure", "ImageNamer"]
[docs]
class Exposure(object):
"""Exposure class. Represents an exposure data and its metadata.
An `.Exposure` is defined by the raw image taken by the camera, a series
of attributes that define the exposure (exposure time, shutter, etc.) and
a data model that is used to generate the FITS file for the exposure.
Parameters
----------
camera
The instance of a subclass of `.BaseCamera` that took this exposure.
filename
The filename of the FITS image generated.
data
The exposure raw data, as a 2D array.
fits_model
The `model <.FITSModel>` to create the FITS image. If `None`, a single
extension with a basic header will be used.
wcs
The WCS object describing the astrometry of this exposure.
Attributes
----------
image_type
The type of image, one of ``bias``, ``flat``, ``dark``, ``object``.
exptime : float
The exposure time, in seconds, of a single integration.
exptime_n : float
The total exposure time, in seconds. If the image is stacked, this is
the total time, i.e., the sum of the exposure time of each stacked
image.
stack : int
Number of exposures stacked.
stack_function
Name of the function used for stacking.
filename
The path where to write the image.
wcs : ~astropy.wcs.WCS
The WCS object describing the astrometry of this exposure.
"""
def __init__(
self,
camera: basecam.camera.BaseCamera,
filename: Optional[str] = None,
data: Optional[numpy.ndarray] = None,
fits_model: Optional[basecam.models.FITSModel] = None,
wcs: Optional[astropy.wcs.WCS] = None,
):
self.camera = camera
self.data = data
self.fits_model = fits_model.copy() if fits_model else None
self.filename = filename
self._obstime: astropy.time.Time = astropy.time.Time.now()
self.exptime: Optional[float] = None
self.exptime_n: Optional[float] = None
self.stack: int = 1
self.stack_function: Optional[Callable[..., numpy.ndarray]] = None
self.image_type: Optional[str] = None
self._extra_hdus: List[Tuple[Union[BinTableHDU, ImageHDU], Optional[int]]] = []
self.wcs = wcs
@property
def obstime(self) -> astropy.time.Time:
"""The time at the beginning of the observation.
It must be an `astropy.time.Time` object or a datetime in ISO
format; in the latter case, UTC scale is assumed.
"""
return self._obstime
@obstime.setter
def obstime(self, value: astropy.time.Time):
if isinstance(value, astropy.time.Time):
self._obstime = value
elif isinstance(value, str):
self._obstime = astropy.time.Time(value, format="iso", scale="utc")
else:
raise ExposureError(f"invalid obstime {value}")
[docs]
def add_hdu(self, hdu: Union[BinTableHDU, ImageHDU], index: Optional[int] = None):
"""Adds an HDU to the list of extensions.
Parameters
----------
hdu
The `~astropy.io.fits.BinTableHDU` or `~astropy.io.fits.ImageHDU` HDU to
append.
index
The index where the extension will be added. Extra HDUs are inserted in
order after the FITS model has been generated. ``index=None`` appends the
new HDU at the end of the list. Note that ``astropy.io.fits`` may change
the final order of the extensions to ensure that a primary HDU remains
as the first HDU.
"""
assert isinstance(hdu, (BinTableHDU, ImageHDU)), "invalid HDU type"
self._extra_hdus.append((hdu, index))
[docs]
def to_hdu(self, context={}) -> HDUList:
"""Return an `~astropy.io.fits.HDUList` for the image.
Parameters
----------
context
A dictionary of arguments used to evaluate the FITS model.
Returns
-------
hdulist
A list of HDUs in which the FITS data model has been evaluated
for this exposure.
"""
fits_model = self.fits_model or basecam.models.FITSModel()
hdulist = fits_model.to_hdu(self, context=context)
for hdu, index in self._extra_hdus:
if index:
hdulist.insert(index, hdu)
else:
hdulist.append(hdu)
return hdulist
[docs]
async def write(
self,
filename: Optional[str] = None,
context={},
overwrite=False,
checksum=True,
retry=True,
) -> HDUList:
"""Writes the image to disk.
Parameters
----------
filename
The path where to write the file. If not provided, uses
``Exposure.filename``.
context
A dictionary of arguments used to evaluate the FITS model.
overwrite
Whether to overwrite the image if it exists.
checksum
When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards to the
headers of all HDUs written to the file.
retry
If `True` and the image fails to write, tries again. This can be
useful when writing to network volumen where failures are more
frequent.
Returns
-------
hdulist
A list of HDUs in which the FITS data model has been evaluated
for this exposure.
"""
filename = filename or self.filename
if not filename:
raise ExposureError("filename not set.")
hdulist = self.to_hdu(context=context)
dirname = os.path.realpath(os.path.dirname(filename))
os.makedirs(dirname, exist_ok=True)
loop = asyncio.get_event_loop()
filename = str(filename)
for ntry in range(2):
try:
if filename.endswith(".gz"):
# We compress in a local temporary file, which is faster when we are
# going to save a file across the network.
tmp_name = tempfile.NamedTemporaryFile(suffix=".gz").name
# Astropy compresses with gzip -9 which takes forever.
# Instead we compress manually with -1, which is still pretty good.
writeto_partial = functools.partial(
hdulist.writeto,
tmp_name[:-3],
overwrite=overwrite,
checksum=checksum,
)
await loop.run_in_executor(None, writeto_partial)
await gzip_async(tmp_name[:-3], complevel=1)
shutil.move(tmp_name, filename)
else:
writeto_partial = functools.partial(
hdulist.writeto,
filename,
overwrite=overwrite,
checksum=checksum,
)
await loop.run_in_executor(None, writeto_partial)
break
except Exception as err:
if ntry == 0 and retry is True:
warnings.warn(
f"Retrying after exposure writing failed with error: {err}"
)
continue
else:
raise ExposureError(f"Failed writing exposure to disk: {err}")
# Horrible hack to try to fix compressed headers.
update_hdu = fits.open(filename, mode="update")
for ext in update_hdu:
try:
if "BSCALE" in ext.header:
BSCALE = ext.header.pop("BSCALE", 1)
BZERO = ext.header.pop("BZERO", 2**15)
ext.header["BSCALE"] = BSCALE
ext.header["BZERO"] = BZERO
except Exception:
raise
pass
update_hdu.close()
return hdulist
[docs]
class ImageNamer(object):
"""Creates a new sequential filename for an image.
Parameters
----------
basename
The basename of the image filenames. Must contain a placeholder
``num`` in the place where to insert the sequence number. For example,
``'test-{num:04d}.fits'`` will produce image names ``test-0001.fits``,
``test-0002.fits``, etc. It's also possible to use placeholders for
camera values, e.g. ``{camera.name}-{num}.fits``.
dirname
The directory for the images. Can include an expression based on the
``date`` substitution which is a `~astropy.time.Time.now` object. For
example: ``dirname='/data/{camera.uid}/{int(date.mjd)}'``.
overwrite
If `True`, the sequence will start at 1 regardless of the existing
images. If `False`, the first element in the sequence will be selected
to avoid colliding with any image already existing in the directory.
camera
A `.BaseCamera` instance. It can also be passed when calling the
instance.
reset_sequence
Resets the sequence number when the directory changes (for example when
the MJD rolls over).
Examples
--------
>>> namer = ImageNamer('{camera.name}-{num:04d}.fits', dirname='testdir')
>>> namer(camera=camera)
PosixPath('testdir/my_camera-0001.fits')
>>> namer(camera=camera)
PosixPath('testdir/my_camera-0002.fits')
"""
def __init__(
self,
basename: str = "{camera.name}-{num:04d}.fits",
dirname: str = ".",
overwrite: bool = False,
camera: Optional[basecam.camera.BaseCamera] = None,
reset_sequence: bool = True,
):
assert re.match(r".+(\{num.+\}).+", basename), "invalid basename."
self._basename: str
self.basename = basename
self.dirname: Union[pathlib.Path, str] = pathlib.Path(dirname)
self.overwrite: bool = overwrite
self._last_num: int = 0
self._previous_dirname: str | None = None
self._reset_sequence = reset_sequence
self.camera = camera
@property
def basename(self) -> str:
"""The image name pattern."""
return self._basename
@basename.setter
def basename(self, value: str):
"""Sets the basename."""
# We want to expand everything except the num first so we "double-escape" it.
self._basename = re.sub(r"(\{num.+\})", r"{\1}", value)
[docs]
def get_dirname(self) -> pathlib.Path:
"""Returns the evaluated dirname."""
date = astropy.time.Time.now()
sjd = get_sjd(raise_error=False)
dirname = pathlib.Path(
eval(
f'f"{self.dirname}"',
{},
{"date": date, "camera": self.camera, "sjd": sjd},
)
)
if self._previous_dirname and self._previous_dirname != str(dirname):
if self._reset_sequence:
self._last_num = 0
self._previous_dirname = str(dirname)
return dirname
def _get_num(self, basename: str) -> int:
"""Returns the counter value."""
if self.overwrite:
return self._last_num + 1
regex = re.compile(re.sub(r"\{num.+\}", "(?P<num>[0-9]*?)", basename))
dirname = self.get_dirname()
all_files = list(map(str, dirname.glob("*")))
match_files = list(filter(regex.search, all_files))
if len(match_files) == 0:
return self._last_num + 1
matches = [regex.search(file) for file in match_files]
values = [int(match.group(1)) for match in matches if match is not None]
return max(values) + 1
def __call__(
self,
camera: Optional[basecam.camera.BaseCamera] = None,
update_num: bool = True,
num: Optional[int] = None,
) -> pathlib.Path:
camera = camera or self.camera
if camera:
expanded_basename = self.basename.format(camera=camera)
else:
expanded_basename = self.basename.format()
dirname = self.get_dirname()
num = num or self._get_num(expanded_basename)
path = dirname / expanded_basename.format(num=num)
if update_num:
self._last_num = num
return path