[docs]classLoggerMixIn(object):"""A mixin to provide easy logging with a header."""log_header=""logger:SDSSLogger
[docs]deflog(self,message,level=logging.DEBUG,use_header=True):"""Logs a message with a header."""header=(self.log_headeror"")ifuse_headerelse""self.logger.log(level,header+message)
[docs]classPoller(object):"""A task that runs a callback periodically. Parameters ---------- name : str The name of the poller. callback : function or coroutine A function or coroutine to call periodically. delay : float Initial delay between calls to the callback. loop : event loop The event loop to which to attach the task. """def__init__(self,name,callback,delay=1):self.name=nameself.callback=callbackself._orig_delay=delayself.delay=delayself.loop=asyncio.get_running_loop()# Create two tasks, one for the sleep timer and another for the poller# itself. We do this because we want to be able to cancell the sleep# coroutine if we are going to change the delay.self._sleep_task=Noneself._task=None
[docs]asyncdefset_delay(self,delay=None,immediate=False):"""Sets the delay for polling. Parameters ---------- delay : float The delay between calls to the callback. If `None`, restores the original delay. immediate : bool If `True`, stops the currently running task and sets the new delay. Otherwise waits for the current task to complete. """# Only change delay if the difference is significant.ifdelayandabs(self.delay-delay)<1e-6:returnifnotself.running:returnifimmediate:awaitself.stop()self.start(delay)else:self.delay=delayorself._orig_delay
[docs]defstart(self,delay=None):"""Starts the poller. Parameters ---------- delay : float The delay between calls to the callback. If not specified, restores the original delay used when the class was instantiated. """self.delay=delayorself._orig_delayifself.running:returnself._task=asyncio.create_task(self.poller())returnself
[docs]asyncdefstop(self):"""Cancel the poller."""ifnotself.running:returnifself._task:self._task.cancel()withsuppress(asyncio.CancelledError):awaitself._task
[docs]asyncdefcall_now(self):"""Calls the callback immediately."""restart=Falsedelay=self.delayifself.running:awaitself.stop()restart=Trueifasyncio.iscoroutinefunction(self.callback):awaitasyncio.create_task(self.callback())else:self.callback()ifrestart:self.start(delay=delay)
@propertydefrunning(self):"""Returns `True` if the poller is running."""ifself._taskandnotself._task.cancelled():returnTruereturnFalse
[docs]asyncdefcancel_task(task):"""Cleanly cancels a task."""iftaskandnottask.done():task.cancel()withsuppress(asyncio.CancelledError):awaittask
[docs]asyncdefsubprocess_run_async(*args,shell=False):"""Runs a command asynchronously. If ``shell=True`` the command will be executed through the shell. In that case the argument must be a single string with the full command. Otherwise, must receive a list of program arguments. Returns the output of stdout. """ifshell:cmd=awaitasyncio.create_subprocess_shell(args[0],stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,)cmd_str=args[0]else:cmd=awaitasyncio.create_subprocess_exec(*args,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,)cmd_str=" ".join(args)stdout,stderr=awaitcmd.communicate()ifcmd.returncodeandcmd.returncode>0:raiseCalledProcessError(cmd.returncode,cmd=cmd_str,output=stdout,stderr=stderr,)ifstdout:returnstdout.decode()
[docs]asyncdefgzip_async(file:pathlib.Path|str,complevel=1):"""Compresses a file with gzip asynchronously."""file=str(file)ifnotos.path.exists(file):raiseFileNotFoundError(f"File not found: {file!r}")try:awaitsubprocess_run_async("gzip","-"+str(complevel),file,)exceptExceptionaserr:raiseOSError(f"Failed compressing file {file}: {err}")