# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2020 Nekokatt
# Copyright (c) 2021-present davfsa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Signal handling utilities."""
from __future__ import annotations
__all__: typing.Sequence[str] = ("handle_interrupts",)
import asyncio
import contextlib
import logging
import signal
import threading
import traceback
import types
import typing
from hikari import errors
from hikari.internal import ux
_INTERRUPT_SIGNALS: typing.Tuple[str, ...] = ("SIGINT", "SIGTERM")
_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.signals")
def _raise_interrupt(signum: int) -> typing.NoReturn:
signame = signal.strsignal(signum)
assert signame is not None
raise errors.HikariInterrupt(signum, signame)
def _interrupt_handler(
loop: asyncio.AbstractEventLoop,
) -> typing.Callable[[int, typing.Optional[types.FrameType]], None]:
loop_thread_id = threading.get_native_id()
def handler(signum: int, frame: typing.Optional[types.FrameType]) -> None:
# The loop may or may not be running, depending on the state of the application when this occurs.
# Signals on POSIX only occur on the main thread usually, too, so we need to ensure this is
# threadsafe.
# We log native thread IDs purely for debugging purposes.
if _LOGGER.isEnabledFor(ux.TRACE):
_LOGGER.log(
ux.TRACE,
"interrupt %s occurred on thread %s, process on thread %s will be notified shortly\n"
"Stacktrace for developer sanity:\n%s",
signum,
threading.get_native_id(),
loop_thread_id,
"".join(traceback.format_stack(frame)),
)
loop.call_soon_threadsafe(_raise_interrupt, signum)
return handler
@contextlib.contextmanager
[docs]def handle_interrupts(
enabled: typing.Optional[bool], loop: asyncio.AbstractEventLoop, propagate_interrupts: bool
) -> typing.Generator[None, None, None]:
"""Context manager which cleanly exits on signal interrupts.
Parameters
----------
enabled : typing.Optional[bool]
Whether to enable the signal interrupts.
If set to `None`, then it will be enabled or not based on whether the running
thread is the main one or not.
loop : asyncio.AbstractEventLoop
The event loop the interrupt will be raised in.
propagate_interrupts : bool
Whether to propagate interrupts.
"""
if enabled is None:
enabled = threading.current_thread() is threading.main_thread()
if not enabled:
# NOOP context manager
yield
return
interrupt_handler = _interrupt_handler(loop)
for sig in _INTERRUPT_SIGNALS:
try:
signum = getattr(signal, sig)
signal.signal(signum, interrupt_handler)
except AttributeError:
_LOGGER.log(ux.TRACE, "signal %s is not implemented on your platform; skipping", sig)
try:
yield
except errors.HikariInterrupt:
if propagate_interrupts:
raise
finally:
for sig in _INTERRUPT_SIGNALS:
try:
signum = getattr(signal, sig)
signal.signal(signum, signal.SIG_DFL)
except AttributeError:
# Signal not implemented. We already logged this earlier.
pass