Skip to content

hikari.impl.rate_limits#

Basic lazy ratelimit systems for asyncio.

See hikari.impl.buckets for HTTP-specific rate-limiting logic.

BaseRateLimiter #

Bases: ABC

Base for any asyncio-based rate limiter being used.

acquire abstractmethod async #

acquire() -> None

Acquire permission to perform a task that needs to have rate limit management enforced.

Calling this function will cause it to block until you are not longer being rate limited.

close abstractmethod #

close() -> None

Close the rate limiter, cancelling any internal tasks that are executing.

BurstRateLimiter #

BurstRateLimiter(name: str)

Bases: BaseRateLimiter, ABC

Base implementation for a burst-based rate limiter.

This provides an internal queue and throttling placeholder, as well as complete logic for safely aborting any pending tasks when being shut down.

is_empty property #

is_empty: bool

Return True if no futures are on the queue being rate limited.

name instance-attribute #

name: str = name

The name of the rate limiter.

queue instance-attribute #

queue: List[Future[Any]] = []

The queue of any futures under a rate limit.

throttle_task instance-attribute #

throttle_task: Optional[Task[Any]] = None

The throttling task, or None if it is not running.

acquire abstractmethod async #

acquire() -> None

Acquire time on this rate limiter.

Calling this function will cause it to block until you are not longer being rate limited.

close #

close() -> None

Close the rate limiter, and shut down any pending tasks.

ExponentialBackOff #

ExponentialBackOff(
    base: float = 2.0,
    maximum: float = 64.0,
    jitter_multiplier: float = 1.0,
    initial_increment: int = 0,
)

Implementation of an asyncio-compatible exponential back-off algorithm with random jitter.

Each backoff will be calculated by raising the base to the increment (the number of invocations since last reset) and added on to it, the jitter, calculated as jitter_multiplier times a random number between 0 and 1.

PARAMETER DESCRIPTION
base

The base to use.

TYPE: float DEFAULT: 2.0

maximum

The max value the backoff can be in a single iteration.

All values will be capped to this base value plus some random jitter.

TYPE: float DEFAULT: 64.0

jitter_multiplier

The multiplier for the random jitter.

Set to 0 to disable jitter.

TYPE: float DEFAULT: 1.0

initial_increment

The initial increment to start at.

TYPE: int DEFAULT: 0

RAISES DESCRIPTION
ValueError

If an int that's too big to be represented as a float or a non-finite value is passed in place of a field that's annotated as float.

base instance-attribute #

base: Final[float] = float(base)

The base to use.

increment instance-attribute #

increment: int = initial_increment

The current increment.

jitter_multiplier instance-attribute #

jitter_multiplier: Final[float] = float(jitter_multiplier)

The multiplier for the random jitter.

maximum instance-attribute #

maximum: float = float(maximum)

This is the max value the backoff can be in a single iteration before an asyncio.TimeoutError is raised.

reset #

reset() -> None

Reset the exponential back-off.

ManualRateLimiter #

ManualRateLimiter()

Bases: BurstRateLimiter

Rate limit handler for the global HTTP rate limit.

This is a non-preemptive rate limiting algorithm that will always return completed futures until hikari.impl.rate_limits.ManualRateLimiter.throttle is invoked. Once this is invoked, any subsequent calls to hikari.impl.rate_limits.ManualRateLimiter.acquire will return incomplete futures that will be enqueued to an internal queue. A task will be spun up to wait for a period of time given to the hikari.impl.rate_limits.ManualRateLimiter.throttle. Once that has passed, the lock will begin to re-consume incomplete futures on the queue, completing them.

Triggering a throttle when it is already set will cancel the current throttle task that is sleeping and replace it.

This is used to enforce the global HTTP rate limit that will occur "randomly" during HTTP API interaction.

Expect random occurrences.

is_empty property #

is_empty: bool

Return True if no futures are on the queue being rate limited.

name instance-attribute #

name: str = name

The name of the rate limiter.

queue instance-attribute #

queue: List[Future[Any]] = []

The queue of any futures under a rate limit.

reset_at instance-attribute #

reset_at: Optional[float] = None

The monotonic time.monotonic timestamp at which the ratelimit gets lifted.

throttle_task instance-attribute #

throttle_task: Optional[Task[Any]]

The throttling task, or None if it is not running.

acquire async #

acquire() -> None

Acquire time on this rate limiter.

Calling this function will cause it to block until you are not longer being rate limited.

close #

close() -> None

Close the rate limiter, and shut down any pending tasks.

get_time_until_reset #

get_time_until_reset(now: float) -> float

Determine how long until the current rate limit is reset.

PARAMETER DESCRIPTION
now

The monotonic time.monotonic timestamp.

TYPE: float

RETURNS DESCRIPTION
float

The time left to sleep before the rate limit is reset. If no rate limit is in effect, then this will return 0.0 instead.

throttle #

throttle(retry_after: float) -> None

Perform the throttling rate limiter logic.

Iterates repeatedly while the queue is not empty, adhering to any rate limits that occur in the meantime.

