aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py')
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py439
1 files changed, 439 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py
new file mode 100644
index 0000000..de93b52
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py
@@ -0,0 +1,439 @@
+"""
+WSGI Protocol Linter
+====================
+
+This module provides a middleware that performs sanity checks on the
+behavior of the WSGI server and application. It checks that the
+:pep:`3333` WSGI spec is properly implemented. It also warns on some
+common HTTP errors such as non-empty responses for 304 status codes.
+
+.. autoclass:: LintMiddleware
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import typing as t
+from types import TracebackType
+from urllib.parse import urlparse
+from warnings import warn
+
+from ..datastructures import Headers
+from ..http import is_entity_header
+from ..wsgi import FileWrapper
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class WSGIWarning(Warning):
+ """Warning class for WSGI warnings."""
+
+
+class HTTPWarning(Warning):
+ """Warning class for HTTP warnings."""
+
+
+def check_type(context: str, obj: object, need: type = str) -> None:
+ if type(obj) is not need:
+ warn(
+ f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+
+class InputStream:
+ def __init__(self, stream: t.IO[bytes]) -> None:
+ self._stream = stream
+
+ def read(self, *args: t.Any) -> bytes:
+ if len(args) == 0:
+ warn(
+ "WSGI does not guarantee an EOF marker on the input stream, thus making"
+ " calls to 'wsgi.input.read()' unsafe. Conforming servers may never"
+ " return from this call.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ elif len(args) != 1:
+ warn(
+ "Too many parameters passed to 'wsgi.input.read()'.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ return self._stream.read(*args)
+
+ def readline(self, *args: t.Any) -> bytes:
+ if len(args) == 0:
+ warn(
+ "Calls to 'wsgi.input.readline()' without arguments are unsafe. Use"
+ " 'wsgi.input.read()' instead.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ elif len(args) == 1:
+ warn(
+ "'wsgi.input.readline()' was called with a size hint. WSGI does not"
+ " support this, although it's available on all major servers.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ else:
+ raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.")
+ return self._stream.readline(*args)
+
+ def __iter__(self) -> t.Iterator[bytes]:
+ try:
+ return iter(self._stream)
+ except TypeError:
+ warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2)
+ return iter(())
+
+ def close(self) -> None:
+ warn("The application closed the input stream!", WSGIWarning, stacklevel=2)
+ self._stream.close()
+
+
+class ErrorStream:
+ def __init__(self, stream: t.IO[str]) -> None:
+ self._stream = stream
+
+ def write(self, s: str) -> None:
+ check_type("wsgi.error.write()", s, str)
+ self._stream.write(s)
+
+ def flush(self) -> None:
+ self._stream.flush()
+
+ def writelines(self, seq: t.Iterable[str]) -> None:
+ for line in seq:
+ self.write(line)
+
+ def close(self) -> None:
+ warn("The application closed the error stream!", WSGIWarning, stacklevel=2)
+ self._stream.close()
+
+
+class GuardedWrite:
+ def __init__(self, write: t.Callable[[bytes], object], chunks: list[int]) -> None:
+ self._write = write
+ self._chunks = chunks
+
+ def __call__(self, s: bytes) -> None:
+ check_type("write()", s, bytes)
+ self._write(s)
+ self._chunks.append(len(s))
+
+
+class GuardedIterator:
+ def __init__(
+ self,
+ iterator: t.Iterable[bytes],
+ headers_set: tuple[int, Headers],
+ chunks: list[int],
+ ) -> None:
+ self._iterator = iterator
+ self._next = iter(iterator).__next__
+ self.closed = False
+ self.headers_set = headers_set
+ self.chunks = chunks
+
+ def __iter__(self) -> GuardedIterator:
+ return self
+
+ def __next__(self) -> bytes:
+ if self.closed:
+ warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2)
+
+ rv = self._next()
+
+ if not self.headers_set:
+ warn(
+ "The application returned before it started the response.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ check_type("application iterator items", rv, bytes)
+ self.chunks.append(len(rv))
+ return rv
+
+ def close(self) -> None:
+ self.closed = True
+
+ if hasattr(self._iterator, "close"):
+ self._iterator.close()
+
+ if self.headers_set:
+ status_code, headers = self.headers_set
+ bytes_sent = sum(self.chunks)
+ content_length = headers.get("content-length", type=int)
+
+ if status_code == 304:
+ for key, _value in headers:
+ key = key.lower()
+ if key not in ("expires", "content-location") and is_entity_header(
+ key
+ ):
+ warn(
+ f"Entity header {key!r} found in 304 response.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ if bytes_sent:
+ warn(
+ "304 responses must not have a body.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ elif 100 <= status_code < 200 or status_code == 204:
+ if content_length != 0:
+ warn(
+ f"{status_code} responses must have an empty content length.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ if bytes_sent:
+ warn(
+ f"{status_code} responses must not have a body.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ elif content_length is not None and content_length != bytes_sent:
+ warn(
+ "Content-Length and the number of bytes sent to the"
+ " client do not match.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ def __del__(self) -> None:
+ if not self.closed:
+ try:
+ warn(
+ "Iterator was garbage collected before it was closed.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ except Exception:
+ pass
+
+
+class LintMiddleware:
+ """Warns about common errors in the WSGI and HTTP behavior of the
+ server and wrapped application. Some of the issues it checks are:
+
+ - invalid status codes
+ - non-bytes sent to the WSGI server
+ - strings returned from the WSGI application
+ - non-empty conditional responses
+ - unquoted etags
+ - relative URLs in the Location header
+ - unsafe calls to wsgi.input
+ - unclosed iterators
+
+ Error information is emitted using the :mod:`warnings` module.
+
+ :param app: The WSGI application to wrap.
+
+ .. code-block:: python
+
+ from werkzeug.middleware.lint import LintMiddleware
+ app = LintMiddleware(app)
+ """
+
+ def __init__(self, app: WSGIApplication) -> None:
+ self.app = app
+
+ def check_environ(self, environ: WSGIEnvironment) -> None:
+ if type(environ) is not dict: # noqa: E721
+ warn(
+ "WSGI environment is not a standard Python dict.",
+ WSGIWarning,
+ stacklevel=4,
+ )
+ for key in (
+ "REQUEST_METHOD",
+ "SERVER_NAME",
+ "SERVER_PORT",
+ "wsgi.version",
+ "wsgi.input",
+ "wsgi.errors",
+ "wsgi.multithread",
+ "wsgi.multiprocess",
+ "wsgi.run_once",
+ ):
+ if key not in environ:
+ warn(
+ f"Required environment key {key!r} not found",
+ WSGIWarning,
+ stacklevel=3,
+ )
+ if environ["wsgi.version"] != (1, 0):
+ warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3)
+
+ script_name = environ.get("SCRIPT_NAME", "")
+ path_info = environ.get("PATH_INFO", "")
+
+ if script_name and script_name[0] != "/":
+ warn(
+ f"'SCRIPT_NAME' does not start with a slash: {script_name!r}",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ if path_info and path_info[0] != "/":
+ warn(
+ f"'PATH_INFO' does not start with a slash: {path_info!r}",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ def check_start_response(
+ self,
+ status: str,
+ headers: list[tuple[str, str]],
+ exc_info: None | (tuple[type[BaseException], BaseException, TracebackType]),
+ ) -> tuple[int, Headers]:
+ check_type("status", status, str)
+ status_code_str = status.split(None, 1)[0]
+
+ if len(status_code_str) != 3 or not status_code_str.isdecimal():
+ warn("Status code must be three digits.", WSGIWarning, stacklevel=3)
+
+ if len(status) < 4 or status[3] != " ":
+ warn(
+ f"Invalid value for status {status!r}. Valid status strings are three"
+ " digits, a space and a status explanation.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ status_code = int(status_code_str)
+
+ if status_code < 100:
+ warn("Status code < 100 detected.", WSGIWarning, stacklevel=3)
+
+ if type(headers) is not list: # noqa: E721
+ warn("Header list is not a list.", WSGIWarning, stacklevel=3)
+
+ for item in headers:
+ if type(item) is not tuple or len(item) != 2:
+ warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3)
+ name, value = item
+ if type(name) is not str or type(value) is not str: # noqa: E721
+ warn(
+ "Header keys and values must be strings.", WSGIWarning, stacklevel=3
+ )
+ if name.lower() == "status":
+ warn(
+ "The status header is not supported due to"
+ " conflicts with the CGI spec.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ if exc_info is not None and not isinstance(exc_info, tuple):
+ warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3)
+
+ headers_obj = Headers(headers)
+ self.check_headers(headers_obj)
+
+ return status_code, headers_obj
+
+ def check_headers(self, headers: Headers) -> None:
+ etag = headers.get("etag")
+
+ if etag is not None:
+ if etag.startswith(("W/", "w/")):
+ if etag.startswith("w/"):
+ warn(
+ "Weak etag indicator should be upper case.",
+ HTTPWarning,
+ stacklevel=4,
+ )
+
+ etag = etag[2:]
+
+ if not (etag[:1] == etag[-1:] == '"'):
+ warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4)
+
+ location = headers.get("location")
+
+ if location is not None:
+ if not urlparse(location).netloc:
+ warn(
+ "Absolute URLs required for location header.",
+ HTTPWarning,
+ stacklevel=4,
+ )
+
+ def check_iterator(self, app_iter: t.Iterable[bytes]) -> None:
+ if isinstance(app_iter, str):
+ warn(
+ "The application returned a string. The response will send one"
+ " character at a time to the client, which will kill performance."
+ " Return a list or iterable instead.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]:
+ if len(args) != 2:
+ warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2)
+
+ if kwargs:
+ warn(
+ "A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2
+ )
+
+ environ: WSGIEnvironment = args[0]
+ start_response: StartResponse = args[1]
+
+ self.check_environ(environ)
+ environ["wsgi.input"] = InputStream(environ["wsgi.input"])
+ environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"])
+
+ # Hook our own file wrapper in so that applications will always
+ # iterate to the end and we can check the content length.
+ environ["wsgi.file_wrapper"] = FileWrapper
+
+ headers_set: list[t.Any] = []
+ chunks: list[int] = []
+
+ def checking_start_response(
+ *args: t.Any, **kwargs: t.Any
+ ) -> t.Callable[[bytes], None]:
+ if len(args) not in {2, 3}:
+ warn(
+ f"Invalid number of arguments: {len(args)}, expected 2 or 3.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ if kwargs:
+ warn(
+ "'start_response' does not take keyword arguments.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ status: str = args[0]
+ headers: list[tuple[str, str]] = args[1]
+ exc_info: (
+ None | (tuple[type[BaseException], BaseException, TracebackType])
+ ) = args[2] if len(args) == 3 else None
+
+ headers_set[:] = self.check_start_response(status, headers, exc_info)
+ return GuardedWrite(start_response(status, headers, exc_info), chunks)
+
+ app_iter = self.app(environ, t.cast("StartResponse", checking_start_response))
+ self.check_iterator(app_iter)
+ return GuardedIterator(
+ app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks
+ )