Skip to content

hikari.impl.gateway_bot#

Basic implementation the components for a single-process bot.

GatewayBot #

GatewayBot(token: str, *, allow_color: bool = True, banner: Optional[str] = 'hikari', suppress_optimization_warning: bool = False, executor: Optional[Executor] = None, force_color: bool = False, cache_settings: Optional[CacheSettings] = None, http_settings: Optional[HTTPSettings] = None, dumps: JSONEncoder = data_binding.default_json_dumps, loads: JSONDecoder = data_binding.default_json_loads, intents: Intents = intents_.Intents.ALL_UNPRIVILEGED, auto_chunk_members: bool = True, logs: Union[None, str, int, dict[str, Any], PathLike[str]] = 'INFO', max_rate_limit: float = 300.0, max_retries: int = 3, proxy_settings: Optional[ProxySettings] = None, rest_url: Optional[str] = None)

Bases: GatewayBotAware

Basic auto-sharding bot implementation.

This is the class you will want to use to start, control, and build a bot with.

Note

Settings that control the gateway session are provided to the hikari.impl.gateway_bot.GatewayBot.run and hikari.impl.gateway_bot.GatewayBot.start functions in this class. This is done to allow you to contextually customise details such as sharding configuration without having to re-initialize the entire application each time.

PARAMETER DESCRIPTION
token

The bot token to sign in with.

TYPE: str

allow_color

Whether enable coloured console logs will be enabled on any platform that is a TTY. Setting a "CLICOLOR" environment variable to any non 0 value will override this setting.

Users should consider this an advice to the application on whether it is safe to show colours if possible or not. Since some terminals can be awkward or not support features in a standard way, the option to explicitly disable this is provided. See force_color for an alternative.

TYPE: bool DEFAULT: True

banner

The package to search for a banner.txt in.

Setting this to None will disable the banner being shown.

TYPE: Optional[str] DEFAULT: 'hikari'

suppress_optimization_warning

By default, hikari warns you if you are not running your bot using optimizations (-O or -OO). If this is True, you won't receive these warnings, even if you are not running using optimizations.

TYPE: bool DEFAULT: False

executor

If non-None, then this executor is used instead of the concurrent.futures.ThreadPoolExecutor attached to the asyncio.AbstractEventLoop that the bot will run on. This executor is used primarily for file-IO.

While mainly supporting the concurrent.futures.ThreadPoolExecutor implementation in the standard lib, Hikari's file handling systems should also work with concurrent.futures.ProcessPoolExecutor, which relies on all objects used in IPC to be pickleable. Many third-party libraries will not support this fully though, so your mileage may vary on using ProcessPoolExecutor implementations with this parameter.

TYPE: Optional[Executor] DEFAULT: None

force_color

If True, then this application will force colour to be used in console-based output. Specifying a "CLICOLOR_FORCE" environment variable with a non-"0" value will override this setting.

This will take precedence over allow_color if both are specified.

TYPE: bool DEFAULT: False

cache_settings

Optional cache settings. If unspecified, will use the defaults.

TYPE: Optional[CacheSettings] DEFAULT: None

http_settings

Optional custom HTTP configuration settings to use. Allows you to customise functionality such as whether SSL-verification is enabled, what timeouts aiohttp should expect to use for requests, and behavior regarding HTTP-redirects.

TYPE: Optional[HTTPSettings] DEFAULT: None

intents

This allows you to change which intents your application will use on the gateway. This can be used to control and change the types of events you will receive.

TYPE: Intents DEFAULT: ALL_UNPRIVILEGED

auto_chunk_members

If False, then no member chunks will be requested automatically, even if there are reasons to do so.

We only want to chunk if we both are allowed and need to:

  • Allowed? All the following must be true: 1. auto_chunk_members is True (the user wants us to). 2. We have the necessary intents (hikari.intents.Intents.GUILD_MEMBERS). 3. The guild is marked as "large" or we do not have hikari.intents.Intents.GUILD_PRESENCES intent Discord will only send every other member objects on the GUILD_CREATE payload if presence intents are also declared, so if this isn't the case then we also want to chunk small guilds.

  • Needed? One of the following must be true: 1. We have a cache, and it requires it (it is enabled for hikari.api.CacheComponents.MEMBERS), but we are not limited to only our own member (which is included in the GUILD_CREATE payload). 2. The user is waiting for the member chunks (there is an event listener for it).

TYPE: bool DEFAULT: True

logs

