aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/blinker/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.8/site-packages/blinker/base.py')
-rw-r--r--venv/lib/python3.8/site-packages/blinker/base.py621
1 files changed, 621 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/blinker/base.py b/venv/lib/python3.8/site-packages/blinker/base.py
new file mode 100644
index 0000000..ec494b1
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/blinker/base.py
@@ -0,0 +1,621 @@
+from __future__ import annotations
+
+import collections.abc as c
+import typing as t
+import warnings
+import weakref
+from collections import defaultdict
+from contextlib import AbstractContextManager
+from contextlib import contextmanager
+from functools import cached_property
+from inspect import iscoroutinefunction
+from weakref import WeakValueDictionary
+
+from ._utilities import make_id
+from ._utilities import make_ref
+from ._utilities import Symbol
+
+if t.TYPE_CHECKING:
+ F = t.TypeVar("F", bound=c.Callable[..., t.Any])
+
+ANY = Symbol("ANY")
+"""Symbol for "any sender"."""
+
+ANY_ID = 0
+
+
+class Signal:
+ """A notification emitter.
+
+ :param doc: The docstring for the signal.
+ """
+
+ ANY = ANY
+ """An alias for the :data:`~blinker.ANY` sender symbol."""
+
+ set_class: type[set[t.Any]] = set
+ """The set class to use for tracking connected receivers and senders.
+ Python's ``set`` is unordered. If receivers must be dispatched in the order
+ they were connected, an ordered set implementation can be used.
+
+ .. versionadded:: 1.7
+ """
+
+ @cached_property
+ def receiver_connected(self) -> Signal:
+ """Emitted at the end of each :meth:`connect` call.
+
+ The signal sender is the signal instance, and the :meth:`connect`
+ arguments are passed through: ``receiver``, ``sender``, and ``weak``.
+
+ .. versionadded:: 1.2
+ """
+ return Signal(doc="Emitted after a receiver connects.")
+
+ @cached_property
+ def receiver_disconnected(self) -> Signal:
+ """Emitted at the end of each :meth:`disconnect` call.
+
+ The sender is the signal instance, and the :meth:`disconnect` arguments
+ are passed through: ``receiver`` and ``sender``.
+
+ This signal is emitted **only** when :meth:`disconnect` is called
+ explicitly. This signal cannot be emitted by an automatic disconnect
+ when a weakly referenced receiver or sender goes out of scope, as the
+ instance is no longer be available to be used as the sender for this
+ signal.
+
+ An alternative approach is available by subscribing to
+ :attr:`receiver_connected` and setting up a custom weakref cleanup
+ callback on weak receivers and senders.
+
+ .. versionadded:: 1.2
+ """
+ return Signal(doc="Emitted after a receiver disconnects.")
+
+ def __init__(self, doc: str | None = None) -> None:
+ if doc:
+ self.__doc__ = doc
+
+ self.receivers: dict[
+ t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any]
+ ] = {}
+ """The map of connected receivers. Useful to quickly check if any
+ receivers are connected to the signal: ``if s.receivers:``. The
+ structure and data is not part of the public API, but checking its
+ boolean value is.
+ """
+
+ self.is_muted: bool = False
+ self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
+ self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
+ self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {}
+
+ def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F:
+ """Connect ``receiver`` to be called when the signal is sent by
+ ``sender``.
+
+ :param receiver: The callable to call when :meth:`send` is called with
+ the given ``sender``, passing ``sender`` as a positional argument
+ along with any extra keyword arguments.
+ :param sender: Any object or :data:`ANY`. ``receiver`` will only be
+ called when :meth:`send` is called with this sender. If ``ANY``, the
+ receiver will be called for any sender. A receiver may be connected
+ to multiple senders by calling :meth:`connect` multiple times.
+ :param weak: Track the receiver with a :mod:`weakref`. The receiver will
+ be automatically disconnected when it is garbage collected. When
+ connecting a receiver defined within a function, set to ``False``,
+ otherwise it will be disconnected when the function scope ends.
+ """
+ receiver_id = make_id(receiver)
+ sender_id = ANY_ID if sender is ANY else make_id(sender)
+
+ if weak:
+ self.receivers[receiver_id] = make_ref(
+ receiver, self._make_cleanup_receiver(receiver_id)
+ )
+ else:
+ self.receivers[receiver_id] = receiver
+
+ self._by_sender[sender_id].add(receiver_id)
+ self._by_receiver[receiver_id].add(sender_id)
+
+ if sender is not ANY and sender_id not in self._weak_senders:
+ # store a cleanup for weakref-able senders
+ try:
+ self._weak_senders[sender_id] = make_ref(
+ sender, self._make_cleanup_sender(sender_id)
+ )
+ except TypeError:
+ pass
+
+ if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
+ try:
+ self.receiver_connected.send(
+ self, receiver=receiver, sender=sender, weak=weak
+ )
+ except TypeError:
+ # TODO no explanation or test for this
+ self.disconnect(receiver, sender)
+ raise
+
+ if _receiver_connected.receivers and self is not _receiver_connected:
+ try:
+ _receiver_connected.send(
+ self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak
+ )
+ except TypeError:
+ self.disconnect(receiver, sender)
+ raise
+
+ return receiver
+
+ def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]:
+ """Connect the decorated function to be called when the signal is sent
+ by ``sender``.
+
+ The decorated function will be called when :meth:`send` is called with
+ the given ``sender``, passing ``sender`` as a positional argument along
+ with any extra keyword arguments.
+
+ :param sender: Any object or :data:`ANY`. ``receiver`` will only be
+ called when :meth:`send` is called with this sender. If ``ANY``, the
+ receiver will be called for any sender. A receiver may be connected
+ to multiple senders by calling :meth:`connect` multiple times.
+ :param weak: Track the receiver with a :mod:`weakref`. The receiver will
+ be automatically disconnected when it is garbage collected. When
+ connecting a receiver defined within a function, set to ``False``,
+ otherwise it will be disconnected when the function scope ends.=
+
+ .. versionadded:: 1.1
+ """
+
+ def decorator(fn: F) -> F:
+ self.connect(fn, sender, weak)
+ return fn
+
+ return decorator
+
+ @contextmanager
+ def connected_to(
+ self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY
+ ) -> c.Generator[None, None, None]:
+ """A context manager that temporarily connects ``receiver`` to the
+ signal while a ``with`` block executes. When the block exits, the
+ receiver is disconnected. Useful for tests.
+
+ :param receiver: The callable to call when :meth:`send` is called with
+ the given ``sender``, passing ``sender`` as a positional argument
+ along with any extra keyword arguments.
+ :param sender: Any object or :data:`ANY`. ``receiver`` will only be
+ called when :meth:`send` is called with this sender. If ``ANY``, the
+ receiver will be called for any sender.
+
+ .. versionadded:: 1.1
+ """
+ self.connect(receiver, sender=sender, weak=False)
+
+ try:
+ yield None
+ finally:
+ self.disconnect(receiver)
+
+ @contextmanager
+ def muted(self) -> c.Generator[None, None, None]:
+ """A context manager that temporarily disables the signal. No receivers
+ will be called if the signal is sent, until the ``with`` block exits.
+ Useful for tests.
+ """
+ self.is_muted = True
+
+ try:
+ yield None
+ finally:
+ self.is_muted = False
+
+ def temporarily_connected_to(
+ self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY
+ ) -> AbstractContextManager[None]:
+ """Deprecated alias for :meth:`connected_to`.
+
+ .. deprecated:: 1.1
+ Renamed to ``connected_to``. Will be removed in Blinker 1.9.
+
+ .. versionadded:: 0.9
+ """
+ warnings.warn(
+ "'temporarily_connected_to' is renamed to 'connected_to'. The old name is"
+ " deprecated and will be removed in Blinker 1.9.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return self.connected_to(receiver, sender)
+
+ def send(
+ self,
+ sender: t.Any | None = None,
+ /,
+ *,
+ _async_wrapper: c.Callable[
+ [c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any]
+ ]
+ | None = None,
+ **kwargs: t.Any,
+ ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
+ """Call all receivers that are connected to the given ``sender``
+ or :data:`ANY`. Each receiver is called with ``sender`` as a positional
+ argument along with any extra keyword arguments. Return a list of
+ ``(receiver, return value)`` tuples.
+
+ The order receivers are called is undefined, but can be influenced by
+ setting :attr:`set_class`.
+
+ If a receiver raises an exception, that exception will propagate up.
+ This makes debugging straightforward, with an assumption that correctly
+ implemented receivers will not raise.
+
+ :param sender: Call receivers connected to this sender, in addition to
+ those connected to :data:`ANY`.
+ :param _async_wrapper: Will be called on any receivers that are async
+ coroutines to turn them into sync callables. For example, could run
+ the receiver with an event loop.
+ :param kwargs: Extra keyword arguments to pass to each receiver.
+
+ .. versionchanged:: 1.7
+ Added the ``_async_wrapper`` argument.
+ """
+ if self.is_muted:
+ return []
+
+ results = []
+
+ for receiver in self.receivers_for(sender):
+ if iscoroutinefunction(receiver):
+ if _async_wrapper is None:
+ raise RuntimeError("Cannot send to a coroutine function.")
+
+ result = _async_wrapper(receiver)(sender, **kwargs)
+ else:
+ result = receiver(sender, **kwargs)
+
+ results.append((receiver, result))
+
+ return results
+
+ async def send_async(
+ self,
+ sender: t.Any | None = None,
+ /,
+ *,
+ _sync_wrapper: c.Callable[
+ [c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]
+ ]
+ | None = None,
+ **kwargs: t.Any,
+ ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
+ """Await all receivers that are connected to the given ``sender``
+ or :data:`ANY`. Each receiver is called with ``sender`` as a positional
+ argument along with any extra keyword arguments. Return a list of
+ ``(receiver, return value)`` tuples.
+
+ The order receivers are called is undefined, but can be influenced by
+ setting :attr:`set_class`.
+
+ If a receiver raises an exception, that exception will propagate up.
+ This makes debugging straightforward, with an assumption that correctly
+ implemented receivers will not raise.
+
+ :param sender: Call receivers connected to this sender, in addition to
+ those connected to :data:`ANY`.
+ :param _sync_wrapper: Will be called on any receivers that are sync
+ callables to turn them into async coroutines. For example,
+ could call the receiver in a thread.
+ :param kwargs: Extra keyword arguments to pass to each receiver.
+
+ .. versionadded:: 1.7
+ """
+ if self.is_muted:
+ return []
+
+ results = []
+
+ for receiver in self.receivers_for(sender):
+ if not iscoroutinefunction(receiver):
+ if _sync_wrapper is None:
+ raise RuntimeError("Cannot send to a non-coroutine function.")
+
+ result = await _sync_wrapper(receiver)(sender, **kwargs)
+ else:
+ result = await receiver(sender, **kwargs)
+
+ results.append((receiver, result))
+
+ return results
+
+ def has_receivers_for(self, sender: t.Any) -> bool:
+ """Check if there is at least one receiver that will be called with the
+ given ``sender``. A receiver connected to :data:`ANY` will always be
+ called, regardless of sender. Does not check if weakly referenced
+ receivers are still live. See :meth:`receivers_for` for a stronger
+ search.
+
+ :param sender: Check for receivers connected to this sender, in addition
+ to those connected to :data:`ANY`.
+ """
+ if not self.receivers:
+ return False
+
+ if self._by_sender[ANY_ID]:
+ return True
+
+ if sender is ANY:
+ return False
+
+ return make_id(sender) in self._by_sender
+
+ def receivers_for(
+ self, sender: t.Any
+ ) -> c.Generator[c.Callable[..., t.Any], None, None]:
+ """Yield each receiver to be called for ``sender``, in addition to those
+ to be called for :data:`ANY`. Weakly referenced receivers that are not
+ live will be disconnected and skipped.
+
+ :param sender: Yield receivers connected to this sender, in addition
+ to those connected to :data:`ANY`.
+ """
+ # TODO: test receivers_for(ANY)
+ if not self.receivers:
+ return
+
+ sender_id = make_id(sender)
+
+ if sender_id in self._by_sender:
+ ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
+ else:
+ ids = self._by_sender[ANY_ID].copy()
+
+ for receiver_id in ids:
+ receiver = self.receivers.get(receiver_id)
+
+ if receiver is None:
+ continue
+
+ if isinstance(receiver, weakref.ref):
+ strong = receiver()
+
+ if strong is None:
+ self._disconnect(receiver_id, ANY_ID)
+ continue
+
+ yield strong
+ else:
+ yield receiver
+
+ def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None:
+ """Disconnect ``receiver`` from being called when the signal is sent by
+ ``sender``.
+
+ :param receiver: A connected receiver callable.
+ :param sender: Disconnect from only this sender. By default, disconnect
+ from all senders.
+ """
+ sender_id: c.Hashable
+
+ if sender is ANY:
+ sender_id = ANY_ID
+ else:
+ sender_id = make_id(sender)
+
+ receiver_id = make_id(receiver)
+ self._disconnect(receiver_id, sender_id)
+
+ if (
+ "receiver_disconnected" in self.__dict__
+ and self.receiver_disconnected.receivers
+ ):
+ self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
+
+ def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None:
+ if sender_id == ANY_ID:
+ if self._by_receiver.pop(receiver_id, None) is not None:
+ for bucket in self._by_sender.values():
+ bucket.discard(receiver_id)
+
+ self.receivers.pop(receiver_id, None)
+ else:
+ self._by_sender[sender_id].discard(receiver_id)
+ self._by_receiver[receiver_id].discard(sender_id)
+
+ def _make_cleanup_receiver(
+ self, receiver_id: c.Hashable
+ ) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]:
+ """Create a callback function to disconnect a weakly referenced
+ receiver when it is garbage collected.
+ """
+
+ def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None:
+ self._disconnect(receiver_id, ANY_ID)
+
+ return cleanup
+
+ def _make_cleanup_sender(
+ self, sender_id: c.Hashable
+ ) -> c.Callable[[weakref.ref[t.Any]], None]:
+ """Create a callback function to disconnect all receivers for a weakly
+ referenced sender when it is garbage collected.
+ """
+ assert sender_id != ANY_ID
+
+ def cleanup(ref: weakref.ref[t.Any]) -> None:
+ self._weak_senders.pop(sender_id, None)
+
+ for receiver_id in self._by_sender.pop(sender_id, ()):
+ self._by_receiver[receiver_id].discard(sender_id)
+
+ return cleanup
+
+ def _cleanup_bookkeeping(self) -> None:
+ """Prune unused sender/receiver bookkeeping. Not threadsafe.
+
+ Connecting & disconnecting leaves behind a small amount of bookkeeping
+ data. Typical workloads using Blinker, for example in most web apps,
+ Flask, CLI scripts, etc., are not adversely affected by this
+ bookkeeping.
+
+ With a long-running process performing dynamic signal routing with high
+ volume, e.g. connecting to function closures, senders are all unique
+ object instances. Doing all of this over and over may cause memory usage
+ to grow due to extraneous bookkeeping. (An empty ``set`` for each stale
+ sender/receiver pair.)
+
+ This method will prune that bookkeeping away, with the caveat that such
+ pruning is not threadsafe. The risk is that cleanup of a fully
+ disconnected receiver/sender pair occurs while another thread is
+ connecting that same pair. If you are in the highly dynamic, unique
+ receiver/sender situation that has lead you to this method, that failure
+ mode is perhaps not a big deal for you.
+ """
+ for mapping in (self._by_sender, self._by_receiver):
+ for ident, bucket in list(mapping.items()):
+ if not bucket:
+ mapping.pop(ident, None)
+
+ def _clear_state(self) -> None:
+ """Disconnect all receivers and senders. Useful for tests."""
+ self._weak_senders.clear()
+ self.receivers.clear()
+ self._by_sender.clear()
+ self._by_receiver.clear()
+
+
+_receiver_connected = Signal(
+ """\
+Sent by a :class:`Signal` after a receiver connects.
+
+:argument: the Signal that was connected to
+:keyword receiver_arg: the connected receiver
+:keyword sender_arg: the sender to connect to
+:keyword weak_arg: true if the connection to receiver_arg is a weak reference
+
+.. deprecated:: 1.2
+ Individual signals have their own :attr:`~Signal.receiver_connected` and
+ :attr:`~Signal.receiver_disconnected` signals with a slightly simplified
+ call signature. This global signal will be removed in Blinker 1.9.
+"""
+)
+
+
+class NamedSignal(Signal):
+ """A named generic notification emitter. The name is not used by the signal
+ itself, but matches the key in the :class:`Namespace` that it belongs to.
+
+ :param name: The name of the signal within the namespace.
+ :param doc: The docstring for the signal.
+ """
+
+ def __init__(self, name: str, doc: str | None = None) -> None:
+ super().__init__(doc)
+
+ #: The name of this signal.
+ self.name: str = name
+
+ def __repr__(self) -> str:
+ base = super().__repr__()
+ return f"{base[:-1]}; {self.name!r}>" # noqa: E702
+
+
+if t.TYPE_CHECKING:
+
+ class PNamespaceSignal(t.Protocol):
+ def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ...
+
+ # Python < 3.9
+ _NamespaceBase = dict[str, NamedSignal] # type: ignore[misc]
+else:
+ _NamespaceBase = dict
+
+
+class Namespace(_NamespaceBase):
+ """A dict mapping names to signals."""
+
+ def signal(self, name: str, doc: str | None = None) -> NamedSignal:
+ """Return the :class:`NamedSignal` for the given ``name``, creating it
+ if required. Repeated calls with the same name return the same signal.
+
+ :param name: The name of the signal.
+ :param doc: The docstring of the signal.
+ """
+ if name not in self:
+ self[name] = NamedSignal(name, doc)
+
+ return self[name]
+
+
+class _WeakNamespace(WeakValueDictionary): # type: ignore[type-arg]
+ """A weak mapping of names to signals.
+
+ Automatically cleans up unused signals when the last reference goes out
+ of scope. This namespace implementation provides similar behavior to Blinker
+ <= 1.2.
+
+ .. deprecated:: 1.3
+ Will be removed in Blinker 1.9.
+
+ .. versionadded:: 1.3
+ """
+
+ def __init__(self) -> None:
+ warnings.warn(
+ "'WeakNamespace' is deprecated and will be removed in Blinker 1.9."
+ " Use 'Namespace' instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ super().__init__()
+
+ def signal(self, name: str, doc: str | None = None) -> NamedSignal:
+ """Return the :class:`NamedSignal` for the given ``name``, creating it
+ if required. Repeated calls with the same name return the same signal.
+
+ :param name: The name of the signal.
+ :param doc: The docstring of the signal.
+ """
+ if name not in self:
+ self[name] = NamedSignal(name, doc)
+
+ return self[name] # type: ignore[no-any-return]
+
+
+default_namespace: Namespace = Namespace()
+"""A default :class:`Namespace` for creating named signals. :func:`signal`
+creates a :class:`NamedSignal` in this namespace.
+"""
+
+signal: PNamespaceSignal = default_namespace.signal
+"""Return a :class:`NamedSignal` in :data:`default_namespace` with the given
+``name``, creating it if required. Repeated calls with the same name return the
+same signal.
+"""
+
+
+def __getattr__(name: str) -> t.Any:
+ if name == "receiver_connected":
+ warnings.warn(
+ "The global 'receiver_connected' signal is deprecated and will be"
+ " removed in Blinker 1.9. Use 'Signal.receiver_connected' and"
+ " 'Signal.receiver_disconnected' instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return _receiver_connected
+
+ if name == "WeakNamespace":
+ warnings.warn(
+ "'WeakNamespace' is deprecated and will be removed in Blinker 1.9."
+ " Use 'Namespace' instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return _WeakNamespace
+
+ raise AttributeError(name)