Source code for hikari.internal.net

# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""General bits and pieces that are reused between components."""

from __future__ import annotations

__all__: typing.Sequence[str] = ("generate_error_response", "create_client_session")

import http
import typing

import aiohttp

from hikari import errors
from hikari.internal import data_binding

if typing.TYPE_CHECKING:
    from hikari.impl import config


[docs]async def generate_error_response(response: aiohttp.ClientResponse) -> errors.HTTPError: """Given an erroneous HTTP response, return a corresponding exception.""" real_url = str(response.real_url) raw_body = await response.read() # Little hack to stop mypy from complaining when using `*args` args: typing.List[typing.Any] = [real_url, response.headers, raw_body] try: json_body = data_binding.default_json_loads(await response.read()) assert isinstance(json_body, dict) args.append(json_body.get("message", "")) args.append(json_body.get("code", 0)) raw_error_array: typing.Optional[data_binding.JSONObject] = json_body.get("errors") except ValueError: raw_error_array = None if response.status == http.HTTPStatus.BAD_REQUEST: return errors.BadRequestError(*args, errors=raw_error_array) if response.status == http.HTTPStatus.UNAUTHORIZED: return errors.UnauthorizedError(*args) if response.status == http.HTTPStatus.FORBIDDEN: return errors.ForbiddenError(*args) if response.status == http.HTTPStatus.NOT_FOUND: return errors.NotFoundError(*args) try: status: typing.Union[http.HTTPStatus, int] = http.HTTPStatus(response.status) except ValueError: status = response.status if 400 <= status < 500: return errors.ClientHTTPResponseError(real_url, status, response.headers, raw_body) elif 500 <= status < 600: return errors.InternalServerError(real_url, status, response.headers, raw_body) else: return errors.HTTPResponseError(real_url, status, response.headers, raw_body)
def create_tcp_connector( http_settings: config.HTTPSettings, *, dns_cache: typing.Union[bool, int] = True, limit: int = 100, ) -> aiohttp.TCPConnector: """Create a TCP connector and return it. Parameters ---------- http_settings : config.HTTPSettings HTTP settings to use for the connector. Optional Parameters ------------------- dns_cache : typing.Union[None, bool, int] If `True`, DNS caching is used with a default TTL of 10 seconds. If `False`, DNS caching is disabled. If an `int` is given, then DNS caching is enabled with an explicit TTL set. If `None`, the cache will be enabled and never invalidate. limit : int Number of connections to allow in the pool at any given time. Returns ------- aiohttp.TCPConnector TCP connector to use. """ return aiohttp.TCPConnector( enable_cleanup_closed=http_settings.enable_cleanup_closed, force_close=http_settings.force_close_transports, limit=limit, ssl=http_settings.ssl, ttl_dns_cache=dns_cache if not isinstance(dns_cache, bool) else 10, use_dns_cache=dns_cache is not False, )
[docs]def create_client_session( connector: aiohttp.BaseConnector, connector_owner: bool, http_settings: config.HTTPSettings, raise_for_status: bool, trust_env: bool, ) -> aiohttp.ClientSession: """Generate a client session using the given settings. .. warning:: You must invoke this from within a running event loop. .. note:: If you pass an explicit connector, then the connection that is created will not own the connector. You will be expected to manually close it __after__ the returned client session is closed to prevent leaking resources. Parameters ---------- connector : aiohttp.BaseConnector The connector to use. connector_owner : bool If `True`, then the client session will close the connector on shutdown. Otherwise, you must do it manually. http_settings : hikari.impl.config.HTTPSettings HTTP settings to use. raise_for_status : bool `True` to default to throwing exceptions if a request fails, or `False` to default to not. trust_env : bool `True` to trust anything in environment variables and the `netrc` file, `False` to ignore it. Returns ------- aiohttp.ClientSession The client session to use. """ return aiohttp.ClientSession( connector=connector, connector_owner=connector_owner, raise_for_status=raise_for_status, timeout=aiohttp.ClientTimeout( connect=http_settings.timeouts.acquire_and_connect, sock_connect=http_settings.timeouts.request_socket_connect, sock_read=http_settings.timeouts.request_socket_read, total=http_settings.timeouts.total, ), trust_env=trust_env, version=aiohttp.HttpVersion11, )