hikari.api.event_manager
#
Core interface for components that manage events in the library.
EventManager #
Bases: ABC
Base interface for event manager implementations.
This is a listener of a hikari.events.base_events.Event
object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate.
consume_raw_event abstractmethod
#
consume_raw_event(event_name: str, shard: GatewayShard, payload: JSONObject) -> None
Consume a raw event.
PARAMETER | DESCRIPTION |
---|---|
event_name | The case-insensitive name of the event being triggered. TYPE: |
shard | Object of the shard that received this event. TYPE: |
payload | Payload of the event being triggered. TYPE: |
RAISES | DESCRIPTION |
---|---|
LookupError | If there is no consumer for the event. |
dispatch abstractmethod
#
Dispatch an event.
PARAMETER | DESCRIPTION |
---|---|
event | The event to dispatch. TYPE: |
Examples:
We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event
.
import attrs
from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake
@attrs.define()
class EveryoneMentionedEvent(Event):
app: RESTAware = attrs.field()
author: User = attrs.field()
'''The user who mentioned everyone.'''
content: str = attrs.field()
'''The message that was sent.'''
message_id: Snowflake = attrs.field()
'''The message ID.'''
channel_id: Snowflake = attrs.field()
'''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent
@bot.listen(MessageCreateEvent)
async def on_message(event):
if "@everyone" in event.content or "@here" in event.content:
event = EveryoneMentionedEvent(
author=event.author,
content=event.content,
message_id=event.id,
channel_id=event.channel_id,
)
bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with hikari.api.event_manager.EventManager.subscribe
.
@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
print(event.user, "just pinged everyone in", event.channel_id)
RETURNS | DESCRIPTION |
---|---|
Future[Any] | A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. |
See Also
Listen : hikari.api.event_manager.EventManager.listen
. Stream : hikari.api.event_manager.EventManager.stream
. Subscribe : hikari.api.event_manager.EventManager.subscribe
. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
. Wait_for: hikari.api.event_manager.EventManager.wait_for
.
get_listeners abstractmethod
#
get_listeners(event_type: type[EventT], /, *, polymorphic: bool = True) -> Collection[CallbackT[EventT]]
Get the listeners for a given event type, if there are any.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to look for. TYPE: |
polymorphic | If TYPE: |
RETURNS | DESCRIPTION |
---|---|
Collection[Callable[[T], Coroutine[Any, Any, None]]] | A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. |
listen abstractmethod
#
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
PARAMETER | DESCRIPTION |
---|---|
*event_types | The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Callable[[T], T] | A decorator for a coroutine function that passes it to |
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
. Stream : hikari.api.event_manager.EventManager.stream
. Subscribe : hikari.api.event_manager.EventManager.subscribe
. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
. Wait_for : hikari.api.event_manager.EventManager.wait_for
.
stream abstractmethod
#
stream(event_type: type[EventT], /, timeout: Union[float, int, None], limit: Optional[int] = None) -> EventStream[EventT]
Return a stream iterator for the given event and sub-events.
Warning
If you use [await stream.open()][] to start the stream then you must also close it with [await stream.close()][] otherwise it may queue events in memory indefinitely.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to listen for. This will listen for subclasses of this type additionally. TYPE: |
timeout | How long this streamer should wait for the next event before ending the iteration. If |
limit | The limit for how many events this should queue at one time before dropping extra incoming events, leave this as |
RETURNS | DESCRIPTION |
---|---|
EventStream[Event] | The async iterator to handle streamed events. This must be started with |
Examples:
with bot.stream(events.ReactionAddEvent, timeout=30).filter(
("message_id", message.id)
) as stream:
async for user_id in stream.map("user_id").limit(50):
...
or using open()
and close()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()
async for user_id in stream.map("user_id").limit(50)
...
stream.close()
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
. Listen : hikari.api.event_manager.EventManager.listen
. Subscribe : hikari.api.event_manager.EventManager.subscribe
. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
. Wait_for : hikari.api.event_manager.EventManager.wait_for
.
subscribe abstractmethod
#
Subscribe a given callback to a given event type.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to listen for. This will also listen for any subclasses of the given type. |
callback | Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded. TYPE: |
Examples:
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent
async def on_message(event): ...
bot.subscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
. Listen : hikari.api.event_manager.EventManager.listen
. Stream : hikari.api.event_manager.EventManager.stream
. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
. Wait_for : hikari.api.event_manager.EventManager.wait_for
.
unsubscribe abstractmethod
#
Unsubscribe a given callback from a given event type, if present.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. |
callback | The callback to unsubscribe. TYPE: |
Examples:
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent
async def on_message(event): ...
bot.unsubscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
. Listen : hikari.api.event_manager.EventManager.listen
. Stream : hikari.api.event_manager.EventManager.stream
. Subscribe : hikari.api.event_manager.EventManager.subscribe
. Wait_for : hikari.api.event_manager.EventManager.wait_for
.
wait_for abstractmethod
async
#
wait_for(event_type: type[EventT], /, timeout: Union[float, int, None], predicate: Optional[PredicateT[EventT]] = None) -> EventT
Wait for a given event to occur once, then return the event.
Warning
Async predicates are not supported.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to listen for. This will listen for subclasses of this type additionally. TYPE: |
predicate | A function taking the event as the single parameter. This should return TYPE: |
timeout | The amount of time to wait before raising an |
RETURNS | DESCRIPTION |
---|---|
Event | The event that was provided. |
RAISES | DESCRIPTION |
---|---|
TimeoutError |
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
. Listen : hikari.api.event_manager.EventManager.listen
. Stream : hikari.api.event_manager.EventManager.stream
. Subscribe : hikari.api.event_manager.EventManager.subscribe
. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
.
EventStream #
Bases: LazyIterator[EventT]
, ABC
A base abstract class for all event streamers.
Unlike hikari.iterators.LazyIterator
(which this extends), an event streamer must be started and closed.
Examples:
A streamer may either be started and closed using with
syntax where hikari.api.event_manager.EventStream.open
and hikari.api.event_manager.EventStream.close
are implicitly called based on context.
with EventStream(app, EventType, timeout=50) as stream:
async for entry in stream:
...
A streamer may also be directly started and closed using the hikari.api.event_manager.EventStream.close
and hikari.api.event_manager.EventStream.open
. Note that if you don't call hikari.api.event_manager.EventStream.close
after opening a streamer when you're finished with it then it may queue events in memory indefinitely.
stream = EventStream(app, EventType, timeout=50)
await stream.open()
async for event in stream:
...
await stream.close()
See Also
LazyIterator : hikari.iterators.LazyIterator
.
close abstractmethod
#
close() -> None
Mark this streamer as closed to stop it from queueing and receiving events.
If called on an already closed streamer then this will do nothing.
Note
[with streamer][] may be used as a short-cut for opening and closing a streamer.
filter abstractmethod
#
Filter the items by one or more conditions.
Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.
All conditions must evaluate to True
for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.
PARAMETER | DESCRIPTION |
---|---|
*predicates | Predicates to invoke. These are functions that take a value and return TYPE: |
**attrs | Alternative to passing 2-tuples. Cannot specify nested attributes using this method. TYPE: |
RETURNS | DESCRIPTION |
---|---|
EventStream[ValueT] | The current stream with the new filter applied. |
open abstractmethod
#
open() -> None
Mark this streamer as opened to let it start receiving and queueing events.
If called on an already started streamer then this will do nothing.
Note
[with streamer][] may be used as a short-cut for opening and closing a stream.