The flavour to set the logging to.

This can be None to not enable logging automatically.

If you pass a str or a int, it is interpreted as the global logging level to use, and should match one of "DEBUG", "INFO", "WARNING", "ERROR" or "CRITICAL". The configuration will be set up to use a colorlog coloured logger, and to use a sane logging format strategy. The output will be written to sys.stdout using this configuration.

If you pass a dict, it is treated as the mapping to pass to logging.config.dictConfig. If the dict defines any handlers, default handlers will not be setup if incremental is not specified.

If you pass a str to an existing file or a os.PathLike, it is interpreted as the file to load config from using logging.config.fileConfig.

Note that "TRACE_HIKARI" is a library-specific logging level which is expected to be more verbose than "DEBUG".

TYPE: Union[None, str, int, dict[str, Any], PathLike[str]] DEFAULT: 'INFO'

max_rate_limit

The max number of seconds to backoff for when rate limited. Anything greater than this will instead raise an error.

This defaults to five minutes if left to the default value. This is to stop potentially indefinitely waiting on an endpoint, which is almost never what you want to do if giving a response to a user.

You can set this to float("inf") to disable this check entirely.

Note that this only applies to the REST API component that communicates with Discord, and will not affect sharding or third party HTTP endpoints that may be in use.

TYPE: float DEFAULT: 300.0

max_retries

Maximum number of times a request will be retried if it fails with a 5xx status.

Will default to 3 if set to None.

TYPE: int DEFAULT: 3

proxy_settings

Custom proxy settings to use with network-layer logic in your application to get through an HTTP-proxy.

TYPE: Optional[ProxySettings] DEFAULT: None

dumps

The JSON encoder this application should use.

TYPE: JSONEncoder DEFAULT: default_json_dumps

loads

The JSON decoder this application should use.

TYPE: JSONDecoder DEFAULT: default_json_loads

rest_url

Defaults to the Discord REST API URL if None. Can be overridden if you are attempting to point to an unofficial endpoint, or if you are attempting to mock/stub the Discord API for any reason. Generally you do not want to change this.

TYPE: Optional[str] DEFAULT: None

Examples:

Simple logging setup:

hikari.GatewayBot("TOKEN", logs="INFO")  # Registered logging level
# or
hikari.GatewayBot("TOKEN", logs=20)  # Logging level as an int

File config:

# See https://docs.python.org/3/library/logging.config.html#configuration-file-format for more info
hikari.GatewayBot("TOKEN", logs="path/to/file.ini")

Setting up logging through a dict config:

# See https://docs.python.org/3/library/logging.config.html#dictionary-schema-details for more info
hikari.GatewayBot(
    "TOKEN",
    logs={
        "version": 1,
        "incremental": True,  # In incremental setups, the default stream handler will be setup
        "loggers": {
            "hikari.gateway": {"level": "DEBUG"},
            "hikari.ratelimits": {"level": "TRACE_HIKARI"},
        },
    },
)

cache property #

cache: Cache

Immutable cache implementation for this object.

entity_factory property #

entity_factory: EntityFactory

Entity factory implementation for this object.

event_factory property #

event_factory: EventFactory

Event factory component.

event_manager property #

event_manager: EventManager

Event manager for this object.

executor property #

executor: Optional[Executor]

Executor to use for blocking operations.

This may return None if the default asyncio thread pool should be used instead.

heartbeat_latencies property #

heartbeat_latencies: Mapping[int, float]

Mapping of shard ID to heartbeat latency.

Any shards that are not yet started will be float('nan').

heartbeat_latency property #

heartbeat_latency: float

Average heartbeat latency of all started shards.

If no shards are started, this will return float('nan').

http_settings property #

http_settings: HTTPSettings

HTTP settings in use by this component.

intents property #

intents: Intents

Intents registered for the application.

is_alive property #

is_alive: bool

Whether the application is running or not.

This is useful as some functions might raise hikari.errors.ComponentStateConflictError if this is False.

proxy_settings property #

proxy_settings: ProxySettings

Proxy settings in use by this component.

rest property #

rest: RESTClient

REST client to use for HTTP requests.

shard_count property #

shard_count: int

Number of shards in the total application.

This may not be the same as the size of shards. If the application is auto-sharded, this may be 0 until the shards are started.

shards instance-attribute #

Mapping of shards in this application instance.

Each shard ID is mapped to the corresponding shard instance.

If the application has not started, it is acceptable to assume the result of this call will be an empty mapping.

