src.event_bus module

class src.event_bus.EventBus

Bases: object

add_listener(event_name: str, listener: Callable[[...], Any]) None

This method adds listener to the set associated with event_name.

Parameters:
  • event_name (str) – The name of the event with which to associate listener.

  • listener (Callable[..., Any]) – The function to be called whenever event_name is emitted.

Returns:

None

Return type:

None

emit(event_name: str, event: Any | None = None) None

Creates an asyncio.Task to invoke every listener associated with event_name whenever a publisher invokes emit (this method) with said event_name. Each listener will be passed an optional event object containing data about the event.

Parameters:

event_name (str) – Name of the event whose listeners to trigger.

Event:

Arbitrary event data.

Type:

Optional[Any]

Returns:

None

Return type:

None

remove_listener(event_name: str, listener: Callable[[...], Any]) None

This method removes listener from the set associated with event_name, and deletes the evet_name key if listener was the last listener registered to it.

Parameters:
  • event_name (str) – The name of the event with which listener is associated.

  • listener (Callable[..., Any]) – The function associated with event_name to remove from the list of listeners.

Returns:

None

Return type:

None