Abstract methods#
basecam
relies on the concept of abstract classes: classes that cannot be instantiated unless they are subclassed and a series of methods and properties have been overridden and implemented. CameraSystem
, BaseCamera
, and the mixins are defined as abc
abstract classes. The process of wrapping a new camera entails to subclass these base classes and override the necessary methods with the concrete implementation for the camera. In addition, some methods such as CameraSystem.setup
are not abstract methods but may need to be overridden depending on the camera implementation.
In this section we describe each on of the existing abstract methods (mandatory and optional) and provide details about how they must be implemented. The abstract methods for mixins are described in their own section.
CameraSystem#
For CameraSystem
the only required abstract method is list_available_cameras
. This method must be overridden to return a list or tuple of the unique identifiers that the native camera system detects are connected. For example
def list_available_cameras(self):
devices_id = self.lib.list_cameras()
# Get the serial number as UID.
serial_numbers = []
for device_id in devices_id:
device = flicamera.lib.FLIDevice(device_id, self.lib.libc)
serial_numbers.append(device.serial)
return serial_numbers
This method can be called manually and it is also used by the camera poller to determine whether new cameras have been connected/disconnected and call add_camera
or remove_camera
automatically.
Some camera libraries need to be initialised in a particular way, or you may want to set some attributes. For that, you can override CameraSystem.setup
def setup(self):
self.lib = flicamera.lib.LibFLI()
return self
Note that this method must return self
. This is so that the camera system can conveniently be instantiated and setup as
camera_system = CameraSystem(CameraClass).setup()
If setup
is not overridden, it does nothing.
Finally, you’ll also need to override the __version__
property, which must return the version of the camera system. The easiest way of doing this is
class MyCameraSystem(CameraSystem):
__version__ = '0.0.1'
BaseCamera#
BaseCamera
must implement all the minimal features to control the camera. This includes connecting and disconnecting it, retrieving its status and unique identifier, and exposing and reading the camera. The camera system and camra configuration passed by add_camera
can be accessed from the camera_system
and camera_config
attributes, respectively.
async def _connect_internal
#
Opens the camera and makes sure it’s ready to be accessed. It must raise a CameraConnectionError
if the connection fails. The signature of the method must include any keyword parameter that is needed to connect the camera, which must be the same as the parameters in the connection_params
section in the camera configuration.
async def _connect_internal(self, serial=None):
- if not serial:
raise basecam.exceptions.CameraConnectionError(‘unknown serial number.’)
self._device = self.camera_system.lib.get_camera(serial)
- if self._device is None:
- raise basecam.exceptions.CameraConnectionError(
f’cannot find camera with serial {serial}.’)
return self
_connect_internal
is called by the public method connect
, which takes care of passing the necessary arguments, notifying the listeners of the CAMERA_CONNECTED
or CAMERA_CONNECT_FAILED
events. If the connection is successful, connect
will set BaseCamera.connected=True
.
def _status_internal
(optional)#
Called by get_status
. Must returns a dictionary of all the relevant status parameters the camera is aware of (temperature, firmware version, serial, binning, etc.) By default, it returns an empty dictionary.
def _status_internal(self):
device = self._device
device._update_temperature()
return dict(model=device.model,
serial=device.serial,
fwrev=device.fwrev,
hwrev=device.hwrev,
hbin=device.hbin,
vbin=device.vbin,
visible_area=device.get_visible_area(),
image_area=device.area,
temperature_ccd=device._temperature['CCD'],
temperature_base=device._temperature['base'],
exposure_time_left=device.get_exposure_time_left(),
cooler_power=device.get_cooler_power())
async def _expose_internal
#
Called by expose
. This method must implement the exposing and reading of a camera frame. The method receives an Exposure
instance for the frame which contains the type of image to take and the exposure time. After taking and reading the exposure, it must set exposure.data
with a numpy array of the data just read. It must raise an ExposureError
if something goes wrong. An example of implementation, taken from flicamera
async def _expose_internal(self, exposure, **kwargs):
TIMEOUT = 5
device = self._device
device.cancel_exposure()
device.set_exposure_time(exposure.exptime)
image_type = exposure.image_type
frametype = 'dark' if image_type in ['dark', 'bias'] else 'normal'
device.start_exposure(frametype)
exposure.obstime = astropy.time.Time.now()
self.notify(CameraEvent.EXPOSURE_INTEGRATING)
start_time = time.time()
time_left = exposure.exptime
while True:
await asyncio.sleep(time_left)
time_left = device.get_exposure_time_left() / 1000.
if time_left == 0:
self.notify(CameraEvent.EXPOSURE_READING)
array = await self.loop.run_in_executor(None, device.read_frame)
exposure.data = array
return
if time.time() - start_time > exposure.exptime + TIMEOUT:
raise ExposureError('timeout waiting for exposure to finish.')
There are a few things to note here. Because some cameras may not differentiate between the process of integrating and reading, _expose_internal
must take care of both. That means that the method is responsible from emitting notifications of when integration and reading starts by calling _notify
with the appropriate CameraEvent
. Other exposure events are handled by expose
.
Long running processes such as integration (if it’s not asynchronous) and reading must be run in an executor
to avoid them blocking the loop (which can be accessed as self.loop
. You can use the executor in any way you want, whether it is using the default executor or any subclass of concurrent.futures.Executor
.
expose
sets Exposure.obstime
just before calling _expose_internal
. However, if you want additional precision on when the exposure exactly started, you can set the value again, which must be and astropy.time.Time
object.
Note that you don’t need to care about stacking or saving the image; that’s all taken care in expose
(the public interface is described in Exposures). However, _expose_internal
must take care of operating the shutter if this is not done automatically by the API when exposing. For this two attributes, has_shutter
and auto_shutter
, can be set when subclassing BaseCamera
to indicate whether the camera has a shutter and if this opens and closes automatically when an exposure is commanded
class MyCamera(BaseCamera):
has_shutter = True
auto_shutter = False
These parameters can also be set in the configuration file
cameras: {
my_camera: {
has_shutter: true
auto_shutter: false
}
}
async def _post_process_internal
(optional)#
Often we want to run additional processing on the exposure we have just taken. For example, we may want to substract a bias level, or do source extraction. One way of accomplishing that is to override the _post_process_internal
method. If overloaded, the method is called after _expose_internal
is complete but before the exposure is returned or written to disk. _post_process_internal
is called with the Exposure
object and additional arguments that have been pased to expose
(these are the same keyword arguments also passed to _expose_internal
). The method must return the exposure again and must take care of issuing event notifications.
async def _post_process_internal(self, exposure, **kwargs):
self.notify(CameraEvent.EXPOSURE_POST_PROCESSING)
bias = numpy.median(exposure.data)
exposure.data -= bias
self.notify(CameraEvent.EXPOSURE_POST_PROCESS_DONE)
return exposure
Alternatively, we could add the bias as a new extension, as descried in Additional HDUs
async def _post_process_internal(self, exposure, **kwargs):
self.notify(CameraEvent.EXPOSURE_POST_PROCESSING)
bias_image = exposure.data.copy()
bias_image[bias_image > 1000] = numpy.median(bias_image) # do NOT do this!
exposure.add_hdu(ImageHDU(data=bias_image, name='BIAS'))
self.notify(CameraEvent.EXPOSURE_POST_PROCESS_DONE)
return exposure
If post-processing fails, there are two possibilities. One can raise an ExposureError
which will be propagated and will prevent the exposure from being returned or written to disk. Alternatively one can notify using EXPOSURE_POST_PROCESS_FAILED
but return the original image.
async def _disconnect_internal
(optional)#
Called by disconnect
. By default does nothing but can be overridden to close the camera. Must raise a CameraConnectionError
if a problem is found.
Summary of abstract methods#
Class |
Name |
Type |
Optional |
Purpose |
---|---|---|---|---|
method |
No |
Return list of unique identifiers of system cameras. |
||
method |
Yes |
Setup the camera system. Must return |
||
|
property |
No |
Return the version of the camera system. |
|
async method |
No |
Establish connection with the camera and make it ready. |
||
method |
Yes |
Return a dictionary with status parameters. |
||
async method |
No |
Expose and read the camera and populate |
||
async method |
Yes |
Run additional post-processing steps on the newly exposed frame. |
||
async method |
Yes |
Disconnect the camera. |