Source code for basecam.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-08-05
# @Filename: utils.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging
import os
import pathlib
from contextlib import suppress
from subprocess import CalledProcessError

from sdsstools.logger import SDSSLogger


__all__ = ["LoggerMixIn", "Poller", "cancel_task", "gzip_async", "subprocess_run_async"]


[docs] class LoggerMixIn(object): """A mixin to provide easy logging with a header.""" log_header = "" logger: SDSSLogger
[docs] def log(self, message, level=logging.DEBUG, use_header=True): """Logs a message with a header.""" header = (self.log_header or "") if use_header else "" self.logger.log(level, header + message)
[docs] class Poller(object): """A task that runs a callback periodically. Parameters ---------- name : str The name of the poller. callback : function or coroutine A function or coroutine to call periodically. delay : float Initial delay between calls to the callback. loop : event loop The event loop to which to attach the task. """ def __init__(self, name, callback, delay=1, loop=None): self.name = name self.callback = callback self._orig_delay = delay self.delay = delay self.loop = loop or asyncio.get_event_loop() # Create two tasks, one for the sleep timer and another for the poller # itself. We do this because we want to be able to cancell the sleep # coroutine if we are going to change the delay. self._sleep_task = None self._task = None
[docs] async def poller(self): """The polling loop.""" while True: try: if asyncio.iscoroutinefunction(self.callback): await self.callback() else: self.callback() except Exception as ee: self.loop.call_exception_handler( {"message": "failed running callback", "exception": ee} ) self._sleep_task = self.loop.create_task(asyncio.sleep(self.delay)) await self._sleep_task
[docs] async def set_delay(self, delay=None, immediate=False): """Sets the delay for polling. Parameters ---------- delay : float The delay between calls to the callback. If `None`, restores the original delay. immediate : bool If `True`, stops the currently running task and sets the new delay. Otherwise waits for the current task to complete. """ # Only change delay if the difference is significant. if delay and abs(self.delay - delay) < 1e-6: return if not self.running: return if immediate: await self.stop() self.start(delay) else: self.delay = delay or self._orig_delay
[docs] def start(self, delay=None): """Starts the poller. Parameters ---------- delay : float The delay between calls to the callback. If not specified, restores the original delay used when the class was instantiated. """ self.delay = delay or self._orig_delay if self.running: return self._task = self.loop.create_task(self.poller()) return self
[docs] async def stop(self): """Cancel the poller.""" if not self.running: return if self._task: self._task.cancel() with suppress(asyncio.CancelledError): await self._task
[docs] async def call_now(self): """Calls the callback immediately.""" restart = False delay = self.delay if self.running: await self.stop() restart = True if asyncio.iscoroutinefunction(self.callback): await self.loop.create_task(self.callback()) else: self.callback() if restart: self.start(delay=delay)
@property def running(self): """Returns `True` if the poller is running.""" if self._task and not self._task.cancelled(): return True return False
[docs] async def cancel_task(task): """Cleanly cancels a task.""" if task and not task.done(): task.cancel() with suppress(asyncio.CancelledError): await task
[docs] async def subprocess_run_async(*args, shell=False): """Runs a command asynchronously. If ``shell=True`` the command will be executed through the shell. In that case the argument must be a single string with the full command. Otherwise, must receive a list of program arguments. Returns the output of stdout. """ if shell: cmd = await asyncio.create_subprocess_shell( args[0], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) cmd_str = args[0] else: cmd = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) cmd_str = " ".join(args) stdout, stderr = await cmd.communicate() if cmd.returncode and cmd.returncode > 0: raise CalledProcessError( cmd.returncode, cmd=cmd_str, output=stdout, stderr=stderr, ) if stdout: return stdout.decode()
[docs] async def gzip_async(file: pathlib.Path | str, complevel=1): """Compresses a file with gzip asynchronously.""" file = str(file) if not os.path.exists(file): raise FileNotFoundError(f"File not found: {file!r}") try: await subprocess_run_async( "gzip", "-" + str(complevel), file, ) except Exception as err: raise OSError(f"Failed compressing file {file}: {err}")