src.event_bus module¶
- class src.event_bus.EventBus¶
Bases:
object- add_listener(event_name: str, listener: Callable[[...], Any]) None¶
This method adds listener to the set associated with event_name.
- Parameters:
event_name (str) – The name of the event with which to associate listener.
listener (Callable[..., Any]) – The function to be called whenever event_name is emitted.
- Returns:
None
- Return type:
None
- emit(event_name: str, event: Any | None = None) None¶
Creates an asyncio.Task to invoke every listener associated with event_name whenever a publisher invokes emit (this method) with said event_name. Each listener will be passed an optional event object containing data about the event.
- Parameters:
event_name (str) – Name of the event whose listeners to trigger.
- Event:
Arbitrary event data.
- Type:
Optional[Any]
- Returns:
None
- Return type:
None
- remove_listener(event_name: str, listener: Callable[[...], Any]) None¶
This method removes listener from the set associated with event_name, and deletes the evet_name key if listener was the last listener registered to it.
- Parameters:
event_name (str) – The name of the event with which listener is associated.
listener (Callable[..., Any]) – The function associated with event_name to remove from the list of listeners.
- Returns:
None
- Return type:
None