hikari.impl.bot#

Basic implementation the components for a single-process bot.

Module Contents#

class hikari.impl.bot.GatewayBot(token, *, allow_color=True, banner='hikari', executor=None, force_color=False, cache_settings=None, http_settings=None, intents=intents_.Intents.ALL_UNPRIVILEGED, auto_chunk_members=True, logs='INFO', max_rate_limit=300.0, max_retries=3, proxy_settings=None, rest_url=None)[source]#

Bases: hikari.traits.GatewayBotAware

Basic auto-sharding bot implementation.

This is the class you will want to use to start, control, and build a bot with.

Note

Settings that control the gateway session are provided to the GatewayBot.run and GatewayBot.start functions in this class. This is done to allow you to contextually customise details such as sharding configuration without having to re-initialize the entire application each time.

Parameters:
tokenstr

The bot token to sign in with.

Other Parameters:
allow_colorbool

Defaulting to True, this will enable coloured console logs on any platform that is a TTY. Setting a "CLICOLOR" environment variable to any non `0` value will override this setting.

Users should consider this an advice to the application on whether it is safe to show colours if possible or not. Since some terminals can be awkward or not support features in a standard way, the option to explicitly disable this is provided. See force_color for an alternative.

bannertyping.Optional[str]

The package to search for a banner.txt in. Defaults to "hikari" for the "hikari/banner.txt" banner. Setting this to None will disable the banner being shown.

executortyping.Optional[concurrent.futures.Executor]

Defaults to None. If non-None, then this executor is used instead of the concurrent.futures.ThreadPoolExecutor attached to the asyncio.AbstractEventLoop that the bot will run on. This executor is used primarily for file-IO.

While mainly supporting the concurrent.futures.ThreadPoolExecutor implementation in the standard lib, Hikari’s file handling systems should also work with concurrent.futures.ProcessPoolExecutor, which relies on all objects used in IPC to be pickleable. Many third-party libraries will not support this fully though, so your mileage may vary on using ProcessPoolExecutor implementations with this parameter.

force_colorbool

Defaults to False. If True, then this application will __force__ colour to be used in console-based output. Specifying a "CLICOLOR_FORCE" environment variable with a non-"0" value will override this setting.

This will take precedence over allow_color if both are specified.

cache_settingstyping.Optional[hikari.impl.config.CacheSettings]

Optional cache settings. If unspecified, will use the defaults.

http_settingstyping.Optional[hikari.impl.config.HTTPSettings]

Optional custom HTTP configuration settings to use. Allows you to customise functionality such as whether SSL-verification is enabled, what timeouts aiohttp should expect to use for requests, and behavior regarding HTTP-redirects.

intentshikari.intents.Intents

Defaults to hikari.intents.Intents.ALL_UNPRIVILEGED. This allows you to change which intents your application will use on the gateway. This can be used to control and change the types of events you will receive.

auto_chunk_membersbool

Defaults to True. If False, then no member chunks will be requested automatically, even if there are reasons to do so.

All following statements must be true to automatically request chunks:

  1. auto_chunk_members is True.

  2. The members intent is enabled.

  3. The server is marked as “large” or the presences intent is not enabled (since Discord only sends other members when presences are declared, we should also chunk small guilds if the presences are not declared).

  4. The members cache is enabled or there are listeners for the MemberChunkEvent.

logstyping.Union[None, LoggerLevel, typing.Dict[str, typing.Any]]

Defaults to "INFO".

If None, then the Python logging system is left uninitialized on startup, and you will need to configure it manually to view most logs that are output by components of this library.

If one of the valid values in a LoggerLevel, then this will match a call to colorlog.basicConfig (a facade for logging.basicConfig with additional conduit for enabling coloured logging levels) with the level kwarg matching this value.

If a typing.Dict[str, typing.Any] equivalent, then this value is passed to logging.config.dictConfig to allow the user to provide a specialized logging configuration of their choice. If any handlers are defined in the dict, default handlers will not be setup.

As a side note, you can always opt to leave this on the default value and then use an incremental logging.config.dictConfig that applies any additional changes on top of the base configuration, if you prefer. An example of can be found in the Example section.

