#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-06-19
# @Filename: camera.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import abc
import asyncio
import logging
import os
import pathlib
import time
import warnings
from logging import INFO, WARNING
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Type,
TypeVar,
Union,
cast,
)
import numpy
from sdsstools import get_logger, read_yaml_file
from sdsstools.logger import SDSSLogger
from basecam.models.fits import FITSModel
from .events import CameraEvent, CameraSystemEvent
from .exceptions import (
CameraConnectionError,
CameraError,
ExposureError,
ExposureWarning,
)
from .exposure import Exposure, ImageNamer
from .models import basic_fits_model
from .notifier import EventNotifier
from .utils import LoggerMixIn, Poller
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
__all__ = ["CameraSystem", "BaseCamera"]
AnyPath = Union[str, pathlib.Path]
_T_CameraSystem = TypeVar("_T_CameraSystem", bound="CameraSystem")
_T_BaseCamera = TypeVar("_T_BaseCamera", bound="BaseCamera")
[docs]
class CameraSystem(LoggerMixIn, Generic[_T_BaseCamera], metaclass=abc.ABCMeta):
"""A base class for the camera system.
Provides an abstract class for the camera system, including camera
connection/disconnection event handling, adding and removing cameras, etc.
While the instance does not handle the loop, it assumes that an event loop
is running elsewhere, for example via `asyncio.loop.run_forever`.
Parameters
----------
camera_class
The subclass of `.BaseCamera` to use with this camera system.
camera_config
A dictionary with the configuration parameters for the multiple
cameras that can be present in the system, or the path to a YAML file.
Refer to the documentation for details on the accepted format.
include
List of camera UIDs that can be connected.
exclude
List of camera UIDs that will be ignored.
logger
The logger instance to use. If `None`, a new logger will be created.
log_header
A string to be prefixed to each message logged.
log_file
The path to which to log. If set, the log will be saved to that file path.
verbose
Logging level for the console handler If `False`, only warnings and errors
will be logged to the console.
"""
camera_class: Union[Type[_T_BaseCamera], None] = None
include = None
exclude = None
def __init__(
self,
camera_class: Optional[Type[_T_BaseCamera]] = None,
camera_config: Optional[Union[AnyPath, Dict[str, Any]]] = None,
include: Optional[List[Any]] = None,
exclude: Optional[List[Any]] = None,
logger: Optional[SDSSLogger] = None,
log_header: Optional[str] = None,
log_file: Optional[AnyPath] = None,
verbose: Optional[Union[bool, int]] = False,
):
self.camera_class = camera_class or self.camera_class
if not self.camera_class or not issubclass(self.camera_class, BaseCamera):
raise ValueError("camera_class must be a subclass of BaseCamera.")
self.include = self.include or include
self.exclude = self.exclude or exclude
logger_name = self.__class__.__name__.upper()
self.log_header = log_header or f"[{logger_name.upper()}]: "
self.logger = logger or cast(SDSSLogger, get_logger(logger_name))
if verbose:
self.logger.sh.setLevel(int(verbose))
else:
self.logger.sh.setLevel(logging.WARNING)
if log_file:
self.logger.start_file_logger(str(log_file))
if self.logger.fh:
assert self.logger.fh.formatter
self.logger.fh.formatter.converter = time.gmtime
self.log(f"logging to {log_file}")
self.loop = asyncio.get_event_loop()
self.loop.set_exception_handler(self.logger.asyncio_exception_handler)
#: list: The list of cameras being handled.
self.cameras: List[_T_BaseCamera] = []
self._camera_poller = None
#: .EventNotifier: Notifies of `.CameraSystemEvent` and `.CameraEvent` events.
self.notifier = EventNotifier()
self._config: Optional[Dict[str, Any]] = None
if camera_config:
if isinstance(camera_config, dict):
self._config = camera_config.copy()
else:
self._config = cast(Dict[str, Any], read_yaml_file(str(camera_config)))
self.log(f"read configuration file from {camera_config}")
# If the config has a section named "cameras", prefer that.
if self._config is not None:
if isinstance(self._config.get("cameras", None), dict):
self._config = self._config["cameras"]
assert self._config is not None
uids = [self._config[camera]["uid"] for camera in self._config]
if len(uids) != len(set(uids)):
raise ValueError("repeated UIDs in the configuration data.")
self.running: bool = False
[docs]
def setup(self):
"""Setup custom camera system.
To be overridden by the subclass if needed. Must return ``self``.
"""
self.running = True
return self
[docs]
def get_camera_config(
self,
name: Optional[str] = None,
uid: Optional[str] = None,
) -> Union[Dict[str, Any], None]:
"""Gets camera parameters from the configuration.
Parameters
----------
name
The name of the camera.
uid
The unique identifier of the camera.
Returns
-------
camera_params
A dictionary with the camera parameters. If the camera is not
present in the configuration, returns `None`.
"""
assert name or uid, "either a name or unique identifier are required."
if not self._config:
return None
if name:
if name not in self._config:
return None
config_params = {"name": name}
config_params.update(self._config[name])
return config_params
else:
for name_ in self._config:
if self._config[name_]["uid"] == uid:
config_params = {"name": name_}
config_params.update(self._config[name_])
return config_params
return None
[docs]
async def start_camera_poller(
self: _T_CameraSystem,
interval: float = 1.0,
) -> _T_CameraSystem:
"""Monitors changes in the camera list.
Issues calls to `.list_available_cameras` on an interval and compares
the connected cameras with those in `.cameras`. New found cameras
are added and cameras not present are cleanly removed.
This poller should not be initiated if the camera API already provides
a framework to detect camera events. In that case the event handler
should be wrapped to call `.on_camera_connected` and
`.on_camera_disconnected`.
Similarly, if you prefer to handle camera connections manually, avoid
starting this poller.
Parameters
----------
interval
Interval, in seconds, on which the connected cameras are polled
for changes.
"""
if self._camera_poller is None:
self._camera_poller = Poller("camera_poller", self._check_cameras)
self._camera_poller.start(delay=interval)
self.log("started camera poller.")
return self
[docs]
async def stop_camera_poller(self):
"""Stops the camera poller if it is running."""
if self._camera_poller and self._camera_poller.running:
await self._camera_poller.stop()
async def _check_cameras(self):
"""Checks the list of connected cameras.
This is an internal function to be used only by the camera poller.
"""
try:
uids = self.list_available_cameras()
except NotImplementedError:
self.log(
"get_connected cameras is not implemented. Stopping camera poller.",
logging.ERROR,
)
# It's important to not do await self.stop_camera_poller()
# because that would stop the poller from inside the callback
# and that creates a max recursion error.
self.loop.create_task(self.stop_camera_poller())
return False
# Checks cameras that are handled but not connected.
to_remove = []
for camera in self.cameras:
if camera.uid not in uids and not camera.force:
self.log(
f"camera with UID {camera.uid!r} ({camera.name}) has been removed.",
logging.INFO,
)
to_remove.append(camera.name)
for camera_name in to_remove:
await self.remove_camera(camera_name)
# Checks cameras that are connected.
camera_uids = [camera.uid for camera in self.cameras]
for uid in uids:
if uid in camera_uids:
continue
elif self.include and uid not in self.include:
continue
elif self.exclude and uid in self.exclude:
continue
else:
self.log(f"detected new camera with UID {uid!r}.", INFO)
await self.add_camera(uid=uid)
[docs]
@abc.abstractmethod
def list_available_cameras(self) -> List[str]:
"""Lists the connected cameras as reported by the camera system.
Returns
-------
connected_cameras
A list of unique identifiers of cameras connected to the system.
"""
raise NotImplementedError
[docs]
async def add_camera(
self,
name: Optional[str] = None,
uid: Optional[str] = None,
force: bool = False,
autoconnect: bool = True,
**kwargs,
) -> BaseCamera:
"""Adds a new `camera <.BaseCamera>` instance to `.cameras`.
The configuration values (if any) found in the configuration that
match the ``name`` or ``uid`` of the camera will be used. These
parameters can be overridden by providing additional keyword values
for the corresponding parameters.
Parameters
----------
name
The name of the camera.
uid
The unique identifier for the camera.
force
Forces the camera to stay in the `.CameraSystem` list even if it
does not appear in the system camera list.
autoconnect
Whether to autoconnect the camera.
kwargs
Other arguments to be passed to `.BaseCamera` during
initialisation. These parameters override the default configuration
where applicable.
Returns
-------
camera
The new camera instance.
"""
assert name or uid, "either name or uid are required."
camera_params = self.get_camera_config(name=name, uid=uid) or {}
camera_params.update(kwargs)
# At this point we do need to have an UID, either passed directly to
# add_camera or from the configuration.
uid = camera_params.get("uid", uid)
if not uid:
raise CameraError(
"No camera UID provided or found in "
f"the configuration for camera {name!r}."
)
name = camera_params.pop("name", uid)
connected_camera = self.get_camera(name=name, uid=uid)
if connected_camera:
self.log(f"camera {name!r} is already connected.", WARNING)
return connected_camera
self.log(f"adding camera {name!r} with parameters {camera_params!r}")
assert self.camera_class
camera = self.camera_class(
uid, self, name=name, force=force, camera_params=camera_params
)
# If the autoconnect parameter is set, connects the camera.
if autoconnect and camera_params.pop("autoconnect", True):
await camera.connect()
else:
self.log(f"camera {name!r} added but not connected.")
self.cameras.append(camera)
# Notify event
self.notifier.notify(CameraSystemEvent.CAMERA_ADDED, camera_params)
return camera
[docs]
async def remove_camera(
self,
name: Optional[str] = None,
uid: Optional[str] = None,
):
"""Removes a camera, cancelling any ongoing process.
Parameters
----------
name
The name of the camera.
uid
The unique identifier for the camera.
"""
for camera in self.cameras:
if camera.name == name or camera.uid == uid:
await camera.disconnect()
self.cameras.remove(camera)
self.log(f"removed camera {camera.name!r}.")
# Notify event
self.notifier.notify(
CameraSystemEvent.CAMERA_REMOVED,
{"uid": camera.uid, "name": camera.name},
)
return
raise ValueError(f"camera {name} is not connected.")
[docs]
def get_camera(
self,
name: Optional[str] = None,
uid: Optional[str] = None,
) -> Union[BaseCamera, Literal[False]]:
"""Gets a camera matching a name or UID.
If only one camera is connected and the method is called without
arguments, returns the camera.
Parameters
----------
name
The name of the camera.
uid
The unique identifier for the camera.
Returns
-------
camera
The connected camera (must be one of the cameras in `.cameras`)
whose name or UID matches the input parameters. Returns `False` if the
camera was not found.
"""
if name is None and uid is None and len(self.cameras) == 1:
return self.cameras[0]
for camera in self.cameras:
if name and camera.name == name:
if uid:
assert camera.uid == uid, "camera name does not match uid."
return camera
elif uid and camera.uid == uid:
return camera
return False
[docs]
def on_camera_connected(self, uid: str) -> asyncio.Task[BaseCamera]:
"""Event handler for a newly connected camera.
Parameters
----------
uid
The unique identifier for the camera.
Returns
-------
task
The task calling `.add_camera`.
"""
return self.loop.create_task(self.add_camera(uid=uid))
[docs]
def on_camera_disconnected(self, uid: str) -> asyncio.Task[None]:
"""Event handler for a camera that was disconnected.
Parameters
----------
uid
The unique identifier for the camera.
Returns
-------
task
The task calling `.remove_camera`.
"""
return self.loop.create_task(self.remove_camera(uid=uid))
[docs]
async def disconnect(self):
"""Shuts down the system."""
if self._camera_poller and self._camera_poller.running:
await self.stop_camera_poller()
self.running = False
@property
@abc.abstractmethod
def __version__(self) -> str:
"""The version of the camera library."""
raise NotImplementedError
[docs]
class BaseCamera(LoggerMixIn, metaclass=abc.ABCMeta):
"""A base class for wrapping a camera API in a standard implementation.
Instantiating the `.BaseCamera` class does not open the camera and makes
it ready to be used. To do that, call and await `.connect`.
Parameters
----------
uid
The unique identifier for the camera.
camera_system
The camera system handling this camera.
name
The name of the camera. If not set, the ``uid`` will be used.
force
Forces the camera to stay in the `.CameraSystem` list even if it
does not appear in the system camera list.
image_namer
An instance of `.ImageNamer` used to sequentially assign predefined
names to new exposure images, or a dictionary of parameters to be
passed to `.ImageNamer` to create a new instance. If not set, creates
an image namer with format ``{camera.name}-{num:04d}.fits``.
camera_params
Parameters used to define how to connect to the camera, its geometry,
initialisation parameters, etc. The format of the parameters must
follow the structure of the configuration file.
Attributes
----------
connected : bool
Whether the camera is open and connected.
fits_model : .FITSModel
An instance of `.FITSModel` defining the data model of the images
taken by the camera. If not defined, a basic model will be used.
image_namer : .ImageNamer
And instance of `.ImageNamer` to determine the default file path for
new exposures. If not provided, uses ``'{camera.name}-{num:04d}.fits'``
where ``camera.name`` is the name of the camera, and ``num`` is a
sequential counter.
"""
fits_model = basic_fits_model
image_namer = ImageNamer(
"{camera.name}-{num:04d}.fits",
dirname=".",
overwrite=False,
)
def __init__(
self,
uid: str,
camera_system: CameraSystem,
name: Optional[str] = None,
force: bool = False,
image_namer: Optional[Union[ImageNamer, dict]] = None,
camera_params={},
):
self.uid = uid
self.name = name or self.uid
self.camera_system = camera_system
self.loop = camera_system.loop
self.connected = False
self.force = force
self.camera_params = camera_params
if "uid" in camera_params and camera_params["uid"] != self.uid:
raise CameraError("Mismatching UIDs between input and camera parameters.")
self._status = {}
if isinstance(image_namer, ImageNamer):
self.image_namer = image_namer
self.image_namer.camera = self
elif isinstance(image_namer, dict):
self.image_namer = ImageNamer(**image_namer, camera=self)
elif image_namer is None:
pass
else:
raise CameraError(f"Invalid image namer {image_namer!r}")
self.image_namer.camera = self
self.__version__ = self.camera_system.__version__
# Get the same logger as the camera system but use the UID or
# name of the camera as prefix for messages from this camera.
self.log_header = f"[{self.name.upper()}]: "
self.logger = self.camera_system.logger
[docs]
async def connect(
self: _T_BaseCamera,
force: bool = False,
**kwargs,
) -> _T_BaseCamera:
"""Connects the camera and performs all the necessary setup.
Parameters
----------
force
Forces the camera to reconnect even if it's already connected.
kwargs
A series of keyword arguments to be passed to the internal
implementation of ``connect`` for a the camera. If provided,
they override the ``connection`` settings in the
configuration for this camera.
"""
if self.connected and not force:
raise CameraConnectionError("the camera is already connected.")
conn_params = self.camera_params.get("connection", {}).copy()
conn_params.update(kwargs)
try:
await self._connect_internal(**conn_params)
self.connected = True
except CameraConnectionError as ee:
self.connected = False
self.notify(CameraEvent.CAMERA_CONNECT_FAILED)
raise CameraConnectionError(f"failed to connect: {ee}")
self.log("camera connected.")
self.notify(CameraEvent.CAMERA_CONNECTED)
return self
[docs]
def notify(
self,
event: CameraEvent,
extra_payload: Optional[Dict[str, Any]] = None,
):
"""Notifies an event."""
payload = self._get_basic_payload()
payload.update(extra_payload or {})
self.camera_system.notifier.notify(event, payload)
[docs]
def _get_basic_payload(self) -> Dict[str, Any]:
"""Returns a dictionary with basic payload for notifying events."""
return {"uid": self.uid, "name": self.name, "camera": self}
[docs]
@abc.abstractmethod
async def _connect_internal(self, **conn_params):
"""Internal method to connect the camera.
Must raise `.CameraConnectionError` if the connection fails.
"""
raise NotImplementedError
@property
def status(self):
"""Returns the cached status. Equivalent to ``get_status(update=False)``."""
return self.get_status(update=False)
[docs]
def get_status(self, update: bool = False) -> Dict[str, Any]:
"""Returns a dictionary with the camera status values.
Parameters
----------
update
If `True`, retrieves the status from the camera; otherwise returns
the last cached status.
"""
if update or not self._status:
try:
self._status = self._status_internal()
except Exception as err:
self.log(f"Failed to receive status: {err}", WARNING)
self._status = {}
return self._status
[docs]
def _status_internal(self) -> Dict[str, Any]:
"""Gets a dictionary with the status of the camera.
This method is intended to be overridden by the specific camera.
Returns
-------
status
A dictionary with status values from the camera (e.g.,
temperature, cooling status, firmware information, etc.)
"""
return {}
[docs]
async def expose(
self,
exptime: float,
image_type: str = "object",
stack: int = 1,
stack_function: Callable[..., numpy.ndarray] = numpy.median,
fits_model: Optional[FITSModel] = None,
filename: Optional[str] = None,
num: Optional[int] = None,
write: bool = False,
postprocess: bool = True,
**kwargs,
) -> Exposure:
"""Exposes the camera.
This is the general method to expose the camera. It receives the
exposure time and type of exposure, along with other necessary
arguments, and returns an `.Exposure` instance with the data and
metadata for the image.
The `.Exposure` object is created and populated by `.expose` and passed
to the parent mixins for the camera class. It is also passed to the
internal expose method where the concrete implementation of the camera
expose system happens.
Parameters
----------
exptime
The exposure time for the image, in seconds.
image_type
The type of image (``{'bias', 'dark', 'object', 'flat'}``).
stack
Number of images to stack.
stack_function
The function to apply to the several images taken to generate the
final stacked image. Defaults to the median.
fits_model
A `.FITSModel` that can be used to override the default model
for the camera.
filename
The path where to write the image. If not given, a new name is
automatically assigned based on the camera instance of
`.ImageNamer`.
num
If specified, a ``num`` value to pass wot `.ImageNamer` to define
the sequence number of the exposure filename.
write
If `True`, writes the image to disk immediately.
postprocess
Whether to run the post-process stage. This argument is ignored if the
subclass of `.BaseCamera` does not override ``_post_process_internal``.
kwargs
Other keyword arguments to pass to the internal expose and post-process
methods.
Returns
-------
exposure
An `.Exposure` object containing the image data, exposure time,
and header datamodel.
"""
exptime = exptime or 0.0
fits_model = fits_model or self.fits_model
if exptime < 0:
raise ExposureError("exposure time cannot be < 0")
if image_type == "bias" and exptime > 0:
warnings.warn("setting exposure time for bias to 0", ExposureWarning)
exptime = 0.0
exposures: List[Exposure] = []
notif_payload = {
"exptime": exptime,
"remaining_time": exptime,
"image_type": image_type,
"n_stack": stack,
"current_stack": 1,
}
for idx in range(stack):
notif_payload.update({"current_stack": idx + 1})
exposure = Exposure(self, fits_model=fits_model)
exposure.image_type = image_type
exposure.exptime = exptime
self.notify(CameraEvent.EXPOSURE_INTEGRATING, notif_payload)
try:
await self._expose_internal(exposure, **kwargs)
except Exception as err:
self.notify(CameraEvent.EXPOSURE_FAILED, {"error": str(err)})
raise ExposureError(str(err))
if exposure.data is None:
error = "data was not taken."
self.notify(CameraEvent.EXPOSURE_FAILED, {"error": error})
raise ExposureError(error)
exposures.append(exposure)
if len(exposures) > 1:
data = [
exp.data.astype(numpy.float32)
for exp in exposures
if exp.data is not None
]
stacked_data = stack_function(numpy.stack(data), axis=0)
exposures[0].data = stacked_data
exposure = exposures[0]
exposure.exptime_n = exptime * stack
exposure.stack = stack
exposure.stack_function = stack_function
if exposure.filename is not None:
if filename:
exposure.filename = str(filename)
else:
exposure.filename = str(filename or self.image_namer(self, num=num))
notif_payload = {
"exptime_total": exposure.exptime_n,
"image_type": image_type,
"current_stack": 0,
"n_stack": 0,
}
self.notify(CameraEvent.EXPOSURE_DONE, notif_payload)
if postprocess:
try:
exposure = await self._post_process_internal(exposure, **kwargs)
except ExposureError:
self.notify(CameraEvent.EXPOSURE_POST_PROCESS_FAILED)
raise
if write:
filename = os.path.realpath(str(exposure.filename))
self.notify(CameraEvent.EXPOSURE_WRITING, {"filename": filename})
try:
await exposure.write()
except Exception as err:
raise ExposureError(f"Failed writing image to disk: {err}")
self.notify(CameraEvent.EXPOSURE_WRITTEN, {"filename": filename})
return exposure
[docs]
@abc.abstractmethod
async def _expose_internal(self, exposure: Exposure, **kwargs) -> Exposure:
"""Internal method to handle camera exposures.
This method handles the entire process of exposing and reading the
camera and return an array or FITS object with the exposed frame.
If necessary, it must handle the correct operation of the shutter
before and after the exposure.
The method receives an `.Exposure` instance in which the exposure
time, image type, and other parameters have been set by `.expose`.
Additional parameters can be passed via the ``kwargs`` arguments.
The camera instance can be accessed via the ``Exposure.camera``
attribute.
The method is responsible for adding any relevant attributes in the
exposure instance. The time of the start of the exposure is initially
set just before `._expose_internal` is called, but if necessary it
must be updated when the camera is actually commanded to expose (or,
if flushing occurs, when the integration starts). Finally, it must
set ``Exposure.data`` with the image as a Numpy array.
Parameters
----------
exposure
The exposure being taken.
kwargs
Other keyword arguments to configure the exposure.
"""
raise NotImplementedError
[docs]
async def _post_process_internal(self, exposure: Exposure, **kwargs) -> Exposure:
"""Performs post-process on an exposure.
This method can be overridden to perform additional processing on the already
read exposure. It's called by `.expose` after `._expose_internal` is complete
but before the image is returned to the user or written to disk.
The user is responsible for issuing any events. If an `.ExposureError` is
raised, the error will be propagated and the exposure will not be returned.
"""
return exposure
[docs]
async def disconnect(self) -> bool:
"""Shuts down the camera."""
try:
await self._disconnect_internal()
except CameraConnectionError as ee:
self.notify(CameraEvent.CAMERA_DISCONNECT_FAILED)
raise CameraConnectionError(f"failed to disconnect: {ee}")
self.log("camera has been disconnected.")
self.notify(CameraEvent.CAMERA_DISCONNECTED)
return True
[docs]
async def _disconnect_internal(self):
"""Internal method to disconnect a camera.
Must raise a `.CameraConnectionError` if the shutdown fails.
"""
pass