aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py')
-rw-r--r--venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py418
1 files changed, 418 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py b/venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py
new file mode 100644
index 0000000..8d197ea
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/urllib3/contrib/emscripten/fetch.py
@@ -0,0 +1,418 @@
+"""
+Support for streaming http requests in emscripten.
+
+A few caveats -
+
+Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed.
+Streaming only works if you're running pyodide in a web worker.
+
+Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch
+operation, so it requires that you have crossOriginIsolation enabled, by serving over https
+(or from localhost) with the two headers below set:
+
+ Cross-Origin-Opener-Policy: same-origin
+ Cross-Origin-Embedder-Policy: require-corp
+
+You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in
+javascript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole
+request into a buffer and then returning it. it shows a warning in the javascript console in this case.
+
+Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once
+control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch.
+
+NB: in this code, there are a lot of javascript objects. They are named js_*
+to make it clear what type of object they are.
+"""
+from __future__ import annotations
+
+import io
+import json
+from email.parser import Parser
+from importlib.resources import files
+from typing import TYPE_CHECKING, Any
+
+import js # type: ignore[import-not-found]
+from pyodide.ffi import ( # type: ignore[import-not-found]
+ JsArray,
+ JsException,
+ JsProxy,
+ to_js,
+)
+
+if TYPE_CHECKING:
+ from typing_extensions import Buffer
+
+from .request import EmscriptenRequest
+from .response import EmscriptenResponse
+
+"""
+There are some headers that trigger unintended CORS preflight requests.
+See also https://github.com/koenvo/pyodide-http/issues/22
+"""
+HEADERS_TO_IGNORE = ("user-agent",)
+
+SUCCESS_HEADER = -1
+SUCCESS_EOF = -2
+ERROR_TIMEOUT = -3
+ERROR_EXCEPTION = -4
+
+_STREAMING_WORKER_CODE = (
+ files(__package__)
+ .joinpath("emscripten_fetch_worker.js")
+ .read_text(encoding="utf-8")
+)
+
+
+class _RequestError(Exception):
+ def __init__(
+ self,
+ message: str | None = None,
+ *,
+ request: EmscriptenRequest | None = None,
+ response: EmscriptenResponse | None = None,
+ ):
+ self.request = request
+ self.response = response
+ self.message = message
+ super().__init__(self.message)
+
+
+class _StreamingError(_RequestError):
+ pass
+
+
+class _TimeoutError(_RequestError):
+ pass
+
+
+def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy:
+ return to_js(dict_val, dict_converter=js.Object.fromEntries)
+
+
+class _ReadStream(io.RawIOBase):
+ def __init__(
+ self,
+ int_buffer: JsArray,
+ byte_buffer: JsArray,
+ timeout: float,
+ worker: JsProxy,
+ connection_id: int,
+ request: EmscriptenRequest,
+ ):
+ self.int_buffer = int_buffer
+ self.byte_buffer = byte_buffer
+ self.read_pos = 0
+ self.read_len = 0
+ self.connection_id = connection_id
+ self.worker = worker
+ self.timeout = int(1000 * timeout) if timeout > 0 else None
+ self.is_live = True
+ self._is_closed = False
+ self.request: EmscriptenRequest | None = request
+
+ def __del__(self) -> None:
+ self.close()
+
+ # this is compatible with _base_connection
+ def is_closed(self) -> bool:
+ return self._is_closed
+
+ # for compatibility with RawIOBase
+ @property
+ def closed(self) -> bool:
+ return self.is_closed()
+
+ def close(self) -> None:
+ if not self.is_closed():
+ self.read_len = 0
+ self.read_pos = 0
+ self.int_buffer = None
+ self.byte_buffer = None
+ self._is_closed = True
+ self.request = None
+ if self.is_live:
+ self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
+ self.is_live = False
+ super().close()
+
+ def readable(self) -> bool:
+ return True
+
+ def writable(self) -> bool:
+ return False
+
+ def seekable(self) -> bool:
+ return False
+
+ def readinto(self, byte_obj: Buffer) -> int:
+ if not self.int_buffer:
+ raise _StreamingError(
+ "No buffer for stream in _ReadStream.readinto",
+ request=self.request,
+ response=None,
+ )
+ if self.read_len == 0:
+ # wait for the worker to send something
+ js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT)
+ self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
+ if (
+ js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout)
+ == "timed-out"
+ ):
+ raise _TimeoutError
+ data_len = self.int_buffer[0]
+ if data_len > 0:
+ self.read_len = data_len
+ self.read_pos = 0
+ elif data_len == ERROR_EXCEPTION:
+ string_len = self.int_buffer[1]
+ # decode the error string
+ js_decoder = js.TextDecoder.new()
+ json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len))
+ raise _StreamingError(
+ f"Exception thrown in fetch: {json_str}",
+ request=self.request,
+ response=None,
+ )
+ else:
+ # EOF, free the buffers and return zero
+ # and free the request
+ self.is_live = False
+ self.close()
+ return 0
+ # copy from int32array to python bytes
+ ret_length = min(self.read_len, len(memoryview(byte_obj)))
+ subarray = self.byte_buffer.subarray(
+ self.read_pos, self.read_pos + ret_length
+ ).to_py()
+ memoryview(byte_obj)[0:ret_length] = subarray
+ self.read_len -= ret_length
+ self.read_pos += ret_length
+ return ret_length
+
+
+class _StreamingFetcher:
+ def __init__(self) -> None:
+ # make web-worker and data buffer on startup
+ self.streaming_ready = False
+
+ js_data_blob = js.Blob.new(
+ [_STREAMING_WORKER_CODE], _obj_from_dict({"type": "application/javascript"})
+ )
+
+ def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None:
+ def onMsg(e: JsProxy) -> None:
+ self.streaming_ready = True
+ js_resolve_fn(e)
+
+ def onErr(e: JsProxy) -> None:
+ js_reject_fn(e) # Defensive: never happens in ci
+
+ self.js_worker.onmessage = onMsg
+ self.js_worker.onerror = onErr
+
+ js_data_url = js.URL.createObjectURL(js_data_blob)
+ self.js_worker = js.globalThis.Worker.new(js_data_url)
+ self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver)
+
+ def send(self, request: EmscriptenRequest) -> EmscriptenResponse:
+ headers = {
+ k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE
+ }
+
+ body = request.body
+ fetch_data = {"headers": headers, "body": to_js(body), "method": request.method}
+ # start the request off in the worker
+ timeout = int(1000 * request.timeout) if request.timeout > 0 else None
+ js_shared_buffer = js.SharedArrayBuffer.new(1048576)
+ js_int_buffer = js.Int32Array.new(js_shared_buffer)
+ js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8)
+
+ js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT)
+ js.Atomics.notify(js_int_buffer, 0)
+ js_absolute_url = js.URL.new(request.url, js.location).href
+ self.js_worker.postMessage(
+ _obj_from_dict(
+ {
+ "buffer": js_shared_buffer,
+ "url": js_absolute_url,
+ "fetchParams": fetch_data,
+ }
+ )
+ )
+ # wait for the worker to send something
+ js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout)
+ if js_int_buffer[0] == ERROR_TIMEOUT:
+ raise _TimeoutError(
+ "Timeout connecting to streaming request",
+ request=request,
+ response=None,
+ )
+ elif js_int_buffer[0] == SUCCESS_HEADER:
+ # got response
+ # header length is in second int of intBuffer
+ string_len = js_int_buffer[1]
+ # decode the rest to a JSON string
+ js_decoder = js.TextDecoder.new()
+ # this does a copy (the slice) because decode can't work on shared array
+ # for some silly reason
+ json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
+ # get it as an object
+ response_obj = json.loads(json_str)
+ return EmscriptenResponse(
+ request=request,
+ status_code=response_obj["status"],
+ headers=response_obj["headers"],
+ body=_ReadStream(
+ js_int_buffer,
+ js_byte_buffer,
+ request.timeout,
+ self.js_worker,
+ response_obj["connectionID"],
+ request,
+ ),
+ )
+ elif js_int_buffer[0] == ERROR_EXCEPTION:
+ string_len = js_int_buffer[1]
+ # decode the error string
+ js_decoder = js.TextDecoder.new()
+ json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
+ raise _StreamingError(
+ f"Exception thrown in fetch: {json_str}", request=request, response=None
+ )
+ else:
+ raise _StreamingError(
+ f"Unknown status from worker in fetch: {js_int_buffer[0]}",
+ request=request,
+ response=None,
+ )
+
+
+# check if we are in a worker or not
+def is_in_browser_main_thread() -> bool:
+ return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window
+
+
+def is_cross_origin_isolated() -> bool:
+ return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated
+
+
+def is_in_node() -> bool:
+ return (
+ hasattr(js, "process")
+ and hasattr(js.process, "release")
+ and hasattr(js.process.release, "name")
+ and js.process.release.name == "node"
+ )
+
+
+def is_worker_available() -> bool:
+ return hasattr(js, "Worker") and hasattr(js, "Blob")
+
+
+_fetcher: _StreamingFetcher | None = None
+
+if is_worker_available() and (
+ (is_cross_origin_isolated() and not is_in_browser_main_thread())
+ and (not is_in_node())
+):
+ _fetcher = _StreamingFetcher()
+else:
+ _fetcher = None
+
+
+def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None:
+ if _fetcher and streaming_ready():
+ return _fetcher.send(request)
+ else:
+ _show_streaming_warning()
+ return None
+
+
+_SHOWN_TIMEOUT_WARNING = False
+
+
+def _show_timeout_warning() -> None:
+ global _SHOWN_TIMEOUT_WARNING
+ if not _SHOWN_TIMEOUT_WARNING:
+ _SHOWN_TIMEOUT_WARNING = True
+ message = "Warning: Timeout is not available on main browser thread"
+ js.console.warn(message)
+
+
+_SHOWN_STREAMING_WARNING = False
+
+
+def _show_streaming_warning() -> None:
+ global _SHOWN_STREAMING_WARNING
+ if not _SHOWN_STREAMING_WARNING:
+ _SHOWN_STREAMING_WARNING = True
+ message = "Can't stream HTTP requests because: \n"
+ if not is_cross_origin_isolated():
+ message += " Page is not cross-origin isolated\n"
+ if is_in_browser_main_thread():
+ message += " Python is running in main browser thread\n"
+ if not is_worker_available():
+ message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in
+ if streaming_ready() is False:
+ message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch
+is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`"""
+ from js import console
+
+ console.warn(message)
+
+
+def send_request(request: EmscriptenRequest) -> EmscriptenResponse:
+ try:
+ js_xhr = js.XMLHttpRequest.new()
+
+ if not is_in_browser_main_thread():
+ js_xhr.responseType = "arraybuffer"
+ if request.timeout:
+ js_xhr.timeout = int(request.timeout * 1000)
+ else:
+ js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15")
+ if request.timeout:
+ # timeout isn't available on the main thread - show a warning in console
+ # if it is set
+ _show_timeout_warning()
+
+ js_xhr.open(request.method, request.url, False)
+ for name, value in request.headers.items():
+ if name.lower() not in HEADERS_TO_IGNORE:
+ js_xhr.setRequestHeader(name, value)
+
+ js_xhr.send(to_js(request.body))
+
+ headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders()))
+
+ if not is_in_browser_main_thread():
+ body = js_xhr.response.to_py().tobytes()
+ else:
+ body = js_xhr.response.encode("ISO-8859-15")
+ return EmscriptenResponse(
+ status_code=js_xhr.status, headers=headers, body=body, request=request
+ )
+ except JsException as err:
+ if err.name == "TimeoutError":
+ raise _TimeoutError(err.message, request=request)
+ elif err.name == "NetworkError":
+ raise _RequestError(err.message, request=request)
+ else:
+ # general http error
+ raise _RequestError(err.message, request=request)
+
+
+def streaming_ready() -> bool | None:
+ if _fetcher:
+ return _fetcher.streaming_ready
+ else:
+ return None # no fetcher, return None to signify that
+
+
+async def wait_for_streaming_ready() -> bool:
+ if _fetcher:
+ await _fetcher.js_worker_ready_promise
+ return True
+ else:
+ return False