hikari.impl.gateway_bot
#
Basic implementation the components for a single-process bot.
GatewayBot #
GatewayBot(token: str, *, allow_color: bool = True, banner: Optional[str] = 'hikari', suppress_optimization_warning: bool = False, executor: Optional[Executor] = None, force_color: bool = False, cache_settings: Optional[CacheSettings] = None, http_settings: Optional[HTTPSettings] = None, dumps: JSONEncoder = data_binding.default_json_dumps, loads: JSONDecoder = data_binding.default_json_loads, intents: Intents = intents_.Intents.ALL_UNPRIVILEGED, auto_chunk_members: bool = True, logs: Union[None, str, int, dict[str, Any], PathLike[str]] = 'INFO', max_rate_limit: float = 300.0, max_retries: int = 3, proxy_settings: Optional[ProxySettings] = None, rest_url: Optional[str] = None)
Bases: GatewayBotAware
Basic auto-sharding bot implementation.
This is the class you will want to use to start, control, and build a bot with.
Note
Settings that control the gateway session are provided to the hikari.impl.gateway_bot.GatewayBot.run
and hikari.impl.gateway_bot.GatewayBot.start
functions in this class. This is done to allow you to contextually customise details such as sharding configuration without having to re-initialize the entire application each time.
PARAMETER | DESCRIPTION |
---|---|
token | The bot token to sign in with. TYPE: |
allow_color | Whether enable coloured console logs will be enabled on any platform that is a TTY. Setting a Users should consider this an advice to the application on whether it is safe to show colours if possible or not. Since some terminals can be awkward or not support features in a standard way, the option to explicitly disable this is provided. See TYPE: |
banner | The package to search for a Setting this to |
suppress_optimization_warning | By default, hikari warns you if you are not running your bot using optimizations ( TYPE: |
executor | If non- While mainly supporting the |
force_color | If This will take precedence over TYPE: |
cache_settings | Optional cache settings. If unspecified, will use the defaults. TYPE: |
http_settings | Optional custom HTTP configuration settings to use. Allows you to customise functionality such as whether SSL-verification is enabled, what timeouts TYPE: |
intents | This allows you to change which intents your application will use on the gateway. This can be used to control and change the types of events you will receive. TYPE: |
auto_chunk_members | If We only want to chunk if we both are allowed and need to:
TYPE: |
logs | The flavour to set the logging to. This can be If you pass a If you pass a If you pass a Note that TYPE: |
max_rate_limit | The max number of seconds to backoff for when rate limited. Anything greater than this will instead raise an error. This defaults to five minutes if left to the default value. This is to stop potentially indefinitely waiting on an endpoint, which is almost never what you want to do if giving a response to a user. You can set this to Note that this only applies to the REST API component that communicates with Discord, and will not affect sharding or third party HTTP endpoints that may be in use. TYPE: |
max_retries | Maximum number of times a request will be retried if it fails with a Will default to 3 if set to TYPE: |
proxy_settings | Custom proxy settings to use with network-layer logic in your application to get through an HTTP-proxy. TYPE: |
dumps | The JSON encoder this application should use. TYPE: |
loads | The JSON decoder this application should use. TYPE: |
rest_url | Defaults to the Discord REST API URL if |
Examples:
Simple logging setup:
hikari.GatewayBot("TOKEN", logs="INFO") # Registered logging level
# or
hikari.GatewayBot("TOKEN", logs=20) # Logging level as an int
File config:
# See https://docs.python.org/3/library/logging.config.html#configuration-file-format for more info
hikari.GatewayBot("TOKEN", logs="path/to/file.ini")
Setting up logging through a dict config:
# See https://docs.python.org/3/library/logging.config.html#dictionary-schema-details for more info
hikari.GatewayBot(
"TOKEN",
logs={
"version": 1,
"incremental": True, # In incremental setups, the default stream handler will be setup
"loggers": {
"hikari.gateway": {"level": "DEBUG"},
"hikari.ratelimits": {"level": "TRACE_HIKARI"},
},
},
)
entity_factory property
#
entity_factory: EntityFactory
Entity factory implementation for this object.
executor property
#
heartbeat_latencies property
#
Mapping of shard ID to heartbeat latency.
Any shards that are not yet started will be float('nan')
.
heartbeat_latency property
#
heartbeat_latency: float
Average heartbeat latency of all started shards.
If no shards are started, this will return float('nan')
.
is_alive property
#
is_alive: bool
Whether the application is running or not.
This is useful as some functions might raise hikari.errors.ComponentStateConflictError
if this is False
.
shard_count property
#
shard_count: int
Number of shards in the total application.
This may not be the same as the size of shards
. If the application is auto-sharded, this may be 0
until the shards are started.
shards instance-attribute
#
shards: Mapping[int, GatewayShard] = MappingProxyType(_shards)
Mapping of shards in this application instance.
Each shard ID is mapped to the corresponding shard instance.
If the application has not started, it is acceptable to assume the result of this call will be an empty mapping.
dispatch #
Dispatch an event.
PARAMETER | DESCRIPTION |
---|---|
event | The event to dispatch. TYPE: |
Examples:
We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event
.
import attrs
from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake
@attrs.define()
class EveryoneMentionedEvent(Event):
app: RESTAware = attrs.field()
author: User = attrs.field()
'''The user who mentioned everyone.'''
content: str = attrs.field()
'''The message that was sent.'''
message_id: Snowflake = attrs.field()
'''The message ID.'''
channel_id: Snowflake = attrs.field()
'''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent
@bot.listen(MessageCreateEvent)
async def on_message(event):
if "@everyone" in event.content or "@here" in event.content:
event = EveryoneMentionedEvent(
author=event.author,
content=event.content,
message_id=event.id,
channel_id=event.channel_id,
)
bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with hikari.impl.event_manager_base.EventManagerBase.subscribe
.
@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
print(event.user, "just pinged everyone in", event.channel_id)
RETURNS | DESCRIPTION |
---|---|
Future[Any] | A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. |
See Also
Listen : hikari.impl.gateway_bot.GatewayBot.listen
. Stream : hikari.impl.gateway_bot.GatewayBot.stream
. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
get_listeners #
get_listeners(event_type: type[EventT], /, *, polymorphic: bool = True) -> Collection[CallbackT[EventT]]
Get the listeners for a given event type, if there are any.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to look for. TYPE: |
polymorphic | If TYPE: |
RETURNS | DESCRIPTION |
---|---|
Collection[Callable[[EventT], Coroutine[Any, Any, None]]] | A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. |
get_me #
Return the bot user, if known.
This should be available as soon as the bot has fired the hikari.events.lifetime_events.StartingEvent
.
Until then, this may or may not be None
.
RETURNS | DESCRIPTION |
---|---|
Optional[OwnUser] | The bot user, if known, otherwise |
join async
#
join() -> None
Wait indefinitely until the application closes.
This can be placed in a task and cancelled without affecting the application runtime itself. Any exceptions raised by shards will be propagated to here.
listen #
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
PARAMETER | DESCRIPTION |
---|---|
*event_types | The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.
TYPE: |
RETURNS | DESCRIPTION |
---|---|
Callable[[EventT], EventT] | A decorator for a coroutine function that passes it to |
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
. Stream : hikari.impl.gateway_bot.GatewayBot.stream
. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
print_banner staticmethod
#
print_banner(banner: Optional[str], allow_color: bool, force_color: bool, extra_args: Optional[dict[str, str]] = None) -> None
Print the banner.
This allows library vendors to override this behaviour, or choose to inject their own "branding" on top of what hikari provides by default.
Normal users should not need to invoke this function, and can simply change the banner
argument passed to the constructor to manipulate what is displayed.
PARAMETER | DESCRIPTION |
---|---|
banner | The package to find a |
allow_color | A flag that allows advising whether to allow color if supported or not. Can be overridden by setting a TYPE: |
force_color | A flag that allows forcing color to always be output, even if the terminal device may not support it. Setting the This will take precedence over TYPE: |
extra_args | If provided, extra $-substitutions to use when printing the banner. Default substitutions can not be overwritten. |
RAISES | DESCRIPTION |
---|---|
ValueError | If |
request_guild_members async
#
request_guild_members(guild: SnowflakeishOr[PartialGuild], *, include_presences: UndefinedOr[bool] = undefined.UNDEFINED, query: str = '', limit: int = 0, users: UndefinedOr[SnowflakeishSequence[User]] = undefined.UNDEFINED, nonce: UndefinedOr[str] = undefined.UNDEFINED) -> None
Request for a guild chunk.
Note
To request the full list of members, set query
to ""
(empty string) and limit
to 0
.
PARAMETER | DESCRIPTION |
---|---|
guild | The guild to request chunk for. TYPE: |
include_presences | If provided, whether to request presences. TYPE: |
query | If not TYPE: |
limit | Maximum number of members to send matching the query. TYPE: |
users | If provided, the users to request for. TYPE: |
nonce | If provided, the nonce to be sent with guild chunks. TYPE: |
RAISES | DESCRIPTION |
---|---|
ValueError | If trying to specify |
MissingIntentError | When trying to request presences without the |
RuntimeError | If the guild passed isn't covered by any of the shards in this sharded client. |
run #
run(*, activity: Optional[Activity] = None, afk: bool = False, asyncio_debug: Optional[bool] = None, check_for_updates: bool = True, close_passed_executor: bool = False, close_loop: bool = True, coroutine_tracking_depth: Optional[int] = None, enable_signal_handlers: Optional[bool] = None, idle_since: Optional[datetime] = None, ignore_session_start_limit: bool = False, large_threshold: int = 250, propagate_interrupts: bool = False, status: Status = presences.Status.ONLINE, shard_ids: Optional[Sequence[int]] = None, shard_count: Optional[int] = None) -> None
Start the application and block until it's finished running.
PARAMETER | DESCRIPTION |
---|---|
activity | The initial activity to display in the bot user presence, or |
afk | The initial AFK state to display in the bot user presence, or TYPE: |
asyncio_debug | If |
check_for_updates | If TYPE: |
close_passed_executor | If TYPE: |
close_loop | If This will wait until all hikari-owned TYPE: |
coroutine_tracking_depth | If an integer value and supported by the interpreter, then this many nested coroutine calls will be tracked with their call origin state. This allows you to determine where non-awaited coroutines may originate from, but generally you do not want to leave this enabled for performance reasons. |
enable_signal_handlers | Defaults to If on a non-Windows OS with builtin support for kernel-level POSIX signals, then setting this to |
idle_since | The |
ignore_session_start_limit | If TYPE: |
large_threshold | Threshold for members in a guild before it is treated as being "large" and no longer sending member details in the [GUILD CREATE][] event. TYPE: |
propagate_interrupts | If TYPE: |
shard_ids | |
shard_count | The number of shards to use in the entire distributed application. Defaults to |
status | The initial status to show for the user presence on startup. |
RAISES | DESCRIPTION |
---|---|
ComponentStateConflictError | If bot is already running. |
TypeError | If |
start async
#
start(*, activity: Optional[Activity] = None, afk: bool = False, check_for_updates: bool = True, idle_since: Optional[datetime] = None, ignore_session_start_limit: bool = False, large_threshold: int = 250, shard_ids: Optional[Sequence[int]] = None, shard_count: Optional[int] = None, status: Status = presences.Status.ONLINE) -> None
Start the bot, wait for all shards to become ready, and then return.
PARAMETER | DESCRIPTION |
---|---|
activity | The initial activity to display in the bot user presence, or |
afk | The initial AFK state to display in the bot user presence, or TYPE: |
check_for_updates | If TYPE: |
idle_since | The |
ignore_session_start_limit | If TYPE: |
large_threshold | Threshold for members in a guild before it is treated as being "large" and no longer sending member details in the TYPE: |
shard_ids | |
shard_count | The number of shards to use in the entire distributed application. Defaults to |
status | The initial status to show for the user presence on startup. |
RAISES | DESCRIPTION |
---|---|
TypeError | If |
ComponentStateConflictError | If bot is already running. |
stream #
stream(event_type: type[EventT], /, timeout: Union[float, int, None], limit: Optional[int] = None) -> EventStream[EventT]
Return a stream iterator for the given event and sub-events.
Warning
If you use stream.open()
to start the stream then you must also close it with stream.close()
otherwise it may queue events in memory indefinitely.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to listen for. This will listen for subclasses of this type additionally. TYPE: |
timeout | How long this streamer should wait for the next event before ending the iteration. If |
limit | The limit for how many events this should queue at one time before dropping extra incoming events, leave this as |
RETURNS | DESCRIPTION |
---|---|
EventStream[Event] | The async iterator to handle streamed events. This must be started with |
Examples:
with bot.stream(events.ReactionAddEvent, timeout=30).filter(
("message_id", message.id)
) as stream:
async for user_id in stream.map("user_id").limit(50):
...
or using open()
and close()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()
async for user_id in stream.map("user_id").limit(50)
...
stream.close()
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
. Listen : hikari.impl.gateway_bot.GatewayBot.listen
. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
subscribe #
Subscribe a given callback to a given event type.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to listen for. This will also listen for any subclasses of the given type. |
callback | Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded. TYPE: |
Examples:
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent
async def on_message(event): ...
bot.subscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
. Listen : hikari.impl.gateway_bot.GatewayBot.listen
. Stream : hikari.impl.gateway_bot.GatewayBot.stream
. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
unsubscribe #
Unsubscribe a given callback from a given event type, if present.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. |
callback | The callback to unsubscribe. TYPE: |
Examples:
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent
async def on_message(event): ...
bot.unsubscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
. Listen : hikari.impl.gateway_bot.GatewayBot.listen
. Stream : hikari.impl.gateway_bot.GatewayBot.stream
. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
. Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
update_presence async
#
update_presence(*, status: UndefinedOr[Status] = undefined.UNDEFINED, idle_since: UndefinedNoneOr[datetime] = undefined.UNDEFINED, activity: UndefinedNoneOr[Activity] = undefined.UNDEFINED, afk: UndefinedOr[bool] = undefined.UNDEFINED) -> None
Update the presence on all shards.
This call will patch the presence on each shard. This means that unless you explicitly specify a parameter, the previous value will be retained. This means you do not have to track the global presence in your code.
Note
This will only send the update payloads to shards that are alive. Any shards that are not alive will cache the new presence for when they do start.
Note
If you want to set presences per shard, access the shard you wish to update (e.g. by using hikari.GatewayBot.shards
), and call hikari.api.shard.GatewayShard.update_presence
on that shard. This method is simply a facade to make performing this in bulk simpler.
PARAMETER | DESCRIPTION |
---|---|
idle_since | The datetime that the user started being idle. If undefined, this will not be changed. TYPE: |
afk | Whether to be marked as AFK. If undefined, this will not be changed. TYPE: |
activity | The activity to appear to be playing. If undefined, this will not be changed. TYPE: |
status | The web status to show. If undefined, this will not be changed. TYPE: |
update_voice_state async
#
update_voice_state(guild: SnowflakeishOr[PartialGuild], channel: Optional[SnowflakeishOr[GuildVoiceChannel]], *, self_mute: UndefinedOr[bool] = undefined.UNDEFINED, self_deaf: UndefinedOr[bool] = undefined.UNDEFINED) -> None
Update the voice state for this bot in a given guild.
PARAMETER | DESCRIPTION |
---|---|
guild | The guild or guild ID to update the voice state for. TYPE: |
channel | The channel or channel ID to update the voice state for. If TYPE: |
self_mute | If specified and TYPE: |
self_deaf | If specified and TYPE: |
RAISES | DESCRIPTION |
---|---|
RuntimeError | If the guild passed isn't covered by any of the shards in this sharded client. |
wait_for async
#
wait_for(event_type: type[EventT], /, timeout: Union[float, int, None], predicate: Optional[PredicateT[EventT]] = None) -> EventT
Wait for a given event to occur once, then return the event.
Warning
Async predicates are not supported.
PARAMETER | DESCRIPTION |
---|---|
event_type | The event type to listen for. This will listen for subclasses of this type additionally. TYPE: |
predicate | A function taking the event as the single parameter. This should return TYPE: |
timeout | The amount of time to wait before raising an |
RETURNS | DESCRIPTION |
---|---|
Event | The event that was provided. |
RAISES | DESCRIPTION |
---|---|
TimeoutError |
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
. Listen : hikari.impl.gateway_bot.GatewayBot.listen
. Stream : hikari.impl.gateway_bot.GatewayBot.stream
. Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
. Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
.