voice property #

Voice connection manager component for this application.

close async #

close() -> None

Kill the application by shutting all components down.

dispatch #

dispatch(event: Event) -> Future[Any]

Dispatch an event.

PARAMETER DESCRIPTION
event

The event to dispatch.

TYPE: Event

Examples:

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

import attrs

from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake


@attrs.define()
class EveryoneMentionedEvent(Event):
    app: RESTAware = attrs.field()

    author: User = attrs.field()
    '''The user who mentioned everyone.'''

    content: str = attrs.field()
    '''The message that was sent.'''

    message_id: Snowflake = attrs.field()
    '''The message ID.'''

    channel_id: Snowflake = attrs.field()
    '''The channel ID.'''

We can then dispatch our event as we see fit.

from hikari.events.messages import MessageCreateEvent


@bot.listen(MessageCreateEvent)
async def on_message(event):
    if "@everyone" in event.content or "@here" in event.content:
        event = EveryoneMentionedEvent(
            author=event.author,
            content=event.content,
            message_id=event.id,
            channel_id=event.channel_id,
        )

        bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with hikari.impl.event_manager_base.EventManagerBase.subscribe.

@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
    print(event.user, "just pinged everyone in", event.channel_id)
RETURNS DESCRIPTION
Future[Any]

A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.

See Also

Listen : hikari.impl.gateway_bot.GatewayBot.listen. Stream : hikari.impl.gateway_bot.GatewayBot.stream. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for.

get_listeners #

get_listeners(event_type: type[EventT], /, *, polymorphic: bool = True) -> Collection[CallbackT[EventT]]

Get the listeners for a given event type, if there are any.

PARAMETER DESCRIPTION
event_type

The event type to look for. EventT must be a subclass of hikari.events.base_events.Event.

TYPE: type[EventT]

polymorphic

If True, this will also return the listeners of the subclasses of the given event type. If False, then only listeners for this class specifically are returned.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Collection[Callable[[EventT], Coroutine[Any, Any, None]]]

A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.

get_me #

get_me() -> Optional[OwnUser]

Return the bot user, if known.

This should be available as soon as the bot has fired the hikari.events.lifetime_events.StartingEvent.

Until then, this may or may not be None.

RETURNS DESCRIPTION
Optional[OwnUser]

The bot user, if known, otherwise None.

join async #

join() -> None

Wait indefinitely until the application closes.

This can be placed in a task and cancelled without affecting the application runtime itself. Any exceptions raised by shards will be propagated to here.

listen #

listen(*event_types: type[EventT]) -> Callable[[CallbackT[EventT]], CallbackT[EventT]]

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

PARAMETER DESCRIPTION
*event_types

The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.

EventT must be a subclass of hikari.events.base_events.Event.

TYPE: type[EventT] DEFAULT: ()

RETURNS DESCRIPTION
Callable[[EventT], EventT]

A decorator for a coroutine function that passes it to hikari.impl.event_manager.EventManagerImpl.subscribe before returning the function reference.

See Also

Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch. Stream : hikari.impl.gateway_bot.GatewayBot.stream. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for.

print_banner staticmethod #

print_banner(banner: Optional[str], allow_color: bool, force_color: bool, extra_args: Optional[dict[str, str]] = None) -> None

Print the banner.

This allows library vendors to override this behaviour, or choose to inject their own "branding" on top of what hikari provides by default.

Normal users should not need to invoke this function, and can simply change the banner argument passed to the constructor to manipulate what is displayed.

PARAMETER DESCRIPTION
banner

The package to find a banner.txt in.

TYPE: Optional[str]

allow_color

A flag that allows advising whether to allow color if supported or not. Can be overridden by setting a CLICOLOR environment variable to a non-"0" string.

TYPE: bool

force_color

A flag that allows forcing color to always be output, even if the terminal device may not support it. Setting the CLICOLOR_FORCE environment variable to a non-"0" string will override this.

This will take precedence over allow_color if both are specified.

TYPE: bool

extra_args

If provided, extra $-substitutions to use when printing the banner. Default substitutions can not be overwritten.

TYPE: Optional[dict[str, str]] DEFAULT: None

RAISES DESCRIPTION
ValueError

If extra_args contains a default $-substitution.

request_guild_members async #

request_guild_members(guild: SnowflakeishOr[PartialGuild], *, include_presences: UndefinedOr[bool] = undefined.UNDEFINED, query: str = '', limit: int = 0, users: UndefinedOr[SnowflakeishSequence[User]] = undefined.UNDEFINED, nonce: UndefinedOr[str] = undefined.UNDEFINED) -> None

