Source code for hikari.impl.cache

# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Basic implementation of a cache for general bots and gateway apps."""

from __future__ import annotations

__all__: typing.Sequence[str] = ("CacheImpl",)

import copy
import logging
import typing

from hikari import channels as channels_
from hikari import emojis
from hikari import messages
from hikari import snowflakes
from hikari import undefined
from hikari.api import cache
from hikari.api import config as config_api
from hikari.impl import config as config_impl
from hikari.internal import cache as cache_utility
from hikari.internal import collections

if typing.TYPE_CHECKING:
    from hikari import guilds
    from hikari import invites
    from hikari import presences
    from hikari import stickers
    from hikari import traits
    from hikari import users
    from hikari import voices

_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.cache")


# TODO: do we want to hide entities that are marked as "deleted" and being kept alive by references?
[docs]class CacheImpl(cache.MutableCache): """In-memory cache implementation. Parameters ---------- app : hikari.traits.RESTAware The object of the REST aware app this is bound to. settings : hikari.impl.config.CacheSettings The cache settings to use. """ __slots__: typing.Sequence[str] = ( "_app", "_dm_channel_entries", "_emoji_entries", "_guild_channel_entries", "_guild_thread_entries", "_guild_entries", "_intents", "_invite_entries", "_me", "_role_entries", "_sticker_entries", "_unknown_custom_emoji_entries", "_user_entries", "_message_entries", "_referenced_messages", "_settings", ) # For the sake of keeping things clean, the annotations are being kept separate from the assignment here. _me: typing.Optional[users.OwnUser] _emoji_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, cache_utility.KnownCustomEmojiData] _dm_channel_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, snowflakes.Snowflake] _guild_channel_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, channels_.PermissibleGuildChannel] _guild_thread_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, channels_.GuildThreadChannel] _guild_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, cache_utility.GuildRecord] _invite_entries: collections.ExtendedMutableMapping[str, cache_utility.InviteData] _role_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, guilds.Role] _sticker_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, cache_utility.GuildStickerData] _unknown_custom_emoji_entries: collections.ExtendedMutableMapping[ snowflakes.Snowflake, cache_utility.RefCell[emojis.CustomEmoji], ] _user_entries: collections.ExtendedMutableMapping[snowflakes.Snowflake, cache_utility.RefCell[users.User]] _message_entries: collections.ExtendedMutableMapping[ snowflakes.Snowflake, cache_utility.RefCell[cache_utility.MessageData] ] _referenced_messages: collections.ExtendedMutableMapping[ snowflakes.Snowflake, cache_utility.RefCell[cache_utility.MessageData] ] def __init__(self, app: traits.RESTAware, settings: config_impl.CacheSettings) -> None: self._app = app self._settings = settings self._create_cache() @property
[docs] def settings(self) -> config_impl.CacheSettings: return self._settings
def _create_cache(self) -> None: self._me = None self._dm_channel_entries = collections.LimitedCapacityCacheMap(limit=self._settings.max_dm_channel_ids) self._emoji_entries = collections.FreezableDict() self._guild_channel_entries = collections.FreezableDict() self._guild_thread_entries = collections.FreezableDict() self._guild_entries = collections.FreezableDict() self._invite_entries = collections.FreezableDict() self._role_entries = collections.FreezableDict() self._sticker_entries = collections.FreezableDict() # This is a purely internal cache used for handling the caching and de-duplicating of the unknown custom emojis # found attached to cached presence activities. self._unknown_custom_emoji_entries = collections.FreezableDict() self._user_entries = collections.FreezableDict() self._message_entries = collections.LimitedCapacityCacheMap( limit=self._settings.max_messages, on_expire=self._on_message_expire ) self._referenced_messages = collections.FreezableDict() def _is_cache_enabled_for(self, required_flag: config_api.CacheComponents) -> bool: return (self._settings.components & required_flag) == required_flag @staticmethod def _increment_ref_count(obj: cache_utility.RefCell[typing.Any], increment: int = 1) -> None: obj.ref_count += increment
[docs] def clear(self) -> None: if self._settings.components == config_api.CacheComponents.NONE: return None self._create_cache()
[docs] def clear_dm_channel_ids(self) -> cache.CacheView[snowflakes.Snowflake, snowflakes.Snowflake]: if not self._is_cache_enabled_for(config_api.CacheComponents.DM_CHANNEL_IDS): return cache_utility.EmptyCacheView() result = self._dm_channel_entries self._dm_channel_entries = collections.LimitedCapacityCacheMap(limit=self._settings.max_dm_channel_ids) return cache_utility.CacheMappingView(result)
[docs] def delete_dm_channel_id( self, user: snowflakes.SnowflakeishOr[users.PartialUser], / ) -> typing.Optional[snowflakes.Snowflake]: if not self._is_cache_enabled_for(config_api.CacheComponents.DM_CHANNEL_IDS): return None return self._dm_channel_entries.pop(snowflakes.Snowflake(user), None)
[docs] def get_dm_channel_id( self, user: snowflakes.SnowflakeishOr[users.PartialUser], / ) -> typing.Optional[snowflakes.Snowflake]: if not self._is_cache_enabled_for(config_api.CacheComponents.DM_CHANNEL_IDS): return None return self._dm_channel_entries.get(snowflakes.Snowflake(user))
[docs] def get_dm_channel_ids_view(self) -> cache.CacheView[snowflakes.Snowflake, snowflakes.Snowflake]: if not self._is_cache_enabled_for(config_api.CacheComponents.DM_CHANNEL_IDS): return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(self._dm_channel_entries.freeze())
[docs] def set_dm_channel_id( self, user: snowflakes.SnowflakeishOr[users.PartialUser], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.DM_CHANNEL_IDS): return None self._dm_channel_entries[snowflakes.Snowflake(user)] = snowflakes.Snowflake(channel)
def _build_emoji( self, emoji_data: cache_utility.KnownCustomEmojiData, ) -> emojis.KnownCustomEmoji: return emoji_data.build_entity(self._app)
[docs] def clear_emojis(self) -> cache.CacheView[snowflakes.Snowflake, emojis.KnownCustomEmoji]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return cache_utility.EmptyCacheView() cached_emojis = self._emoji_entries self._emoji_entries = collections.FreezableDict() for emoji_data in cached_emojis.values(): if emoji_data.user: self._garbage_collect_user(emoji_data.user, decrement=1) for guild_id, guild_record in self._guild_entries.freeze().items(): guild_record.emojis = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_emojis, builder=self._build_emoji)
[docs] def clear_emojis_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, emojis.KnownCustomEmoji]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.emojis: return cache_utility.EmptyCacheView() cached_emojis = {emoji_id: self._emoji_entries.pop(emoji_id) for emoji_id in guild_record.emojis} guild_record.emojis = None self._remove_guild_record_if_empty(guild_id, guild_record) for emoji_data in cached_emojis.values(): if emoji_data.user: self._garbage_collect_user(emoji_data.user, decrement=1) return cache_utility.CacheMappingView(cached_emojis, builder=self._build_emoji)
[docs] def delete_emoji( self, emoji: snowflakes.SnowflakeishOr[emojis.CustomEmoji], / ) -> typing.Optional[emojis.KnownCustomEmoji]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return None emoji_id = snowflakes.Snowflake(emoji) emoji_data = self._emoji_entries.pop(emoji_id, None) if not emoji_data: return None if emoji_data.user: self._garbage_collect_user(emoji_data.user, decrement=1) guild_record = self._guild_entries.get(emoji_data.guild_id) if guild_record and guild_record.emojis: guild_record.emojis.remove(emoji_id) if not guild_record.emojis: guild_record.emojis = None self._remove_guild_record_if_empty(emoji_data.guild_id, guild_record) return self._build_emoji(emoji_data)
[docs] def get_emoji( self, emoji: snowflakes.SnowflakeishOr[emojis.CustomEmoji], / ) -> typing.Optional[emojis.KnownCustomEmoji]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return None emoji_data = self._emoji_entries.get(snowflakes.Snowflake(emoji)) return self._build_emoji(emoji_data) if emoji_data else None
[docs] def get_emojis_view(self) -> cache.CacheView[snowflakes.Snowflake, emojis.KnownCustomEmoji]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(self._emoji_entries.freeze(), builder=self._build_emoji)
[docs] def get_emojis_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, emojis.KnownCustomEmoji]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.emojis: return cache_utility.EmptyCacheView() cached_emojis = {emoji_id: self._emoji_entries[emoji_id] for emoji_id in guild_record.emojis} return cache_utility.CacheMappingView(cached_emojis, builder=self._build_emoji)
[docs] def set_emoji(self, emoji: emojis.KnownCustomEmoji, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return None user: typing.Optional[cache_utility.RefCell[users.User]] = None if emoji.user: user = self._set_user(emoji.user) if emoji.id not in self._emoji_entries: self._increment_ref_count(user) emoji_data = cache_utility.KnownCustomEmojiData.build_from_entity(emoji, user=user) self._emoji_entries[emoji.id] = emoji_data guild_record = self._get_or_create_guild_record(emoji.guild_id) if guild_record.emojis is None: # TODO: add test cases when it is not None? guild_record.emojis = collections.SnowflakeSet() guild_record.emojis.add(emoji.id)
[docs] def update_emoji( self, emoji: emojis.KnownCustomEmoji, / ) -> typing.Tuple[typing.Optional[emojis.KnownCustomEmoji], typing.Optional[emojis.KnownCustomEmoji]]: if not self._is_cache_enabled_for(config_api.CacheComponents.EMOJIS): return None, None cached_emoji = self.get_emoji(emoji.id) self.set_emoji(emoji) return cached_emoji, self.get_emoji(emoji.id)
def _build_sticker( self, sticker_data: cache_utility.GuildStickerData, ) -> stickers.GuildSticker: return sticker_data.build_entity(self._app)
[docs] def clear_stickers(self) -> cache.CacheView[snowflakes.Snowflake, stickers.GuildSticker]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return cache_utility.EmptyCacheView() cached_stickers = self._sticker_entries self._sticker_entries = collections.FreezableDict() for sticker_data in cached_stickers.values(): if sticker_data.user: self._garbage_collect_user(sticker_data.user, decrement=1) for guild_id, guild_record in self._guild_entries.freeze().items(): guild_record.stickers = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_stickers, builder=self._build_sticker)
[docs] def clear_stickers_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, stickers.GuildSticker]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.stickers: return cache_utility.EmptyCacheView() cached_stickers = {sticker_id: self._sticker_entries.pop(sticker_id) for sticker_id in guild_record.stickers} guild_record.stickers = None self._remove_guild_record_if_empty(guild_id, guild_record) for sticker_data in cached_stickers.values(): if sticker_data.user: self._garbage_collect_user(sticker_data.user, decrement=1) return cache_utility.CacheMappingView(cached_stickers, builder=self._build_sticker)
[docs] def get_sticker( self, sticker: snowflakes.SnowflakeishOr[stickers.GuildSticker], / ) -> typing.Optional[stickers.GuildSticker]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return None sticker_data = self._sticker_entries.get(snowflakes.Snowflake(sticker)) return self._build_sticker(sticker_data) if sticker_data else None
[docs] def get_stickers_view(self) -> cache.CacheView[snowflakes.Snowflake, stickers.GuildSticker]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(self._sticker_entries.freeze(), builder=self._build_sticker)
[docs] def get_stickers_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, stickers.GuildSticker]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.stickers: return cache_utility.EmptyCacheView() cached_stickers = {sticker_id: self._sticker_entries[sticker_id] for sticker_id in guild_record.stickers} return cache_utility.CacheMappingView(cached_stickers, builder=self._build_sticker)
[docs] def delete_sticker( self, sticker: snowflakes.SnowflakeishOr[stickers.GuildSticker], / ) -> typing.Optional[stickers.GuildSticker]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return None sticker_id = snowflakes.Snowflake(sticker) sticker_data = self._sticker_entries.pop(sticker_id, None) if not sticker_data: return None if sticker_data.user: self._garbage_collect_user(sticker_data.user, decrement=1) guild_record = self._guild_entries.get(sticker_data.guild_id) if guild_record and guild_record.stickers: guild_record.stickers.remove(sticker_id) if not guild_record.stickers: guild_record.stickers = None return self._build_sticker(sticker_data)
[docs] def set_sticker(self, sticker: stickers.GuildSticker, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_STICKERS): return None user: typing.Optional[cache_utility.RefCell[users.User]] = None if sticker.user: user = self._set_user(sticker.user) if sticker.id not in self._sticker_entries: self._increment_ref_count(user) sticker_data = cache_utility.GuildStickerData.build_from_entity(sticker, user=user) self._sticker_entries[sticker.id] = sticker_data guild_record = self._get_or_create_guild_record(sticker.guild_id) if guild_record.stickers is None: # TODO: add test cases when it is not None? guild_record.stickers = collections.SnowflakeSet() guild_record.stickers.add(sticker.id)
def _remove_guild_record_if_empty( self, guild_id: snowflakes.Snowflake, record: cache_utility.GuildRecord, / ) -> None: if guild_id in self._guild_entries and record.empty(): del self._guild_entries[guild_id] def _get_or_create_guild_record(self, guild_id: snowflakes.Snowflake) -> cache_utility.GuildRecord: if guild_id not in self._guild_entries: self._guild_entries[guild_id] = cache_utility.GuildRecord() return self._guild_entries[guild_id]
[docs] def clear_guilds(self) -> cache.CacheView[snowflakes.Snowflake, guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return cache_utility.EmptyCacheView() cached_guilds = {} for guild_id, guild_record in self._guild_entries.freeze().items(): if guild_record.guild: cached_guilds[guild_id] = guild_record.guild guild_record.guild = None guild_record.is_available = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_guilds) if cached_guilds else cache_utility.EmptyCacheView()
[docs] def delete_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> typing.Optional[guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record: return None guild = guild_record.guild if guild: guild_record.guild = None guild_record.is_available = None self._remove_guild_record_if_empty(guild_id, guild_record) return guild
def _get_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], /, *, availability: bool ) -> typing.Optional[guilds.GatewayGuild]: guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.guild or guild_record.is_available is not availability: return None return copy.copy(guild_record.guild)
[docs] def get_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> typing.Optional[guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) return copy.copy(guild_record.guild) if guild_record and guild_record.guild else None
[docs] def get_available_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> typing.Optional[guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None return self._get_guild(guild, availability=True)
[docs] def get_unavailable_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> typing.Optional[guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None return self._get_guild(guild, availability=False)
def _get_guilds_view(self, *, availability: bool) -> cache.CacheView[snowflakes.Snowflake, guilds.GatewayGuild]: # We may have a guild record without a guild object in cases where we're caching other entities that belong to # the guild therefore we want to make sure record.guild isn't None. results = { sf: guild_record.guild for sf, guild_record in self._guild_entries.items() if guild_record.guild and guild_record.is_available is availability } return cache_utility.CacheMappingView(results) if results else cache_utility.EmptyCacheView()
[docs] def get_guilds_view(self) -> cache.CacheView[snowflakes.Snowflake, guilds.GatewayGuild]: return cache_utility.CacheMappingView( {guild_id: record.guild for guild_id, record in self._guild_entries.items() if record.guild} )
[docs] def get_available_guilds_view(self) -> cache.CacheView[snowflakes.Snowflake, guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return cache_utility.EmptyCacheView() return self._get_guilds_view(availability=True)
[docs] def get_unavailable_guilds_view(self) -> cache.CacheView[snowflakes.Snowflake, guilds.GatewayGuild]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return cache_utility.EmptyCacheView() return self._get_guilds_view(availability=False)
[docs] def set_guild(self, guild: guilds.GatewayGuild, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None guild_record = self._get_or_create_guild_record(guild.id) guild_record.guild = copy.copy(guild) guild_record.is_available = True
[docs] def set_guild_availability( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], is_available: bool, / ) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if guild_record and guild_record.guild: guild_record.is_available = is_available
[docs] def update_guild( self, guild: guilds.GatewayGuild, / ) -> typing.Tuple[typing.Optional[guilds.GatewayGuild], typing.Optional[guilds.GatewayGuild]]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILDS): return None, None guild = copy.copy(guild) cached_guild = self.get_guild(guild.id) # We have to manually update these because Inconsistency is Discord's middle name. if cached_guild: guild.member_count = cached_guild.member_count guild.joined_at = cached_guild.joined_at guild.is_large = cached_guild.is_large self.set_guild(guild) return cached_guild, self.get_guild(guild.id)
[docs] def clear_threads(self) -> cache.CacheView[snowflakes.Snowflake, channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return cache_utility.EmptyCacheView() cached_threads = self._guild_thread_entries self._guild_thread_entries = collections.FreezableDict() for guild_id, guild_record in self._guild_entries.freeze().items(): if guild_record.threads: guild_record.threads = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_threads)
[docs] def clear_threads_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.threads: return cache_utility.EmptyCacheView() cached_threads = {sf: self._guild_thread_entries.pop(sf) for sf in guild_record.threads} guild_record.threads = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_threads)
[docs] def clear_threads_for_channel( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> cache.CacheView[snowflakes.Snowflake, channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return cache_utility.EmptyCacheView() channel_id = snowflakes.Snowflake(channel) guild = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild) if not guild_record or not guild_record.threads: return cache_utility.EmptyCacheView() threads: typing.Dict[snowflakes.Snowflake, channels_.GuildThreadChannel] = {} for thread in map(self._guild_thread_entries.__getitem__, tuple(guild_record.threads)): if thread.parent_id == channel_id: del self._guild_thread_entries[thread.id] guild_record.threads.remove(thread.id) if not guild_record.threads: guild_record.threads = None self._remove_guild_record_if_empty(guild, guild_record) return cache_utility.CacheMappingView(threads)
[docs] def delete_thread( self, thread: snowflakes.SnowflakeishOr[channels_.PartialChannel], / ) -> typing.Optional[channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return None thread_id = snowflakes.Snowflake(thread) thread = self._guild_thread_entries.pop(thread_id, None) if not thread: return None guild_record = self._guild_entries.get(thread.guild_id) if guild_record and guild_record.threads: guild_record.threads.remove(thread_id) if not guild_record.threads: guild_record.threads = None self._remove_guild_record_if_empty(thread.guild_id, guild_record) return thread
[docs] def get_thread( self, thread: snowflakes.SnowflakeishOr[channels_.PartialChannel], / ) -> typing.Optional[channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return None thread = self._guild_thread_entries.get(snowflakes.Snowflake(thread)) return copy.copy(thread) if thread else None
[docs] def get_threads_view(self) -> cache.CacheView[snowflakes.Snowflake, channels_.GuildThreadChannel]: return cache_utility.CacheMappingView(self._guild_thread_entries.freeze())
[docs] def get_threads_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.threads: return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView( {sf: self._guild_thread_entries[sf] for sf in guild_record.threads}, )
[docs] def get_threads_view_for_channel( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> cache.CacheView[snowflakes.Snowflake, channels_.GuildThreadChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return cache_utility.EmptyCacheView() record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not record or not record.threads: return cache_utility.EmptyCacheView() threads = map(self._guild_thread_entries.__getitem__, record.threads) channel = snowflakes.Snowflake(channel) return cache_utility.CacheMappingView({thread.id: thread for thread in threads if thread.parent_id == channel})
[docs] def set_thread(self, thread: channels_.GuildThreadChannel, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return self._guild_thread_entries[thread.id] = copy.copy(thread) guild_record = self._get_or_create_guild_record(thread.guild_id) if guild_record.threads is None: guild_record.threads = collections.SnowflakeSet() guild_record.threads.add(thread.id)
[docs] def update_thread( self, thread: channels_.GuildThreadChannel, / ) -> typing.Tuple[typing.Optional[channels_.GuildThreadChannel], typing.Optional[channels_.GuildThreadChannel]]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_THREADS): return None, None cached_thread = self.get_thread(thread.id) self.set_thread(thread) return cached_thread, self.get_thread(thread.id)
[docs] def clear_guild_channels(self) -> cache.CacheView[snowflakes.Snowflake, channels_.PermissibleGuildChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return cache_utility.EmptyCacheView() cached_channels = self._guild_channel_entries self._guild_channel_entries = collections.FreezableDict() for guild_id, guild_record in self._guild_entries.freeze().items(): if guild_record.channels: guild_record.channels = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_channels)
[docs] def clear_guild_channels_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, channels_.PermissibleGuildChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.channels: return cache_utility.EmptyCacheView() cached_channels = {sf: self._guild_channel_entries.pop(sf) for sf in guild_record.channels} guild_record.channels = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_channels)
[docs] def delete_guild_channel( self, channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], / ) -> typing.Optional[channels_.PermissibleGuildChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return None channel_id = snowflakes.Snowflake(channel) channel = self._guild_channel_entries.pop(channel_id, None) if not channel: return None guild_record = self._guild_entries.get(channel.guild_id) if guild_record and guild_record.channels: guild_record.channels.remove(channel_id) if not guild_record.channels: guild_record.channels = None self._remove_guild_record_if_empty(channel.guild_id, guild_record) return channel
[docs] def get_guild_channel( self, channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], / ) -> typing.Optional[channels_.PermissibleGuildChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return None channel = self._guild_channel_entries.get(snowflakes.Snowflake(channel)) return cache_utility.copy_guild_channel(channel) if channel else None
[docs] def get_guild_channels_view(self) -> cache.CacheView[snowflakes.Snowflake, channels_.PermissibleGuildChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView( self._guild_channel_entries.freeze(), builder=cache_utility.copy_guild_channel # type: ignore[type-var] )
[docs] def get_guild_channels_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, channels_.PermissibleGuildChannel]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.channels: return cache_utility.EmptyCacheView() cached_channels = {sf: self._guild_channel_entries[sf] for sf in guild_record.channels} def sorter( args: typing.Tuple[snowflakes.Snowflake, channels_.PermissibleGuildChannel] ) -> typing.Tuple[int, int, int]: channel = args[1] if isinstance(channel, channels_.GuildCategory): return channel.position, -1, 0 parent_position = -1 if channel.parent_id is None else cached_channels[channel.parent_id].position if not isinstance(channel, channels_.GuildVoiceChannel): return parent_position, 0, channel.position return parent_position, 1, channel.position cached_channels = dict(sorted(cached_channels.items(), key=sorter)) return cache_utility.CacheMappingView( cached_channels, builder=cache_utility.copy_guild_channel # type: ignore[type-var] )
[docs] def set_guild_channel(self, channel: channels_.PermissibleGuildChannel, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return None self._guild_channel_entries[channel.id] = cache_utility.copy_guild_channel(channel) guild_record = self._get_or_create_guild_record(channel.guild_id) if guild_record.channels is None: guild_record.channels = collections.SnowflakeSet() guild_record.channels.add(channel.id)
[docs] def update_guild_channel( self, channel: channels_.PermissibleGuildChannel, / ) -> typing.Tuple[ typing.Optional[channels_.PermissibleGuildChannel], typing.Optional[channels_.PermissibleGuildChannel] ]: if not self._is_cache_enabled_for(config_api.CacheComponents.GUILD_CHANNELS): return None, None cached_channel = self.get_guild_channel(channel.id) self.set_guild_channel(channel) return cached_channel, self.get_guild_channel(channel.id)
def _build_invite( self, invite_data: cache_utility.InviteData, ) -> invites.InviteWithMetadata: return invite_data.build_entity(self._app) def _remove_invite_users(self, invite: cache_utility.InviteData) -> None: if invite.inviter: self._garbage_collect_user(invite.inviter, decrement=1) if invite.target_user: self._garbage_collect_user(invite.target_user, decrement=1)
[docs] def clear_invites(self) -> cache.CacheView[str, invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return cache_utility.EmptyCacheView() cached_invites = self._invite_entries self._invite_entries = collections.FreezableDict() for invite_data in cached_invites.values(): self._remove_invite_users(invite_data) for guild_id, guild_record in self._guild_entries.freeze().items(): if guild_record.invites: guild_record.invites = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_invites, builder=self._build_invite)
[docs] def clear_invites_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[str, invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.invites: return cache_utility.EmptyCacheView() cached_invites = {invite_code: self._invite_entries.pop(invite_code) for invite_code in guild_record.invites} guild_record.invites = None self._remove_guild_record_if_empty(guild_id, guild_record) for invite_data in cached_invites.values(): self._remove_invite_users(invite_data) return cache_utility.CacheMappingView(cached_invites, builder=self._build_invite)
[docs] def clear_invites_for_channel( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> cache.CacheView[str, invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) channel_id = snowflakes.Snowflake(channel) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.invites: return cache_utility.EmptyCacheView() cached_invites = {} for code in tuple(guild_record.invites): invite_data = self._invite_entries[code] if invite_data.channel_id != channel_id: continue cached_invites[code] = invite_data del self._invite_entries[code] guild_record.invites.remove(code) self._remove_invite_users(invite_data) if not guild_record.invites: guild_record.invites = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_invites, builder=self._build_invite)
[docs] def delete_invite( self, code: typing.Union[invites.InviteCode, str], / ) -> typing.Optional[invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return None code = code if isinstance(code, str) else code.code invite_data = self._invite_entries.pop(code, None) if not invite_data: return None self._remove_invite_users(invite_data) if invite_data.guild_id is not None: # TODO: test case when this is None? guild_record = self._guild_entries.get(invite_data.guild_id) if guild_record and guild_record.invites: guild_record.invites.remove(code) if not guild_record.invites: guild_record.invites = None # TODO: test when this is set to None self._remove_guild_record_if_empty(invite_data.guild_id, guild_record) return self._build_invite(invite_data)
[docs] def get_invite(self, code: typing.Union[invites.InviteCode, str], /) -> typing.Optional[invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return None code = code if isinstance(code, str) else code.code invite_data = self._invite_entries.get(code) return self._build_invite(invite_data) if invite_data else None
[docs] def get_invites_view(self) -> cache.CacheView[str, invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(self._invite_entries.freeze(), builder=self._build_invite)
[docs] def get_invites_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[str, invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_entry = self._guild_entries.get(guild_id) if not guild_entry or not guild_entry.invites: return cache_utility.EmptyCacheView() cached_invites = {code: self._invite_entries[code] for code in guild_entry.invites} return cache_utility.CacheMappingView(cached_invites, builder=self._build_invite)
[docs] def get_invites_view_for_channel( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> cache.CacheView[str, invites.InviteWithMetadata]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) channel_id = snowflakes.Snowflake(channel) guild_entry = self._guild_entries.get(guild_id) if not guild_entry or not guild_entry.invites: return cache_utility.EmptyCacheView() cached_invites = { invite.code: invite for invite in map(self._invite_entries.get, guild_entry.invites) if invite and invite.channel_id == channel_id } return cache_utility.CacheMappingView(cached_invites, builder=self._build_invite)
[docs] def set_invite(self, invite: invites.InviteWithMetadata, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return None inviter: typing.Optional[cache_utility.RefCell[users.User]] = None if invite.inviter: inviter = self._set_user(invite.inviter) if invite.code not in self._invite_entries: self._increment_ref_count(inviter) target_user: typing.Optional[cache_utility.RefCell[users.User]] = None if invite.target_user: target_user = self._set_user(invite.target_user) if invite.code not in self._invite_entries: self._increment_ref_count(target_user) self._invite_entries[invite.code] = cache_utility.InviteData.build_from_entity( invite, inviter=inviter, target_user=target_user ) if invite.guild_id: guild_entry = self._get_or_create_guild_record(invite.guild_id) if guild_entry.invites is None: guild_entry.invites = [] guild_entry.invites.append(invite.code)
[docs] def update_invite( self, invite: invites.InviteWithMetadata, / ) -> typing.Tuple[typing.Optional[invites.InviteWithMetadata], typing.Optional[invites.InviteWithMetadata]]: if not self._is_cache_enabled_for(config_api.CacheComponents.INVITES): return None, None cached_invite = self.get_invite(invite.code) self.set_invite(invite) return cached_invite, self.get_invite(invite.code)
[docs] def delete_me(self) -> typing.Optional[users.OwnUser]: cached_user = self._me self._me = None return cached_user
[docs] def get_me(self) -> typing.Optional[users.OwnUser]: return copy.copy(self._me)
[docs] def set_me(self, user: users.OwnUser, /) -> None: if self._is_cache_enabled_for(config_api.CacheComponents.ME): _LOGGER.debug("setting my user to %s", user) self._me = copy.copy(user)
[docs] def update_me( self, user: users.OwnUser, / ) -> typing.Tuple[typing.Optional[users.OwnUser], typing.Optional[users.OwnUser]]: if not self._is_cache_enabled_for(config_api.CacheComponents.ME): return None, None cached_user = self.get_me() self.set_me(user) return cached_user, self.get_me()
def _build_member( self, member_data: cache_utility.RefCell[cache_utility.MemberData], ) -> guilds.Member: return member_data.object.build_entity(self._app) @staticmethod def _can_remove_member( member: cache_utility.RefCell[cache_utility.MemberData], ) -> bool: return member.ref_count < 1 and member.object.has_been_deleted def _garbage_collect_member( self, guild_record: cache_utility.GuildRecord, member: cache_utility.RefCell[cache_utility.MemberData], *, decrement: typing.Optional[int] = None, deleting: bool = False, ) -> typing.Optional[cache_utility.RefCell[cache_utility.MemberData]]: if deleting: member.object.has_been_deleted = True if decrement is not None: self._increment_ref_count(member, -decrement) user_id = member.object.user.object.id if not guild_record.members or user_id not in guild_record.members: return None if not self._can_remove_member(member): return None del guild_record.members[user_id] self._garbage_collect_user(member.object.user, decrement=1) if not guild_record.members: guild_record.members = None self._remove_guild_record_if_empty(member.object.guild_id, guild_record) return member
[docs] def clear_members( self, ) -> cache.CacheView[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, guilds.Member]]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return cache_utility.EmptyCacheView() views = ((guild_id, self.clear_members_for_guild(guild_id)) for guild_id in self._guild_entries.freeze().keys()) return cache_utility.CacheMappingView({guild_id: view for guild_id, view in views if view})
[docs] def clear_members_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, guilds.Member]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.members: return cache_utility.EmptyCacheView() cached_members = guild_record.members.freeze() members_gen = (self._garbage_collect_member(guild_record, m, deleting=True) for m in cached_members.values()) # _garbage_collect_member will only return the member data object if they could be removed, else None. cached_members = {member.object.user.object.id: member for member in members_gen if member} self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_members, builder=self._build_member) # type: ignore[type-var]
[docs] def delete_member( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], user: snowflakes.SnowflakeishOr[users.PartialUser], /, ) -> typing.Optional[guilds.Member]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return None guild_id = snowflakes.Snowflake(guild) user_id = snowflakes.Snowflake(user) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.members: return None member_data = guild_record.members.get(user_id) if not member_data: return None if not guild_record.members: guild_record.members = None self._remove_guild_record_if_empty(guild_id, guild_record) # _garbage_collect_member will only return the member data object if they could be removed, else None. garbage_collected = self._garbage_collect_member(guild_record, member_data, deleting=True) return self._build_member(member_data) if garbage_collected else None
[docs] def get_member( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], user: snowflakes.SnowflakeishOr[users.PartialUser], /, ) -> typing.Optional[guilds.Member]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return None guild_id = snowflakes.Snowflake(guild) user_id = snowflakes.Snowflake(user) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.members: return None member = guild_record.members.get(user_id) return self._build_member(member) if member else None
[docs] def get_members_view( self, ) -> cache.CacheView[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, guilds.Member]]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return cache_utility.EmptyCacheView() views: typing.Mapping[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, guilds.Member]] = { guild_id: cache_utility.CacheMappingView(view.members.freeze(), builder=self._build_member) # type: ignore[type-var] for guild_id, view in self._guild_entries.items() if view.members } return cache_utility.Cache3DMappingView(views)
[docs] def get_members_view_for_guild( self, guild_id: snowflakes.Snowflakeish, / ) -> cache.CacheView[snowflakes.Snowflake, guilds.Member]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild_id) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.members: return cache_utility.EmptyCacheView() cached_members = { user_id: member for user_id, member in guild_record.members.items() if not member.object.has_been_deleted } return cache_utility.CacheMappingView(cached_members, builder=self._build_member) # type: ignore[type-var]
[docs] def set_member(self, member: guilds.Member, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return None self._set_member(member, is_reference=False)
def _set_member( self, member: guilds.Member, /, *, is_reference: bool = True ) -> cache_utility.RefCell[cache_utility.MemberData]: guild_record = self._get_or_create_guild_record(member.guild_id) user = self._set_user(member.user) member_data = cache_utility.MemberData.build_from_entity(member, user=user) if guild_record.members is None: # TODO: test when this is not None guild_record.members = collections.FreezableDict() if member.user.id not in guild_record.members: self._increment_ref_count(member_data.user) try: member_data.has_been_deleted = False if is_reference: member_data.has_been_deleted = guild_record.members[member.id].object.has_been_deleted guild_record.members[member.id].object = member_data except KeyError: member_data.has_been_deleted = is_reference guild_record.members[member.id] = cache_utility.RefCell(member_data) return guild_record.members[member.id]
[docs] def update_member( self, member: guilds.Member, / ) -> typing.Tuple[typing.Optional[guilds.Member], typing.Optional[guilds.Member]]: if not self._is_cache_enabled_for(config_api.CacheComponents.MEMBERS): return None, None cached_member = self.get_member(member.guild_id, member.user.id) self.set_member(member) return cached_member, self.get_member(member.guild_id, member.user.id)
def _build_presence( self, presence_data: cache_utility.MemberPresenceData, ) -> presences.MemberPresence: return presence_data.build_entity(self._app) def _garbage_collect_unknown_custom_emoji( self, emoji: cache_utility.RefCell[emojis.CustomEmoji], *, decrement: typing.Optional[int] = None ) -> None: if decrement is not None: self._increment_ref_count(emoji, -decrement) if emoji.ref_count < 1 and emoji.object.id in self._unknown_custom_emoji_entries: del self._unknown_custom_emoji_entries[emoji.object.id] def _remove_presence_assets( self, presence_data: cache_utility.MemberPresenceData, ) -> None: for activity_data in presence_data.activities: if isinstance(activity_data.emoji, cache_utility.RefCell): self._garbage_collect_unknown_custom_emoji(activity_data.emoji, decrement=1)
[docs] def clear_presences( self, ) -> cache.CacheView[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, presences.MemberPresence]]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return cache_utility.EmptyCacheView() views = ( (guild_id, self.clear_presences_for_guild(guild_id)) for guild_id in self._guild_entries.freeze().keys() ) return cache_utility.CacheMappingView({guild_id: view for guild_id, view in views if view})
[docs] def clear_presences_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, presences.MemberPresence]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.presences: return cache_utility.EmptyCacheView() cached_presences = guild_record.presences guild_record.presences = None for presence in cached_presences.values(): self._remove_presence_assets(presence) self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_presences, builder=self._build_presence)
[docs] def delete_presence( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], user: snowflakes.SnowflakeishOr[users.PartialUser], /, ) -> typing.Optional[presences.MemberPresence]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return None guild_id = snowflakes.Snowflake(guild) user_id = snowflakes.Snowflake(user) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.presences: return None presence_data = guild_record.presences.pop(user_id, None) if not presence_data: return None self._remove_presence_assets(presence_data) if not guild_record.presences: guild_record.presences = None self._remove_guild_record_if_empty(guild_id, guild_record) return self._build_presence(presence_data)
[docs] def get_presence( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], user: snowflakes.SnowflakeishOr[users.PartialUser], /, ) -> typing.Optional[presences.MemberPresence]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return None guild_id = snowflakes.Snowflake(guild) user_id = snowflakes.Snowflake(user) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.presences: return None return self._build_presence(guild_record.presences[user_id]) if user_id in guild_record.presences else None
[docs] def get_presences_view( self, ) -> cache.CacheView[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, presences.MemberPresence]]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return cache_utility.EmptyCacheView() views = { guild_id: cache_utility.CacheMappingView(guild_record.presences.freeze(), builder=self._build_presence) for guild_id, guild_record in self._guild_entries.items() if guild_record.presences } return cache_utility.Cache3DMappingView(views)
[docs] def get_presences_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, presences.MemberPresence]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.presences: return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(guild_record.presences.freeze(), builder=self._build_presence)
[docs] def set_presence(self, presence: presences.MemberPresence, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return None presence_data = cache_utility.MemberPresenceData.build_from_entity(presence) for activity, activity_data in zip(presence.activities, presence_data.activities): emoji = activity.emoji if not isinstance(emoji, emojis.CustomEmoji): continue if emoji.id in self._unknown_custom_emoji_entries: self._unknown_custom_emoji_entries[emoji.id].object = copy.copy(emoji) emoji_data = self._unknown_custom_emoji_entries[emoji.id] else: emoji_data = cache_utility.RefCell(copy.copy(emoji)) self._unknown_custom_emoji_entries[emoji.id] = emoji_data self._increment_ref_count(emoji_data) activity_data.emoji = emoji_data guild_record = self._get_or_create_guild_record(presence.guild_id) if guild_record.presences is None: guild_record.presences = collections.FreezableDict() guild_record.presences[presence.user_id] = presence_data
[docs] def update_presence( self, presence: presences.MemberPresence, / ) -> typing.Tuple[typing.Optional[presences.MemberPresence], typing.Optional[presences.MemberPresence]]: if not self._is_cache_enabled_for(config_api.CacheComponents.PRESENCES): return None, None cached_presence = self.get_presence(presence.guild_id, presence.user_id) self.set_presence(presence) return cached_presence, self.get_presence(presence.guild_id, presence.user_id)
[docs] def clear_roles(self) -> cache.CacheView[snowflakes.Snowflake, guilds.Role]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES) or not self._role_entries: return cache_utility.EmptyCacheView() roles = self._role_entries self._role_entries = collections.FreezableDict() for guild_id, guild_record in self._guild_entries.freeze().items(): if guild_record.roles: # TODO: test coverage for when not this guild_record.roles = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(roles)
[docs] def clear_roles_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, guilds.Role]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.roles: return cache_utility.EmptyCacheView() view = cache_utility.CacheMappingView( {role_id: self._role_entries.pop(role_id) for role_id in guild_record.roles} ) guild_record.roles = None self._remove_guild_record_if_empty(guild_id, guild_record) return view
[docs] def delete_role(self, role: snowflakes.SnowflakeishOr[guilds.PartialRole], /) -> typing.Optional[guilds.Role]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return None role_id = snowflakes.Snowflake(role) role = self._role_entries.pop(role_id, None) if not role: return None guild_record = self._guild_entries.get(role.guild_id) if guild_record and guild_record.roles: guild_record.roles.remove(role_id) if not guild_record.roles: guild_record.roles = None self._remove_guild_record_if_empty(role.guild_id, guild_record) return role
[docs] def get_role(self, role: snowflakes.SnowflakeishOr[guilds.PartialRole], /) -> typing.Optional[guilds.Role]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return None role = self._role_entries.get(snowflakes.Snowflake(role)) return copy.copy(role) if role else None
[docs] def get_roles_view(self) -> cache.CacheView[snowflakes.Snowflake, guilds.Role]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(self._role_entries.freeze())
[docs] def get_roles_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, guilds.Role]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.roles: return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView({role_id: self._role_entries[role_id] for role_id in guild_record.roles})
[docs] def set_role(self, role: guilds.Role, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return None self._role_entries[role.id] = role guild_record = self._get_or_create_guild_record(role.guild_id) if guild_record.roles is None: # TODO: test when this is not None guild_record.roles = collections.SnowflakeSet() guild_record.roles.add(role.id)
[docs] def update_role( self, role: guilds.Role, / ) -> typing.Tuple[typing.Optional[guilds.Role], typing.Optional[guilds.Role]]: if not self._is_cache_enabled_for(config_api.CacheComponents.ROLES): return None, None cached_role = self.get_role(role.id) self.set_role(role) return cached_role, self.get_role(role.id)
@staticmethod def _can_remove_user(user_data: cache_utility.RefCell[users.User]) -> bool: return user_data.ref_count < 1 def _garbage_collect_user( self, user: cache_utility.RefCell[users.User], *, decrement: typing.Optional[int] = None ) -> None: if decrement is not None: self._increment_ref_count(user, -decrement) if self._can_remove_user(user) and user.object.id in self._user_entries: del self._user_entries[user.object.id] self._dm_channel_entries.pop(user.object.id, None)
[docs] def get_user(self, user: snowflakes.SnowflakeishOr[users.PartialUser], /) -> typing.Optional[users.User]: user = self._user_entries.get(snowflakes.Snowflake(user)) return user.copy() if user else None
[docs] def get_users_view(self) -> cache.CacheView[snowflakes.Snowflake, users.User]: if not self._user_entries: return cache_utility.EmptyCacheView() cached_users = self._user_entries.freeze() unwrapper = typing.cast( "typing.Callable[[cache_utility.RefCell[users.User]], users.User]", cache_utility.unwrap_ref_cell ) return cache_utility.CacheMappingView(cached_users, builder=unwrapper) # type: ignore[type-var]
def _set_user(self, user: users.User, /) -> cache_utility.RefCell[users.User]: try: self._user_entries[user.id].object = copy.copy(user) cell = self._user_entries[user.id] except KeyError: cell = cache_utility.RefCell(copy.copy(user)) self._user_entries[user.id] = cell return cell def _build_voice_state( self, voice_data: cache_utility.VoiceStateData, ) -> voices.VoiceState: return voice_data.build_entity(self._app)
[docs] def clear_voice_states( self, ) -> cache.CacheView[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, voices.VoiceState]]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return cache_utility.EmptyCacheView() views = ( (guild_id, self.clear_voice_states_for_guild(guild_id)) for guild_id in self._guild_entries.freeze().keys() ) return cache_utility.CacheMappingView({guild_id: view for guild_id, view in views if view})
[docs] def clear_voice_states_for_channel( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> cache.CacheView[snowflakes.Snowflake, voices.VoiceState]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) channel_id = snowflakes.Snowflake(channel) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.voice_states: return cache_utility.EmptyCacheView() cached_voice_states = {} for user_id, voice_state in guild_record.voice_states.items(): if voice_state.channel_id == channel_id: cached_voice_states[user_id] = voice_state self._garbage_collect_member(guild_record, voice_state.member, decrement=1) if not guild_record.voice_states: guild_record.voice_states = None self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_voice_states, builder=self._build_voice_state)
[docs] def clear_voice_states_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, voices.VoiceState]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.voice_states: return cache_utility.EmptyCacheView() cached_voice_states = guild_record.voice_states guild_record.voice_states = None for voice_state in cached_voice_states.values(): self._garbage_collect_member(guild_record, voice_state.member, decrement=1) self._remove_guild_record_if_empty(guild_id, guild_record) return cache_utility.CacheMappingView(cached_voice_states, builder=self._build_voice_state)
[docs] def delete_voice_state( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], user: snowflakes.SnowflakeishOr[users.PartialUser], /, ) -> typing.Optional[voices.VoiceState]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return None guild_id = snowflakes.Snowflake(guild) user_id = snowflakes.Snowflake(user) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.voice_states: return None voice_state_data = guild_record.voice_states.pop(user_id, None) if not voice_state_data: return None if not guild_record.voice_states: guild_record.voice_states = None self._garbage_collect_member(guild_record, voice_state_data.member, decrement=1) self._remove_guild_record_if_empty(guild_id, guild_record) return self._build_voice_state(voice_state_data)
[docs] def get_voice_state( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], user: snowflakes.SnowflakeishOr[users.PartialUser], /, ) -> typing.Optional[voices.VoiceState]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return None guild_id = snowflakes.Snowflake(guild) user_id = snowflakes.Snowflake(user) guild_record = self._guild_entries.get(guild_id) voice_data = guild_record.voice_states.get(user_id) if guild_record and guild_record.voice_states else None return self._build_voice_state(voice_data) if voice_data else None
[docs] def get_voice_states_view( self, ) -> cache.CacheView[snowflakes.Snowflake, cache.CacheView[snowflakes.Snowflake, voices.VoiceState]]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return cache_utility.EmptyCacheView() views = { guild_id: cache_utility.CacheMappingView( guild_record.voice_states.freeze(), builder=self._build_voice_state ) for guild_id, guild_record in self._guild_entries.items() if guild_record.voice_states } return cache_utility.Cache3DMappingView(views)
[docs] def get_voice_states_view_for_channel( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], channel: snowflakes.SnowflakeishOr[channels_.PartialChannel], /, ) -> cache.CacheView[snowflakes.Snowflake, voices.VoiceState]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return cache_utility.EmptyCacheView() guild_id = snowflakes.Snowflake(guild) channel_id = snowflakes.Snowflake(channel) guild_record = self._guild_entries.get(guild_id) if not guild_record or not guild_record.voice_states: return cache_utility.EmptyCacheView() cached_voice_states = { user_id: voice_state for user_id, voice_state in guild_record.voice_states.items() if voice_state.channel_id == channel_id } return cache_utility.CacheMappingView(cached_voice_states, builder=self._build_voice_state)
[docs] def get_voice_states_view_for_guild( self, guild: snowflakes.SnowflakeishOr[guilds.PartialGuild], / ) -> cache.CacheView[snowflakes.Snowflake, voices.VoiceState]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return cache_utility.EmptyCacheView() guild_record = self._guild_entries.get(snowflakes.Snowflake(guild)) if not guild_record or not guild_record.voice_states: return cache_utility.EmptyCacheView() return cache_utility.CacheMappingView(guild_record.voice_states.freeze(), builder=self._build_voice_state)
[docs] def set_voice_state(self, voice_state: voices.VoiceState, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return None guild_record = self._get_or_create_guild_record(voice_state.guild_id) if guild_record.voice_states is None: # TODO: test when this is not None guild_record.voice_states = collections.FreezableDict() member = self._set_member(voice_state.member) voice_state_data = cache_utility.VoiceStateData.build_from_entity(voice_state, member=member) if voice_state.user_id not in guild_record.voice_states: self._increment_ref_count(member) guild_record.voice_states[voice_state.user_id] = voice_state_data
[docs] def update_voice_state( self, voice_state: voices.VoiceState, / ) -> typing.Tuple[typing.Optional[voices.VoiceState], typing.Optional[voices.VoiceState]]: if not self._is_cache_enabled_for(config_api.CacheComponents.VOICE_STATES): return None, None cached_voice_state = self.get_voice_state(voice_state.guild_id, voice_state.user_id) self.set_voice_state(voice_state) return cached_voice_state, self.get_voice_state(voice_state.guild_id, voice_state.user_id)
def _build_message(self, message_data: cache_utility.RefCell[cache_utility.MessageData]) -> messages.Message: return message_data.object.build_entity(self._app) def _can_remove_message(self, message: cache_utility.RefCell[cache_utility.MessageData]) -> bool: return message.object.id not in self._message_entries and message.ref_count < 1 def _garbage_collect_message( self, message: cache_utility.RefCell[cache_utility.MessageData], *, decrement: typing.Optional[int] = None, override_ref: bool = False, ) -> typing.Optional[cache_utility.RefCell[cache_utility.MessageData]]: if decrement is not None: self._increment_ref_count(message, -decrement) if not self._can_remove_message(message) or override_ref: return None self._garbage_collect_user(message.object.author, decrement=1) if message.object.member: guild_record = self._guild_entries.get(message.object.member.object.guild_id) if guild_record: self._garbage_collect_member(guild_record, message.object.member, decrement=1) if message.object.referenced_message: self._garbage_collect_message(message.object.referenced_message, decrement=1) if message.object.user_mentions: for user in message.object.user_mentions.values(): self._garbage_collect_user(user, decrement=1) # If we got this far the message won't be in _message_entries as that'd infer that it hasn't been marked as # deleted yet. if message.object.id in self._referenced_messages: del self._referenced_messages[message.object.id] return message def _on_message_expire(self, message: cache_utility.RefCell[cache_utility.MessageData], /) -> None: if not self._garbage_collect_message(message): self._referenced_messages[message.object.id] = message
[docs] def clear_messages(self) -> cache.CacheView[snowflakes.Snowflake, messages.Message]: if not self._is_cache_enabled_for(config_api.CacheComponents.MESSAGES) or not self._message_entries: return cache_utility.EmptyCacheView() # As the only entry which references messages is other messages, this is enough for now. cached_messages = self._message_entries.freeze() self._message_entries.clear() cached_messages.update(self._referenced_messages) self._referenced_messages.clear() for message in cached_messages.values(): self._garbage_collect_message(message, override_ref=True) return cache_utility.CacheMappingView(cached_messages, builder=self._build_message) # type: ignore[type-var]
[docs] def delete_message( self, message: snowflakes.SnowflakeishOr[messages.PartialMessage], / ) -> typing.Optional[messages.Message]: if not self._is_cache_enabled_for(config_api.CacheComponents.MESSAGES): return None message_id = snowflakes.Snowflake(message) message_data = self._message_entries.pop(message_id, None) if not message_data: return None if not self._garbage_collect_message(message_data): self._referenced_messages[message_id] = message_data return None return self._build_message(message_data)
[docs] def get_message( self, message: snowflakes.SnowflakeishOr[messages.PartialMessage], / ) -> typing.Optional[messages.Message]: if not self._is_cache_enabled_for(config_api.CacheComponents.MESSAGES): return None message_id = snowflakes.Snowflake(message) message_data = self._message_entries.get(message_id) or self._referenced_messages.get(message_id) return self._build_message(message_data) if message_data else None
[docs] def get_messages_view(self) -> cache.CacheView[snowflakes.Snowflake, messages.Message]: if not self._is_cache_enabled_for(config_api.CacheComponents.MESSAGES): return cache_utility.EmptyCacheView() cached_messages = self._message_entries.freeze() cached_messages.update(self._referenced_messages) return cache_utility.CacheMappingView(cached_messages, builder=self._build_message) # type: ignore[type-var]
def _set_message( self, message: messages.Message, /, *, is_reference: bool = True ) -> cache_utility.RefCell[cache_utility.MessageData]: author = self._set_user(message.author) member = self._set_member(message.member) if message.member else None user_mentions: undefined.UndefinedOr[ typing.Mapping[snowflakes.Snowflake, cache_utility.RefCell[users.User]] ] = undefined.UNDEFINED if message.user_mentions is not undefined.UNDEFINED: user_mentions = {user_id: self._set_user(user) for user_id, user in message.user_mentions.items()} interaction_user: typing.Optional[cache_utility.RefCell[users.User]] = None if message.interaction: interaction_user = self._set_user(message.interaction.user) referenced_message: typing.Optional[cache_utility.RefCell[cache_utility.MessageData]] = None if message.referenced_message: reference_id = message.referenced_message.id referenced_message = self._message_entries.get(reference_id) or self._referenced_messages.get(reference_id) if referenced_message: # Since the message is partial, if we don't have it cached, there is nothing we can do about it referenced_message.object.update(message.referenced_message) # Only increment ref counts if this wasn't previously cached. if message.id not in self._referenced_messages and message.id not in self._message_entries: if member: self._increment_ref_count(member) if referenced_message: self._increment_ref_count(referenced_message) if user_mentions is not undefined.UNDEFINED: for user in user_mentions.values(): self._increment_ref_count(user) if interaction_user: self._increment_ref_count(interaction_user) message_data = cache_utility.MessageData.build_from_entity( message, author=author, member=member, user_mentions=user_mentions, referenced_message=referenced_message, interaction_user=interaction_user, ) # Ensure any previously set message ref cell is in the right place before updating the cache. if not is_reference and message.id in self._referenced_messages: self._message_entries[message.id] = self._referenced_messages.pop(message.id) if message.id in self._message_entries: self._message_entries[message.id].object = message_data elif not is_reference: self._message_entries[message.id] = cache_utility.RefCell(message_data) elif message.id in self._referenced_messages: self._referenced_messages[message.id].object = message_data else: self._referenced_messages[message.id] = cache_utility.RefCell(message_data) return self._message_entries.get(message.id) or self._referenced_messages[message.id]
[docs] def set_message(self, message: messages.Message, /) -> None: if not self._is_cache_enabled_for(config_api.CacheComponents.MESSAGES): return None self._set_message(message, is_reference=False)
[docs] def update_message( self, message: typing.Union[messages.PartialMessage, messages.Message], / ) -> typing.Tuple[typing.Optional[messages.Message], typing.Optional[messages.Message]]: if not self._is_cache_enabled_for(config_api.CacheComponents.MESSAGES): return None, None cached_message = self.get_message(message.id) if isinstance(message, messages.Message): self.set_message(message) elif cached_message_data := self._message_entries.get(message.id) or self._referenced_messages.get(message.id): user_mentions: undefined.UndefinedOr[ typing.Mapping[snowflakes.Snowflake, cache_utility.RefCell[users.User]] ] = undefined.UNDEFINED if message.user_mentions is not undefined.UNDEFINED: user_mentions = {user_id: self._set_user(user) for user_id, user in message.user_mentions.items()} # We want to ensure that any previously mentioned users are garbage collected if they're no longer # being mentioned. if cached_message_data.object.user_mentions is not undefined.UNDEFINED: for user_id, user in cached_message_data.object.user_mentions.items(): if user_id not in user_mentions: self._garbage_collect_user(user, decrement=1) cached_message_data.object.update(message, user_mentions=user_mentions) return cached_message, self.get_message(message.id)