Note

This will invoke hikari.impl.rate_limits.ManualRateLimiter.unlock_later as a scheduled task in the future (it will not await it to finish).

When the hikari.impl.rate_limits.ManualRateLimiter.unlock_later coroutine function completes, it should be expected to set the throttle_task to None. This means you can check if throttling is occurring by checking if throttle_task is not None.

If this is invoked while another throttle is in progress, that one is cancelled and a new one is started. This enables new rate limits to override existing ones.

PARAMETER DESCRIPTION
retry_after

How long to sleep for before unlocking and releasing any futures in the queue.

TYPE: float

unlock_later async #

unlock_later(retry_after: float) -> None

Sleep for a while, then remove the lock.

Warning

You should not need to invoke this directly. Call hikari.impl.rate_limits.ManualRateLimiter.throttle instead.

When the hikari.impl.rate_limits.ManualRateLimiter.unlock_later coroutine function completes, it should be expected to set the throttle_task to None. This means you can check if throttling is occurring by checking if throttle_task is not None.

PARAMETER DESCRIPTION
retry_after

How long to sleep for before unlocking and releasing any futures in the queue.

TYPE: float

WindowedBurstRateLimiter #

WindowedBurstRateLimiter(name: str, period: float, limit: int)

Bases: BurstRateLimiter

Windowed burst rate limiter.

Rate limiter for rate limits that last fixed periods of time with a fixed number of times it can be used in that time frame.

To use this, you should call hikari.impl.rate_limits.WindowedBurstRateLimiter.acquire and await the result immediately before performing your rate-limited task.

If the rate limit has been hit, acquiring time will return an incomplete future that is placed on the internal queue. A throttle task is then spun up if not already running that will be expected to provide some implementation of backing off and sleeping for a given period of time until the limit has passed, and then proceed to consume futures from the queue while adhering to those rate limits.

If the throttle task is already running, the acquired future will always be incomplete and enqueued regardless of whether the rate limit is actively reached or not.

Acquiring a future from this limiter when no throttling task is running and when the rate limit is not reached will always result in the task invoking a drip and a completed future being returned.

Dripping is left to the implementation of this class, but will be expected to provide some mechanism for updating the internal statistics to represent that a unit has been placed into the bucket.

is_empty property #

is_empty: bool

Return True if no futures are on the queue being rate limited.

limit instance-attribute #

limit: int = limit

The maximum number of hikari.impl.rate_limits.WindowedBurstRateLimiter.acquire's allowed in this time window.

name instance-attribute #

name: str = name

The name of the rate limiter.

period instance-attribute #

period: float = period

How long the window lasts for from the start in seconds.

queue instance-attribute #

queue: List[Future[Any]] = []

The queue of any futures under a rate limit.

remaining instance-attribute #

remaining: int = 0

The number of hikari.impl.rate_limits.WindowedBurstRateLimiter.acquire's left in this window before you will get rate limited.

reset_at instance-attribute #

reset_at: float = 0.0

The time.monotonic that the limit window ends at.

throttle_task instance-attribute #

throttle_task: Optional[Task[Any]]

The throttling task, or None if it is not running.

acquire async #

acquire() -> None

Acquire time on this rate limiter.

Calling this function will cause it to block until you are not longer being rate limited.

close #

close() -> None

Close the rate limiter, and shut down any pending tasks.

drip #

drip() -> None

Decrement the remaining counter.

get_time_until_reset #

get_time_until_reset(now: float) -> float

Determine how long until the current rate limit is reset.

Warning

Invoking this method will update the internal state if we were previously rate limited, but at the given time are no longer under that limit. This makes it imperative that you only pass the current timestamp to this function, and not past or future timestamps. The effects of doing the latter are undefined behaviour.

PARAMETER DESCRIPTION
now

The monotonic time.monotonic timestamp.

TYPE: float

RETURNS DESCRIPTION
float

The time left to sleep before the rate limit is reset. If no rate limit is in effect, then this will return 0.0 instead.

is_rate_limited #

is_rate_limited(now: float) -> bool

Determine if we are under a rate limit at the given time.

Warning

Invoking this method will update the internal state if we were previously rate limited, but at the given time are no longer under that limit. This makes it imperative that you only pass the current timestamp to this function, and not past or future timestamps. The effects of doing the latter are undefined behaviour.

PARAMETER DESCRIPTION
now

The monotonic time.monotonic timestamp.

TYPE: float

RETURNS DESCRIPTION
bool

Whether the bucket is ratelimited.

throttle async #

throttle() -> None

Perform the throttling rate limiter logic.

Iterates repeatedly while the queue is not empty, adhering to any rate limits that occur in the mean time.

Note

You should usually not need to invoke this directly, but if you do, ensure to call it using asyncio.create_task, and store the task immediately in hikari.impl.rate_limits.WindowedBurstRateLimiter.throttle_task.

When this coroutine function completes, it will set the hikari.impl.rate_limits.WindowedBurstRateLimiter.throttle_task to None. This means you can check if throttling is occurring by checking if it is not None.