Request for a guild chunk.

Note

To request the full list of members, set query to "" (empty string) and limit to 0.

PARAMETER DESCRIPTION
guild

The guild to request chunk for.

TYPE: SnowflakeishOr[PartialGuild]

include_presences

If provided, whether to request presences.

TYPE: UndefinedOr[bool] DEFAULT: UNDEFINED

query

If not "", request the members which username starts with the string.

TYPE: str DEFAULT: ''

limit

Maximum number of members to send matching the query.

TYPE: int DEFAULT: 0

users

If provided, the users to request for.

TYPE: UndefinedOr[SnowflakeishSequence[User]] DEFAULT: UNDEFINED

nonce

If provided, the nonce to be sent with guild chunks.

TYPE: UndefinedOr[str] DEFAULT: UNDEFINED

RAISES DESCRIPTION
ValueError

If trying to specify users with query/limit, if limit is not between 0 and 100, both inclusive or if users length is over 100.

MissingIntentError

When trying to request presences without the hikari.intents.Intents.GUILD_MEMBERS or when trying to request the full list of members without hikari.intents.Intents.GUILD_PRESENCES.

RuntimeError

If the guild passed isn't covered by any of the shards in this sharded client.

run #

run(*, activity: Optional[Activity] = None, afk: bool = False, asyncio_debug: Optional[bool] = None, check_for_updates: bool = True, close_passed_executor: bool = False, close_loop: bool = True, coroutine_tracking_depth: Optional[int] = None, enable_signal_handlers: Optional[bool] = None, idle_since: Optional[datetime] = None, ignore_session_start_limit: bool = False, large_threshold: int = 250, propagate_interrupts: bool = False, status: Status = presences.Status.ONLINE, shard_ids: Optional[Sequence[int]] = None, shard_count: Optional[int] = None) -> None

Start the application and block until it's finished running.

PARAMETER DESCRIPTION
activity

The initial activity to display in the bot user presence, or None (default) to not show any.

TYPE: Optional[Activity] DEFAULT: None

afk

The initial AFK state to display in the bot user presence, or False (default) to not show any.

TYPE: bool DEFAULT: False

asyncio_debug

If True, then debugging is enabled for the asyncio event loop in use.

TYPE: Optional[bool] DEFAULT: None

check_for_updates

If True, will check for newer versions of hikari on PyPI and notify if available.

TYPE: bool DEFAULT: True

close_passed_executor

If True, any custom concurrent.futures.Executor passed to the constructor will be shut down when the application terminates. This does not affect the default executor associated with the event loop, and will not do anything if you do not provide a custom executor to the constructor.

TYPE: bool DEFAULT: False

close_loop

If True, then once the bot enters a state where all components have shut down permanently during application shut down, then all asyncgens and background tasks will be destroyed, and the event loop will be shut down.

This will wait until all hikari-owned aiohttp connectors have had time to attempt to shut down correctly (around 250ms), and on Python 3.9 and newer, will also shut down the default event loop executor too.

TYPE: bool DEFAULT: True

coroutine_tracking_depth

If an integer value and supported by the interpreter, then this many nested coroutine calls will be tracked with their call origin state. This allows you to determine where non-awaited coroutines may originate from, but generally you do not want to leave this enabled for performance reasons.

TYPE: Optional[int] DEFAULT: None

enable_signal_handlers

Defaults to True if this is called in the main thread.

If on a non-Windows OS with builtin support for kernel-level POSIX signals, then setting this to True will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself.

TYPE: Optional[bool] DEFAULT: None

idle_since

The datetime.datetime the user should be marked as being idle since, or None to not show this.

TYPE: Optional[datetime] DEFAULT: None

ignore_session_start_limit

If False, then attempting to start more sessions than you are allowed in a 24 hour window will throw a RuntimeError rather than going ahead and hitting the IDENTIFY limit, which may result in your token being reset. Setting to True disables this behavior.

TYPE: bool DEFAULT: False

large_threshold

Threshold for members in a guild before it is treated as being "large" and no longer sending member details in the [GUILD CREATE][] event.

TYPE: int DEFAULT: 250

propagate_interrupts

If True, then any internal hikari.errors.HikariInterrupt that is raises as a result of catching an OS level signal will result in the exception being rethrown once the application has closed. This can allow you to use hikari signal handlers and still be able to determine what kind of interrupt the application received after it closes. When False, nothing is raised and the call will terminate cleanly and silently where possible instead.

