aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/werkzeug/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/test.py')
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/test.py1464
1 files changed, 1464 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/test.py b/venv/lib/python3.8/site-packages/werkzeug/test.py
new file mode 100644
index 0000000..38f69bf
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/test.py
@@ -0,0 +1,1464 @@
+from __future__ import annotations
+
+import dataclasses
+import mimetypes
+import sys
+import typing as t
+from collections import defaultdict
+from datetime import datetime
+from io import BytesIO
+from itertools import chain
+from random import random
+from tempfile import TemporaryFile
+from time import time
+from urllib.parse import unquote
+from urllib.parse import urlsplit
+from urllib.parse import urlunsplit
+
+from ._internal import _get_environ
+from ._internal import _wsgi_decoding_dance
+from ._internal import _wsgi_encoding_dance
+from .datastructures import Authorization
+from .datastructures import CallbackDict
+from .datastructures import CombinedMultiDict
+from .datastructures import EnvironHeaders
+from .datastructures import FileMultiDict
+from .datastructures import Headers
+from .datastructures import MultiDict
+from .http import dump_cookie
+from .http import dump_options_header
+from .http import parse_cookie
+from .http import parse_date
+from .http import parse_options_header
+from .sansio.multipart import Data
+from .sansio.multipart import Epilogue
+from .sansio.multipart import Field
+from .sansio.multipart import File
+from .sansio.multipart import MultipartEncoder
+from .sansio.multipart import Preamble
+from .urls import _urlencode
+from .urls import iri_to_uri
+from .utils import cached_property
+from .utils import get_content_type
+from .wrappers.request import Request
+from .wrappers.response import Response
+from .wsgi import ClosingIterator
+from .wsgi import get_current_url
+
+if t.TYPE_CHECKING:
+ import typing_extensions as te
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+def stream_encode_multipart(
+ data: t.Mapping[str, t.Any],
+ use_tempfile: bool = True,
+ threshold: int = 1024 * 500,
+ boundary: str | None = None,
+) -> tuple[t.IO[bytes], int, str]:
+ """Encode a dict of values (either strings or file descriptors or
+ :class:`FileStorage` objects.) into a multipart encoded string stored
+ in a file descriptor.
+
+ .. versionchanged:: 3.0
+ The ``charset`` parameter was removed.
+ """
+ if boundary is None:
+ boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
+
+ stream: t.IO[bytes] = BytesIO()
+ total_length = 0
+ on_disk = False
+ write_binary: t.Callable[[bytes], int]
+
+ if use_tempfile:
+
+ def write_binary(s: bytes) -> int:
+ nonlocal stream, total_length, on_disk
+
+ if on_disk:
+ return stream.write(s)
+ else:
+ length = len(s)
+
+ if length + total_length <= threshold:
+ stream.write(s)
+ else:
+ new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
+ new_stream.write(stream.getvalue()) # type: ignore
+ new_stream.write(s)
+ stream = new_stream
+ on_disk = True
+
+ total_length += length
+ return length
+
+ else:
+ write_binary = stream.write
+
+ encoder = MultipartEncoder(boundary.encode())
+ write_binary(encoder.send_event(Preamble(data=b"")))
+ for key, value in _iter_data(data):
+ reader = getattr(value, "read", None)
+ if reader is not None:
+ filename = getattr(value, "filename", getattr(value, "name", None))
+ content_type = getattr(value, "content_type", None)
+ if content_type is None:
+ content_type = (
+ filename
+ and mimetypes.guess_type(filename)[0]
+ or "application/octet-stream"
+ )
+ headers = value.headers
+ headers.update([("Content-Type", content_type)])
+ if filename is None:
+ write_binary(encoder.send_event(Field(name=key, headers=headers)))
+ else:
+ write_binary(
+ encoder.send_event(
+ File(name=key, filename=filename, headers=headers)
+ )
+ )
+ while True:
+ chunk = reader(16384)
+
+ if not chunk:
+ write_binary(encoder.send_event(Data(data=chunk, more_data=False)))
+ break
+
+ write_binary(encoder.send_event(Data(data=chunk, more_data=True)))
+ else:
+ if not isinstance(value, str):
+ value = str(value)
+ write_binary(encoder.send_event(Field(name=key, headers=Headers())))
+ write_binary(encoder.send_event(Data(data=value.encode(), more_data=False)))
+
+ write_binary(encoder.send_event(Epilogue(data=b"")))
+
+ length = stream.tell()
+ stream.seek(0)
+ return stream, length, boundary
+
+
+def encode_multipart(
+ values: t.Mapping[str, t.Any], boundary: str | None = None
+) -> tuple[str, bytes]:
+ """Like `stream_encode_multipart` but returns a tuple in the form
+ (``boundary``, ``data``) where data is bytes.
+
+ .. versionchanged:: 3.0
+ The ``charset`` parameter was removed.
+ """
+ stream, length, boundary = stream_encode_multipart(
+ values, use_tempfile=False, boundary=boundary
+ )
+ return boundary, stream.read()
+
+
+def _iter_data(data: t.Mapping[str, t.Any]) -> t.Iterator[tuple[str, t.Any]]:
+ """Iterate over a mapping that might have a list of values, yielding
+ all key, value pairs. Almost like iter_multi_items but only allows
+ lists, not tuples, of values so tuples can be used for files.
+ """
+ if isinstance(data, MultiDict):
+ yield from data.items(multi=True)
+ else:
+ for key, value in data.items():
+ if isinstance(value, list):
+ for v in value:
+ yield key, v
+ else:
+ yield key, value
+
+
+_TAnyMultiDict = t.TypeVar("_TAnyMultiDict", bound="MultiDict[t.Any, t.Any]")
+
+
+class EnvironBuilder:
+ """This class can be used to conveniently create a WSGI environment
+ for testing purposes. It can be used to quickly create WSGI environments
+ or request objects from arbitrary data.
+
+ The signature of this class is also used in some other places as of
+ Werkzeug 0.5 (:func:`create_environ`, :meth:`Response.from_values`,
+ :meth:`Client.open`). Because of this most of the functionality is
+ available through the constructor alone.
+
+ Files and regular form data can be manipulated independently of each
+ other with the :attr:`form` and :attr:`files` attributes, but are
+ passed with the same argument to the constructor: `data`.
+
+ `data` can be any of these values:
+
+ - a `str` or `bytes` object: The object is converted into an
+ :attr:`input_stream`, the :attr:`content_length` is set and you have to
+ provide a :attr:`content_type`.
+ - a `dict` or :class:`MultiDict`: The keys have to be strings. The values
+ have to be either any of the following objects, or a list of any of the
+ following objects:
+
+ - a :class:`file`-like object: These are converted into
+ :class:`FileStorage` objects automatically.
+ - a `tuple`: The :meth:`~FileMultiDict.add_file` method is called
+ with the key and the unpacked `tuple` items as positional
+ arguments.
+ - a `str`: The string is set as form data for the associated key.
+ - a file-like object: The object content is loaded in memory and then
+ handled like a regular `str` or a `bytes`.
+
+ :param path: the path of the request. In the WSGI environment this will
+ end up as `PATH_INFO`. If the `query_string` is not defined
+ and there is a question mark in the `path` everything after
+ it is used as query string.
+ :param base_url: the base URL is a URL that is used to extract the WSGI
+ URL scheme, host (server name + server port) and the
+ script root (`SCRIPT_NAME`).
+ :param query_string: an optional string or dict with URL parameters.
+ :param method: the HTTP method to use, defaults to `GET`.
+ :param input_stream: an optional input stream. Do not specify this and
+ `data`. As soon as an input stream is set you can't
+ modify :attr:`args` and :attr:`files` unless you
+ set the :attr:`input_stream` to `None` again.
+ :param content_type: The content type for the request. As of 0.5 you
+ don't have to provide this when specifying files
+ and form data via `data`.
+ :param content_length: The content length for the request. You don't
+ have to specify this when providing data via
+ `data`.
+ :param errors_stream: an optional error stream that is used for
+ `wsgi.errors`. Defaults to :data:`stderr`.
+ :param multithread: controls `wsgi.multithread`. Defaults to `False`.
+ :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
+ :param run_once: controls `wsgi.run_once`. Defaults to `False`.
+ :param headers: an optional list or :class:`Headers` object of headers.
+ :param data: a string or dict of form data or a file-object.
+ See explanation above.
+ :param json: An object to be serialized and assigned to ``data``.
+ Defaults the content type to ``"application/json"``.
+ Serialized with the function assigned to :attr:`json_dumps`.
+ :param environ_base: an optional dict of environment defaults.
+ :param environ_overrides: an optional dict of environment overrides.
+ :param auth: An authorization object to use for the
+ ``Authorization`` header value. A ``(username, password)`` tuple
+ is a shortcut for ``Basic`` authorization.
+
+ .. versionchanged:: 3.0
+ The ``charset`` parameter was removed.
+
+ .. versionchanged:: 2.1
+ ``CONTENT_TYPE`` and ``CONTENT_LENGTH`` are not duplicated as
+ header keys in the environ.
+
+ .. versionchanged:: 2.0
+ ``REQUEST_URI`` and ``RAW_URI`` is the full raw URI including
+ the query string, not only the path.
+
+ .. versionchanged:: 2.0
+ The default :attr:`request_class` is ``Request`` instead of
+ ``BaseRequest``.
+
+ .. versionadded:: 2.0
+ Added the ``auth`` parameter.
+
+ .. versionadded:: 0.15
+ The ``json`` param and :meth:`json_dumps` method.
+
+ .. versionadded:: 0.15
+ The environ has keys ``REQUEST_URI`` and ``RAW_URI`` containing
+ the path before percent-decoding. This is not part of the WSGI
+ PEP, but many WSGI servers include it.
+
+ .. versionchanged:: 0.6
+ ``path`` and ``base_url`` can now be unicode strings that are
+ encoded with :func:`iri_to_uri`.
+ """
+
+ #: the server protocol to use. defaults to HTTP/1.1
+ server_protocol = "HTTP/1.1"
+
+ #: the wsgi version to use. defaults to (1, 0)
+ wsgi_version = (1, 0)
+
+ #: The default request class used by :meth:`get_request`.
+ request_class = Request
+
+ import json
+
+ #: The serialization function used when ``json`` is passed.
+ json_dumps = staticmethod(json.dumps)
+ del json
+
+ _args: MultiDict[str, str] | None
+ _query_string: str | None
+ _input_stream: t.IO[bytes] | None
+ _form: MultiDict[str, str] | None
+ _files: FileMultiDict | None
+
+ def __init__(
+ self,
+ path: str = "/",
+ base_url: str | None = None,
+ query_string: t.Mapping[str, str] | str | None = None,
+ method: str = "GET",
+ input_stream: t.IO[bytes] | None = None,
+ content_type: str | None = None,
+ content_length: int | None = None,
+ errors_stream: t.IO[str] | None = None,
+ multithread: bool = False,
+ multiprocess: bool = False,
+ run_once: bool = False,
+ headers: Headers | t.Iterable[tuple[str, str]] | None = None,
+ data: None | (t.IO[bytes] | str | bytes | t.Mapping[str, t.Any]) = None,
+ environ_base: t.Mapping[str, t.Any] | None = None,
+ environ_overrides: t.Mapping[str, t.Any] | None = None,
+ mimetype: str | None = None,
+ json: t.Mapping[str, t.Any] | None = None,
+ auth: Authorization | tuple[str, str] | None = None,
+ ) -> None:
+ if query_string is not None and "?" in path:
+ raise ValueError("Query string is defined in the path and as an argument")
+ request_uri = urlsplit(path)
+ if query_string is None and "?" in path:
+ query_string = request_uri.query
+
+ self.path = iri_to_uri(request_uri.path)
+ self.request_uri = path
+ if base_url is not None:
+ base_url = iri_to_uri(base_url)
+ self.base_url = base_url # type: ignore
+ if isinstance(query_string, str):
+ self.query_string = query_string
+ else:
+ if query_string is None:
+ query_string = MultiDict()
+ elif not isinstance(query_string, MultiDict):
+ query_string = MultiDict(query_string)
+ self.args = query_string
+ self.method = method
+ if headers is None:
+ headers = Headers()
+ elif not isinstance(headers, Headers):
+ headers = Headers(headers)
+ self.headers = headers
+ if content_type is not None:
+ self.content_type = content_type
+ if errors_stream is None:
+ errors_stream = sys.stderr
+ self.errors_stream = errors_stream
+ self.multithread = multithread
+ self.multiprocess = multiprocess
+ self.run_once = run_once
+ self.environ_base = environ_base
+ self.environ_overrides = environ_overrides
+ self.input_stream = input_stream
+ self.content_length = content_length
+ self.closed = False
+
+ if auth is not None:
+ if isinstance(auth, tuple):
+ auth = Authorization(
+ "basic", {"username": auth[0], "password": auth[1]}
+ )
+
+ self.headers.set("Authorization", auth.to_header())
+
+ if json is not None:
+ if data is not None:
+ raise TypeError("can't provide both json and data")
+
+ data = self.json_dumps(json)
+
+ if self.content_type is None:
+ self.content_type = "application/json"
+
+ if data:
+ if input_stream is not None:
+ raise TypeError("can't provide input stream and data")
+ if hasattr(data, "read"):
+ data = data.read()
+ if isinstance(data, str):
+ data = data.encode()
+ if isinstance(data, bytes):
+ self.input_stream = BytesIO(data)
+ if self.content_length is None:
+ self.content_length = len(data)
+ else:
+ for key, value in _iter_data(data):
+ if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
+ self._add_file_from_data(key, value)
+ else:
+ self.form.setlistdefault(key).append(value)
+
+ if mimetype is not None:
+ self.mimetype = mimetype
+
+ @classmethod
+ def from_environ(cls, environ: WSGIEnvironment, **kwargs: t.Any) -> EnvironBuilder:
+ """Turn an environ dict back into a builder. Any extra kwargs
+ override the args extracted from the environ.
+
+ .. versionchanged:: 2.0
+ Path and query values are passed through the WSGI decoding
+ dance to avoid double encoding.
+
+ .. versionadded:: 0.15
+ """
+ headers = Headers(EnvironHeaders(environ))
+ out = {
+ "path": _wsgi_decoding_dance(environ["PATH_INFO"]),
+ "base_url": cls._make_base_url(
+ environ["wsgi.url_scheme"],
+ headers.pop("Host"),
+ _wsgi_decoding_dance(environ["SCRIPT_NAME"]),
+ ),
+ "query_string": _wsgi_decoding_dance(environ["QUERY_STRING"]),
+ "method": environ["REQUEST_METHOD"],
+ "input_stream": environ["wsgi.input"],
+ "content_type": headers.pop("Content-Type", None),
+ "content_length": headers.pop("Content-Length", None),
+ "errors_stream": environ["wsgi.errors"],
+ "multithread": environ["wsgi.multithread"],
+ "multiprocess": environ["wsgi.multiprocess"],
+ "run_once": environ["wsgi.run_once"],
+ "headers": headers,
+ }
+ out.update(kwargs)
+ return cls(**out)
+
+ def _add_file_from_data(
+ self,
+ key: str,
+ value: (t.IO[bytes] | tuple[t.IO[bytes], str] | tuple[t.IO[bytes], str, str]),
+ ) -> None:
+ """Called in the EnvironBuilder to add files from the data dict."""
+ if isinstance(value, tuple):
+ self.files.add_file(key, *value)
+ else:
+ self.files.add_file(key, value)
+
+ @staticmethod
+ def _make_base_url(scheme: str, host: str, script_root: str) -> str:
+ return urlunsplit((scheme, host, script_root, "", "")).rstrip("/") + "/"
+
+ @property
+ def base_url(self) -> str:
+ """The base URL is used to extract the URL scheme, host name,
+ port, and root path.
+ """
+ return self._make_base_url(self.url_scheme, self.host, self.script_root)
+
+ @base_url.setter
+ def base_url(self, value: str | None) -> None:
+ if value is None:
+ scheme = "http"
+ netloc = "localhost"
+ script_root = ""
+ else:
+ scheme, netloc, script_root, qs, anchor = urlsplit(value)
+ if qs or anchor:
+ raise ValueError("base url must not contain a query string or fragment")
+ self.script_root = script_root.rstrip("/")
+ self.host = netloc
+ self.url_scheme = scheme
+
+ @property
+ def content_type(self) -> str | None:
+ """The content type for the request. Reflected from and to
+ the :attr:`headers`. Do not set if you set :attr:`files` or
+ :attr:`form` for auto detection.
+ """
+ ct = self.headers.get("Content-Type")
+ if ct is None and not self._input_stream:
+ if self._files:
+ return "multipart/form-data"
+ if self._form:
+ return "application/x-www-form-urlencoded"
+ return None
+ return ct
+
+ @content_type.setter
+ def content_type(self, value: str | None) -> None:
+ if value is None:
+ self.headers.pop("Content-Type", None)
+ else:
+ self.headers["Content-Type"] = value
+
+ @property
+ def mimetype(self) -> str | None:
+ """The mimetype (content type without charset etc.)
+
+ .. versionadded:: 0.14
+ """
+ ct = self.content_type
+ return ct.split(";")[0].strip() if ct else None
+
+ @mimetype.setter
+ def mimetype(self, value: str) -> None:
+ self.content_type = get_content_type(value, "utf-8")
+
+ @property
+ def mimetype_params(self) -> t.Mapping[str, str]:
+ """The mimetype parameters as dict. For example if the
+ content type is ``text/html; charset=utf-8`` the params would be
+ ``{'charset': 'utf-8'}``.
+
+ .. versionadded:: 0.14
+ """
+
+ def on_update(d: CallbackDict[str, str]) -> None:
+ self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
+
+ d = parse_options_header(self.headers.get("content-type", ""))[1]
+ return CallbackDict(d, on_update)
+
+ @property
+ def content_length(self) -> int | None:
+ """The content length as integer. Reflected from and to the
+ :attr:`headers`. Do not set if you set :attr:`files` or
+ :attr:`form` for auto detection.
+ """
+ return self.headers.get("Content-Length", type=int)
+
+ @content_length.setter
+ def content_length(self, value: int | None) -> None:
+ if value is None:
+ self.headers.pop("Content-Length", None)
+ else:
+ self.headers["Content-Length"] = str(value)
+
+ def _get_form(self, name: str, storage: type[_TAnyMultiDict]) -> _TAnyMultiDict:
+ """Common behavior for getting the :attr:`form` and
+ :attr:`files` properties.
+
+ :param name: Name of the internal cached attribute.
+ :param storage: Storage class used for the data.
+ """
+ if self.input_stream is not None:
+ raise AttributeError("an input stream is defined")
+
+ rv = getattr(self, name)
+
+ if rv is None:
+ rv = storage()
+ setattr(self, name, rv)
+
+ return rv # type: ignore
+
+ def _set_form(self, name: str, value: MultiDict[str, t.Any]) -> None:
+ """Common behavior for setting the :attr:`form` and
+ :attr:`files` properties.
+
+ :param name: Name of the internal cached attribute.
+ :param value: Value to assign to the attribute.
+ """
+ self._input_stream = None
+ setattr(self, name, value)
+
+ @property
+ def form(self) -> MultiDict[str, str]:
+ """A :class:`MultiDict` of form values."""
+ return self._get_form("_form", MultiDict)
+
+ @form.setter
+ def form(self, value: MultiDict[str, str]) -> None:
+ self._set_form("_form", value)
+
+ @property
+ def files(self) -> FileMultiDict:
+ """A :class:`FileMultiDict` of uploaded files. Use
+ :meth:`~FileMultiDict.add_file` to add new files.
+ """
+ return self._get_form("_files", FileMultiDict)
+
+ @files.setter
+ def files(self, value: FileMultiDict) -> None:
+ self._set_form("_files", value)
+
+ @property
+ def input_stream(self) -> t.IO[bytes] | None:
+ """An optional input stream. This is mutually exclusive with
+ setting :attr:`form` and :attr:`files`, setting it will clear
+ those. Do not provide this if the method is not ``POST`` or
+ another method that has a body.
+ """
+ return self._input_stream
+
+ @input_stream.setter
+ def input_stream(self, value: t.IO[bytes] | None) -> None:
+ self._input_stream = value
+ self._form = None
+ self._files = None
+
+ @property
+ def query_string(self) -> str:
+ """The query string. If you set this to a string
+ :attr:`args` will no longer be available.
+ """
+ if self._query_string is None:
+ if self._args is not None:
+ return _urlencode(self._args)
+ return ""
+ return self._query_string
+
+ @query_string.setter
+ def query_string(self, value: str | None) -> None:
+ self._query_string = value
+ self._args = None
+
+ @property
+ def args(self) -> MultiDict[str, str]:
+ """The URL arguments as :class:`MultiDict`."""
+ if self._query_string is not None:
+ raise AttributeError("a query string is defined")
+ if self._args is None:
+ self._args = MultiDict()
+ return self._args
+
+ @args.setter
+ def args(self, value: MultiDict[str, str] | None) -> None:
+ self._query_string = None
+ self._args = value
+
+ @property
+ def server_name(self) -> str:
+ """The server name (read-only, use :attr:`host` to set)"""
+ return self.host.split(":", 1)[0]
+
+ @property
+ def server_port(self) -> int:
+ """The server port as integer (read-only, use :attr:`host` to set)"""
+ pieces = self.host.split(":", 1)
+
+ if len(pieces) == 2:
+ try:
+ return int(pieces[1])
+ except ValueError:
+ pass
+
+ if self.url_scheme == "https":
+ return 443
+ return 80
+
+ def __del__(self) -> None:
+ try:
+ self.close()
+ except Exception:
+ pass
+
+ def close(self) -> None:
+ """Closes all files. If you put real :class:`file` objects into the
+ :attr:`files` dict you can call this method to automatically close
+ them all in one go.
+ """
+ if self.closed:
+ return
+ try:
+ files = self.files.values()
+ except AttributeError:
+ files = () # type: ignore
+ for f in files:
+ try:
+ f.close()
+ except Exception:
+ pass
+ self.closed = True
+
+ def get_environ(self) -> WSGIEnvironment:
+ """Return the built environ.
+
+ .. versionchanged:: 0.15
+ The content type and length headers are set based on
+ input stream detection. Previously this only set the WSGI
+ keys.
+ """
+ input_stream = self.input_stream
+ content_length = self.content_length
+
+ mimetype = self.mimetype
+ content_type = self.content_type
+
+ if input_stream is not None:
+ start_pos = input_stream.tell()
+ input_stream.seek(0, 2)
+ end_pos = input_stream.tell()
+ input_stream.seek(start_pos)
+ content_length = end_pos - start_pos
+ elif mimetype == "multipart/form-data":
+ input_stream, content_length, boundary = stream_encode_multipart(
+ CombinedMultiDict([self.form, self.files])
+ )
+ content_type = f'{mimetype}; boundary="{boundary}"'
+ elif mimetype == "application/x-www-form-urlencoded":
+ form_encoded = _urlencode(self.form).encode("ascii")
+ content_length = len(form_encoded)
+ input_stream = BytesIO(form_encoded)
+ else:
+ input_stream = BytesIO()
+
+ result: WSGIEnvironment = {}
+ if self.environ_base:
+ result.update(self.environ_base)
+
+ def _path_encode(x: str) -> str:
+ return _wsgi_encoding_dance(unquote(x))
+
+ raw_uri = _wsgi_encoding_dance(self.request_uri)
+ result.update(
+ {
+ "REQUEST_METHOD": self.method,
+ "SCRIPT_NAME": _path_encode(self.script_root),
+ "PATH_INFO": _path_encode(self.path),
+ "QUERY_STRING": _wsgi_encoding_dance(self.query_string),
+ # Non-standard, added by mod_wsgi, uWSGI
+ "REQUEST_URI": raw_uri,
+ # Non-standard, added by gunicorn
+ "RAW_URI": raw_uri,
+ "SERVER_NAME": self.server_name,
+ "SERVER_PORT": str(self.server_port),
+ "HTTP_HOST": self.host,
+ "SERVER_PROTOCOL": self.server_protocol,
+ "wsgi.version": self.wsgi_version,
+ "wsgi.url_scheme": self.url_scheme,
+ "wsgi.input": input_stream,
+ "wsgi.errors": self.errors_stream,
+ "wsgi.multithread": self.multithread,
+ "wsgi.multiprocess": self.multiprocess,
+ "wsgi.run_once": self.run_once,
+ }
+ )
+
+ headers = self.headers.copy()
+ # Don't send these as headers, they're part of the environ.
+ headers.remove("Content-Type")
+ headers.remove("Content-Length")
+
+ if content_type is not None:
+ result["CONTENT_TYPE"] = content_type
+
+ if content_length is not None:
+ result["CONTENT_LENGTH"] = str(content_length)
+
+ combined_headers = defaultdict(list)
+
+ for key, value in headers.to_wsgi_list():
+ combined_headers[f"HTTP_{key.upper().replace('-', '_')}"].append(value)
+
+ for key, values in combined_headers.items():
+ result[key] = ", ".join(values)
+
+ if self.environ_overrides:
+ result.update(self.environ_overrides)
+
+ return result
+
+ def get_request(self, cls: type[Request] | None = None) -> Request:
+ """Returns a request with the data. If the request class is not
+ specified :attr:`request_class` is used.
+
+ :param cls: The request wrapper to use.
+ """
+ if cls is None:
+ cls = self.request_class
+
+ return cls(self.get_environ())
+
+
+class ClientRedirectError(Exception):
+ """If a redirect loop is detected when using follow_redirects=True with
+ the :cls:`Client`, then this exception is raised.
+ """
+
+
+class Client:
+ """Simulate sending requests to a WSGI application without running a WSGI or HTTP
+ server.
+
+ :param application: The WSGI application to make requests to.
+ :param response_wrapper: A :class:`.Response` class to wrap response data with.
+ Defaults to :class:`.TestResponse`. If it's not a subclass of ``TestResponse``,
+ one will be created.
+ :param use_cookies: Persist cookies from ``Set-Cookie`` response headers to the
+ ``Cookie`` header in subsequent requests. Domain and path matching is supported,
+ but other cookie parameters are ignored.
+ :param allow_subdomain_redirects: Allow requests to follow redirects to subdomains.
+ Enable this if the application handles subdomains and redirects between them.
+
+ .. versionchanged:: 2.3
+ Simplify cookie implementation, support domain and path matching.
+
+ .. versionchanged:: 2.1
+ All data is available as properties on the returned response object. The
+ response cannot be returned as a tuple.
+
+ .. versionchanged:: 2.0
+ ``response_wrapper`` is always a subclass of :class:``TestResponse``.
+
+ .. versionchanged:: 0.5
+ Added the ``use_cookies`` parameter.
+ """
+
+ def __init__(
+ self,
+ application: WSGIApplication,
+ response_wrapper: type[Response] | None = None,
+ use_cookies: bool = True,
+ allow_subdomain_redirects: bool = False,
+ ) -> None:
+ self.application = application
+
+ if response_wrapper in {None, Response}:
+ response_wrapper = TestResponse
+ elif response_wrapper is not None and not issubclass(
+ response_wrapper, TestResponse
+ ):
+ response_wrapper = type(
+ "WrapperTestResponse",
+ (TestResponse, response_wrapper),
+ {},
+ )
+
+ self.response_wrapper = t.cast(t.Type["TestResponse"], response_wrapper)
+
+ if use_cookies:
+ self._cookies: dict[tuple[str, str, str], Cookie] | None = {}
+ else:
+ self._cookies = None
+
+ self.allow_subdomain_redirects = allow_subdomain_redirects
+
+ def get_cookie(
+ self, key: str, domain: str = "localhost", path: str = "/"
+ ) -> Cookie | None:
+ """Return a :class:`.Cookie` if it exists. Cookies are uniquely identified by
+ ``(domain, path, key)``.
+
+ :param key: The decoded form of the key for the cookie.
+ :param domain: The domain the cookie was set for.
+ :param path: The path the cookie was set for.
+
+ .. versionadded:: 2.3
+ """
+ if self._cookies is None:
+ raise TypeError(
+ "Cookies are disabled. Create a client with 'use_cookies=True'."
+ )
+
+ return self._cookies.get((domain, path, key))
+
+ def set_cookie(
+ self,
+ key: str,
+ value: str = "",
+ *,
+ domain: str = "localhost",
+ origin_only: bool = True,
+ path: str = "/",
+ **kwargs: t.Any,
+ ) -> None:
+ """Set a cookie to be sent in subsequent requests.
+
+ This is a convenience to skip making a test request to a route that would set
+ the cookie. To test the cookie, make a test request to a route that uses the
+ cookie value.
+
+ The client uses ``domain``, ``origin_only``, and ``path`` to determine which
+ cookies to send with a request. It does not use other cookie parameters that
+ browsers use, since they're not applicable in tests.
+
+ :param key: The key part of the cookie.
+ :param value: The value part of the cookie.
+ :param domain: Send this cookie with requests that match this domain. If
+ ``origin_only`` is true, it must be an exact match, otherwise it may be a
+ suffix match.
+ :param origin_only: Whether the domain must be an exact match to the request.
+ :param path: Send this cookie with requests that match this path either exactly
+ or as a prefix.
+ :param kwargs: Passed to :func:`.dump_cookie`.
+
+ .. versionchanged:: 3.0
+ The parameter ``server_name`` is removed. The first parameter is
+ ``key``. Use the ``domain`` and ``origin_only`` parameters instead.
+
+ .. versionchanged:: 2.3
+ The ``origin_only`` parameter was added.
+
+ .. versionchanged:: 2.3
+ The ``domain`` parameter defaults to ``localhost``.
+ """
+ if self._cookies is None:
+ raise TypeError(
+ "Cookies are disabled. Create a client with 'use_cookies=True'."
+ )
+
+ cookie = Cookie._from_response_header(
+ domain, "/", dump_cookie(key, value, domain=domain, path=path, **kwargs)
+ )
+ cookie.origin_only = origin_only
+
+ if cookie._should_delete:
+ self._cookies.pop(cookie._storage_key, None)
+ else:
+ self._cookies[cookie._storage_key] = cookie
+
+ def delete_cookie(
+ self,
+ key: str,
+ *,
+ domain: str = "localhost",
+ path: str = "/",
+ ) -> None:
+ """Delete a cookie if it exists. Cookies are uniquely identified by
+ ``(domain, path, key)``.
+
+ :param key: The decoded form of the key for the cookie.
+ :param domain: The domain the cookie was set for.
+ :param path: The path the cookie was set for.
+
+ .. versionchanged:: 3.0
+ The ``server_name`` parameter is removed. The first parameter is
+ ``key``. Use the ``domain`` parameter instead.
+
+ .. versionchanged:: 3.0
+ The ``secure``, ``httponly`` and ``samesite`` parameters are removed.
+
+ .. versionchanged:: 2.3
+ The ``domain`` parameter defaults to ``localhost``.
+ """
+ if self._cookies is None:
+ raise TypeError(
+ "Cookies are disabled. Create a client with 'use_cookies=True'."
+ )
+
+ self._cookies.pop((domain, path, key), None)
+
+ def _add_cookies_to_wsgi(self, environ: WSGIEnvironment) -> None:
+ """If cookies are enabled, set the ``Cookie`` header in the environ to the
+ cookies that are applicable to the request host and path.
+
+ :meta private:
+
+ .. versionadded:: 2.3
+ """
+ if self._cookies is None:
+ return
+
+ url = urlsplit(get_current_url(environ))
+ server_name = url.hostname or "localhost"
+ value = "; ".join(
+ c._to_request_header()
+ for c in self._cookies.values()
+ if c._matches_request(server_name, url.path)
+ )
+
+ if value:
+ environ["HTTP_COOKIE"] = value
+ else:
+ environ.pop("HTTP_COOKIE", None)
+
+ def _update_cookies_from_response(
+ self, server_name: str, path: str, headers: list[str]
+ ) -> None:
+ """If cookies are enabled, update the stored cookies from any ``Set-Cookie``
+ headers in the response.
+
+ :meta private:
+
+ .. versionadded:: 2.3
+ """
+ if self._cookies is None:
+ return
+
+ for header in headers:
+ cookie = Cookie._from_response_header(server_name, path, header)
+
+ if cookie._should_delete:
+ self._cookies.pop(cookie._storage_key, None)
+ else:
+ self._cookies[cookie._storage_key] = cookie
+
+ def run_wsgi_app(
+ self, environ: WSGIEnvironment, buffered: bool = False
+ ) -> tuple[t.Iterable[bytes], str, Headers]:
+ """Runs the wrapped WSGI app with the given environment.
+
+ :meta private:
+ """
+ self._add_cookies_to_wsgi(environ)
+ rv = run_wsgi_app(self.application, environ, buffered=buffered)
+ url = urlsplit(get_current_url(environ))
+ self._update_cookies_from_response(
+ url.hostname or "localhost", url.path, rv[2].getlist("Set-Cookie")
+ )
+ return rv
+
+ def resolve_redirect(
+ self, response: TestResponse, buffered: bool = False
+ ) -> TestResponse:
+ """Perform a new request to the location given by the redirect
+ response to the previous request.
+
+ :meta private:
+ """
+ scheme, netloc, path, qs, anchor = urlsplit(response.location)
+ builder = EnvironBuilder.from_environ(
+ response.request.environ, path=path, query_string=qs
+ )
+
+ to_name_parts = netloc.split(":", 1)[0].split(".")
+ from_name_parts = builder.server_name.split(".")
+
+ if to_name_parts != [""]:
+ # The new location has a host, use it for the base URL.
+ builder.url_scheme = scheme
+ builder.host = netloc
+ else:
+ # A local redirect with autocorrect_location_header=False
+ # doesn't have a host, so use the request's host.
+ to_name_parts = from_name_parts
+
+ # Explain why a redirect to a different server name won't be followed.
+ if to_name_parts != from_name_parts:
+ if to_name_parts[-len(from_name_parts) :] == from_name_parts:
+ if not self.allow_subdomain_redirects:
+ raise RuntimeError("Following subdomain redirects is not enabled.")
+ else:
+ raise RuntimeError("Following external redirects is not supported.")
+
+ path_parts = path.split("/")
+ root_parts = builder.script_root.split("/")
+
+ if path_parts[: len(root_parts)] == root_parts:
+ # Strip the script root from the path.
+ builder.path = path[len(builder.script_root) :]
+ else:
+ # The new location is not under the script root, so use the
+ # whole path and clear the previous root.
+ builder.path = path
+ builder.script_root = ""
+
+ # Only 307 and 308 preserve all of the original request.
+ if response.status_code not in {307, 308}:
+ # HEAD is preserved, everything else becomes GET.
+ if builder.method != "HEAD":
+ builder.method = "GET"
+
+ # Clear the body and the headers that describe it.
+
+ if builder.input_stream is not None:
+ builder.input_stream.close()
+ builder.input_stream = None
+
+ builder.content_type = None
+ builder.content_length = None
+ builder.headers.pop("Transfer-Encoding", None)
+
+ return self.open(builder, buffered=buffered)
+
+ def open(
+ self,
+ *args: t.Any,
+ buffered: bool = False,
+ follow_redirects: bool = False,
+ **kwargs: t.Any,
+ ) -> TestResponse:
+ """Generate an environ dict from the given arguments, make a
+ request to the application using it, and return the response.
+
+ :param args: Passed to :class:`EnvironBuilder` to create the
+ environ for the request. If a single arg is passed, it can
+ be an existing :class:`EnvironBuilder` or an environ dict.
+ :param buffered: Convert the iterator returned by the app into
+ a list. If the iterator has a ``close()`` method, it is
+ called automatically.
+ :param follow_redirects: Make additional requests to follow HTTP
+ redirects until a non-redirect status is returned.
+ :attr:`TestResponse.history` lists the intermediate
+ responses.
+
+ .. versionchanged:: 2.1
+ Removed the ``as_tuple`` parameter.
+
+ .. versionchanged:: 2.0
+ The request input stream is closed when calling
+ ``response.close()``. Input streams for redirects are
+ automatically closed.
+
+ .. versionchanged:: 0.5
+ If a dict is provided as file in the dict for the ``data``
+ parameter the content type has to be called ``content_type``
+ instead of ``mimetype``. This change was made for
+ consistency with :class:`werkzeug.FileWrapper`.
+
+ .. versionchanged:: 0.5
+ Added the ``follow_redirects`` parameter.
+ """
+ request: Request | None = None
+
+ if not kwargs and len(args) == 1:
+ arg = args[0]
+
+ if isinstance(arg, EnvironBuilder):
+ request = arg.get_request()
+ elif isinstance(arg, dict):
+ request = EnvironBuilder.from_environ(arg).get_request()
+ elif isinstance(arg, Request):
+ request = arg
+
+ if request is None:
+ builder = EnvironBuilder(*args, **kwargs)
+
+ try:
+ request = builder.get_request()
+ finally:
+ builder.close()
+
+ response_parts = self.run_wsgi_app(request.environ, buffered=buffered)
+ response = self.response_wrapper(*response_parts, request=request)
+
+ redirects = set()
+ history: list[TestResponse] = []
+
+ if not follow_redirects:
+ return response
+
+ while response.status_code in {
+ 301,
+ 302,
+ 303,
+ 305,
+ 307,
+ 308,
+ }:
+ # Exhaust intermediate response bodies to ensure middleware
+ # that returns an iterator runs any cleanup code.
+ if not buffered:
+ response.make_sequence()
+ response.close()
+
+ new_redirect_entry = (response.location, response.status_code)
+
+ if new_redirect_entry in redirects:
+ raise ClientRedirectError(
+ f"Loop detected: A {response.status_code} redirect"
+ f" to {response.location} was already made."
+ )
+
+ redirects.add(new_redirect_entry)
+ response.history = tuple(history)
+ history.append(response)
+ response = self.resolve_redirect(response, buffered=buffered)
+ else:
+ # This is the final request after redirects.
+ response.history = tuple(history)
+ # Close the input stream when closing the response, in case
+ # the input is an open temporary file.
+ response.call_on_close(request.input_stream.close)
+ return response
+
+ def get(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``GET``."""
+ kw["method"] = "GET"
+ return self.open(*args, **kw)
+
+ def post(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``POST``."""
+ kw["method"] = "POST"
+ return self.open(*args, **kw)
+
+ def put(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``PUT``."""
+ kw["method"] = "PUT"
+ return self.open(*args, **kw)
+
+ def delete(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``DELETE``."""
+ kw["method"] = "DELETE"
+ return self.open(*args, **kw)
+
+ def patch(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``PATCH``."""
+ kw["method"] = "PATCH"
+ return self.open(*args, **kw)
+
+ def options(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``OPTIONS``."""
+ kw["method"] = "OPTIONS"
+ return self.open(*args, **kw)
+
+ def head(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``HEAD``."""
+ kw["method"] = "HEAD"
+ return self.open(*args, **kw)
+
+ def trace(self, *args: t.Any, **kw: t.Any) -> TestResponse:
+ """Call :meth:`open` with ``method`` set to ``TRACE``."""
+ kw["method"] = "TRACE"
+ return self.open(*args, **kw)
+
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} {self.application!r}>"
+
+
+def create_environ(*args: t.Any, **kwargs: t.Any) -> WSGIEnvironment:
+ """Create a new WSGI environ dict based on the values passed. The first
+ parameter should be the path of the request which defaults to '/'. The
+ second one can either be an absolute path (in that case the host is
+ localhost:80) or a full path to the request with scheme, netloc port and
+ the path to the script.
+
+ This accepts the same arguments as the :class:`EnvironBuilder`
+ constructor.
+
+ .. versionchanged:: 0.5
+ This function is now a thin wrapper over :class:`EnvironBuilder` which
+ was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
+ and `charset` parameters were added.
+ """
+ builder = EnvironBuilder(*args, **kwargs)
+
+ try:
+ return builder.get_environ()
+ finally:
+ builder.close()
+
+
+def run_wsgi_app(
+ app: WSGIApplication, environ: WSGIEnvironment, buffered: bool = False
+) -> tuple[t.Iterable[bytes], str, Headers]:
+ """Return a tuple in the form (app_iter, status, headers) of the
+ application output. This works best if you pass it an application that
+ returns an iterator all the time.
+
+ Sometimes applications may use the `write()` callable returned
+ by the `start_response` function. This tries to resolve such edge
+ cases automatically. But if you don't get the expected output you
+ should set `buffered` to `True` which enforces buffering.
+
+ If passed an invalid WSGI application the behavior of this function is
+ undefined. Never pass non-conforming WSGI applications to this function.
+
+ :param app: the application to execute.
+ :param buffered: set to `True` to enforce buffering.
+ :return: tuple in the form ``(app_iter, status, headers)``
+ """
+ # Copy environ to ensure any mutations by the app (ProxyFix, for
+ # example) don't affect subsequent requests (such as redirects).
+ environ = _get_environ(environ).copy()
+ status: str
+ response: tuple[str, list[tuple[str, str]]] | None = None
+ buffer: list[bytes] = []
+
+ def start_response(status, headers, exc_info=None): # type: ignore
+ nonlocal response
+
+ if exc_info:
+ try:
+ raise exc_info[1].with_traceback(exc_info[2])
+ finally:
+ exc_info = None
+
+ response = (status, headers)
+ return buffer.append
+
+ app_rv = app(environ, start_response)
+ close_func = getattr(app_rv, "close", None)
+ app_iter: t.Iterable[bytes] = iter(app_rv)
+
+ # when buffering we emit the close call early and convert the
+ # application iterator into a regular list
+ if buffered:
+ try:
+ app_iter = list(app_iter)
+ finally:
+ if close_func is not None:
+ close_func()
+
+ # otherwise we iterate the application iter until we have a response, chain
+ # the already received data with the already collected data and wrap it in
+ # a new `ClosingIterator` if we need to restore a `close` callable from the
+ # original return value.
+ else:
+ for item in app_iter:
+ buffer.append(item)
+
+ if response is not None:
+ break
+
+ if buffer:
+ app_iter = chain(buffer, app_iter)
+
+ if close_func is not None and app_iter is not app_rv:
+ app_iter = ClosingIterator(app_iter, close_func)
+
+ status, headers = response # type: ignore
+ return app_iter, status, Headers(headers)
+
+
+class TestResponse(Response):
+ """:class:`~werkzeug.wrappers.Response` subclass that provides extra
+ information about requests made with the test :class:`Client`.
+
+ Test client requests will always return an instance of this class.
+ If a custom response class is passed to the client, it is
+ subclassed along with this to support test information.
+
+ If the test request included large files, or if the application is
+ serving a file, call :meth:`close` to close any open files and
+ prevent Python showing a ``ResourceWarning``.
+
+ .. versionchanged:: 2.2
+ Set the ``default_mimetype`` to None to prevent a mimetype being
+ assumed if missing.
+
+ .. versionchanged:: 2.1
+ Response instances cannot be treated as tuples.
+
+ .. versionadded:: 2.0
+ Test client methods always return instances of this class.
+ """
+
+ default_mimetype = None
+ # Don't assume a mimetype, instead use whatever the response provides
+
+ request: Request
+ """A request object with the environ used to make the request that
+ resulted in this response.
+ """
+
+ history: tuple[TestResponse, ...]
+ """A list of intermediate responses. Populated when the test request
+ is made with ``follow_redirects`` enabled.
+ """
+
+ # Tell Pytest to ignore this, it's not a test class.
+ __test__ = False
+
+ def __init__(
+ self,
+ response: t.Iterable[bytes],
+ status: str,
+ headers: Headers,
+ request: Request,
+ history: tuple[TestResponse] = (), # type: ignore
+ **kwargs: t.Any,
+ ) -> None:
+ super().__init__(response, status, headers, **kwargs)
+ self.request = request
+ self.history = history
+ self._compat_tuple = response, status, headers
+
+ @cached_property
+ def text(self) -> str:
+ """The response data as text. A shortcut for
+ ``response.get_data(as_text=True)``.
+
+ .. versionadded:: 2.1
+ """
+ return self.get_data(as_text=True)
+
+
+@dataclasses.dataclass
+class Cookie:
+ """A cookie key, value, and parameters.
+
+ The class itself is not a public API. Its attributes are documented for inspection
+ with :meth:`.Client.get_cookie` only.
+
+ .. versionadded:: 2.3
+ """
+
+ key: str
+ """The cookie key, encoded as a client would see it."""
+
+ value: str
+ """The cookie key, encoded as a client would see it."""
+
+ decoded_key: str
+ """The cookie key, decoded as the application would set and see it."""
+
+ decoded_value: str
+ """The cookie value, decoded as the application would set and see it."""
+
+ expires: datetime | None
+ """The time at which the cookie is no longer valid."""
+
+ max_age: int | None
+ """The number of seconds from when the cookie was set at which it is
+ no longer valid.
+ """
+
+ domain: str
+ """The domain that the cookie was set for, or the request domain if not set."""
+
+ origin_only: bool
+ """Whether the cookie will be sent for exact domain matches only. This is ``True``
+ if the ``Domain`` parameter was not present.
+ """
+
+ path: str
+ """The path that the cookie was set for."""
+
+ secure: bool | None
+ """The ``Secure`` parameter."""
+
+ http_only: bool | None
+ """The ``HttpOnly`` parameter."""
+
+ same_site: str | None
+ """The ``SameSite`` parameter."""
+
+ def _matches_request(self, server_name: str, path: str) -> bool:
+ return (
+ server_name == self.domain
+ or (
+ not self.origin_only
+ and server_name.endswith(self.domain)
+ and server_name[: -len(self.domain)].endswith(".")
+ )
+ ) and (
+ path == self.path
+ or (
+ path.startswith(self.path)
+ and path[len(self.path) - self.path.endswith("/") :].startswith("/")
+ )
+ )
+
+ def _to_request_header(self) -> str:
+ return f"{self.key}={self.value}"
+
+ @classmethod
+ def _from_response_header(cls, server_name: str, path: str, header: str) -> te.Self:
+ header, _, parameters_str = header.partition(";")
+ key, _, value = header.partition("=")
+ decoded_key, decoded_value = next(parse_cookie(header).items())
+ params = {}
+
+ for item in parameters_str.split(";"):
+ k, sep, v = item.partition("=")
+ params[k.strip().lower()] = v.strip() if sep else None
+
+ return cls(
+ key=key.strip(),
+ value=value.strip(),
+ decoded_key=decoded_key,
+ decoded_value=decoded_value,
+ expires=parse_date(params.get("expires")),
+ max_age=int(params["max-age"] or 0) if "max-age" in params else None,
+ domain=params.get("domain") or server_name,
+ origin_only="domain" not in params,
+ path=params.get("path") or path.rpartition("/")[0] or "/",
+ secure="secure" in params,
+ http_only="httponly" in params,
+ same_site=params.get("samesite"),
+ )
+
+ @property
+ def _storage_key(self) -> tuple[str, str, str]:
+ return self.domain, self.path, self.decoded_key
+
+ @property
+ def _should_delete(self) -> bool:
+ return self.max_age == 0 or (
+ self.expires is not None and self.expires.timestamp() == 0
+ )