Note that "TRACE_HIKARI" is a library-specific logging level which is expected to be more verbose than "DEBUG".

max_rate_limitfloat

The max number of seconds to backoff for when rate limited. Anything greater than this will instead raise an error.

This defaults to five minutes if left to the default value. This is to stop potentially indefinitely waiting on an endpoint, which is almost never what you want to do if giving a response to a user.

You can set this to float("inf") to disable this check entirely.

Note that this only applies to the REST API component that communicates with Discord, and will not affect sharding or third party HTTP endpoints that may be in use.

max_retriestyping.Optional[int]

Maximum number of times a request will be retried if it fails with a 5xx status. Defaults to 3 if set to None.

proxy_settingstyping.Optional[hikari.impl.config.ProxySettings]

Custom proxy settings to use with network-layer logic in your application to get through an HTTP-proxy.

rest_urltyping.Optional[str]

Defaults to the Discord REST API URL if None. Can be overridden if you are attempting to point to an unofficial endpoint, or if you are attempting to mock/stub the Discord API for any reason. Generally you do not want to change this.

Examples

Setting up logging using a dictionary configuration:

import os

import hikari

# We want to make gateway logs output as DEBUG, and TRACE for all ratelimit content.
bot = hikari.GatewayBot(
    token=os.environ["BOT_TOKEN"],
    logs={
        "version": 1,
        "incremental": True,
        "loggers": {
            "hikari.gateway": {"level": "DEBUG"},
            "hikari.ratelimits": {"level": "TRACE_HIKARI"},
        },
    },
)
shards: Mapping[int, hikari.api.shard.GatewayShard][source]#

Mapping of shards in this application instance.

Each shard ID is mapped to the corresponding shard instance.

If the application has not started, it is acceptable to assume the result of this call will be an empty mapping.

dispatch(event)[source]#

Dispatch an event.

Parameters:
eventhikari.events.base_events.Event

The event to dispatch.

Returns:
asyncio.Future[typing.Any]

A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.

Examples

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

import attr

from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake

@attr.define()
class EveryoneMentionedEvent(Event):
    app: RESTAware = attr.field()

    author: User = attr.field()
    '''The user who mentioned everyone.'''

    content: str = attr.field()
    '''The message that was sent.'''

    message_id: Snowflake = attr.field()
    '''The message ID.'''

    channel_id: Snowflake = attr.field()
    '''The channel ID.'''

We can then dispatch our event as we see fit.

from hikari.events.messages import MessageCreateEvent

@bot.listen(MessageCreateEvent)
async def on_message(event):
    if "@everyone" in event.content or "@here" in event.content:
        event = EveryoneMentionedEvent(
            author=event.author,
            content=event.content,
            message_id=event.id,
            channel_id=event.channel_id,
        )

        bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with hikari.impl.event_manager_base.EventManager.subscribe.

@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
    print(event.user, "just pinged everyone in", event.channel_id)
get_listeners(event_type, /, *, polymorphic=True)[source]#

Get the listeners for a given event type, if there are any.

Parameters:
event_typetyping.Type[EventT]

The event type to look for. EventT must be a subclass of hikari.events.base_events.Event.

polymorphicbool

If True, this will also return the listeners of the subclasses of the given event type. If False, then only listeners for this class specifically are returned. The default is True.

Returns:
typing.Collection[typing.Callable[[EventT], typing.Coroutine[typing.Any, typing.Any, None]]]

A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.

listen(*event_types)[source]#

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

Parameters:
*event_typestyping.Optional[typing.Type[EventT]]

The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.

EventT must be a subclass of hikari.events.base_events.Event.

Returns:
typing.Callable[[EventT], EventT]

A decorator for a coroutine function that passes it to EventManager.subscribe before returning the function reference.

static print_banner(banner, allow_color, force_color, extra_args=None)[source]#

Print the banner.

This allows library vendors to override this behaviour, or choose to inject their own “branding” on top of what hikari provides by default.

