hikari.impl.gateway_bot
#
Basic implementation the components for a single-process bot.
GatewayBot
#
GatewayBot(
token: str,
*,
allow_color: bool = True,
banner: Optional[str] = "hikari",
suppress_optimization_warning: bool = False,
executor: Optional[Executor] = None,
force_color: bool = False,
cache_settings: Optional[CacheSettings] = None,
http_settings: Optional[HTTPSettings] = None,
dumps: JSONEncoder = data_binding.default_json_dumps,
loads: JSONDecoder = data_binding.default_json_loads,
intents: Intents = intents_.Intents.ALL_UNPRIVILEGED,
auto_chunk_members: bool = True,
logs: Union[None, str, int, Dict[str, Any], PathLike[str]] = "INFO",
max_rate_limit: float = 300.0,
max_retries: int = 3,
proxy_settings: Optional[ProxySettings] = None,
rest_url: Optional[str] = None
)
Bases: GatewayBotAware
Basic auto-sharding bot implementation.
This is the class you will want to use to start, control, and build a bot with.
Note
Settings that control the gateway session are provided to the
hikari.impl.gateway_bot.GatewayBot.run
and hikari.impl.gateway_bot.GatewayBot.start
functions in this class. This is done to allow you to contextually
customise details such as sharding configuration without having to
re-initialize the entire application each time.
PARAMETER | DESCRIPTION |
---|---|
token |
The bot token to sign in with.
TYPE:
|
PARAMETER | DESCRIPTION |
---|---|
allow_color |
Whether enable coloured console logs will be enabled on any platform that is a TTY.
Setting a Users should consider this an advice to the application on whether it is
safe to show colours if possible or not. Since some terminals can be
awkward or not support features in a standard way, the option to
explicitly disable this is provided. See
TYPE:
|
banner |
The package to search for a Setting this to |
suppress_optimization_warning |
By default, hikari warns you if you are not running
your bot using optimizations (
TYPE:
|
executor |
If non- While mainly supporting the |
force_color |
If This will take precedence over
TYPE:
|
cache_settings |
Optional cache settings. If unspecified, will use the defaults.
TYPE:
|
http_settings |
Optional custom HTTP configuration settings to use. Allows you to
customise functionality such as whether SSL-verification is enabled,
what timeouts
TYPE:
|
intents |
This allows you to change which intents your application will use on the gateway. This can be used to control and change the types of events you will receive.
TYPE:
|
auto_chunk_members |
If We only want to chunk if we both are allowed and need to:
TYPE:
|
logs |
The flavour to set the logging to. This can be If you pass a If you pass a If you pass a Note that |
max_rate_limit |
The max number of seconds to backoff for when rate limited. Anything greater than this will instead raise an error. This defaults to five minutes if left to the default value. This is to stop potentially indefinitely waiting on an endpoint, which is almost never what you want to do if giving a response to a user. You can set this to Note that this only applies to the REST API component that communicates with Discord, and will not affect sharding or third party HTTP endpoints that may be in use.
TYPE:
|
max_retries |
Maximum number of times a request will be retried if
it fails with a Will default to 3 if set to |
proxy_settings |
Custom proxy settings to use with network-layer logic in your application to get through an HTTP-proxy.
TYPE:
|
dumps |
The JSON encoder this application should use.
TYPE:
|
loads |
The JSON decoder this application should use.
TYPE:
|
rest_url |
Defaults to the Discord REST API URL if |
Examples:
Simple logging setup:
hikari.GatewayBot("TOKEN", logs="INFO") # Registered logging level
# or
hikari.GatewayBot("TOKEN", logs=20) # Logging level as an int
File config:
# See https://docs.python.org/3/library/logging.config.html#configuration-file-format for more info
hikari.GatewayBot("TOKEN", logs="path/to/file.ini")
Setting up logging through a dict config:
# See https://docs.python.org/3/library/logging.config.html#dictionary-schema-details for more info
hikari.GatewayBot(
"TOKEN",
logs={
"version": 1,
"incremental": True, # In incremental setups, the default stream handler will be setup
"loggers": {
"hikari.gateway": {"level": "DEBUG"},
"hikari.ratelimits": {"level": "TRACE_HIKARI"},
},
}
)
entity_factory
property
#
entity_factory: EntityFactory
Entity factory implementation for this object.
executor
property
#
heartbeat_latencies
property
#
Mapping of shard ID to heartbeat latency.
Any shards that are not yet started will be float('nan')
.
heartbeat_latency
property
#
heartbeat_latency: float
Average heartbeat latency of all started shards.
If no shards are started, this will return float('nan')
.
is_alive
property
#
is_alive: bool
Whether the application is running or not.
This is useful as some functions might raise
hikari.errors.ComponentStateConflictError
if this is
False
.
shard_count
property
#
shard_count: int
Number of shards in the total application.
This may not be the same as the size of shards
. If the application
is auto-sharded, this may be 0
until the shards are started.
shards
instance-attribute
#
shards: Mapping[int, GatewayShard] = MappingProxyType(_shards)
Mapping of shards in this application instance.
Each shard ID is mapped to the corresponding shard instance.
If the application has not started, it is acceptable to assume the result of this call will be an empty mapping.
dispatch
#
Dispatch an event.
PARAMETER | DESCRIPTION |
---|---|
event |
The event to dispatch.
TYPE:
|
Examples:
We can dispatch custom events by first defining a class that
derives from hikari.events.base_events.Event
.
import attrs
from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake
@attrs.define()
class EveryoneMentionedEvent(Event):
app: RESTAware = attrs.field()
author: User = attrs.field()
'''The user who mentioned everyone.'''
content: str = attrs.field()
'''The message that was sent.'''
message_id: Snowflake = attrs.field()
'''The message ID.'''
channel_id: Snowflake = attrs.field()
'''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent
@bot.listen(MessageCreateEvent)
async def on_message(event):
if "@everyone" in event.content or "@here" in event.content:
event = EveryoneMentionedEvent(
author=event.author,
content=event.content,
message_id=event.id,
channel_id=event.channel_id,
)
bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with
hikari.impl.event_manager_base.EventManagerBase.subscribe
.
@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
print(event.user, "just pinged everyone in", event.channel_id)
RETURNS | DESCRIPTION |
---|---|
Future[Any]
|
A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. |
See Also
Listen : hikari.impl.gateway_bot.GatewayBot.listen
.
Stream : hikari.impl.gateway_bot.GatewayBot.stream
.
Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
.
Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
.
Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
get_listeners
#
get_listeners(
event_type: Type[EventT], /, *, polymorphic: bool = True
) -> Collection[CallbackT[EventT]]
Get the listeners for a given event type, if there are any.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to look for.
TYPE:
|
polymorphic |
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Collection[Callable[[EventT], Coroutine[Any, Any, None]]]
|
A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. |
get_me
#
Return the bot user, if known.
This should be available as soon as the bot has fired the
hikari.events.lifetime_events.StartingEvent
.
Until then, this may or may not be None
.
RETURNS | DESCRIPTION |
---|---|
Optional[OwnUser]
|
The bot user, if known, otherwise |
join
async
#
Wait indefinitely until the application closes.
This can be placed in a task and cancelled without affecting the application runtime itself. Any exceptions raised by shards will be propagated to here.
listen
#
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
PARAMETER | DESCRIPTION |
---|---|
*event_types |
The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.
|
RETURNS | DESCRIPTION |
---|---|
Callable[[EventT], EventT]
|
A decorator for a coroutine function that passes it to
|
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
.
Stream : hikari.impl.gateway_bot.GatewayBot.stream
.
Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
.
Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
.
Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
print_banner
staticmethod
#
print_banner(
banner: Optional[str],
allow_color: bool,
force_color: bool,
extra_args: Optional[Dict[str, str]] = None,
) -> None
Print the banner.
This allows library vendors to override this behaviour, or choose to inject their own "branding" on top of what hikari provides by default.
Normal users should not need to invoke this function, and can simply
change the banner
argument passed to the constructor to manipulate
what is displayed.
PARAMETER | DESCRIPTION |
---|---|
banner |
The package to find a |
allow_color |
A flag that allows advising whether to allow color if supported or
not. Can be overridden by setting a
TYPE:
|
force_color |
A flag that allows forcing color to always be output, even if the
terminal device may not support it. Setting the This will take precedence over
TYPE:
|
extra_args |
If provided, extra $-substitutions to use when printing the banner. Default substitutions can not be overwritten. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If |
request_guild_members
async
#
request_guild_members(
guild: SnowflakeishOr[PartialGuild],
*,
include_presences: UndefinedOr[bool] = undefined.UNDEFINED,
query: str = "",
limit: int = 0,
users: UndefinedOr[SnowflakeishSequence[User]] = undefined.UNDEFINED,
nonce: UndefinedOr[str] = undefined.UNDEFINED
) -> None
Request for a guild chunk.
Note
To request the full list of members, set query
to ""
(empty
string) and limit
to 0
.
PARAMETER | DESCRIPTION |
---|---|
guild |
The guild to request chunk for.
TYPE:
|
PARAMETER | DESCRIPTION |
---|---|
include_presences |
If provided, whether to request presences.
TYPE:
|
query |
If not
TYPE:
|
limit |
Maximum number of members to send matching the query.
TYPE:
|
users |
If provided, the users to request for.
TYPE:
|
nonce |
If provided, the nonce to be sent with guild chunks.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If trying to specify |
MissingIntentError
|
When trying to request presences without the |
RuntimeError
|
If the guild passed isn't covered by any of the shards in this sharded client. |
run
#
run(
*,
activity: Optional[Activity] = None,
afk: bool = False,
asyncio_debug: Optional[bool] = None,
check_for_updates: bool = True,
close_passed_executor: bool = False,
close_loop: bool = True,
coroutine_tracking_depth: Optional[int] = None,
enable_signal_handlers: Optional[bool] = None,
idle_since: Optional[datetime] = None,
ignore_session_start_limit: bool = False,
large_threshold: int = 250,
propagate_interrupts: bool = False,
status: Status = presences.Status.ONLINE,
shard_ids: Optional[Sequence[int]] = None,
shard_count: Optional[int] = None
) -> None
Start the application and block until it's finished running.
PARAMETER | DESCRIPTION |
---|---|
activity |
The initial activity to display in the bot user presence, or
|
afk |
The initial AFK state to display in the bot user presence, or
TYPE:
|
asyncio_debug |
If
TYPE:
|
check_for_updates |
If
TYPE:
|
close_passed_executor |
If
TYPE:
|
close_loop |
If This will wait until all hikari-owned
TYPE:
|
coroutine_tracking_depth |
If an integer value and supported by the interpreter, then this many nested coroutine calls will be tracked with their call origin state. This allows you to determine where non-awaited coroutines may originate from, but generally you do not want to leave this enabled for performance reasons. |
enable_signal_handlers |
Defaults to If on a non-Windows OS with builtin support for kernel-level
POSIX signals, then setting this to |
idle_since |
The |
ignore_session_start_limit |
If
TYPE:
|
large_threshold |
Threshold for members in a guild before it is treated as being "large" and no longer sending member details in the [GUILD CREATE][] event.
TYPE:
|
propagate_interrupts |
If
TYPE:
|
shard_ids |
|
shard_count |
The number of shards to use in the entire distributed application. Defaults to |
status |
The initial status to show for the user presence on startup.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ComponentStateConflictError
|
If bot is already running. |
TypeError
|
If |
start
async
#
start(
*,
activity: Optional[Activity] = None,
afk: bool = False,
check_for_updates: bool = True,
idle_since: Optional[datetime] = None,
ignore_session_start_limit: bool = False,
large_threshold: int = 250,
shard_ids: Optional[Sequence[int]] = None,
shard_count: Optional[int] = None,
status: Status = presences.Status.ONLINE
) -> None
Start the bot, wait for all shards to become ready, and then return.
PARAMETER | DESCRIPTION |
---|---|
activity |
The initial activity to display in the bot user presence, or
|
afk |
The initial AFK state to display in the bot user presence, or
TYPE:
|
check_for_updates |
If
TYPE:
|
idle_since |
The |
ignore_session_start_limit |
If
TYPE:
|
large_threshold |
Threshold for members in a guild before it is treated as being
"large" and no longer sending member details in the
TYPE:
|
shard_ids |
|
shard_count |
The number of shards to use in the entire distributed application. Defaults to |
status |
The initial status to show for the user presence on startup.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
TypeError
|
If |
ComponentStateConflictError
|
If bot is already running. |
stream
#
stream(
event_type: Type[EventT],
/,
timeout: Union[float, int, None],
limit: Optional[int] = None,
) -> EventStream[EventT]
Return a stream iterator for the given event and sub-events.
Warning
If you use stream.open()
to start the stream then you must
also close it with stream.close()
otherwise it may queue
events in memory indefinitely.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to listen for. This will listen for subclasses of this type additionally. |
timeout |
How long this streamer should wait for the next event before
ending the iteration. If |
limit |
The limit for how many events this should queue at one time before
dropping extra incoming events, leave this as |
RETURNS | DESCRIPTION |
---|---|
EventStream[Event]
|
The async iterator to handle streamed events. This must be started
with |
Examples:
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
async for user_id in stream.map("user_id").limit(50):
...
or using open()
and close()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()
async for user_id in stream.map("user_id").limit(50)
...
stream.close()
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
.
Listen : hikari.impl.gateway_bot.GatewayBot.listen
.
Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
.
Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
.
Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
subscribe
#
Subscribe a given callback to a given event type.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to listen for. This will also listen for any
subclasses of the given type.
TYPE:
|
callback |
Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
TYPE:
|
Examples:
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.subscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
.
Listen : hikari.impl.gateway_bot.GatewayBot.listen
.
Stream : hikari.impl.gateway_bot.GatewayBot.stream
.
Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
.
Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
unsubscribe
#
Unsubscribe a given callback from a given event type, if present.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to unsubscribe from. This must be the same exact
type as was originally subscribed with to be removed correctly.
TYPE:
|
callback |
The callback to unsubscribe.
TYPE:
|
Examples:
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.unsubscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
.
Listen : hikari.impl.gateway_bot.GatewayBot.listen
.
Stream : hikari.impl.gateway_bot.GatewayBot.stream
.
Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
.
Wait_for : hikari.impl.gateway_bot.GatewayBot.wait_for
.
update_presence
async
#
update_presence(
*,
status: UndefinedOr[Status] = undefined.UNDEFINED,
idle_since: UndefinedNoneOr[datetime] = undefined.UNDEFINED,
activity: UndefinedNoneOr[Activity] = undefined.UNDEFINED,
afk: UndefinedOr[bool] = undefined.UNDEFINED
) -> None
Update the presence on all shards.
This call will patch the presence on each shard. This means that unless you explicitly specify a parameter, the previous value will be retained. This means you do not have to track the global presence in your code.
Note
This will only send the update payloads to shards that are alive. Any shards that are not alive will cache the new presence for when they do start.
Note
If you want to set presences per shard, access the shard you wish
to update (e.g. by using hikari.GatewayBot.shards
), and call
hikari.api.shard.GatewayShard.update_presence
on that shard.
This method is simply a facade to make performing this in bulk
simpler.
PARAMETER | DESCRIPTION |
---|---|
idle_since |
The datetime that the user started being idle. If undefined, this will not be changed.
TYPE:
|
afk |
Whether to be marked as AFK. If undefined, this will not be changed.
TYPE:
|
activity |
The activity to appear to be playing. If undefined, this will not be changed.
TYPE:
|
status |
The web status to show. If undefined, this will not be changed.
TYPE:
|
update_voice_state
async
#
update_voice_state(
guild: SnowflakeishOr[PartialGuild],
channel: Optional[SnowflakeishOr[GuildVoiceChannel]],
*,
self_mute: UndefinedOr[bool] = undefined.UNDEFINED,
self_deaf: UndefinedOr[bool] = undefined.UNDEFINED
) -> None
Update the voice state for this bot in a given guild.
PARAMETER | DESCRIPTION |
---|---|
guild |
The guild or guild ID to update the voice state for.
TYPE:
|
channel |
The channel or channel ID to update the voice state for. If
TYPE:
|
self_mute |
|
self_deaf |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If the guild passed isn't covered by any of the shards in this sharded client. |
wait_for
async
#
wait_for(
event_type: Type[EventT],
/,
timeout: Union[float, int, None],
predicate: Optional[PredicateT[EventT]] = None,
) -> EventT
Wait for a given event to occur once, then return the event.
Warning
Async predicates are not supported.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to listen for. This will listen for subclasses of this type additionally. |
predicate |
A function taking the event as the single parameter.
This should return
TYPE:
|
timeout |
The amount of time to wait before raising an |
RETURNS | DESCRIPTION |
---|---|
Event
|
The event that was provided. |
RAISES | DESCRIPTION |
---|---|
TimeoutError
|
See Also
Dispatch : hikari.impl.gateway_bot.GatewayBot.dispatch
.
Listen : hikari.impl.gateway_bot.GatewayBot.listen
.
Stream : hikari.impl.gateway_bot.GatewayBot.stream
.
Subscribe : hikari.impl.gateway_bot.GatewayBot.subscribe
.
Unsubscribe : hikari.impl.gateway_bot.GatewayBot.unsubscribe
.