hikari.impl.event_manager_base#

A base implementation for an event manager.

Module Contents#

class hikari.impl.event_manager_base.EventStream(event_manager, event_type, *, timeout, limit=None)[source]#

Bases: hikari.api.event_manager.EventStream[hikari.events.base_events.EventT]

An implementation of an event EventStream class.

Note

While calling EventStream.filter on an active “opened” event stream will return a wrapping lazy iterator, calling it on an inactive “closed” event stream will return the event stream and add the given predicates to the streamer.

hikari.impl.event_manager_base.filtered(event_types, cache_components=config.CacheComponents.NONE, /)[source]#

Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched.

Parameters
event_types

Types of the events this raw consumer method may dispatch. This may either be a singular type of a sequence of types.

Other Parameters
cache_componentshikari.api.config.CacheComponents

Bitfield of the cache components this event may make altering calls to. This defaults to hikari.api.config.CacheComponents.NONE.

class hikari.impl.event_manager_base.EventManagerBase(event_factory, intents, *, cache_components=config.CacheComponents.NONE)[source]#

Bases: hikari.api.event_manager.EventManager

Provides functionality to consume and dispatch events.

Specific event handlers should be in functions named on_xxx where xxx is the raw event name being dispatched in lower-case.

consume_raw_event(event_name, shard, payload)[source]#

Consume a raw event.

Parameters
event_namestr

The case-insensitive name of the event being triggered.

shardhikari.api.shard.GatewayShard

Object of the shard that received this event.

payloadhikari.internal.data_binding.JSONObject

Payload of the event being triggered.

Raises
LookupError

If there is no consumer for the event.

subscribe(event_type, callback, *, _nested=0)[source]#

Subscribe a given callback to a given event type.

Parameters
event_typetyping.Type[T]

The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.

callback

Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.

Examples

The following demonstrates subscribing a callback to message creation events.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.subscribe(MessageCreateEvent, on_message)
get_listeners(event_type, /, *, polymorphic=True)[source]#

Get the listeners for a given event type, if there are any.

Parameters
event_typetyping.Type[T]

The event type to look for. T must be a subclass of hikari.events.base_events.Event.

polymorphicbool

If True, this will also return the listeners for all the event types event_type will dispatch. If False, then only listeners for this class specifically are returned. The default is True.

Returns
typing.Collection[typing.Callable[[T], typing.Coroutine[typing.Any, typing.Any, None]]]

A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.

unsubscribe(event_type, callback)[source]#

Unsubscribe a given callback from a given event type, if present.

Parameters
event_typetyping.Type[T]

The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.

callback

The callback to unsubscribe.

Examples

The following demonstrates unsubscribing a callback from a message creation event.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.unsubscribe(MessageCreateEvent, on_message)
listen(*event_types)[source]#

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

Parameters
*event_typestyping.Optional[typing.Type[T]]

The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. T must be a subclass of hikari.events.base_events.Event.

Returns
typing.Callable[[T], T]

A decorator for a coroutine function that passes it to EventManager.subscribe before returning the function reference.

dispatch(event)[source]#

Dispatch an event.

Parameters
eventhikari.events.base_events.Event

The event to dispatch.

Returns
asyncio.Future[typing.Any]

A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.

Examples

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

import attr

from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake

@attr.define()
class EveryoneMentionedEvent(Event):
    app: RESTAware = attr.field()

    author: User = attr.field()
    '''The user who mentioned everyone.'''

    content: str = attr.field()
    '''The message that was sent.'''

    message_id: Snowflake = attr.field()
    '''The message ID.'''

    channel_id: Snowflake = attr.field()
    '''The channel ID.'''

We can then dispatch our event as we see fit.

from hikari.events.messages import MessageCreateEvent

@bot.listen(MessageCreateEvent)
async def on_message(event):
    if "@everyone" in event.content or "@here" in event.content:
        event = EveryoneMentionedEvent(
            author=event.author,
            content=event.content,
            message_id=event.id,
            channel_id=event.channel_id,
        )

        bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with EventManager.subscribe.

@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
    print(event.user, "just pinged everyone in", event.channel_id)
stream(event_type, /, timeout, limit=None)[source]#

Return a stream iterator for the given event and sub-events.

Warning

If you use await stream.open() to start the stream then you must also close it with await stream.close() otherwise it may queue events in memory indefinitely.

Parameters
event_typetyping.Type[hikari.events.base_events.Event]

The event type to listen for. This will listen for subclasses of this type additionally.

timeouttyping.Optional[int, float]

How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.

limittyping.Optional[int]

The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.

Returns
EventStream[hikari.events.base_events.Event]

The async iterator to handle streamed events. This must be started with with stream: or stream.open() before asynchronously iterating over it.

Examples

with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
    async for user_id in stream.map("user_id").limit(50):
        ...

or using open() and close()

stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()

async for user_id in stream.map("user_id").limit(50)
    ...

stream.close()
async wait_for(event_type, /, timeout, predicate=None)[source]#

Wait for a given event to occur once, then return the event.

Warning

Async predicates are not supported.

Parameters
event_typetyping.Type[hikari.events.base_events.Event]

The event type to listen for. This will listen for subclasses of this type additionally.

predicate

A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.

timeouttyping.Union[float, int, None]

The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in “leaking” of coroutines that never complete if called in an uncontrolled way, so is not recommended).

Returns
hikari.events.base_events.Event

The event that was provided.

Raises
asyncio.TimeoutError

If the timeout is not None and is reached before an event is received that the predicate returns True for.