diff options
author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/werkzeug/http.py | |
parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/http.py')
-rw-r--r-- | venv/lib/python3.8/site-packages/werkzeug/http.py | 1391 |
1 files changed, 1391 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/http.py b/venv/lib/python3.8/site-packages/werkzeug/http.py new file mode 100644 index 0000000..cb8bc25 --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/http.py @@ -0,0 +1,1391 @@ +from __future__ import annotations + +import email.utils +import re +import typing as t +import warnings +from datetime import date +from datetime import datetime +from datetime import time +from datetime import timedelta +from datetime import timezone +from enum import Enum +from hashlib import sha1 +from time import mktime +from time import struct_time +from urllib.parse import quote +from urllib.parse import unquote +from urllib.request import parse_http_list as _parse_list_header + +from ._internal import _dt_as_utc +from ._internal import _plain_int + +if t.TYPE_CHECKING: + from _typeshed.wsgi import WSGIEnvironment + +_token_chars = frozenset( + "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" +) +_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') +_entity_headers = frozenset( + [ + "allow", + "content-encoding", + "content-language", + "content-length", + "content-location", + "content-md5", + "content-range", + "content-type", + "expires", + "last-modified", + ] +) +_hop_by_hop_headers = frozenset( + [ + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + ] +) +HTTP_STATUS_CODES = { + 100: "Continue", + 101: "Switching Protocols", + 102: "Processing", + 103: "Early Hints", # see RFC 8297 + 200: "OK", + 201: "Created", + 202: "Accepted", + 203: "Non Authoritative Information", + 204: "No Content", + 205: "Reset Content", + 206: "Partial Content", + 207: "Multi Status", + 208: "Already Reported", # see RFC 5842 + 226: "IM Used", # see RFC 3229 + 300: "Multiple Choices", + 301: "Moved Permanently", + 302: "Found", + 303: "See Other", + 304: "Not Modified", + 305: "Use Proxy", + 306: "Switch Proxy", # unused + 307: "Temporary Redirect", + 308: "Permanent Redirect", + 400: "Bad Request", + 401: "Unauthorized", + 402: "Payment Required", # unused + 403: "Forbidden", + 404: "Not Found", + 405: "Method Not Allowed", + 406: "Not Acceptable", + 407: "Proxy Authentication Required", + 408: "Request Timeout", + 409: "Conflict", + 410: "Gone", + 411: "Length Required", + 412: "Precondition Failed", + 413: "Request Entity Too Large", + 414: "Request URI Too Long", + 415: "Unsupported Media Type", + 416: "Requested Range Not Satisfiable", + 417: "Expectation Failed", + 418: "I'm a teapot", # see RFC 2324 + 421: "Misdirected Request", # see RFC 7540 + 422: "Unprocessable Entity", + 423: "Locked", + 424: "Failed Dependency", + 425: "Too Early", # see RFC 8470 + 426: "Upgrade Required", + 428: "Precondition Required", # see RFC 6585 + 429: "Too Many Requests", + 431: "Request Header Fields Too Large", + 449: "Retry With", # proprietary MS extension + 451: "Unavailable For Legal Reasons", + 500: "Internal Server Error", + 501: "Not Implemented", + 502: "Bad Gateway", + 503: "Service Unavailable", + 504: "Gateway Timeout", + 505: "HTTP Version Not Supported", + 506: "Variant Also Negotiates", # see RFC 2295 + 507: "Insufficient Storage", + 508: "Loop Detected", # see RFC 5842 + 510: "Not Extended", + 511: "Network Authentication Failed", +} + + +class COEP(Enum): + """Cross Origin Embedder Policies""" + + UNSAFE_NONE = "unsafe-none" + REQUIRE_CORP = "require-corp" + + +class COOP(Enum): + """Cross Origin Opener Policies""" + + UNSAFE_NONE = "unsafe-none" + SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" + SAME_ORIGIN = "same-origin" + + +def quote_header_value(value: t.Any, allow_token: bool = True) -> str: + """Add double quotes around a header value. If the header contains only ASCII token + characters, it will be returned unchanged. If the header contains ``"`` or ``\\`` + characters, they will be escaped with an additional ``\\`` character. + + This is the reverse of :func:`unquote_header_value`. + + :param value: The value to quote. Will be converted to a string. + :param allow_token: Disable to quote the value even if it only has token characters. + + .. versionchanged:: 3.0 + Passing bytes is not supported. + + .. versionchanged:: 3.0 + The ``extra_chars`` parameter is removed. + + .. versionchanged:: 2.3 + The value is quoted if it is the empty string. + + .. versionadded:: 0.5 + """ + value_str = str(value) + + if not value_str: + return '""' + + if allow_token: + token_chars = _token_chars + + if token_chars.issuperset(value_str): + return value_str + + value_str = value_str.replace("\\", "\\\\").replace('"', '\\"') + return f'"{value_str}"' + + +def unquote_header_value(value: str) -> str: + """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a + header value. + + This is the reverse of :func:`quote_header_value`. + + :param value: The header value to unquote. + + .. versionchanged:: 3.0 + The ``is_filename`` parameter is removed. + """ + if len(value) >= 2 and value[0] == value[-1] == '"': + value = value[1:-1] + return value.replace("\\\\", "\\").replace('\\"', '"') + + return value + + +def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str: + """Produce a header value and ``key=value`` parameters separated by semicolons + ``;``. For example, the ``Content-Type`` header. + + .. code-block:: python + + dump_options_header("text/html", {"charset": "UTF-8"}) + 'text/html; charset=UTF-8' + + This is the reverse of :func:`parse_options_header`. + + If a value contains non-token characters, it will be quoted. + + If a value is ``None``, the parameter is skipped. + + In some keys for some headers, a UTF-8 value can be encoded using a special + ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will + not produce that format automatically, but if a given key ends with an asterisk + ``*``, the value is assumed to have that form and will not be quoted further. + + :param header: The primary header value. + :param options: Parameters to encode as ``key=value`` pairs. + + .. versionchanged:: 2.3 + Keys with ``None`` values are skipped rather than treated as a bare key. + + .. versionchanged:: 2.2.3 + If a key ends with ``*``, its value will not be quoted. + """ + segments = [] + + if header is not None: + segments.append(header) + + for key, value in options.items(): + if value is None: + continue + + if key[-1] == "*": + segments.append(f"{key}={value}") + else: + segments.append(f"{key}={quote_header_value(value)}") + + return "; ".join(segments) + + +def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str: + """Produce a header value from a list of items or ``key=value`` pairs, separated by + commas ``,``. + + This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and + :func:`parse_set_header`. + + If a value contains non-token characters, it will be quoted. + + If a value is ``None``, the key is output alone. + + In some keys for some headers, a UTF-8 value can be encoded using a special + ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will + not produce that format automatically, but if a given key ends with an asterisk + ``*``, the value is assumed to have that form and will not be quoted further. + + .. code-block:: python + + dump_header(["foo", "bar baz"]) + 'foo, "bar baz"' + + dump_header({"foo": "bar baz"}) + 'foo="bar baz"' + + :param iterable: The items to create a header from. + + .. versionchanged:: 3.0 + The ``allow_token`` parameter is removed. + + .. versionchanged:: 2.2.3 + If a key ends with ``*``, its value will not be quoted. + """ + if isinstance(iterable, dict): + items = [] + + for key, value in iterable.items(): + if value is None: + items.append(key) + elif key[-1] == "*": + items.append(f"{key}={value}") + else: + items.append(f"{key}={quote_header_value(value)}") + else: + items = [quote_header_value(x) for x in iterable] + + return ", ".join(items) + + +def dump_csp_header(header: ds.ContentSecurityPolicy) -> str: + """Dump a Content Security Policy header. + + These are structured into policies such as "default-src 'self'; + script-src 'self'". + + .. versionadded:: 1.0.0 + Support for Content Security Policy headers was added. + + """ + return "; ".join(f"{key} {value}" for key, value in header.items()) + + +def parse_list_header(value: str) -> list[str]: + """Parse a header value that consists of a list of comma separated items according + to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__. + + This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes + from values. + + .. code-block:: python + + parse_list_header('token, "quoted value"') + ['token', 'quoted value'] + + This is the reverse of :func:`dump_header`. + + :param value: The header value to parse. + """ + result = [] + + for item in _parse_list_header(value): + if len(item) >= 2 and item[0] == item[-1] == '"': + item = item[1:-1] + + result.append(item) + + return result + + +def parse_dict_header(value: str) -> dict[str, str | None]: + """Parse a list header using :func:`parse_list_header`, then parse each item as a + ``key=value`` pair. + + .. code-block:: python + + parse_dict_header('a=b, c="d, e", f') + {"a": "b", "c": "d, e", "f": None} + + This is the reverse of :func:`dump_header`. + + If a key does not have a value, it is ``None``. + + This handles charsets for values as described in + `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8, + and ISO-8859-1 charsets are accepted, otherwise the value remains quoted. + + :param value: The header value to parse. + + .. versionchanged:: 3.0 + Passing bytes is not supported. + + .. versionchanged:: 3.0 + The ``cls`` argument is removed. + + .. versionchanged:: 2.3 + Added support for ``key*=charset''value`` encoded items. + + .. versionchanged:: 0.9 + The ``cls`` argument was added. + """ + result: dict[str, str | None] = {} + + for item in parse_list_header(value): + key, has_value, value = item.partition("=") + key = key.strip() + + if not key: + # =value is not valid + continue + + if not has_value: + result[key] = None + continue + + value = value.strip() + encoding: str | None = None + + if key[-1] == "*": + # key*=charset''value becomes key=value, where value is percent encoded + # adapted from parse_options_header, without the continuation handling + key = key[:-1] + match = _charset_value_re.match(value) + + if match: + # If there is a charset marker in the value, split it off. + encoding, value = match.groups() + encoding = encoding.lower() + + # A safe list of encodings. Modern clients should only send ASCII or UTF-8. + # This list will not be extended further. An invalid encoding will leave the + # value quoted. + if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: + # invalid bytes are replaced during unquoting + value = unquote(value, encoding=encoding) + + if len(value) >= 2 and value[0] == value[-1] == '"': + value = value[1:-1] + + result[key] = value + + return result + + +# https://httpwg.org/specs/rfc9110.html#parameter +_parameter_key_re = re.compile(r"([\w!#$%&'*+\-.^`|~]+)=", flags=re.ASCII) +_parameter_token_value_re = re.compile(r"[\w!#$%&'*+\-.^`|~]+", flags=re.ASCII) +# https://www.rfc-editor.org/rfc/rfc2231#section-4 +_charset_value_re = re.compile( + r""" + ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty + [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty + ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding + """, + re.ASCII | re.VERBOSE, +) +# https://www.rfc-editor.org/rfc/rfc2231#section-3 +_continuation_re = re.compile(r"\*(\d+)$", re.ASCII) + + +def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]: + """Parse a header that consists of a value with ``key=value`` parameters separated + by semicolons ``;``. For example, the ``Content-Type`` header. + + .. code-block:: python + + parse_options_header("text/html; charset=UTF-8") + ('text/html', {'charset': 'UTF-8'}) + + parse_options_header("") + ("", {}) + + This is the reverse of :func:`dump_options_header`. + + This parses valid parameter parts as described in + `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are + skipped. + + This handles continuations and charsets as described in + `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as + strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted, + otherwise the value remains quoted. + + Clients may not be consistent in how they handle a quote character within a quoted + value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__ + replaces it with ``%22`` in multipart form data. + `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash + escapes in HTTP headers. Both are decoded to the ``"`` character. + + Clients may not be consistent in how they handle non-ASCII characters. HTML + documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with + HTML character references, which can be decoded using :func:`html.unescape`. + + :param value: The header value to parse. + :return: ``(value, options)``, where ``options`` is a dict + + .. versionchanged:: 2.3 + Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted + values, are discarded instead of treating as ``None``. + + .. versionchanged:: 2.3 + Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values. + + .. versionchanged:: 2.3 + Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled. + + .. versionchanged:: 2.2 + Option names are always converted to lowercase. + + .. versionchanged:: 2.2 + The ``multiple`` parameter was removed. + + .. versionchanged:: 0.15 + :rfc:`2231` parameter continuations are handled. + + .. versionadded:: 0.5 + """ + if value is None: + return "", {} + + value, _, rest = value.partition(";") + value = value.strip() + rest = rest.strip() + + if not value or not rest: + # empty (invalid) value, or value without options + return value, {} + + # Collect all valid key=value parts without processing the value. + parts: list[tuple[str, str]] = [] + + while True: + if (m := _parameter_key_re.match(rest)) is not None: + pk = m.group(1).lower() + rest = rest[m.end() :] + + # Value may be a token. + if (m := _parameter_token_value_re.match(rest)) is not None: + parts.append((pk, m.group())) + + # Value may be a quoted string, find the closing quote. + elif rest[:1] == '"': + pos = 1 + length = len(rest) + + while pos < length: + if rest[pos : pos + 2] in {"\\\\", '\\"'}: + # Consume escaped slashes and quotes. + pos += 2 + elif rest[pos] == '"': + # Stop at an unescaped quote. + parts.append((pk, rest[: pos + 1])) + rest = rest[pos + 1 :] + break + else: + # Consume any other character. + pos += 1 + + # Find the next section delimited by `;`, if any. + if (end := rest.find(";")) == -1: + break + + rest = rest[end + 1 :].lstrip() + + options: dict[str, str] = {} + encoding: str | None = None + continued_encoding: str | None = None + + # For each collected part, process optional charset and continuation, + # unquote quoted values. + for pk, pv in parts: + if pk[-1] == "*": + # key*=charset''value becomes key=value, where value is percent encoded + pk = pk[:-1] + match = _charset_value_re.match(pv) + + if match: + # If there is a valid charset marker in the value, split it off. + encoding, pv = match.groups() + # This might be the empty string, handled next. + encoding = encoding.lower() + + # No charset marker, or marker with empty charset value. + if not encoding: + encoding = continued_encoding + + # A safe list of encodings. Modern clients should only send ASCII or UTF-8. + # This list will not be extended further. An invalid encoding will leave the + # value quoted. + if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: + # Continuation parts don't require their own charset marker. This is + # looser than the RFC, it will persist across different keys and allows + # changing the charset during a continuation. But this implementation is + # much simpler than tracking the full state. + continued_encoding = encoding + # invalid bytes are replaced during unquoting + pv = unquote(pv, encoding=encoding) + + # Remove quotes. At this point the value cannot be empty or a single quote. + if pv[0] == pv[-1] == '"': + # HTTP headers use slash, multipart form data uses percent + pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"') + + match = _continuation_re.search(pk) + + if match: + # key*0=a; key*1=b becomes key=ab + pk = pk[: match.start()] + options[pk] = options.get(pk, "") + pv + else: + options[pk] = pv + + return value, options + + +_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII) +_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") + + +@t.overload +def parse_accept_header(value: str | None) -> ds.Accept: ... + + +@t.overload +def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ... + + +def parse_accept_header( + value: str | None, cls: type[_TAnyAccept] | None = None +) -> _TAnyAccept: + """Parse an ``Accept`` header according to + `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__. + + Returns an :class:`.Accept` instance, which can sort and inspect items based on + their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or + ``Accept-Language``, pass the appropriate :class:`.Accept` subclass. + + :param value: The header value to parse. + :param cls: The :class:`.Accept` class to wrap the result in. + :return: An instance of ``cls``. + + .. versionchanged:: 2.3 + Parse according to RFC 9110. Items with invalid ``q`` values are skipped. + """ + if cls is None: + cls = t.cast(t.Type[_TAnyAccept], ds.Accept) + + if not value: + return cls(None) + + result = [] + + for item in parse_list_header(value): + item, options = parse_options_header(item) + + if "q" in options: + # pop q, remaining options are reconstructed + q_str = options.pop("q").strip() + + if _q_value_re.fullmatch(q_str) is None: + # ignore an invalid q + continue + + q = float(q_str) + + if q < 0 or q > 1: + # ignore an invalid q + continue + else: + q = 1 + + if options: + # reconstruct the media type with any options + item = dump_options_header(item, options) + + result.append((item, q)) + + return cls(result) + + +_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl") + + +@t.overload +def parse_cache_control_header( + value: str | None, + on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, +) -> ds.RequestCacheControl: ... + + +@t.overload +def parse_cache_control_header( + value: str | None, + on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, + cls: type[_TAnyCC] = ..., +) -> _TAnyCC: ... + + +def parse_cache_control_header( + value: str | None, + on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, + cls: type[_TAnyCC] | None = None, +) -> _TAnyCC: + """Parse a cache control header. The RFC differs between response and + request cache control, this method does not. It's your responsibility + to not use the wrong control statements. + + .. versionadded:: 0.5 + The `cls` was added. If not specified an immutable + :class:`~werkzeug.datastructures.RequestCacheControl` is returned. + + :param value: a cache control header to be parsed. + :param on_update: an optional callable that is called every time a value + on the :class:`~werkzeug.datastructures.CacheControl` + object is changed. + :param cls: the class for the returned object. By default + :class:`~werkzeug.datastructures.RequestCacheControl` is used. + :return: a `cls` object. + """ + if cls is None: + cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl) + + if not value: + return cls((), on_update) + + return cls(parse_dict_header(value), on_update) + + +_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") + + +@t.overload +def parse_csp_header( + value: str | None, + on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, +) -> ds.ContentSecurityPolicy: ... + + +@t.overload +def parse_csp_header( + value: str | None, + on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, + cls: type[_TAnyCSP] = ..., +) -> _TAnyCSP: ... + + +def parse_csp_header( + value: str | None, + on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, + cls: type[_TAnyCSP] | None = None, +) -> _TAnyCSP: + """Parse a Content Security Policy header. + + .. versionadded:: 1.0.0 + Support for Content Security Policy headers was added. + + :param value: a csp header to be parsed. + :param on_update: an optional callable that is called every time a value + on the object is changed. + :param cls: the class for the returned object. By default + :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. + :return: a `cls` object. + """ + if cls is None: + cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy) + + if value is None: + return cls((), on_update) + + items = [] + + for policy in value.split(";"): + policy = policy.strip() + + # Ignore badly formatted policies (no space) + if " " in policy: + directive, value = policy.strip().split(" ", 1) + items.append((directive.strip(), value.strip())) + + return cls(items, on_update) + + +def parse_set_header( + value: str | None, + on_update: t.Callable[[ds.HeaderSet], None] | None = None, +) -> ds.HeaderSet: + """Parse a set-like header and return a + :class:`~werkzeug.datastructures.HeaderSet` object: + + >>> hs = parse_set_header('token, "quoted value"') + + The return value is an object that treats the items case-insensitively + and keeps the order of the items: + + >>> 'TOKEN' in hs + True + >>> hs.index('quoted value') + 1 + >>> hs + HeaderSet(['token', 'quoted value']) + + To create a header from the :class:`HeaderSet` again, use the + :func:`dump_header` function. + + :param value: a set header to be parsed. + :param on_update: an optional callable that is called every time a + value on the :class:`~werkzeug.datastructures.HeaderSet` + object is changed. + :return: a :class:`~werkzeug.datastructures.HeaderSet` + """ + if not value: + return ds.HeaderSet(None, on_update) + return ds.HeaderSet(parse_list_header(value), on_update) + + +def parse_if_range_header(value: str | None) -> ds.IfRange: + """Parses an if-range header which can be an etag or a date. Returns + a :class:`~werkzeug.datastructures.IfRange` object. + + .. versionchanged:: 2.0 + If the value represents a datetime, it is timezone-aware. + + .. versionadded:: 0.7 + """ + if not value: + return ds.IfRange() + date = parse_date(value) + if date is not None: + return ds.IfRange(date=date) + # drop weakness information + return ds.IfRange(unquote_etag(value)[0]) + + +def parse_range_header( + value: str | None, make_inclusive: bool = True +) -> ds.Range | None: + """Parses a range header into a :class:`~werkzeug.datastructures.Range` + object. If the header is missing or malformed `None` is returned. + `ranges` is a list of ``(start, stop)`` tuples where the ranges are + non-inclusive. + + .. versionadded:: 0.7 + """ + if not value or "=" not in value: + return None + + ranges = [] + last_end = 0 + units, rng = value.split("=", 1) + units = units.strip().lower() + + for item in rng.split(","): + item = item.strip() + if "-" not in item: + return None + if item.startswith("-"): + if last_end < 0: + return None + try: + begin = _plain_int(item) + except ValueError: + return None + end = None + last_end = -1 + elif "-" in item: + begin_str, end_str = item.split("-", 1) + begin_str = begin_str.strip() + end_str = end_str.strip() + + try: + begin = _plain_int(begin_str) + except ValueError: + return None + + if begin < last_end or last_end < 0: + return None + if end_str: + try: + end = _plain_int(end_str) + 1 + except ValueError: + return None + + if begin >= end: + return None + else: + end = None + last_end = end if end is not None else -1 + ranges.append((begin, end)) + + return ds.Range(units, ranges) + + +def parse_content_range_header( + value: str | None, + on_update: t.Callable[[ds.ContentRange], None] | None = None, +) -> ds.ContentRange | None: + """Parses a range header into a + :class:`~werkzeug.datastructures.ContentRange` object or `None` if + parsing is not possible. + + .. versionadded:: 0.7 + + :param value: a content range header to be parsed. + :param on_update: an optional callable that is called every time a value + on the :class:`~werkzeug.datastructures.ContentRange` + object is changed. + """ + if value is None: + return None + try: + units, rangedef = (value or "").strip().split(None, 1) + except ValueError: + return None + + if "/" not in rangedef: + return None + rng, length_str = rangedef.split("/", 1) + if length_str == "*": + length = None + else: + try: + length = _plain_int(length_str) + except ValueError: + return None + + if rng == "*": + if not is_byte_range_valid(None, None, length): + return None + + return ds.ContentRange(units, None, None, length, on_update=on_update) + elif "-" not in rng: + return None + + start_str, stop_str = rng.split("-", 1) + try: + start = _plain_int(start_str) + stop = _plain_int(stop_str) + 1 + except ValueError: + return None + + if is_byte_range_valid(start, stop, length): + return ds.ContentRange(units, start, stop, length, on_update=on_update) + + return None + + +def quote_etag(etag: str, weak: bool = False) -> str: + """Quote an etag. + + :param etag: the etag to quote. + :param weak: set to `True` to tag it "weak". + """ + if '"' in etag: + raise ValueError("invalid etag") + etag = f'"{etag}"' + if weak: + etag = f"W/{etag}" + return etag + + +def unquote_etag( + etag: str | None, +) -> tuple[str, bool] | tuple[None, None]: + """Unquote a single etag: + + >>> unquote_etag('W/"bar"') + ('bar', True) + >>> unquote_etag('"bar"') + ('bar', False) + + :param etag: the etag identifier to unquote. + :return: a ``(etag, weak)`` tuple. + """ + if not etag: + return None, None + etag = etag.strip() + weak = False + if etag.startswith(("W/", "w/")): + weak = True + etag = etag[2:] + if etag[:1] == etag[-1:] == '"': + etag = etag[1:-1] + return etag, weak + + +def parse_etags(value: str | None) -> ds.ETags: + """Parse an etag header. + + :param value: the tag header to parse + :return: an :class:`~werkzeug.datastructures.ETags` object. + """ + if not value: + return ds.ETags() + strong = [] + weak = [] + end = len(value) + pos = 0 + while pos < end: + match = _etag_re.match(value, pos) + if match is None: + break + is_weak, quoted, raw = match.groups() + if raw == "*": + return ds.ETags(star_tag=True) + elif quoted: + raw = quoted + if is_weak: + weak.append(raw) + else: + strong.append(raw) + pos = match.end() + return ds.ETags(strong, weak) + + +def generate_etag(data: bytes) -> str: + """Generate an etag for some data. + + .. versionchanged:: 2.0 + Use SHA-1. MD5 may not be available in some environments. + """ + return sha1(data).hexdigest() + + +def parse_date(value: str | None) -> datetime | None: + """Parse an :rfc:`2822` date into a timezone-aware + :class:`datetime.datetime` object, or ``None`` if parsing fails. + + This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It + returns ``None`` if parsing fails instead of raising an exception, + and always returns a timezone-aware datetime object. If the string + doesn't have timezone information, it is assumed to be UTC. + + :param value: A string with a supported date format. + + .. versionchanged:: 2.0 + Return a timezone-aware datetime object. Use + ``email.utils.parsedate_to_datetime``. + """ + if value is None: + return None + + try: + dt = email.utils.parsedate_to_datetime(value) + except (TypeError, ValueError): + return None + + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + + return dt + + +def http_date( + timestamp: datetime | date | int | float | struct_time | None = None, +) -> str: + """Format a datetime object or timestamp into an :rfc:`2822` date + string. + + This is a wrapper for :func:`email.utils.format_datetime`. It + assumes naive datetime objects are in UTC instead of raising an + exception. + + :param timestamp: The datetime or timestamp to format. Defaults to + the current time. + + .. versionchanged:: 2.0 + Use ``email.utils.format_datetime``. Accept ``date`` objects. + """ + if isinstance(timestamp, date): + if not isinstance(timestamp, datetime): + # Assume plain date is midnight UTC. + timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) + else: + # Ensure datetime is timezone-aware. + timestamp = _dt_as_utc(timestamp) + + return email.utils.format_datetime(timestamp, usegmt=True) + + if isinstance(timestamp, struct_time): + timestamp = mktime(timestamp) + + return email.utils.formatdate(timestamp, usegmt=True) + + +def parse_age(value: str | None = None) -> timedelta | None: + """Parses a base-10 integer count of seconds into a timedelta. + + If parsing fails, the return value is `None`. + + :param value: a string consisting of an integer represented in base-10 + :return: a :class:`datetime.timedelta` object or `None`. + """ + if not value: + return None + try: + seconds = int(value) + except ValueError: + return None + if seconds < 0: + return None + try: + return timedelta(seconds=seconds) + except OverflowError: + return None + + +def dump_age(age: timedelta | int | None = None) -> str | None: + """Formats the duration as a base-10 integer. + + :param age: should be an integer number of seconds, + a :class:`datetime.timedelta` object, or, + if the age is unknown, `None` (default). + """ + if age is None: + return None + if isinstance(age, timedelta): + age = int(age.total_seconds()) + else: + age = int(age) + + if age < 0: + raise ValueError("age cannot be negative") + + return str(age) + + +def is_resource_modified( + environ: WSGIEnvironment, + etag: str | None = None, + data: bytes | None = None, + last_modified: datetime | str | None = None, + ignore_if_range: bool = True, +) -> bool: + """Convenience method for conditional requests. + + :param environ: the WSGI environment of the request to be checked. + :param etag: the etag for the response for comparison. + :param data: or alternatively the data of the response to automatically + generate an etag using :func:`generate_etag`. + :param last_modified: an optional date of the last modification. + :param ignore_if_range: If `False`, `If-Range` header will be taken into + account. + :return: `True` if the resource was modified, otherwise `False`. + + .. versionchanged:: 2.0 + SHA-1 is used to generate an etag value for the data. MD5 may + not be available in some environments. + + .. versionchanged:: 1.0.0 + The check is run for methods other than ``GET`` and ``HEAD``. + """ + return _sansio_http.is_resource_modified( + http_range=environ.get("HTTP_RANGE"), + http_if_range=environ.get("HTTP_IF_RANGE"), + http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"), + http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"), + http_if_match=environ.get("HTTP_IF_MATCH"), + etag=etag, + data=data, + last_modified=last_modified, + ignore_if_range=ignore_if_range, + ) + + +def remove_entity_headers( + headers: ds.Headers | list[tuple[str, str]], + allowed: t.Iterable[str] = ("expires", "content-location"), +) -> None: + """Remove all entity headers from a list or :class:`Headers` object. This + operation works in-place. `Expires` and `Content-Location` headers are + by default not removed. The reason for this is :rfc:`2616` section + 10.3.5 which specifies some entity headers that should be sent. + + .. versionchanged:: 0.5 + added `allowed` parameter. + + :param headers: a list or :class:`Headers` object. + :param allowed: a list of headers that should still be allowed even though + they are entity headers. + """ + allowed = {x.lower() for x in allowed} + headers[:] = [ + (key, value) + for key, value in headers + if not is_entity_header(key) or key.lower() in allowed + ] + + +def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None: + """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or + :class:`Headers` object. This operation works in-place. + + .. versionadded:: 0.5 + + :param headers: a list or :class:`Headers` object. + """ + headers[:] = [ + (key, value) for key, value in headers if not is_hop_by_hop_header(key) + ] + + +def is_entity_header(header: str) -> bool: + """Check if a header is an entity header. + + .. versionadded:: 0.5 + + :param header: the header to test. + :return: `True` if it's an entity header, `False` otherwise. + """ + return header.lower() in _entity_headers + + +def is_hop_by_hop_header(header: str) -> bool: + """Check if a header is an HTTP/1.1 "Hop-by-Hop" header. + + .. versionadded:: 0.5 + + :param header: the header to test. + :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. + """ + return header.lower() in _hop_by_hop_headers + + +def parse_cookie( + header: WSGIEnvironment | str | None, + cls: type[ds.MultiDict[str, str]] | None = None, +) -> ds.MultiDict[str, str]: + """Parse a cookie from a string or WSGI environ. + + The same key can be provided multiple times, the values are stored + in-order. The default :class:`MultiDict` will have the first value + first, and all values can be retrieved with + :meth:`MultiDict.getlist`. + + :param header: The cookie header as a string, or a WSGI environ dict + with a ``HTTP_COOKIE`` key. + :param cls: A dict-like class to store the parsed cookies in. + Defaults to :class:`MultiDict`. + + .. versionchanged:: 3.0 + Passing bytes, and the ``charset`` and ``errors`` parameters, were removed. + + .. versionchanged:: 1.0 + Returns a :class:`MultiDict` instead of a ``TypeConversionDict``. + + .. versionchanged:: 0.5 + Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls`` + parameter was added. + """ + if isinstance(header, dict): + cookie = header.get("HTTP_COOKIE") + else: + cookie = header + + if cookie: + cookie = cookie.encode("latin1").decode() + + return _sansio_http.parse_cookie(cookie=cookie, cls=cls) + + +_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A) +_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A) +_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"} +_cookie_slash_map.update( + (v.to_bytes(1, "big"), b"\\%03o" % v) + for v in [*range(0x20), *b",;", *range(0x7F, 256)] +) + + +def dump_cookie( + key: str, + value: str = "", + max_age: timedelta | int | None = None, + expires: str | datetime | int | float | None = None, + path: str | None = "/", + domain: str | None = None, + secure: bool = False, + httponly: bool = False, + sync_expires: bool = True, + max_size: int = 4093, + samesite: str | None = None, +) -> str: + """Create a Set-Cookie header without the ``Set-Cookie`` prefix. + + The return value is usually restricted to ascii as the vast majority + of values are properly escaped, but that is no guarantee. It's + tunneled through latin1 as required by :pep:`3333`. + + The return value is not ASCII safe if the key contains unicode + characters. This is technically against the specification but + happens in the wild. It's strongly recommended to not use + non-ASCII values for the keys. + + :param max_age: should be a number of seconds, or `None` (default) if + the cookie should last only as long as the client's + browser session. Additionally `timedelta` objects + are accepted, too. + :param expires: should be a `datetime` object or unix timestamp. + :param path: limits the cookie to a given path, per default it will + span the whole domain. + :param domain: Use this if you want to set a cross-domain cookie. For + example, ``domain="example.com"`` will set a cookie + that is readable by the domain ``www.example.com``, + ``foo.example.com`` etc. Otherwise, a cookie will only + be readable by the domain that set it. + :param secure: The cookie will only be available via HTTPS + :param httponly: disallow JavaScript to access the cookie. This is an + extension to the cookie standard and probably not + supported by all browsers. + :param charset: the encoding for string values. + :param sync_expires: automatically set expires if max_age is defined + but expires not. + :param max_size: Warn if the final header value exceeds this size. The + default, 4093, should be safely `supported by most browsers + <cookie_>`_. Set to 0 to disable this check. + :param samesite: Limits the scope of the cookie such that it will + only be attached to requests if those requests are same-site. + + .. _`cookie`: http://browsercookielimits.squawky.net/ + + .. versionchanged:: 3.0 + Passing bytes, and the ``charset`` parameter, were removed. + + .. versionchanged:: 2.3.3 + The ``path`` parameter is ``/`` by default. + + .. versionchanged:: 2.3.1 + The value allows more characters without quoting. + + .. versionchanged:: 2.3 + ``localhost`` and other names without a dot are allowed for the domain. A + leading dot is ignored. + + .. versionchanged:: 2.3 + The ``path`` parameter is ``None`` by default. + + .. versionchanged:: 1.0.0 + The string ``'None'`` is accepted for ``samesite``. + """ + if path is not None: + # safe = https://url.spec.whatwg.org/#url-path-segment-string + # as well as percent for things that are already quoted + # excluding semicolon since it's part of the header syntax + path = quote(path, safe="%!$&'()*+,/:=@") + + if domain: + domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii") + + if isinstance(max_age, timedelta): + max_age = int(max_age.total_seconds()) + + if expires is not None: + if not isinstance(expires, str): + expires = http_date(expires) + elif max_age is not None and sync_expires: + expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age) + + if samesite is not None: + samesite = samesite.title() + + if samesite not in {"Strict", "Lax", "None"}: + raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") + + # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with + # three octal digits, which matches http.cookies, although the RFC suggests base64. + if not _cookie_no_quote_re.fullmatch(value): + # Work with bytes here, since a UTF-8 character could be multiple bytes. + value = _cookie_slash_re.sub( + lambda m: _cookie_slash_map[m.group()], value.encode() + ).decode("ascii") + value = f'"{value}"' + + # Send a non-ASCII key as mojibake. Everything else should already be ASCII. + # TODO Remove encoding dance, it seems like clients accept UTF-8 keys + buf = [f"{key.encode().decode('latin1')}={value}"] + + for k, v in ( + ("Domain", domain), + ("Expires", expires), + ("Max-Age", max_age), + ("Secure", secure), + ("HttpOnly", httponly), + ("Path", path), + ("SameSite", samesite), + ): + if v is None or v is False: + continue + + if v is True: + buf.append(k) + continue + + buf.append(f"{k}={v}") + + rv = "; ".join(buf) + + # Warn if the final value of the cookie is larger than the limit. If the cookie is + # too large, then it may be silently ignored by the browser, which can be quite hard + # to debug. + cookie_size = len(rv) + + if max_size and cookie_size > max_size: + value_size = len(value) + warnings.warn( + f"The '{key}' cookie is too large: the value was {value_size} bytes but the" + f" header required {cookie_size - value_size} extra bytes. The final size" + f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" + " silently ignore cookies larger than this.", + stacklevel=2, + ) + + return rv + + +def is_byte_range_valid( + start: int | None, stop: int | None, length: int | None +) -> bool: + """Checks if a given byte content range is valid for the given length. + + .. versionadded:: 0.7 + """ + if (start is None) != (stop is None): + return False + elif start is None: + return length is None or length >= 0 + elif length is None: + return 0 <= start < stop # type: ignore + elif start >= stop: # type: ignore + return False + return 0 <= start < length + + +# circular dependencies +from . import datastructures as ds +from .sansio import http as _sansio_http |