Source code for hikari.internal.signals

# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Signal handling utilities."""

from __future__ import annotations

__all__: typing.Sequence[str] = ("handle_interrupts",)

import asyncio
import contextlib
import logging
import signal
import threading
import traceback
import types
import typing

from hikari import errors
from hikari.internal import ux

_INTERRUPT_SIGNALS: typing.Tuple[str, ...] = ("SIGINT", "SIGTERM")
_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.signals")


def _raise_interrupt(signum: int) -> typing.NoReturn:
    signame = signal.strsignal(signum)
    assert signame is not None

    raise errors.HikariInterrupt(signum, signame)


def _interrupt_handler(
    loop: asyncio.AbstractEventLoop,
) -> typing.Callable[[int, typing.Optional[types.FrameType]], None]:
    loop_thread_id = threading.get_native_id()

    def handler(signum: int, frame: typing.Optional[types.FrameType]) -> None:
        # The loop may or may not be running, depending on the state of the application when this occurs.
        # Signals on POSIX only occur on the main thread usually, too, so we need to ensure this is
        # threadsafe.
        # We log native thread IDs purely for debugging purposes.
        if _LOGGER.isEnabledFor(ux.TRACE):
            _LOGGER.log(
                ux.TRACE,
                "interrupt %s occurred on thread %s, process on thread %s will be notified shortly\n"
                "Stacktrace for developer sanity:\n%s",
                signum,
                threading.get_native_id(),
                loop_thread_id,
                "".join(traceback.format_stack(frame)),
            )

        loop.call_soon_threadsafe(_raise_interrupt, signum)

    return handler


@contextlib.contextmanager
[docs]def handle_interrupts( enabled: typing.Optional[bool], loop: asyncio.AbstractEventLoop, propagate_interrupts: bool, ) -> typing.Generator[None, None, None]: """Context manager which cleanly exits on signal interrupts. Parameters ---------- enabled : typing.Optional[bool] Whether to enable the signal interrupts. If set to `None`, then it will be enabled or not based on whether the running thread is the main one or not. loop : asyncio.AbstractEventLoop The event loop the interrupt will be raised in. propagate_interrupts : bool Whether to propagate interrupts. """ if enabled is None: enabled = threading.current_thread() is threading.main_thread() if not enabled: # NOOP context manager yield return interrupt_handler = _interrupt_handler(loop) for sig in _INTERRUPT_SIGNALS: try: signum = getattr(signal, sig) signal.signal(signum, interrupt_handler) except AttributeError: _LOGGER.log(ux.TRACE, "signal %s is not implemented on your platform; skipping", sig) try: yield except errors.HikariInterrupt: if propagate_interrupts: raise finally: for sig in _INTERRUPT_SIGNALS: try: signum = getattr(signal, sig) signal.signal(signum, signal.SIG_DFL) except AttributeError: # Signal not implemented. We already logged this earlier. pass