Normal users should not need to invoke this function, and can simply change the banner argument passed to the constructor to manipulate what is displayed.

Parameters:
bannertyping.Optional[str]

The package to find a banner.txt in.

allow_colorbool

A flag that allows advising whether to allow color if supported or not. Can be overridden by setting a "CLICOLOR" environment variable to a non-"0" string.

force_colorbool

A flag that allows forcing color to always be output, even if the terminal device may not support it. Setting the "CLICOLOR_FORCE" environment variable to a non-"0" string will override this.

This will take precedence over allow_color if both are specified.

extra_argstyping.Optional[typing.Dict[str, str]]

If provided, extra $-substitutions to use when printing the banner. Default substitutions can not be overwritten.

Raises:
ValueError

If extra_args contains a default $-substitution.

run(*, activity=None, afk=False, asyncio_debug=None, check_for_updates=True, close_passed_executor=False, close_loop=True, coroutine_tracking_depth=None, enable_signal_handlers=None, idle_since=None, ignore_session_start_limit=False, large_threshold=250, propagate_interrupts=False, status=presences.Status.ONLINE, shard_ids=None, shard_count=None)[source]#

Start the application and block until it’s finished running.

Other Parameters:
activitytyping.Optional[hikari.presences.Activity]

The initial activity to display in the bot user presence, or None (default) to not show any.

afkbool

The initial AFK state to display in the bot user presence, or False (default) to not show any.

asyncio_debugbool

Defaults to False. If True, then debugging is enabled for the asyncio event loop in use.

check_for_updatesbool

Defaults to True. If True, will check for newer versions of hikari on PyPI and notify if available.

close_passed_executorbool

Defaults to False. If True, any custom concurrent.futures.Executor passed to the constructor will be shut down when the application terminates. This does not affect the default executor associated with the event loop, and will not do anything if you do not provide a custom executor to the constructor.

close_loopbool

Defaults to True. If True, then once the bot enters a state where all components have shut down permanently during application shut down, then all asyncgens and background tasks will be destroyed, and the event loop will be shut down.

This will wait until all hikari-owned aiohttp connectors have had time to attempt to shut down correctly (around 250ms), and on Python 3.9 and newer, will also shut down the default event loop executor too.

coroutine_tracking_depthtyping.Optional[int]

Defaults to None. If an integer value and supported by the interpreter, then this many nested coroutine calls will be tracked with their call origin state. This allows you to determine where non-awaited coroutines may originate from, but generally you do not want to leave this enabled for performance reasons.

enable_signal_handlerstyping.Optional[bool]

Defaults to True if this is started in the main thread.

If on a non-Windows OS with builtin support for kernel-level POSIX signals, then setting this to True will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself.

idle_sincetyping.Optional[datetime.datetime]

The datetime.datetime the user should be marked as being idle since, or None (default) to not show this.

ignore_session_start_limitbool

Defaults to False. If False, then attempting to start more sessions than you are allowed in a 24 hour window will throw a RuntimeError rather than going ahead and hitting the IDENTIFY limit, which may result in your token being reset. Setting to True disables this behavior.

large_thresholdint

Threshold for members in a guild before it is treated as being “large” and no longer sending member details in the GUILD CREATE event. Defaults to 250.

propagate_interruptsbool

Defaults to False. If set to True, then any internal hikari.errors.HikariInterrupt that is raises as a result of catching an OS level signal will result in the exception being rethrown once the application has closed. This can allow you to use hikari signal handlers and still be able to determine what kind of interrupt the application received after it closes. When False, nothing is raised and the call will terminate cleanly and silently where possible instead.

shard_idstyping.Optional[typing.Sequence[int]]

The shard IDs to create shards for. If not None, then a non-None shard_count must ALSO be provided. Defaults to None, which means the Discord-recommended count is used for your application instead.

Note that the sequence will be de-duplicated.

shard_counttyping.Optional[int]

The number of shards to use in the entire distributed application. Defaults to None which results in the count being determined dynamically on startup.

statushikari.presences.Status

The initial status to show for the user presence on startup. Defaults to hikari.presences.Status.ONLINE.

