#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-08-06
# @Filename: notifier.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import asyncio
import contextlib
import enum
__all__ = ["EventNotifier", "EventListener"]
[docs]
class EventNotifier(object):
"""A registry of clients to be notified of events.
Allows to register a listener queue in which to announce events.
"""
def __init__(self):
self.listeners = []
[docs]
def register_listener(self, listener):
"""Registers a listener.
Parameters
----------
listener : .EventListener
An `.EventListener` instance to which to send events for
processing.
"""
assert isinstance(listener, EventListener), "invalid listener type."
if listener not in self.listeners:
self.listeners.append(listener)
[docs]
def remove_listener(self, listener):
"""Removes a listener."""
if listener not in self.listeners:
raise ValueError("listener is not registered.")
self.listeners.remove(listener)
[docs]
def notify(self, event, payload={}):
"""Sends an event to all listeners.
Parameters
----------
event : ~enum.Enum
An enumeration value belonging to the ``event_class``.
payload : dict
A dictionary with the information associated with the event.
"""
assert isinstance(event, enum.Enum), "event is not an enum."
for listener in self.listeners:
if listener.filter_events and event not in listener.filter_events:
return False
listener.put_nowait((event, payload))
return True
[docs]
class EventListener(asyncio.Queue):
"""An event queue with callbacks.
Parameters
----------
filter_events : list
A list of enum values of which to be notified. If `None`,
all events will be notified.
autostart : bool
Whether to start the listener as soon as the object is created.
"""
def __init__(self, loop=None, filter_events=None, autostart=True):
asyncio.Queue.__init__(self)
self.callbacks = []
self.loop = loop or asyncio.get_running_loop()
self.filter_events = filter_events
if self.filter_events and not isinstance(self.filter_events, (list, tuple)):
self.filter_events = [self.filter_events]
self.listerner_task = None
self._event_waiter = None
self.__events = set() # A list of events received to be used by wait_for
if autostart:
self.listerner_task = self.loop.create_task(self._process_queue())
async def _process_queue(self):
"""Processes the queue and calls callbacks."""
while True:
try:
event, payload = await self.get()
except TypeError:
continue
for callback in self.callbacks:
cb = callback(event, payload)
if asyncio.iscoroutine(cb):
self.loop.create_task(cb)
if self._event_waiter:
self.__events.add(event)
self._event_waiter.set()
[docs]
async def start_listening(self):
"""Starts the listener task. The queue will be initially purged."""
if self.listerner_task is not None:
await self.stop_listening()
# Purges the queue
while True:
try:
self.get_nowait()
except asyncio.QueueEmpty:
break
self.listerner_task = self.loop.create_task(self._process_queue())
[docs]
async def stop_listening(self):
"""Stops the listener task."""
if self.listerner_task is None:
return
self.listerner_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.listerner_task
[docs]
def register_callback(self, callback):
"""Registers a callback to be called when an event is read.
Parameters
----------
callback:
A function or coroutine function to be called. The callback
receives the event (an enumeration value) as the first argument
and the payload associated with that event as a dictionary.
If the callback is a coroutine, it is scheduled as a task.
"""
if callback not in self.callbacks:
self.callbacks.append(callback)
[docs]
def remove_callback(self, callback):
"""De-registers a callback."""
if callback in self.callbacks:
self.callbacks.remove(callback)
else:
raise ValueError("callback not registered.")
[docs]
async def wait_for(self, events, timeout=None):
"""Blocks until a certain event happens.
Parameters
----------
events
The event to wait for. It can be a list of events, in which case
it returns when any of the events has been seen.
timeout : float or None
Timeout in seconds. If `None`, blocks until the event is received.
Returns
-------
result : bool
Returns a `set` of events received that intersects with the
``events`` that were being waited for. Normally this is a single
event, the first one to be seen, but it can be more than one if
multiple events that were being waited for happen at the same time.
Returns `False` if the routine timed out before receiving the event.
"""
# We need __events to be a list because if two events arrive too close
# we may miss some of them.
self._event_waiter = asyncio.Event()
self.__events = set()
events = set(events) if isinstance(events, (list, tuple)) else set([events])
async def _waiter():
assert self._event_waiter
while not events.intersection(self.__events):
await self._event_waiter.wait()
# Clear the event waiter. If __last_event == event
# then it doesn't matter. If _last_event != event,
# this will block in the next loop.
self._event_waiter.clear()
return True
try:
await asyncio.wait_for(_waiter(), timeout)
event_inters = events.intersection(self.__events)
return event_inters
except asyncio.TimeoutError:
return False
finally:
self._event_waiter.set()
self._event_waiter = None
self.__events = set()