aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/werkzeug/wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/wsgi.py')
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/wsgi.py595
1 files changed, 595 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/wsgi.py b/venv/lib/python3.8/site-packages/werkzeug/wsgi.py
new file mode 100644
index 0000000..01d40af
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/wsgi.py
@@ -0,0 +1,595 @@
+from __future__ import annotations
+
+import io
+import typing as t
+from functools import partial
+from functools import update_wrapper
+
+from .exceptions import ClientDisconnected
+from .exceptions import RequestEntityTooLarge
+from .sansio import utils as _sansio_utils
+from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+def responder(f: t.Callable[..., WSGIApplication]) -> WSGIApplication:
+ """Marks a function as responder. Decorate a function with it and it
+ will automatically call the return value as WSGI application.
+
+ Example::
+
+ @responder
+ def application(environ, start_response):
+ return Response('Hello World!')
+ """
+ return update_wrapper(lambda *a: f(*a)(*a[-2:]), f)
+
+
+def get_current_url(
+ environ: WSGIEnvironment,
+ root_only: bool = False,
+ strip_querystring: bool = False,
+ host_only: bool = False,
+ trusted_hosts: t.Iterable[str] | None = None,
+) -> str:
+ """Recreate the URL for a request from the parts in a WSGI
+ environment.
+
+ The URL is an IRI, not a URI, so it may contain Unicode characters.
+ Use :func:`~werkzeug.urls.iri_to_uri` to convert it to ASCII.
+
+ :param environ: The WSGI environment to get the URL parts from.
+ :param root_only: Only build the root path, don't include the
+ remaining path or query string.
+ :param strip_querystring: Don't include the query string.
+ :param host_only: Only build the scheme and host.
+ :param trusted_hosts: A list of trusted host names to validate the
+ host against.
+ """
+ parts = {
+ "scheme": environ["wsgi.url_scheme"],
+ "host": get_host(environ, trusted_hosts),
+ }
+
+ if not host_only:
+ parts["root_path"] = environ.get("SCRIPT_NAME", "")
+
+ if not root_only:
+ parts["path"] = environ.get("PATH_INFO", "")
+
+ if not strip_querystring:
+ parts["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
+
+ return _sansio_utils.get_current_url(**parts)
+
+
+def _get_server(
+ environ: WSGIEnvironment,
+) -> tuple[str, int | None] | None:
+ name = environ.get("SERVER_NAME")
+
+ if name is None:
+ return None
+
+ try:
+ port: int | None = int(environ.get("SERVER_PORT", None))
+ except (TypeError, ValueError):
+ # unix socket
+ port = None
+
+ return name, port
+
+
+def get_host(
+ environ: WSGIEnvironment, trusted_hosts: t.Iterable[str] | None = None
+) -> str:
+ """Return the host for the given WSGI environment.
+
+ The ``Host`` header is preferred, then ``SERVER_NAME`` if it's not
+ set. The returned host will only contain the port if it is different
+ than the standard port for the protocol.
+
+ Optionally, verify that the host is trusted using
+ :func:`host_is_trusted` and raise a
+ :exc:`~werkzeug.exceptions.SecurityError` if it is not.
+
+ :param environ: A WSGI environment dict.
+ :param trusted_hosts: A list of trusted host names.
+
+ :return: Host, with port if necessary.
+ :raise ~werkzeug.exceptions.SecurityError: If the host is not
+ trusted.
+ """
+ return _sansio_utils.get_host(
+ environ["wsgi.url_scheme"],
+ environ.get("HTTP_HOST"),
+ _get_server(environ),
+ trusted_hosts,
+ )
+
+
+def get_content_length(environ: WSGIEnvironment) -> int | None:
+ """Return the ``Content-Length`` header value as an int. If the header is not given
+ or the ``Transfer-Encoding`` header is ``chunked``, ``None`` is returned to indicate
+ a streaming request. If the value is not an integer, or negative, 0 is returned.
+
+ :param environ: The WSGI environ to get the content length from.
+
+ .. versionadded:: 0.9
+ """
+ return _sansio_utils.get_content_length(
+ http_content_length=environ.get("CONTENT_LENGTH"),
+ http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING"),
+ )
+
+
+def get_input_stream(
+ environ: WSGIEnvironment,
+ safe_fallback: bool = True,
+ max_content_length: int | None = None,
+) -> t.IO[bytes]:
+ """Return the WSGI input stream, wrapped so that it may be read safely without going
+ past the ``Content-Length`` header value or ``max_content_length``.
+
+ If ``Content-Length`` exceeds ``max_content_length``, a
+ :exc:`RequestEntityTooLarge`` ``413 Content Too Large`` error is raised.
+
+ If the WSGI server sets ``environ["wsgi.input_terminated"]``, it indicates that the
+ server handles terminating the stream, so it is safe to read directly. For example,
+ a server that knows how to handle chunked requests safely would set this.
+
+ If ``max_content_length`` is set, it can be enforced on streams if
+ ``wsgi.input_terminated`` is set. Otherwise, an empty stream is returned unless the
+ user explicitly disables this safe fallback.
+
+ If the limit is reached before the underlying stream is exhausted (such as a file
+ that is too large, or an infinite stream), the remaining contents of the stream
+ cannot be read safely. Depending on how the server handles this, clients may show a
+ "connection reset" failure instead of seeing the 413 response.
+
+ :param environ: The WSGI environ containing the stream.
+ :param safe_fallback: Return an empty stream when ``Content-Length`` is not set.
+ Disabling this allows infinite streams, which can be a denial-of-service risk.
+ :param max_content_length: The maximum length that content-length or streaming
+ requests may not exceed.
+
+ .. versionchanged:: 2.3.2
+ ``max_content_length`` is only applied to streaming requests if the server sets
+ ``wsgi.input_terminated``.
+
+ .. versionchanged:: 2.3
+ Check ``max_content_length`` and raise an error if it is exceeded.
+
+ .. versionadded:: 0.9
+ """
+ stream = t.cast(t.IO[bytes], environ["wsgi.input"])
+ content_length = get_content_length(environ)
+
+ if content_length is not None and max_content_length is not None:
+ if content_length > max_content_length:
+ raise RequestEntityTooLarge()
+
+ # A WSGI server can set this to indicate that it terminates the input stream. In
+ # that case the stream is safe without wrapping, or can enforce a max length.
+ if "wsgi.input_terminated" in environ:
+ if max_content_length is not None:
+ # If this is moved above, it can cause the stream to hang if a read attempt
+ # is made when the client sends no data. For example, the development server
+ # does not handle buffering except for chunked encoding.
+ return t.cast(
+ t.IO[bytes], LimitedStream(stream, max_content_length, is_max=True)
+ )
+
+ return stream
+
+ # No limit given, return an empty stream unless the user explicitly allows the
+ # potentially infinite stream. An infinite stream is dangerous if it's not expected,
+ # as it can tie up a worker indefinitely.
+ if content_length is None:
+ return io.BytesIO() if safe_fallback else stream
+
+ return t.cast(t.IO[bytes], LimitedStream(stream, content_length))
+
+
+def get_path_info(environ: WSGIEnvironment) -> str:
+ """Return ``PATH_INFO`` from the WSGI environment.
+
+ :param environ: WSGI environment to get the path from.
+
+ .. versionchanged:: 3.0
+ The ``charset`` and ``errors`` parameters were removed.
+
+ .. versionadded:: 0.9
+ """
+ path: bytes = environ.get("PATH_INFO", "").encode("latin1")
+ return path.decode(errors="replace")
+
+
+class ClosingIterator:
+ """The WSGI specification requires that all middlewares and gateways
+ respect the `close` callback of the iterable returned by the application.
+ Because it is useful to add another close action to a returned iterable
+ and adding a custom iterable is a boring task this class can be used for
+ that::
+
+ return ClosingIterator(app(environ, start_response), [cleanup_session,
+ cleanup_locals])
+
+ If there is just one close function it can be passed instead of the list.
+
+ A closing iterator is not needed if the application uses response objects
+ and finishes the processing if the response is started::
+
+ try:
+ return response(environ, start_response)
+ finally:
+ cleanup_session()
+ cleanup_locals()
+ """
+
+ def __init__(
+ self,
+ iterable: t.Iterable[bytes],
+ callbacks: None
+ | (t.Callable[[], None] | t.Iterable[t.Callable[[], None]]) = None,
+ ) -> None:
+ iterator = iter(iterable)
+ self._next = t.cast(t.Callable[[], bytes], partial(next, iterator))
+ if callbacks is None:
+ callbacks = []
+ elif callable(callbacks):
+ callbacks = [callbacks]
+ else:
+ callbacks = list(callbacks)
+ iterable_close = getattr(iterable, "close", None)
+ if iterable_close:
+ callbacks.insert(0, iterable_close)
+ self._callbacks = callbacks
+
+ def __iter__(self) -> ClosingIterator:
+ return self
+
+ def __next__(self) -> bytes:
+ return self._next()
+
+ def close(self) -> None:
+ for callback in self._callbacks:
+ callback()
+
+
+def wrap_file(
+ environ: WSGIEnvironment, file: t.IO[bytes], buffer_size: int = 8192
+) -> t.Iterable[bytes]:
+ """Wraps a file. This uses the WSGI server's file wrapper if available
+ or otherwise the generic :class:`FileWrapper`.
+
+ .. versionadded:: 0.5
+
+ If the file wrapper from the WSGI server is used it's important to not
+ iterate over it from inside the application but to pass it through
+ unchanged. If you want to pass out a file wrapper inside a response
+ object you have to set :attr:`Response.direct_passthrough` to `True`.
+
+ More information about file wrappers are available in :pep:`333`.
+
+ :param file: a :class:`file`-like object with a :meth:`~file.read` method.
+ :param buffer_size: number of bytes for one iteration.
+ """
+ return environ.get("wsgi.file_wrapper", FileWrapper)( # type: ignore
+ file, buffer_size
+ )
+
+
+class FileWrapper:
+ """This class can be used to convert a :class:`file`-like object into
+ an iterable. It yields `buffer_size` blocks until the file is fully
+ read.
+
+ You should not use this class directly but rather use the
+ :func:`wrap_file` function that uses the WSGI server's file wrapper
+ support if it's available.
+
+ .. versionadded:: 0.5
+
+ If you're using this object together with a :class:`Response` you have
+ to use the `direct_passthrough` mode.
+
+ :param file: a :class:`file`-like object with a :meth:`~file.read` method.
+ :param buffer_size: number of bytes for one iteration.
+ """
+
+ def __init__(self, file: t.IO[bytes], buffer_size: int = 8192) -> None:
+ self.file = file
+ self.buffer_size = buffer_size
+
+ def close(self) -> None:
+ if hasattr(self.file, "close"):
+ self.file.close()
+
+ def seekable(self) -> bool:
+ if hasattr(self.file, "seekable"):
+ return self.file.seekable()
+ if hasattr(self.file, "seek"):
+ return True
+ return False
+
+ def seek(self, *args: t.Any) -> None:
+ if hasattr(self.file, "seek"):
+ self.file.seek(*args)
+
+ def tell(self) -> int | None:
+ if hasattr(self.file, "tell"):
+ return self.file.tell()
+ return None
+
+ def __iter__(self) -> FileWrapper:
+ return self
+
+ def __next__(self) -> bytes:
+ data = self.file.read(self.buffer_size)
+ if data:
+ return data
+ raise StopIteration()
+
+
+class _RangeWrapper:
+ # private for now, but should we make it public in the future ?
+
+ """This class can be used to convert an iterable object into
+ an iterable that will only yield a piece of the underlying content.
+ It yields blocks until the underlying stream range is fully read.
+ The yielded blocks will have a size that can't exceed the original
+ iterator defined block size, but that can be smaller.
+
+ If you're using this object together with a :class:`Response` you have
+ to use the `direct_passthrough` mode.
+
+ :param iterable: an iterable object with a :meth:`__next__` method.
+ :param start_byte: byte from which read will start.
+ :param byte_range: how many bytes to read.
+ """
+
+ def __init__(
+ self,
+ iterable: t.Iterable[bytes] | t.IO[bytes],
+ start_byte: int = 0,
+ byte_range: int | None = None,
+ ):
+ self.iterable = iter(iterable)
+ self.byte_range = byte_range
+ self.start_byte = start_byte
+ self.end_byte = None
+
+ if byte_range is not None:
+ self.end_byte = start_byte + byte_range
+
+ self.read_length = 0
+ self.seekable = hasattr(iterable, "seekable") and iterable.seekable()
+ self.end_reached = False
+
+ def __iter__(self) -> _RangeWrapper:
+ return self
+
+ def _next_chunk(self) -> bytes:
+ try:
+ chunk = next(self.iterable)
+ self.read_length += len(chunk)
+ return chunk
+ except StopIteration:
+ self.end_reached = True
+ raise
+
+ def _first_iteration(self) -> tuple[bytes | None, int]:
+ chunk = None
+ if self.seekable:
+ self.iterable.seek(self.start_byte) # type: ignore
+ self.read_length = self.iterable.tell() # type: ignore
+ contextual_read_length = self.read_length
+ else:
+ while self.read_length <= self.start_byte:
+ chunk = self._next_chunk()
+ if chunk is not None:
+ chunk = chunk[self.start_byte - self.read_length :]
+ contextual_read_length = self.start_byte
+ return chunk, contextual_read_length
+
+ def _next(self) -> bytes:
+ if self.end_reached:
+ raise StopIteration()
+ chunk = None
+ contextual_read_length = self.read_length
+ if self.read_length == 0:
+ chunk, contextual_read_length = self._first_iteration()
+ if chunk is None:
+ chunk = self._next_chunk()
+ if self.end_byte is not None and self.read_length >= self.end_byte:
+ self.end_reached = True
+ return chunk[: self.end_byte - contextual_read_length]
+ return chunk
+
+ def __next__(self) -> bytes:
+ chunk = self._next()
+ if chunk:
+ return chunk
+ self.end_reached = True
+ raise StopIteration()
+
+ def close(self) -> None:
+ if hasattr(self.iterable, "close"):
+ self.iterable.close()
+
+
+class LimitedStream(io.RawIOBase):
+ """Wrap a stream so that it doesn't read more than a given limit. This is used to
+ limit ``wsgi.input`` to the ``Content-Length`` header value or
+ :attr:`.Request.max_content_length`.
+
+ When attempting to read after the limit has been reached, :meth:`on_exhausted` is
+ called. When the limit is a maximum, this raises :exc:`.RequestEntityTooLarge`.
+
+ If reading from the stream returns zero bytes or raises an error,
+ :meth:`on_disconnect` is called, which raises :exc:`.ClientDisconnected`. When the
+ limit is a maximum and zero bytes were read, no error is raised, since it may be the
+ end of the stream.
+
+ If the limit is reached before the underlying stream is exhausted (such as a file
+ that is too large, or an infinite stream), the remaining contents of the stream
+ cannot be read safely. Depending on how the server handles this, clients may show a
+ "connection reset" failure instead of seeing the 413 response.
+
+ :param stream: The stream to read from. Must be a readable binary IO object.
+ :param limit: The limit in bytes to not read past. Should be either the
+ ``Content-Length`` header value or ``request.max_content_length``.
+ :param is_max: Whether the given ``limit`` is ``request.max_content_length`` instead
+ of the ``Content-Length`` header value. This changes how exhausted and
+ disconnect events are handled.
+
+ .. versionchanged:: 2.3
+ Handle ``max_content_length`` differently than ``Content-Length``.
+
+ .. versionchanged:: 2.3
+ Implements ``io.RawIOBase`` rather than ``io.IOBase``.
+ """
+
+ def __init__(self, stream: t.IO[bytes], limit: int, is_max: bool = False) -> None:
+ self._stream = stream
+ self._pos = 0
+ self.limit = limit
+ self._limit_is_max = is_max
+
+ @property
+ def is_exhausted(self) -> bool:
+ """Whether the current stream position has reached the limit."""
+ return self._pos >= self.limit
+
+ def on_exhausted(self) -> None:
+ """Called when attempting to read after the limit has been reached.
+
+ The default behavior is to do nothing, unless the limit is a maximum, in which
+ case it raises :exc:`.RequestEntityTooLarge`.
+
+ .. versionchanged:: 2.3
+ Raises ``RequestEntityTooLarge`` if the limit is a maximum.
+
+ .. versionchanged:: 2.3
+ Any return value is ignored.
+ """
+ if self._limit_is_max:
+ raise RequestEntityTooLarge()
+
+ def on_disconnect(self, error: Exception | None = None) -> None:
+ """Called when an attempted read receives zero bytes before the limit was
+ reached. This indicates that the client disconnected before sending the full
+ request body.
+
+ The default behavior is to raise :exc:`.ClientDisconnected`, unless the limit is
+ a maximum and no error was raised.
+
+ .. versionchanged:: 2.3
+ Added the ``error`` parameter. Do nothing if the limit is a maximum and no
+ error was raised.
+
+ .. versionchanged:: 2.3
+ Any return value is ignored.
+ """
+ if not self._limit_is_max or error is not None:
+ raise ClientDisconnected()
+
+ # If the limit is a maximum, then we may have read zero bytes because the
+ # streaming body is complete. There's no way to distinguish that from the
+ # client disconnecting early.
+
+ def exhaust(self) -> bytes:
+ """Exhaust the stream by reading until the limit is reached or the client
+ disconnects, returning the remaining data.
+
+ .. versionchanged:: 2.3
+ Return the remaining data.
+
+ .. versionchanged:: 2.2.3
+ Handle case where wrapped stream returns fewer bytes than requested.
+ """
+ if not self.is_exhausted:
+ return self.readall()
+
+ return b""
+
+ def readinto(self, b: bytearray) -> int | None: # type: ignore[override]
+ size = len(b)
+ remaining = self.limit - self._pos
+
+ if remaining <= 0:
+ self.on_exhausted()
+ return 0
+
+ if hasattr(self._stream, "readinto"):
+ # Use stream.readinto if it's available.
+ if size <= remaining:
+ # The size fits in the remaining limit, use the buffer directly.
+ try:
+ out_size: int | None = self._stream.readinto(b)
+ except (OSError, ValueError) as e:
+ self.on_disconnect(error=e)
+ return 0
+ else:
+ # Use a temp buffer with the remaining limit as the size.
+ temp_b = bytearray(remaining)
+
+ try:
+ out_size = self._stream.readinto(temp_b)
+ except (OSError, ValueError) as e:
+ self.on_disconnect(error=e)
+ return 0
+
+ if out_size:
+ b[:out_size] = temp_b
+ else:
+ # WSGI requires that stream.read is available.
+ try:
+ data = self._stream.read(min(size, remaining))
+ except (OSError, ValueError) as e:
+ self.on_disconnect(error=e)
+ return 0
+
+ out_size = len(data)
+ b[:out_size] = data
+
+ if not out_size:
+ # Read zero bytes from the stream.
+ self.on_disconnect()
+ return 0
+
+ self._pos += out_size
+ return out_size
+
+ def readall(self) -> bytes:
+ if self.is_exhausted:
+ self.on_exhausted()
+ return b""
+
+ out = bytearray()
+
+ # The parent implementation uses "while True", which results in an extra read.
+ while not self.is_exhausted:
+ data = self.read(1024 * 64)
+
+ # Stream may return empty before a max limit is reached.
+ if not data:
+ break
+
+ out.extend(data)
+
+ return bytes(out)
+
+ def tell(self) -> int:
+ """Return the current stream position.
+
+ .. versionadded:: 0.9
+ """
+ return self._pos
+
+ def readable(self) -> bool:
+ return True