TYPE: bool DEFAULT: False

shard_ids

The shard IDs to create shards for. If not None, then a non-None shard_count must ALSO be provided.

Defaults to None, which means the Discord-recommended count is used for your application instead.

Note that the sequence will be de-duplicated.

TYPE: Optional[Sequence[int]] DEFAULT: None

shard_count

The number of shards to use in the entire distributed application.

Defaults to None which results in the count being determined dynamically on startup.

TYPE: Optional[int] DEFAULT: None

status

The initial status to show for the user presence on startup.

TYPE: Status DEFAULT: ONLINE

RAISES DESCRIPTION
ComponentStateConflictError

If bot is already running.

TypeError

If shard_ids is passed without shard_count.

start async #

start(*, activity: Optional[Activity] = None, afk: bool = False, check_for_updates: bool = True, idle_since: Optional[datetime] = None, ignore_session_start_limit: bool = False, large_threshold: int = 250, shard_ids: Optional[Sequence[int]] = None, shard_count: Optional[int] = None, status: Status = presences.Status.ONLINE) -> None

Start the bot, wait for all shards to become ready, and then return.

PARAMETER DESCRIPTION
activity

The initial activity to display in the bot user presence, or None (default) to not show any.

TYPE: Optional[Activity] DEFAULT: None

afk

The initial AFK state to display in the bot user presence, or False (default) to not show any.

TYPE: bool DEFAULT: False

check_for_updates

If True, will check for newer versions of hikari on PyPI and notify if available.

TYPE: bool DEFAULT: True

idle_since

The datetime.datetime the user should be marked as being idle since, or None (default) to not show this.

TYPE: Optional[datetime] DEFAULT: None

ignore_session_start_limit

If False, then attempting to start more sessions than you are allowed in a 24 hour window will throw a RuntimeError rather than going ahead and hitting the IDENTIFY limit, which may result in your token being reset. Setting to True disables this behavior.

TYPE: bool DEFAULT: False

large_threshold

Threshold for members in a guild before it is treated as being "large" and no longer sending member details in the GUILD CREATE event.

TYPE: int DEFAULT: 250

shard_ids

The shard IDs to create shards for. If not None, then a non-None shard_count must ALSO be provided. Defaults to None, which means the Discord-recommended count is used for your application instead.

Note that the sequence will be de-duplicated.

TYPE: Optional[Sequence[int]] DEFAULT: None

shard_count

The number of shards to use in the entire distributed application.

Defaults to None which results in the count being determined dynamically on startup.

TYPE: Optional[int] DEFAULT: None

status

The initial status to show for the user presence on startup.

TYPE: Status DEFAULT: ONLINE

RAISES DESCRIPTION
TypeError

If shard_ids is passed without shard_count.

ComponentStateConflictError

If bot is already running.

stream #

stream(event_type: type[EventT], /, timeout: Union[float, int, None], limit: Optional[int] = None) -> EventStream[EventT]

Return a stream iterator for the given event and sub-events.

Warning

If you use stream.open() to start the stream then you must also close it with stream.close() otherwise it may queue events in memory indefinitely.

PARAMETER DESCRIPTION
event_type

The event type to listen for. This will listen for subclasses of this type additionally.

TYPE: type[EventT]

timeout

How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.

TYPE: Union[float, int, None]

limit

The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
EventStream[Event]

The async iterator to handle streamed events. This must be started with with stream: or stream.open() before asynchronously iterating over it.

Examples:

with bot.stream(events.ReactionAddEvent, timeout=30).filter(
    ("message_id", message.id)
) as stream:
    async for user_id in stream.map("user_id").limit(50):
        ...

or using open() and close()

stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()

async for user_id in stream.map("user_id").limit(50)
    ...

stream.close()
See Also

Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch. Listen : hikari.impl.gateway_bot.GatewayBot.listen. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for.

subscribe #

subscribe(event_type: type[Any], callback: CallbackT[Any]) -> None

Subscribe a given callback to a given event type.

PARAMETER DESCRIPTION
event_type

The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.

TYPE: type[Any]

callback

Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.

TYPE: CallbackT[Any]

Examples:

The following demonstrates subscribing a callback to message creation events.

from hikari.events.messages import MessageCreateEvent


async def on_message(event): ...