Raises:
hikari.errors.ComponentStateConflictError

If bot is already running.

TypeError

If shard_ids is passed without shard_count.

async start(*, activity=None, afk=False, check_for_updates=True, idle_since=None, ignore_session_start_limit=False, large_threshold=250, shard_ids=None, shard_count=None, status=presences.Status.ONLINE)[source]#

Start the bot, wait for all shards to become ready, and then return.

Other Parameters:
activitytyping.Optional[hikari.presences.Activity]

The initial activity to display in the bot user presence, or None (default) to not show any.

afkbool

The initial AFK state to display in the bot user presence, or False (default) to not show any.

check_for_updatesbool

Defaults to True. If True, will check for newer versions of hikari on PyPI and notify if available.

idle_sincetyping.Optional[datetime.datetime]

The datetime.datetime the user should be marked as being idle since, or None (default) to not show this.

ignore_session_start_limitbool

Defaults to False. If False, then attempting to start more sessions than you are allowed in a 24 hour window will throw a RuntimeError rather than going ahead and hitting the IDENTIFY limit, which may result in your token being reset. Setting to True disables this behavior.

large_thresholdint

Threshold for members in a guild before it is treated as being “large” and no longer sending member details in the GUILD CREATE event. Defaults to 250.

shard_idstyping.Optional[typing.Sequence[int]]

The shard IDs to create shards for. If not None, then a non-None shard_count must ALSO be provided. Defaults to None, which means the Discord-recommended count is used for your application instead.

Note that the sequence will be de-duplicated.

shard_counttyping.Optional[int]

The number of shards to use in the entire distributed application. Defaults to None which results in the count being determined dynamically on startup.

statushikari.presences.Status

The initial status to show for the user presence on startup. Defaults to hikari.presences.Status.ONLINE.

Raises:
TypeError

If shard_ids is passed without shard_count.

hikari.errors.ComponentStateConflictError

If bot is already running.

stream(event_type, /, timeout, limit=None)[source]#

Return a stream iterator for the given event and sub-events.

Warning

If you use stream.open() to start the stream then you must also close it with stream.close() otherwise it may queue events in memory indefinitely.

Parameters:
event_typetyping.Type[hikari.events.base_events.Event]

The event type to listen for. This will listen for subclasses of this type additionally.

timeouttyping.Optional[int, float]

How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.

limittyping.Optional[int]

The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.

Returns:
EventStream[hikari.events.base_events.Event]

The async iterator to handle streamed events. This must be started with with stream: or stream.open() before asynchronously iterating over it.

Examples

with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
    async for user_id in stream.map("user_id").limit(50):
        ...

or using open() and close()

stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()

async for user_id in stream.map("user_id").limit(50)
    ...

stream.close()
subscribe(event_type, callback)[source]#

Subscribe a given callback to a given event type.

Parameters:
event_typetyping.Type[T]

The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.

callback

Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.

Examples

The following demonstrates subscribing a callback to message creation events.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.subscribe(MessageCreateEvent, on_message)
unsubscribe(event_type, callback)[source]#

Unsubscribe a given callback from a given event type, if present.

Parameters:
event_typetyping.Type[T]

The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.

callback

The callback to unsubscribe.

Examples

The following demonstrates unsubscribing a callback from a message creation event.

from hikari.events.messages import MessageCreateEvent

async def on_message(event):
    ...

bot.unsubscribe(MessageCreateEvent, on_message)
async wait_for(event_type, /, timeout, predicate=None)[source]#

Wait for a given event to occur once, then return the event.

Warning

Async predicates are not supported.

Parameters:
event_typetyping.Type[hikari.events.base_events.Event]

The event type to listen for. This will listen for subclasses of this type additionally.

predicate

A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.

timeouttyping.Union[float, int, None]

The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in “leaking” of coroutines that never complete if called in an uncontrolled way, so is not recommended).

Returns:
hikari.events.base_events.Event

The event that was provided.

Raises:
asyncio.TimeoutError

If the timeout is not None and is reached before an event is received that the predicate returns True for.