[docs]classEventNotifier(object):"""A registry of clients to be notified of events. Allows to register a listener queue in which to announce events. """def__init__(self):self.listeners=[]
[docs]defregister_listener(self,listener):"""Registers a listener. Parameters ---------- listener : .EventListener An `.EventListener` instance to which to send events for processing. """assertisinstance(listener,EventListener),"invalid listener type."iflistenernotinself.listeners:self.listeners.append(listener)
[docs]defremove_listener(self,listener):"""Removes a listener."""iflistenernotinself.listeners:raiseValueError("listener is not registered.")self.listeners.remove(listener)
[docs]defnotify(self,event,payload={}):"""Sends an event to all listeners. Parameters ---------- event : ~enum.Enum An enumeration value belonging to the ``event_class``. payload : dict A dictionary with the information associated with the event. """assertisinstance(event,enum.Enum),"event is not an enum."forlistenerinself.listeners:iflistener.filter_eventsandeventnotinlistener.filter_events:returnFalselistener.put_nowait((event,payload))returnTrue
[docs]classEventListener(asyncio.Queue):"""An event queue with callbacks. Parameters ---------- filter_events : list A list of enum values of which to be notified. If `None`, all events will be notified. autostart : bool Whether to start the listener as soon as the object is created. """def__init__(self,filter_events=None,autostart=True):asyncio.Queue.__init__(self)self.callbacks=[]self.filter_events=filter_eventsifself.filter_eventsandnotisinstance(self.filter_events,(list,tuple)):self.filter_events=[self.filter_events]self.listerner_task=Noneself._event_waiter=Noneself.__events=set()# A list of events received to be used by wait_forifautostart:self.listerner_task=asyncio.create_task(self._process_queue())asyncdef_process_queue(self):"""Processes the queue and calls callbacks."""whileTrue:try:event,payload=awaitself.get()exceptTypeError:continueforcallbackinself.callbacks:cb=callback(event,payload)ifasyncio.iscoroutine(cb):asyncio.create_task(cb)ifself._event_waiter:self.__events.add(event)self._event_waiter.set()
[docs]asyncdefstart_listening(self):"""Starts the listener task. The queue will be initially purged."""ifself.listerner_taskisnotNone:awaitself.stop_listening()# Purges the queuewhileTrue:try:self.get_nowait()exceptasyncio.QueueEmpty:breakself.listerner_task=asyncio.create_task(self._process_queue())
[docs]asyncdefstop_listening(self):"""Stops the listener task."""ifself.listerner_taskisNone:returnself.listerner_task.cancel()withcontextlib.suppress(asyncio.CancelledError):awaitself.listerner_task
[docs]defregister_callback(self,callback):"""Registers a callback to be called when an event is read. Parameters ---------- callback: A function or coroutine function to be called. The callback receives the event (an enumeration value) as the first argument and the payload associated with that event as a dictionary. If the callback is a coroutine, it is scheduled as a task. """ifcallbacknotinself.callbacks:self.callbacks.append(callback)
[docs]defremove_callback(self,callback):"""De-registers a callback."""ifcallbackinself.callbacks:self.callbacks.remove(callback)else:raiseValueError("callback not registered.")
[docs]asyncdefwait_for(self,events,timeout=None):"""Blocks until a certain event happens. Parameters ---------- events The event to wait for. It can be a list of events, in which case it returns when any of the events has been seen. timeout : float or None Timeout in seconds. If `None`, blocks until the event is received. Returns ------- result : bool Returns a `set` of events received that intersects with the ``events`` that were being waited for. Normally this is a single event, the first one to be seen, but it can be more than one if multiple events that were being waited for happen at the same time. Returns `False` if the routine timed out before receiving the event. """# We need __events to be a list because if two events arrive too close# we may miss some of them.self._event_waiter=asyncio.Event()self.__events=set()events=set(events)ifisinstance(events,(list,tuple))elseset([events])asyncdef_waiter():assertself._event_waiterwhilenotevents.intersection(self.__events):awaitself._event_waiter.wait()# Clear the event waiter. If __last_event == event# then it doesn't matter. If _last_event != event,# this will block in the next loop.self._event_waiter.clear()returnTruetry:awaitasyncio.wait_for(_waiter(),timeout)event_inters=events.intersection(self.__events)returnevent_intersexceptasyncio.TimeoutError:returnFalsefinally:self._event_waiter.set()self._event_waiter=Noneself.__events=set()