bot.subscribe(MessageCreateEvent, on_message)
See Also

Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch. Listen : hikari.impl.gateway_bot.GatewayBot.listen. Stream : hikari.impl.gateway_bot.GatewayBot.stream. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for.

unsubscribe #

unsubscribe(event_type: type[Any], callback: CallbackT[Any]) -> None

Unsubscribe a given callback from a given event type, if present.

PARAMETER DESCRIPTION
event_type

The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.

TYPE: type[Any]

callback

The callback to unsubscribe.

TYPE: CallbackT[Any]

Examples:

The following demonstrates unsubscribing a callback from a message creation event.

from hikari.events.messages import MessageCreateEvent


async def on_message(event): ...


bot.unsubscribe(MessageCreateEvent, on_message)
See Also

Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch. Listen : hikari.impl.gateway_bot.GatewayBot.listen. Stream : hikari.impl.gateway_bot.GatewayBot.stream. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for.

update_presence async #

update_presence(*, status: UndefinedOr[Status] = undefined.UNDEFINED, idle_since: UndefinedNoneOr[datetime] = undefined.UNDEFINED, activity: UndefinedNoneOr[Activity] = undefined.UNDEFINED, afk: UndefinedOr[bool] = undefined.UNDEFINED) -> None

Update the presence on all shards.

This call will patch the presence on each shard. This means that unless you explicitly specify a parameter, the previous value will be retained. This means you do not have to track the global presence in your code.

Note

This will only send the update payloads to shards that are alive. Any shards that are not alive will cache the new presence for when they do start.

Note

If you want to set presences per shard, access the shard you wish to update (e.g. by using hikari.GatewayBot.shards), and call hikari.api.shard.GatewayShard.update_presence on that shard. This method is simply a facade to make performing this in bulk simpler.

PARAMETER DESCRIPTION
idle_since

The datetime that the user started being idle. If undefined, this will not be changed.

TYPE: UndefinedNoneOr[datetime] DEFAULT: UNDEFINED

afk

Whether to be marked as AFK. If undefined, this will not be changed.

TYPE: UndefinedOr[bool] DEFAULT: UNDEFINED

activity

The activity to appear to be playing. If undefined, this will not be changed.

TYPE: UndefinedNoneOr[Activity] DEFAULT: UNDEFINED

status

The web status to show. If undefined, this will not be changed.

TYPE: UndefinedOr[Status] DEFAULT: UNDEFINED

update_voice_state async #

update_voice_state(guild: SnowflakeishOr[PartialGuild], channel: Optional[SnowflakeishOr[GuildVoiceChannel]], *, self_mute: UndefinedOr[bool] = undefined.UNDEFINED, self_deaf: UndefinedOr[bool] = undefined.UNDEFINED) -> None

Update the voice state for this bot in a given guild.

PARAMETER DESCRIPTION
guild

The guild or guild ID to update the voice state for.

TYPE: SnowflakeishOr[PartialGuild]

channel

The channel or channel ID to update the voice state for. If None then the bot will leave the voice channel that it is in for the given guild.

TYPE: Optional[SnowflakeishOr[GuildVoiceChannel]]

self_mute

If specified and True, the bot will mute itself in that voice channel. If False, then it will unmute itself.

TYPE: UndefinedOr[bool] DEFAULT: UNDEFINED

self_deaf

If specified and True, the bot will deafen itself in that voice channel. If False, then it will undeafen itself.

TYPE: UndefinedOr[bool] DEFAULT: UNDEFINED

RAISES DESCRIPTION
RuntimeError

If the guild passed isn't covered by any of the shards in this sharded client.

wait_for async #

wait_for(event_type: type[EventT], /, timeout: Union[float, int, None], predicate: Optional[PredicateT[EventT]] = None) -> EventT

Wait for a given event to occur once, then return the event.

Warning

Async predicates are not supported.

PARAMETER DESCRIPTION
event_type

The event type to listen for. This will listen for subclasses of this type additionally.

TYPE: type[EventT]

predicate

A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.

TYPE: Optional[PredicateT[EventT]] DEFAULT: None

timeout

The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).

TYPE: Union[float, int, None]

RETURNS DESCRIPTION
Event

The event that was provided.

RAISES DESCRIPTION
TimeoutError

If the timeout is not None and is reached before an event is received that the predicate returns True for.

See Also

Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch. Listen : hikari.impl.gateway_bot.GatewayBot.listen. Stream : hikari.impl.gateway_bot.GatewayBot.stream. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe.