aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/werkzeug/_internal.py
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/werkzeug/_internal.py
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/_internal.py')
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/_internal.py211
1 files changed, 211 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/_internal.py b/venv/lib/python3.8/site-packages/werkzeug/_internal.py
new file mode 100644
index 0000000..7dd2fbc
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/_internal.py
@@ -0,0 +1,211 @@
+from __future__ import annotations
+
+import logging
+import re
+import sys
+import typing as t
+from datetime import datetime
+from datetime import timezone
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import WSGIEnvironment
+
+ from .wrappers.request import Request
+
+_logger: logging.Logger | None = None
+
+
+class _Missing:
+ def __repr__(self) -> str:
+ return "no value"
+
+ def __reduce__(self) -> str:
+ return "_missing"
+
+
+_missing = _Missing()
+
+
+def _wsgi_decoding_dance(s: str) -> str:
+ return s.encode("latin1").decode(errors="replace")
+
+
+def _wsgi_encoding_dance(s: str) -> str:
+ return s.encode().decode("latin1")
+
+
+def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
+ env = getattr(obj, "environ", obj)
+ assert isinstance(
+ env, dict
+ ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
+ return env
+
+
+def _has_level_handler(logger: logging.Logger) -> bool:
+ """Check if there is a handler in the logging chain that will handle
+ the given logger's effective level.
+ """
+ level = logger.getEffectiveLevel()
+ current = logger
+
+ while current:
+ if any(handler.level <= level for handler in current.handlers):
+ return True
+
+ if not current.propagate:
+ break
+
+ current = current.parent # type: ignore
+
+ return False
+
+
+class _ColorStreamHandler(logging.StreamHandler): # type: ignore[type-arg]
+ """On Windows, wrap stream with Colorama for ANSI style support."""
+
+ def __init__(self) -> None:
+ try:
+ import colorama
+ except ImportError:
+ stream = None
+ else:
+ stream = colorama.AnsiToWin32(sys.stderr)
+
+ super().__init__(stream)
+
+
+def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
+ """Log a message to the 'werkzeug' logger.
+
+ The logger is created the first time it is needed. If there is no
+ level set, it is set to :data:`logging.INFO`. If there is no handler
+ for the logger's effective level, a :class:`logging.StreamHandler`
+ is added.
+ """
+ global _logger
+
+ if _logger is None:
+ _logger = logging.getLogger("werkzeug")
+
+ if _logger.level == logging.NOTSET:
+ _logger.setLevel(logging.INFO)
+
+ if not _has_level_handler(_logger):
+ _logger.addHandler(_ColorStreamHandler())
+
+ getattr(_logger, type)(message.rstrip(), *args, **kwargs)
+
+
+@t.overload
+def _dt_as_utc(dt: None) -> None: ...
+
+
+@t.overload
+def _dt_as_utc(dt: datetime) -> datetime: ...
+
+
+def _dt_as_utc(dt: datetime | None) -> datetime | None:
+ if dt is None:
+ return dt
+
+ if dt.tzinfo is None:
+ return dt.replace(tzinfo=timezone.utc)
+ elif dt.tzinfo != timezone.utc:
+ return dt.astimezone(timezone.utc)
+
+ return dt
+
+
+_TAccessorValue = t.TypeVar("_TAccessorValue")
+
+
+class _DictAccessorProperty(t.Generic[_TAccessorValue]):
+ """Baseclass for `environ_property` and `header_property`."""
+
+ read_only = False
+
+ def __init__(
+ self,
+ name: str,
+ default: _TAccessorValue | None = None,
+ load_func: t.Callable[[str], _TAccessorValue] | None = None,
+ dump_func: t.Callable[[_TAccessorValue], str] | None = None,
+ read_only: bool | None = None,
+ doc: str | None = None,
+ ) -> None:
+ self.name = name
+ self.default = default
+ self.load_func = load_func
+ self.dump_func = dump_func
+ if read_only is not None:
+ self.read_only = read_only
+ self.__doc__ = doc
+
+ def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
+ raise NotImplementedError
+
+ @t.overload
+ def __get__(
+ self, instance: None, owner: type
+ ) -> _DictAccessorProperty[_TAccessorValue]: ...
+
+ @t.overload
+ def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: ...
+
+ def __get__(
+ self, instance: t.Any | None, owner: type
+ ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]:
+ if instance is None:
+ return self
+
+ storage = self.lookup(instance)
+
+ if self.name not in storage:
+ return self.default # type: ignore
+
+ value = storage[self.name]
+
+ if self.load_func is not None:
+ try:
+ return self.load_func(value)
+ except (ValueError, TypeError):
+ return self.default # type: ignore
+
+ return value # type: ignore
+
+ def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
+ if self.read_only:
+ raise AttributeError("read only property")
+
+ if self.dump_func is not None:
+ self.lookup(instance)[self.name] = self.dump_func(value)
+ else:
+ self.lookup(instance)[self.name] = value
+
+ def __delete__(self, instance: t.Any) -> None:
+ if self.read_only:
+ raise AttributeError("read only property")
+
+ self.lookup(instance).pop(self.name, None)
+
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} {self.name}>"
+
+
+_plain_int_re = re.compile(r"-?\d+", re.ASCII)
+
+
+def _plain_int(value: str) -> int:
+ """Parse an int only if it is only ASCII digits and ``-``.
+
+ This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but
+ are not allowed in HTTP header values.
+
+ Any leading or trailing whitespace is stripped
+ """
+ value = value.strip()
+ if _plain_int_re.fullmatch(value) is None:
+ raise ValueError
+
+ return int(value)