diff options
author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/dash/_callback.py | |
parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/dash/_callback.py')
-rw-r--r-- | venv/lib/python3.8/site-packages/dash/_callback.py | 860 |
1 files changed, 860 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/dash/_callback.py b/venv/lib/python3.8/site-packages/dash/_callback.py new file mode 100644 index 0000000..b0f7bda --- /dev/null +++ b/venv/lib/python3.8/site-packages/dash/_callback.py @@ -0,0 +1,860 @@ +import collections +import hashlib +from functools import wraps + +from typing import Callable, Optional, Any, List, Tuple, Union + + +import asyncio +import flask + +from .dependencies import ( + handle_callback_args, + handle_grouped_callback_args, + Output, + ClientsideFunction, + Input, +) +from .development.base_component import ComponentRegistry +from .exceptions import ( + InvalidCallbackReturnValue, + PreventUpdate, + WildcardInLongCallback, + MissingLongCallbackManagerError, + BackgroundCallbackError, + ImportedInsideCallbackError, +) + +from ._grouping import ( + flatten_grouping, + make_grouping_by_index, + grouping_len, +) +from ._utils import ( + create_callback_id, + stringify_id, + to_json, + coerce_to_list, + AttributeDict, + clean_property_name, +) + +from . import _validate +from .background_callback.managers import BaseBackgroundCallbackManager +from ._callback_context import context_value + + +async def _async_invoke_callback( + func, *args, **kwargs +): # used to mark the frame for the debugger + # Check if the function is a coroutine function + if asyncio.iscoroutinefunction(func): + return await func(*args, **kwargs) # %% callback invoked %% + # If the function is not a coroutine, call it directly + return func(*args, **kwargs) # %% callback invoked %% + + +def _invoke_callback(func, *args, **kwargs): # used to mark the frame for the debugger + return func(*args, **kwargs) # %% callback invoked %% + + +class NoUpdate: + def to_plotly_json(self): # pylint: disable=no-self-use + return {"_dash_no_update": "_dash_no_update"} + + @staticmethod + def is_no_update(obj): + return isinstance(obj, NoUpdate) or ( + isinstance(obj, dict) and obj == {"_dash_no_update": "_dash_no_update"} + ) + + +GLOBAL_CALLBACK_LIST = [] +GLOBAL_CALLBACK_MAP = {} +GLOBAL_INLINE_SCRIPTS = [] + + +# pylint: disable=too-many-locals +def callback( + *_args, + background: bool = False, + interval: int = 1000, + progress: Optional[Union[List[Output], Output]] = None, + progress_default: Any = None, + running: Optional[List[Tuple[Output, Any, Any]]] = None, + cancel: Optional[Union[List[Input], Input]] = None, + manager: Optional[BaseBackgroundCallbackManager] = None, + cache_args_to_ignore: Optional[list] = None, + cache_ignore_triggered=True, + on_error: Optional[Callable[[Exception], Any]] = None, + **_kwargs, +) -> Callable[..., Any]: + """ + Normally used as a decorator, `@dash.callback` provides a server-side + callback relating the values of one or more `Output` items to one or + more `Input` items which will trigger the callback when they change, + and optionally `State` items which provide additional information but + do not trigger the callback directly. + + `@dash.callback` is an alternative to `@app.callback` (where `app = dash.Dash()`) + introduced in Dash 2.0. + It allows you to register callbacks without defining or importing the `app` + object. The call signature is identical and it can be used instead of `app.callback` + in all cases. + + The last, optional argument `prevent_initial_call` causes the callback + not to fire when its outputs are first added to the page. Defaults to + `False` and unlike `app.callback` is not configurable at the app level. + + :Keyword Arguments: + :param background: + Mark the callback as a background callback to execute in a manager for + callbacks that take a long time without locking up the Dash app + or timing out. + :param manager: + A background callback manager instance. Currently, an instance of one of + `DiskcacheManager` or `CeleryManager`. + Defaults to the `background_callback_manager` instance provided to the + `dash.Dash constructor`. + - A diskcache manager (`DiskcacheManager`) that runs callback + logic in a separate process and stores the results to disk using the + diskcache library. This is the easiest backend to use for local + development. + - A Celery manager (`CeleryManager`) that runs callback logic + in a celery worker and returns results to the Dash app through a Celery + broker like RabbitMQ or Redis. + :param running: + A list of 3-element tuples. The first element of each tuple should be + an `Output` dependency object referencing a property of a component in + the app layout. The second element is the value that the property + should be set to while the callback is running, and the third element + is the value the property should be set to when the callback completes. + :param cancel: + A list of `Input` dependency objects that reference a property of a + component in the app's layout. When the value of this property changes + while a callback is running, the callback is canceled. + Note that the value of the property is not significant, any change in + value will result in the cancellation of the running job (if any). + This parameter only applies to background callbacks (`background=True`). + :param progress: + An `Output` dependency grouping that references properties of + components in the app's layout. When provided, the decorated function + will be called with an extra argument as the first argument to the + function. This argument, is a function handle that the decorated + function should call in order to provide updates to the app on its + current progress. This function accepts a single argument, which + correspond to the grouping of properties specified in the provided + `Output` dependency grouping. This parameter only applies to background + callbacks (`background=True`). + :param progress_default: + A grouping of values that should be assigned to the components + specified by the `progress` argument when the callback is not in + progress. If `progress_default` is not provided, all the dependency + properties specified in `progress` will be set to `None` when the + callback is not running. This parameter only applies to background + callbacks (`background=True`). + :param cache_args_to_ignore: + Arguments to ignore when caching is enabled. If callback is configured + with keyword arguments (Input/State provided in a dict), + this should be a list of argument names as strings. Otherwise, + this should be a list of argument indices as integers. + This parameter only applies to background callbacks (`background=True`). + :param cache_ignore_triggered: + Whether to ignore which inputs triggered the callback when creating + the cache. This parameter only applies to background callbacks + (`background=True`). + :param interval: + Time to wait between the background callback update requests. + :param on_error: + Function to call when the callback raises an exception. Receives the + exception object as first argument. The callback_context can be used + to access the original callback inputs, states and output. + """ + + background_spec = None + + config_prevent_initial_callbacks = _kwargs.pop( + "config_prevent_initial_callbacks", False + ) + callback_map = _kwargs.pop("callback_map", GLOBAL_CALLBACK_MAP) + callback_list = _kwargs.pop("callback_list", GLOBAL_CALLBACK_LIST) + + if background: + background_spec: Any = { + "interval": interval, + } + + if manager: + background_spec["manager"] = manager + + if progress: + background_spec["progress"] = coerce_to_list(progress) + validate_background_inputs(background_spec["progress"]) + + if progress_default: + background_spec["progressDefault"] = coerce_to_list(progress_default) + + if not len(background_spec["progress"]) == len( + background_spec["progressDefault"] + ): + raise Exception( + "Progress and progress default needs to be of same length" + ) + + if cancel: + cancel_inputs = coerce_to_list(cancel) + validate_background_inputs(cancel_inputs) + + background_spec["cancel"] = [c.to_dict() for c in cancel_inputs] + background_spec["cancel_inputs"] = cancel_inputs + + if cache_args_to_ignore: + background_spec["cache_args_to_ignore"] = cache_args_to_ignore + + background_spec["cache_ignore_triggered"] = cache_ignore_triggered + + return register_callback( + callback_list, + callback_map, + config_prevent_initial_callbacks, + *_args, + **_kwargs, + background=background_spec, + manager=manager, + running=running, + on_error=on_error, + ) + + +def validate_background_inputs(deps): + for dep in deps: + if dep.has_wildcard(): + raise WildcardInLongCallback( + f""" + background callbacks does not support dependencies with + pattern-matching ids + Received: {repr(dep)}\n""" + ) + + +ClientsideFuncType = Union[str, ClientsideFunction] + + +def clientside_callback(clientside_function: ClientsideFuncType, *args, **kwargs): + return register_clientside_callback( + GLOBAL_CALLBACK_LIST, + GLOBAL_CALLBACK_MAP, + False, + GLOBAL_INLINE_SCRIPTS, + clientside_function, + *args, + **kwargs, + ) + + +# pylint: disable=too-many-arguments +def insert_callback( + callback_list, + callback_map, + config_prevent_initial_callbacks, + output, + outputs_indices, + inputs, + state, + inputs_state_indices, + prevent_initial_call, + background=None, + manager=None, + running=None, + dynamic_creator: Optional[bool] = False, + no_output=False, +): + if prevent_initial_call is None: + prevent_initial_call = config_prevent_initial_callbacks + + _validate.validate_duplicate_output( + output, prevent_initial_call, config_prevent_initial_callbacks + ) + + callback_id = create_callback_id(output, inputs, no_output) + callback_spec = { + "output": callback_id, + "inputs": [c.to_dict() for c in inputs], + "state": [c.to_dict() for c in state], + "clientside_function": None, + # prevent_initial_call can be a string "initial_duplicates" + # which should not prevent the initial call. + "prevent_initial_call": prevent_initial_call is True, + "background": background + and { + "interval": background["interval"], + }, + "dynamic_creator": dynamic_creator, + "no_output": no_output, + } + if running: + callback_spec["running"] = running + + callback_map[callback_id] = { + "inputs": callback_spec["inputs"], + "state": callback_spec["state"], + "outputs_indices": outputs_indices, + "inputs_state_indices": inputs_state_indices, + "background": background, + "output": output, + "raw_inputs": inputs, + "manager": manager, + "allow_dynamic_callbacks": dynamic_creator, + "no_output": no_output, + } + callback_list.append(callback_spec) + + return callback_id + + +def _set_side_update(ctx, response) -> bool: + side_update = dict(ctx.updated_props) + if len(side_update) > 0: + response["sideUpdate"] = side_update + return True + return False + + +def _initialize_context(args, kwargs, inputs_state_indices, has_output, insert_output): + """Initialize context and validate output specifications.""" + app = kwargs.pop("app", None) + output_spec = kwargs.pop("outputs_list") + callback_ctx = kwargs.pop("callback_context", AttributeDict({"updated_props": {}})) + context_value.set(callback_ctx) + original_packages = set(ComponentRegistry.registry) + + if has_output: + _validate.validate_output_spec(insert_output, output_spec, Output) + + func_args, func_kwargs = _validate.validate_and_group_input_args( + args, inputs_state_indices + ) + return ( + output_spec, + callback_ctx, + func_args, + func_kwargs, + app, + original_packages, + False, + ) + + +def _get_callback_manager( + kwargs: dict, background: dict +) -> Union[BaseBackgroundCallbackManager, None]: + """Set up the background callback and manage jobs.""" + callback_manager = background.get( + "manager", kwargs.get("background_callback_manager", None) + ) + if background is not None: + if not callback_manager: + raise MissingLongCallbackManagerError( + "Running `background` callbacks requires a manager to be installed.\n" + "Available managers:\n" + "- Diskcache (`pip install dash[diskcache]`) to run callbacks in a separate Process" + " and store results on the local filesystem.\n" + "- Celery (`pip install dash[celery]`) to run callbacks in a celery worker" + " and store results on redis.\n" + ) + + old_job = flask.request.args.getlist("oldJob") + + if old_job: + for job in old_job: + callback_manager.terminate_job(job) + + return callback_manager + + +def _setup_background_callback( + kwargs, background, background_key, func, func_args, func_kwargs, callback_ctx +): + """Set up the background callback and manage jobs.""" + callback_manager = _get_callback_manager(kwargs, background) + + progress_outputs = background.get("progress") + + cache_ignore_triggered = background.get("cache_ignore_triggered", True) + + cache_key = callback_manager.build_cache_key( + func, + # Inputs provided as dict is kwargs. + func_args if func_args else func_kwargs, + background.get("cache_args_to_ignore", []), + None if cache_ignore_triggered else callback_ctx.get("triggered_inputs", []), + ) + + job_fn = callback_manager.func_registry.get(background_key) + + ctx_value = AttributeDict(**context_value.get()) + ctx_value.ignore_register_page = True + ctx_value.pop("background_callback_manager") + ctx_value.pop("dash_response") + + job = callback_manager.call_job_fn( + cache_key, + job_fn, + func_args if func_args else func_kwargs, + ctx_value, + ) + + data = { + "cacheKey": cache_key, + "job": job, + } + + cancel = background.get("cancel") + if cancel: + data["cancel"] = cancel + + progress_default = background.get("progressDefault") + if progress_default: + data["progressDefault"] = { + str(o): x for o, x in zip(progress_outputs, progress_default) + } + return to_json(data) + + +def _progress_background_callback(response, callback_manager, background): + progress_outputs = background.get("progress") + cache_key = flask.request.args.get("cacheKey") + + if progress_outputs: + # Get the progress before the result as it would be erased after the results. + progress = callback_manager.get_progress(cache_key) + if progress: + response["progress"] = { + str(x): progress[i] for i, x in enumerate(progress_outputs) + } + + +def _update_background_callback( + error_handler, callback_ctx, response, kwargs, background, multi +): + """Set up the background callback and manage jobs.""" + callback_manager = _get_callback_manager(kwargs, background) + + cache_key = flask.request.args.get("cacheKey") + job_id = flask.request.args.get("job") + + _progress_background_callback(response, callback_manager, background) + + output_value = callback_manager.get_result(cache_key, job_id) + + return _handle_rest_background_callback( + output_value, callback_manager, response, error_handler, callback_ctx, multi + ) + + +def _handle_rest_background_callback( + output_value, + callback_manager, + response, + error_handler, + callback_ctx, + multi, + has_update=False, +): + cache_key = flask.request.args.get("cacheKey") + job_id = flask.request.args.get("job") + # Must get job_running after get_result since get_results terminates it. + job_running = callback_manager.job_running(job_id) + if not job_running and output_value is callback_manager.UNDEFINED: + # Job canceled -> no output to close the loop. + output_value = NoUpdate() + + elif isinstance(output_value, dict) and "background_callback_error" in output_value: + error = output_value.get("background_callback_error", {}) + exc = BackgroundCallbackError( + f"An error occurred inside a background callback: {error['msg']}\n{error['tb']}" + ) + if error_handler: + output_value = error_handler(exc) + + if output_value is None: + output_value = NoUpdate() + # set_props from the error handler uses the original ctx + # instead of manager.get_updated_props since it runs in the + # request process. + has_update = ( + _set_side_update(callback_ctx, response) or output_value is not None + ) + else: + raise exc + + if job_running and output_value is not callback_manager.UNDEFINED: + # cached results. + callback_manager.terminate_job(job_id) + + if multi and isinstance(output_value, (list, tuple)): + output_value = [ + NoUpdate() if NoUpdate.is_no_update(r) else r for r in output_value + ] + updated_props = callback_manager.get_updated_props(cache_key) + if len(updated_props) > 0: + response["sideUpdate"] = updated_props + has_update = True + + if output_value is callback_manager.UNDEFINED: + return to_json(response), has_update, True + return output_value, has_update, False + + +# pylint: disable=too-many-branches +def _prepare_response( + output_value, + output_spec, + multi, + response, + callback_ctx, + app, + original_packages, + background, + has_update, + has_output, + output, + callback_id, + allow_dynamic_callbacks, +): + """Prepare the response object based on the callback output.""" + component_ids = collections.defaultdict(dict) + + if has_output: + if not multi: + output_value, output_spec = [output_value], [output_spec] + flat_output_values = output_value + else: + if isinstance(output_value, (list, tuple)): + # For multi-output, allow top-level collection to be + # list or tuple + output_value = list(output_value) + if NoUpdate.is_no_update(output_value): + flat_output_values = [output_value] + else: + # Flatten grouping and validate grouping structure + flat_output_values = flatten_grouping(output_value, output) + + if not NoUpdate.is_no_update(output_value): + _validate.validate_multi_return( + output_spec, flat_output_values, callback_id + ) + + for val, spec in zip(flat_output_values, output_spec): + if NoUpdate.is_no_update(val): + continue + for vali, speci in ( + zip(val, spec) if isinstance(spec, list) else [[val, spec]] # type: ignore[reportArgumentType] + ): + if not NoUpdate.is_no_update(vali): + has_update = True + id_str = stringify_id(speci["id"]) + prop = clean_property_name(speci["property"]) + component_ids[id_str][prop] = vali + + else: + if output_value is not None: + raise InvalidCallbackReturnValue( + f"No-output callback received return value: {output_value}" + ) + + if not background: + has_update = _set_side_update(callback_ctx, response) or has_output + + if not has_update: + raise PreventUpdate + + if len(ComponentRegistry.registry) != len(original_packages): + diff_packages = list( + set(ComponentRegistry.registry).difference(original_packages) + ) + if not allow_dynamic_callbacks: + raise ImportedInsideCallbackError( + f"Component librar{'y' if len(diff_packages) == 1 else 'ies'} was imported during callback.\n" + "You can set `_allow_dynamic_callbacks` to allow for development purpose only." + ) + dist = app.get_dist(diff_packages) + response["dist"] = dist + return response.update({"response": component_ids}) + + +# pylint: disable=too-many-branches,too-many-statements +def register_callback( + callback_list, callback_map, config_prevent_initial_callbacks, *_args, **_kwargs +): + ( + output, + flat_inputs, + flat_state, + inputs_state_indices, + prevent_initial_call, + ) = handle_grouped_callback_args(_args, _kwargs) + if isinstance(output, Output): + # Insert callback with scalar (non-multi) Output + insert_output = output + multi = False + has_output = True + else: + # Insert callback as multi Output + insert_output = flatten_grouping(output) + multi = True + has_output = len(output) > 0 + + background = _kwargs.get("background") + manager = _kwargs.get("manager") + running = _kwargs.get("running") + on_error = _kwargs.get("on_error") + if running is not None: + if not isinstance(running[0], (list, tuple)): + running = [running] + running = { + "running": {str(r[0]): r[1] for r in running}, + "runningOff": {str(r[0]): r[2] for r in running}, + } + allow_dynamic_callbacks = _kwargs.get("_allow_dynamic_callbacks") + + output_indices = make_grouping_by_index(output, list(range(grouping_len(output)))) + callback_id = insert_callback( + callback_list, + callback_map, + config_prevent_initial_callbacks, + insert_output, + output_indices, + flat_inputs, + flat_state, + inputs_state_indices, + prevent_initial_call, + background=background, + manager=manager, + dynamic_creator=allow_dynamic_callbacks, + running=running, + no_output=not has_output, + ) + + # pylint: disable=too-many-locals + def wrap_func(func): + if background is None: + background_key = None + else: + background_key = BaseBackgroundCallbackManager.register_func( + func, + background.get("progress") is not None, + callback_id, + ) + + @wraps(func) + def add_context(*args, **kwargs): + """Handles synchronous callbacks with context management.""" + error_handler = on_error or kwargs.pop("app_on_error", None) + + ( + output_spec, + callback_ctx, + func_args, + func_kwargs, + app, + original_packages, + has_update, + ) = _initialize_context( + args, kwargs, inputs_state_indices, has_output, insert_output + ) + + response: dict = {"multi": True} + + jsonResponse = None + try: + if background is not None: + if not flask.request.args.get("cacheKey"): + return _setup_background_callback( + kwargs, + background, + background_key, + func, + func_args, + func_kwargs, + callback_ctx, + ) + + output_value, has_update, skip = _update_background_callback( + error_handler, callback_ctx, response, kwargs, background, multi + ) + if skip: + return output_value + else: + output_value = _invoke_callback(func, *func_args, **func_kwargs) # type: ignore[reportArgumentType] + except PreventUpdate: + raise + except Exception as err: # pylint: disable=broad-exception-caught + if error_handler: + output_value = error_handler(err) + if output_value is None and output_spec: + output_value = NoUpdate() + else: + raise err + + _prepare_response( + output_value, + output_spec, + multi, + response, + callback_ctx, + app, + original_packages, + background, + has_update, + has_output, + output, + callback_id, + allow_dynamic_callbacks, + ) + try: + jsonResponse = to_json(response) + except TypeError: + _validate.fail_callback_output(output_value, output) + + return jsonResponse + + @wraps(func) + async def async_add_context(*args, **kwargs): + """Handles async callbacks with context management.""" + error_handler = on_error or kwargs.pop("app_on_error", None) + + ( + output_spec, + callback_ctx, + func_args, + func_kwargs, + app, + original_packages, + has_update, + ) = _initialize_context( + args, kwargs, inputs_state_indices, has_output, insert_output + ) + + response: dict = {"multi": True} + + try: + if background is not None: + if not flask.request.args.get("cacheKey"): + return _setup_background_callback( + kwargs, + background, + background_key, + func, + func_args, + func_kwargs, + callback_ctx, + ) + output_value, has_update, skip = _update_background_callback( + error_handler, callback_ctx, response, kwargs, background, multi + ) + if skip: + return output_value + else: + output_value = await _async_invoke_callback( + func, *func_args, **func_kwargs + ) + except PreventUpdate: + raise + except Exception as err: # pylint: disable=broad-exception-caught + if error_handler: + output_value = error_handler(err) + if output_value is None and output_spec: + output_value = NoUpdate() + else: + raise err + + _prepare_response( + output_value, + output_spec, + multi, + response, + callback_ctx, + app, + original_packages, + background, + has_update, + has_output, + output, + callback_id, + allow_dynamic_callbacks, + ) + try: + jsonResponse = to_json(response) + except TypeError: + _validate.fail_callback_output(output_value, output) + + return jsonResponse + + if asyncio.iscoroutinefunction(func): + callback_map[callback_id]["callback"] = async_add_context + else: + callback_map[callback_id]["callback"] = add_context + + return func + + return wrap_func + + +_inline_clientside_template = """ +(function() {{ + var clientside = window.dash_clientside = window.dash_clientside || {{}}; + var ns = clientside["{namespace}"] = clientside["{namespace}"] || {{}}; + ns["{function_name}"] = {clientside_function}; +}})(); +""" + + +def register_clientside_callback( + callback_list, + callback_map, + config_prevent_initial_callbacks, + inline_scripts, + clientside_function: ClientsideFuncType, + *args, + **kwargs, +): + output, inputs, state, prevent_initial_call = handle_callback_args(args, kwargs) + no_output = isinstance(output, (list,)) and len(output) == 0 + insert_callback( + callback_list, + callback_map, + config_prevent_initial_callbacks, + output, + None, + inputs, + state, + None, + prevent_initial_call, + no_output=no_output, + ) + + # If JS source is explicitly given, create a namespace and function + # name, then inject the code. + if isinstance(clientside_function, str): + namespace = "_dashprivate_clientside_funcs" + # Create a hash from the function, it will be the same always + function_name = hashlib.sha256(clientside_function.encode("utf-8")).hexdigest() + + inline_scripts.append( + _inline_clientside_template.format( + namespace=namespace, + function_name=function_name, + clientside_function=clientside_function, + ) + ) + + # Callback is stored in an external asset. + else: + namespace = clientside_function.namespace + function_name = clientside_function.function_name + + callback_list[-1]["clientside_function"] = { + "namespace": namespace, + "function_name": function_name, + } |