1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
"""
Application Profiler
====================
This module provides a middleware that profiles each request with the
:mod:`cProfile` module. This can help identify bottlenecks in your code
that may be slowing down your application.
.. autoclass:: ProfilerMiddleware
:copyright: 2007 Pallets
:license: BSD-3-Clause
"""
from __future__ import annotations
import os.path
import sys
import time
import typing as t
from pstats import Stats
try:
from cProfile import Profile
except ImportError:
from profile import Profile # type: ignore
if t.TYPE_CHECKING:
from _typeshed.wsgi import StartResponse
from _typeshed.wsgi import WSGIApplication
from _typeshed.wsgi import WSGIEnvironment
class ProfilerMiddleware:
"""Wrap a WSGI application and profile the execution of each
request. Responses are buffered so that timings are more exact.
If ``stream`` is given, :class:`pstats.Stats` are written to it
after each request. If ``profile_dir`` is given, :mod:`cProfile`
data files are saved to that directory, one file per request.
The filename can be customized by passing ``filename_format``. If
it is a string, it will be formatted using :meth:`str.format` with
the following fields available:
- ``{method}`` - The request method; GET, POST, etc.
- ``{path}`` - The request path or 'root' should one not exist.
- ``{elapsed}`` - The elapsed time of the request in milliseconds.
- ``{time}`` - The time of the request.
If it is a callable, it will be called with the WSGI ``environ`` and
be expected to return a filename string. The ``environ`` dictionary
will also have the ``"werkzeug.profiler"`` key populated with a
dictionary containing the following fields (more may be added in the
future):
- ``{elapsed}`` - The elapsed time of the request in milliseconds.
- ``{time}`` - The time of the request.
:param app: The WSGI application to wrap.
:param stream: Write stats to this stream. Disable with ``None``.
:param sort_by: A tuple of columns to sort stats by. See
:meth:`pstats.Stats.sort_stats`.
:param restrictions: A tuple of restrictions to filter stats by. See
:meth:`pstats.Stats.print_stats`.
:param profile_dir: Save profile data files to this directory.
:param filename_format: Format string for profile data file names,
or a callable returning a name. See explanation above.
.. code-block:: python
from werkzeug.middleware.profiler import ProfilerMiddleware
app = ProfilerMiddleware(app)
.. versionchanged:: 3.0
Added the ``"werkzeug.profiler"`` key to the ``filename_format(environ)``
parameter with the ``elapsed`` and ``time`` fields.
.. versionchanged:: 0.15
Stats are written even if ``profile_dir`` is given, and can be
disable by passing ``stream=None``.
.. versionadded:: 0.15
Added ``filename_format``.
.. versionadded:: 0.9
Added ``restrictions`` and ``profile_dir``.
"""
def __init__(
self,
app: WSGIApplication,
stream: t.IO[str] | None = sys.stdout,
sort_by: t.Iterable[str] = ("time", "calls"),
restrictions: t.Iterable[str | int | float] = (),
profile_dir: str | None = None,
filename_format: str = "{method}.{path}.{elapsed:.0f}ms.{time:.0f}.prof",
) -> None:
self._app = app
self._stream = stream
self._sort_by = sort_by
self._restrictions = restrictions
self._profile_dir = profile_dir
self._filename_format = filename_format
def __call__(
self, environ: WSGIEnvironment, start_response: StartResponse
) -> t.Iterable[bytes]:
response_body: list[bytes] = []
def catching_start_response(status, headers, exc_info=None): # type: ignore
start_response(status, headers, exc_info)
return response_body.append
def runapp() -> None:
app_iter = self._app(
environ, t.cast("StartResponse", catching_start_response)
)
response_body.extend(app_iter)
if hasattr(app_iter, "close"):
app_iter.close()
profile = Profile()
start = time.time()
profile.runcall(runapp)
body = b"".join(response_body)
elapsed = time.time() - start
if self._profile_dir is not None:
if callable(self._filename_format):
environ["werkzeug.profiler"] = {
"elapsed": elapsed * 1000.0,
"time": time.time(),
}
filename = self._filename_format(environ)
else:
filename = self._filename_format.format(
method=environ["REQUEST_METHOD"],
path=environ["PATH_INFO"].strip("/").replace("/", ".") or "root",
elapsed=elapsed * 1000.0,
time=time.time(),
)
filename = os.path.join(self._profile_dir, filename)
profile.dump_stats(filename)
if self._stream is not None:
stats = Stats(profile, stream=self._stream)
stats.sort_stats(*self._sort_by)
print("-" * 80, file=self._stream)
path_info = environ.get("PATH_INFO", "")
print(f"PATH: {path_info!r}", file=self._stream)
stats.print_stats(*self._restrictions)
print(f"{'-' * 80}\n", file=self._stream)
return [body]
|