aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py')
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py898
1 files changed, 898 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py
new file mode 100644
index 0000000..b3d55f3
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py
@@ -0,0 +1,898 @@
+from __future__ import annotations
+
+import contextlib
+import operator
+from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, cast
+
+from duckdb import CoalesceOperator, FunctionExpression, StarExpression
+from duckdb.typing import DuckDBPyType
+
+from narwhals._compliant import LazyExpr
+from narwhals._compliant.window import WindowInputs
+from narwhals._duckdb.expr_dt import DuckDBExprDateTimeNamespace
+from narwhals._duckdb.expr_list import DuckDBExprListNamespace
+from narwhals._duckdb.expr_str import DuckDBExprStringNamespace
+from narwhals._duckdb.expr_struct import DuckDBExprStructNamespace
+from narwhals._duckdb.utils import (
+ col,
+ generate_order_by_sql,
+ generate_partition_by_sql,
+ lit,
+ narwhals_to_native_dtype,
+ when,
+)
+from narwhals._expression_parsing import ExprKind
+from narwhals._utils import Implementation, not_implemented, requires
+
+if TYPE_CHECKING:
+ from duckdb import Expression
+ from typing_extensions import Self
+
+ from narwhals._compliant.typing import (
+ AliasNames,
+ EvalNames,
+ EvalSeries,
+ WindowFunction,
+ )
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame
+ from narwhals._duckdb.namespace import DuckDBNamespace
+ from narwhals._expression_parsing import ExprMetadata
+ from narwhals._utils import Version, _FullContext
+ from narwhals.typing import (
+ FillNullStrategy,
+ IntoDType,
+ NonNestedLiteral,
+ NumericLiteral,
+ RankMethod,
+ RollingInterpolationMethod,
+ TemporalLiteral,
+ )
+
+ DuckDBWindowFunction = WindowFunction[DuckDBLazyFrame, Expression]
+ DuckDBWindowInputs = WindowInputs[Expression]
+
+
+with contextlib.suppress(ImportError): # requires duckdb>=1.3.0
+ from duckdb import SQLExpression
+
+
+class DuckDBExpr(LazyExpr["DuckDBLazyFrame", "Expression"]):
+ _implementation = Implementation.DUCKDB
+
+ def __init__(
+ self,
+ call: EvalSeries[DuckDBLazyFrame, Expression],
+ window_function: DuckDBWindowFunction | None = None,
+ *,
+ evaluate_output_names: EvalNames[DuckDBLazyFrame],
+ alias_output_names: AliasNames | None,
+ backend_version: tuple[int, ...],
+ version: Version,
+ ) -> None:
+ self._call = call
+ self._evaluate_output_names = evaluate_output_names
+ self._alias_output_names = alias_output_names
+ self._backend_version = backend_version
+ self._version = version
+ self._metadata: ExprMetadata | None = None
+ self._window_function: DuckDBWindowFunction | None = window_function
+
+ @property
+ def window_function(self) -> DuckDBWindowFunction:
+ def default_window_func(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ assert not window_inputs.order_by # noqa: S101
+ partition_by_sql = generate_partition_by_sql(*window_inputs.partition_by)
+ template = f"{{expr}} over ({partition_by_sql})"
+ return [SQLExpression(template.format(expr=expr)) for expr in self(df)]
+
+ return self._window_function or default_window_func
+
+ def __call__(self, df: DuckDBLazyFrame) -> Sequence[Expression]:
+ return self._call(df)
+
+ def __narwhals_expr__(self) -> None: ...
+
+ def __narwhals_namespace__(self) -> DuckDBNamespace: # pragma: no cover
+ # Unused, just for compatibility with PandasLikeExpr
+ from narwhals._duckdb.namespace import DuckDBNamespace
+
+ return DuckDBNamespace(
+ backend_version=self._backend_version, version=self._version
+ )
+
+ def _cum_window_func(
+ self,
+ *,
+ reverse: bool,
+ func_name: Literal["sum", "max", "min", "count", "product"],
+ ) -> DuckDBWindowFunction:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=not reverse)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ sql = (
+ f"{func_name} ({{expr}}) over ({partition_by_sql} {order_by_sql} "
+ "rows between unbounded preceding and current row)"
+ )
+ return [SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return func
+
+ def _rolling_window_func(
+ self,
+ *,
+ func_name: Literal["sum", "mean", "std", "var"],
+ center: bool,
+ window_size: int,
+ min_samples: int,
+ ddof: int | None = None,
+ ) -> DuckDBWindowFunction:
+ supported_funcs = ["sum", "mean", "std", "var"]
+ if center:
+ half = (window_size - 1) // 2
+ remainder = (window_size - 1) % 2
+ start = f"{half + remainder} preceding"
+ end = f"{half} following"
+ else:
+ start = f"{window_size - 1} preceding"
+ end = "current row"
+
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ window = f"({partition_by_sql} {order_by_sql} rows between {start} and {end})"
+ if func_name in {"sum", "mean"}:
+ func_: str = func_name
+ elif func_name == "var" and ddof == 0:
+ func_ = "var_pop"
+ elif func_name in "var" and ddof == 1:
+ func_ = "var_samp"
+ elif func_name == "std" and ddof == 0:
+ func_ = "stddev_pop"
+ elif func_name == "std" and ddof == 1:
+ func_ = "stddev_samp"
+ elif func_name in {"var", "std"}: # pragma: no cover
+ msg = f"Only ddof=0 and ddof=1 are currently supported for rolling_{func_name}."
+ raise ValueError(msg)
+ else: # pragma: no cover
+ msg = f"Only the following functions are supported: {supported_funcs}.\nGot: {func_name}."
+ raise ValueError(msg)
+ condition_sql = f"count({{expr}}) over {window} >= {min_samples}"
+ value_sql = f"{func_}({{expr}}) over {window}"
+ return [
+ when(
+ SQLExpression(condition_sql.format(expr=expr)),
+ SQLExpression(value_sql.format(expr=expr)),
+ )
+ for expr in self(df)
+ ]
+
+ return func
+
+ def broadcast(self, kind: Literal[ExprKind.AGGREGATION, ExprKind.LITERAL]) -> Self:
+ if kind is ExprKind.LITERAL:
+ return self
+ if self._backend_version < (1, 3):
+ msg = "At least version 1.3 of DuckDB is required for binary operations between aggregates and columns."
+ raise NotImplementedError(msg)
+ return self.over([lit(1)], [])
+
+ @classmethod
+ def from_column_names(
+ cls,
+ evaluate_column_names: EvalNames[DuckDBLazyFrame],
+ /,
+ *,
+ context: _FullContext,
+ ) -> Self:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ return [col(name) for name in evaluate_column_names(df)]
+
+ return cls(
+ func,
+ evaluate_output_names=evaluate_column_names,
+ alias_output_names=None,
+ backend_version=context._backend_version,
+ version=context._version,
+ )
+
+ @classmethod
+ def from_column_indices(cls, *column_indices: int, context: _FullContext) -> Self:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ columns = df.columns
+ return [col(columns[i]) for i in column_indices]
+
+ return cls(
+ func,
+ evaluate_output_names=cls._eval_names_indices(column_indices),
+ alias_output_names=None,
+ backend_version=context._backend_version,
+ version=context._version,
+ )
+
+ def _callable_to_eval_series(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> EvalSeries[DuckDBLazyFrame, Expression]:
+ def func(df: DuckDBLazyFrame) -> list[Expression]:
+ native_series_list = self(df)
+ other_native_series = {
+ key: df._evaluate_expr(value) if self._is_expr(value) else lit(value)
+ for key, value in expressifiable_args.items()
+ }
+ return [
+ call(native_series, **other_native_series)
+ for native_series in native_series_list
+ ]
+
+ return func
+
+ def _push_down_window_function(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> DuckDBWindowFunction:
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ # If a function `f` is elementwise, and `g` is another function, then
+ # - `f(g) over (window)`
+ # - `f(g over (window))
+ # are equivalent.
+ # Make sure to only use with if `call` is elementwise!
+ native_series_list = self.window_function(df, window_inputs)
+ other_native_series = {
+ key: df._evaluate_window_expr(value, window_inputs)
+ if self._is_expr(value)
+ else lit(value)
+ for key, value in expressifiable_args.items()
+ }
+ return [
+ call(native_series, **other_native_series)
+ for native_series in native_series_list
+ ]
+
+ return window_f
+
+ def _with_callable(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> Self:
+ """Create expression from callable.
+
+ Arguments:
+ call: Callable from compliant DataFrame to native Expression
+ expr_name: Expression name
+ expressifiable_args: arguments pass to expression which should be parsed
+ as expressions (e.g. in `nw.col('a').is_between('b', 'c')`)
+ """
+ return self.__class__(
+ self._callable_to_eval_series(call, **expressifiable_args),
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_elementwise(
+ self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any
+ ) -> Self:
+ return self.__class__(
+ self._callable_to_eval_series(call, **expressifiable_args),
+ self._push_down_window_function(call, **expressifiable_args),
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_binary(self, op: Callable[..., Expression], other: Self | Any) -> Self:
+ return self.__class__(
+ self._callable_to_eval_series(op, other=other),
+ self._push_down_window_function(op, other=other),
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_alias_output_names(self, func: AliasNames | None, /) -> Self:
+ return type(self)(
+ self._call,
+ self._window_function,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=func,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def _with_window_function(self, window_function: DuckDBWindowFunction) -> Self:
+ return self.__class__(
+ self._call,
+ window_function,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ @classmethod
+ def _alias_native(cls, expr: Expression, name: str) -> Expression:
+ return expr.alias(name)
+
+ def __and__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr & other, other)
+
+ def __or__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr | other, other)
+
+ def __add__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr + other, other)
+
+ def __truediv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr / other, other)
+
+ def __rtruediv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(
+ lambda expr, other: other.__truediv__(expr), other
+ ).alias("literal")
+
+ def __floordiv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr // other, other)
+
+ def __rfloordiv__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(
+ lambda expr, other: other.__floordiv__(expr), other
+ ).alias("literal")
+
+ def __mod__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr % other, other)
+
+ def __rmod__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: other.__mod__(expr), other).alias(
+ "literal"
+ )
+
+ def __sub__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr - other, other)
+
+ def __rsub__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: other.__sub__(expr), other).alias(
+ "literal"
+ )
+
+ def __mul__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr * other, other)
+
+ def __pow__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr**other, other)
+
+ def __rpow__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: other.__pow__(expr), other).alias(
+ "literal"
+ )
+
+ def __lt__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr < other, other)
+
+ def __gt__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr > other, other)
+
+ def __le__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr <= other, other)
+
+ def __ge__(self, other: DuckDBExpr) -> Self:
+ return self._with_binary(lambda expr, other: expr >= other, other)
+
+ def __eq__(self, other: DuckDBExpr) -> Self: # type: ignore[override]
+ return self._with_binary(lambda expr, other: expr == other, other)
+
+ def __ne__(self, other: DuckDBExpr) -> Self: # type: ignore[override]
+ return self._with_binary(lambda expr, other: expr != other, other)
+
+ def __invert__(self) -> Self:
+ invert = cast("Callable[..., Expression]", operator.invert)
+ return self._with_elementwise(invert)
+
+ def abs(self) -> Self:
+ return self._with_elementwise(lambda expr: FunctionExpression("abs", expr))
+
+ def mean(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("mean", expr))
+
+ def skew(self) -> Self:
+ def func(expr: Expression) -> Expression:
+ count = FunctionExpression("count", expr)
+ # Adjust population skewness by correction factor to get sample skewness
+ sample_skewness = (
+ FunctionExpression("skewness", expr)
+ * (count - lit(2))
+ / FunctionExpression("sqrt", count * (count - lit(1)))
+ )
+ return when(count == lit(0), lit(None)).otherwise(
+ when(count == lit(1), lit(float("nan"))).otherwise(
+ when(count == lit(2), lit(0.0)).otherwise(sample_skewness)
+ )
+ )
+
+ return self._with_callable(func)
+
+ def median(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("median", expr))
+
+ def all(self) -> Self:
+ def f(expr: Expression) -> Expression:
+ return CoalesceOperator(FunctionExpression("bool_and", expr), lit(True)) # noqa: FBT003
+
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ pb = generate_partition_by_sql(*window_inputs.partition_by)
+ return [
+ CoalesceOperator(
+ SQLExpression(f"{FunctionExpression('bool_and', expr)} over ({pb})"),
+ lit(True), # noqa: FBT003
+ )
+ for expr in self(df)
+ ]
+
+ return self._with_callable(f)._with_window_function(window_f)
+
+ def any(self) -> Self:
+ def f(expr: Expression) -> Expression:
+ return CoalesceOperator(FunctionExpression("bool_or", expr), lit(False)) # noqa: FBT003
+
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ pb = generate_partition_by_sql(*window_inputs.partition_by)
+ return [
+ CoalesceOperator(
+ SQLExpression(f"{FunctionExpression('bool_or', expr)} over ({pb})"),
+ lit(False), # noqa: FBT003
+ )
+ for expr in self(df)
+ ]
+
+ return self._with_callable(f)._with_window_function(window_f)
+
+ def quantile(
+ self, quantile: float, interpolation: RollingInterpolationMethod
+ ) -> Self:
+ def func(expr: Expression) -> Expression:
+ if interpolation == "linear":
+ return FunctionExpression("quantile_cont", expr, lit(quantile))
+ msg = "Only linear interpolation methods are supported for DuckDB quantile."
+ raise NotImplementedError(msg)
+
+ return self._with_callable(func)
+
+ def clip(
+ self,
+ lower_bound: Self | NumericLiteral | TemporalLiteral | None,
+ upper_bound: Self | NumericLiteral | TemporalLiteral | None,
+ ) -> Self:
+ def _clip_lower(expr: Expression, lower_bound: Any) -> Expression:
+ return FunctionExpression("greatest", expr, lower_bound)
+
+ def _clip_upper(expr: Expression, upper_bound: Any) -> Expression:
+ return FunctionExpression("least", expr, upper_bound)
+
+ def _clip_both(
+ expr: Expression, lower_bound: Any, upper_bound: Any
+ ) -> Expression:
+ return FunctionExpression(
+ "greatest", FunctionExpression("least", expr, upper_bound), lower_bound
+ )
+
+ if lower_bound is None:
+ return self._with_elementwise(_clip_upper, upper_bound=upper_bound)
+ if upper_bound is None:
+ return self._with_elementwise(_clip_lower, lower_bound=lower_bound)
+ return self._with_elementwise(
+ _clip_both, lower_bound=lower_bound, upper_bound=upper_bound
+ )
+
+ def sum(self) -> Self:
+ def f(expr: Expression) -> Expression:
+ return CoalesceOperator(FunctionExpression("sum", expr), lit(0))
+
+ def window_f(
+ df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs
+ ) -> list[Expression]:
+ pb = generate_partition_by_sql(*window_inputs.partition_by)
+ return [
+ CoalesceOperator(
+ SQLExpression(f"{FunctionExpression('sum', expr)} over ({pb})"),
+ lit(0),
+ )
+ for expr in self(df)
+ ]
+
+ return self._with_callable(f)._with_window_function(window_f)
+
+ def n_unique(self) -> Self:
+ def func(expr: Expression) -> Expression:
+ # https://stackoverflow.com/a/79338887/4451315
+ return FunctionExpression(
+ "array_unique", FunctionExpression("array_agg", expr)
+ ) + FunctionExpression(
+ "max", when(expr.isnotnull(), lit(0)).otherwise(lit(1))
+ )
+
+ return self._with_callable(func)
+
+ def count(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("count", expr))
+
+ def len(self) -> Self:
+ return self._with_callable(lambda _expr: FunctionExpression("count"))
+
+ def std(self, ddof: int) -> Self:
+ if ddof == 0:
+ return self._with_callable(
+ lambda expr: FunctionExpression("stddev_pop", expr)
+ )
+ if ddof == 1:
+ return self._with_callable(
+ lambda expr: FunctionExpression("stddev_samp", expr)
+ )
+
+ def _std(expr: Expression) -> Expression:
+ n_samples = FunctionExpression("count", expr)
+ return (
+ FunctionExpression("stddev_pop", expr)
+ * FunctionExpression("sqrt", n_samples)
+ / (FunctionExpression("sqrt", (n_samples - lit(ddof))))
+ )
+
+ return self._with_callable(_std)
+
+ def var(self, ddof: int) -> Self:
+ if ddof == 0:
+ return self._with_callable(lambda expr: FunctionExpression("var_pop", expr))
+ if ddof == 1:
+ return self._with_callable(lambda expr: FunctionExpression("var_samp", expr))
+
+ def _var(expr: Expression) -> Expression:
+ n_samples = FunctionExpression("count", expr)
+ return (
+ FunctionExpression("var_pop", expr) * n_samples / (n_samples - lit(ddof))
+ )
+
+ return self._with_callable(_var)
+
+ def max(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("max", expr))
+
+ def min(self) -> Self:
+ return self._with_callable(lambda expr: FunctionExpression("min", expr))
+
+ def null_count(self) -> Self:
+ return self._with_callable(
+ lambda expr: FunctionExpression("sum", expr.isnull().cast("int"))
+ )
+
+ @requires.backend_version((1, 3))
+ def over(
+ self, partition_by: Sequence[str | Expression], order_by: Sequence[str]
+ ) -> Self:
+ def func(df: DuckDBLazyFrame) -> Sequence[Expression]:
+ return self.window_function(df, WindowInputs(partition_by, order_by))
+
+ return self.__class__(
+ func,
+ evaluate_output_names=self._evaluate_output_names,
+ alias_output_names=self._alias_output_names,
+ backend_version=self._backend_version,
+ version=self._version,
+ )
+
+ def is_null(self) -> Self:
+ return self._with_elementwise(lambda expr: expr.isnull())
+
+ def is_nan(self) -> Self:
+ return self._with_elementwise(lambda expr: FunctionExpression("isnan", expr))
+
+ def is_finite(self) -> Self:
+ return self._with_elementwise(lambda expr: FunctionExpression("isfinite", expr))
+
+ def is_in(self, other: Sequence[Any]) -> Self:
+ return self._with_elementwise(
+ lambda expr: FunctionExpression("contains", lit(other), expr)
+ )
+
+ def round(self, decimals: int) -> Self:
+ return self._with_elementwise(
+ lambda expr: FunctionExpression("round", expr, lit(decimals))
+ )
+
+ @requires.backend_version((1, 3))
+ def shift(self, n: int) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ sql = f"lag({{expr}}, {n}) over ({partition_by_sql} {order_by_sql})"
+ return [SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def is_first_distinct(self) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ if inputs.partition_by:
+ partition_by_sql = (
+ generate_partition_by_sql(*inputs.partition_by) + ", {expr}"
+ )
+ else:
+ partition_by_sql = "partition by {expr}"
+ sql = (
+ f"{FunctionExpression('row_number')} "
+ f"over({partition_by_sql} {order_by_sql})"
+ )
+ return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def is_last_distinct(self) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=False)
+ if inputs.partition_by:
+ partition_by_sql = (
+ generate_partition_by_sql(*inputs.partition_by) + ", {expr}"
+ )
+ else:
+ partition_by_sql = "partition by {expr}"
+ sql = (
+ f"{FunctionExpression('row_number')} "
+ f"over({partition_by_sql} {order_by_sql})"
+ )
+ return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def diff(self) -> Self:
+ def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+ sql = f"lag({{expr}}) over ({partition_by_sql} {order_by_sql})"
+ return [expr - SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return self._with_window_function(func)
+
+ @requires.backend_version((1, 3))
+ def cum_sum(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="sum")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_max(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="max")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_min(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="min")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_count(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="count")
+ )
+
+ @requires.backend_version((1, 3))
+ def cum_prod(self, *, reverse: bool) -> Self:
+ return self._with_window_function(
+ self._cum_window_func(reverse=reverse, func_name="product")
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_sum(self, window_size: int, *, min_samples: int, center: bool) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="sum",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ )
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_mean(self, window_size: int, *, min_samples: int, center: bool) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="mean",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ )
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_var(
+ self, window_size: int, *, min_samples: int, center: bool, ddof: int
+ ) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="var",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ ddof=ddof,
+ )
+ )
+
+ @requires.backend_version((1, 3))
+ def rolling_std(
+ self, window_size: int, *, min_samples: int, center: bool, ddof: int
+ ) -> Self:
+ return self._with_window_function(
+ self._rolling_window_func(
+ func_name="std",
+ center=center,
+ window_size=window_size,
+ min_samples=min_samples,
+ ddof=ddof,
+ )
+ )
+
+ def fill_null(
+ self,
+ value: Self | NonNestedLiteral,
+ strategy: FillNullStrategy | None,
+ limit: int | None,
+ ) -> Self:
+ if strategy is not None:
+ if self._backend_version < (1, 3): # pragma: no cover
+ msg = f"`fill_null` with `strategy={strategy}` is only available in 'duckdb>=1.3.0'."
+ raise NotImplementedError(msg)
+
+ def _fill_with_strategy(
+ df: DuckDBLazyFrame, inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True)
+ partition_by_sql = generate_partition_by_sql(*inputs.partition_by)
+
+ fill_func = "last_value" if strategy == "forward" else "first_value"
+ _limit = "unbounded" if limit is None else limit
+ rows_between = (
+ f"{_limit} preceding and current row"
+ if strategy == "forward"
+ else f"current row and {_limit} following"
+ )
+ sql = (
+ f"{fill_func}({{expr}} ignore nulls) over "
+ f"({partition_by_sql} {order_by_sql} rows between {rows_between})"
+ )
+ return [SQLExpression(sql.format(expr=expr)) for expr in self(df)]
+
+ return self._with_window_function(_fill_with_strategy)
+
+ def _fill_constant(expr: Expression, value: Any) -> Expression:
+ return CoalesceOperator(expr, value)
+
+ return self._with_elementwise(_fill_constant, value=value)
+
+ def cast(self, dtype: IntoDType) -> Self:
+ def func(expr: Expression) -> Expression:
+ native_dtype = narwhals_to_native_dtype(dtype, self._version)
+ return expr.cast(DuckDBPyType(native_dtype))
+
+ return self._with_elementwise(func)
+
+ @requires.backend_version((1, 3))
+ def is_unique(self) -> Self:
+ def _is_unique(expr: Expression, *partition_by: str | Expression) -> Expression:
+ pb = generate_partition_by_sql(expr, *partition_by)
+ sql = f"{FunctionExpression('count', col('*'))} over ({pb})"
+ return SQLExpression(sql) == lit(1)
+
+ def _unpartitioned_is_unique(expr: Expression) -> Expression:
+ return _is_unique(expr)
+
+ def _partitioned_is_unique(
+ df: DuckDBLazyFrame, inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ assert not inputs.order_by # noqa: S101
+ return [_is_unique(expr, *inputs.partition_by) for expr in self(df)]
+
+ return self._with_callable(_unpartitioned_is_unique)._with_window_function(
+ _partitioned_is_unique
+ )
+
+ @requires.backend_version((1, 3))
+ def rank(self, method: RankMethod, *, descending: bool) -> Self:
+ if method in {"min", "max", "average"}:
+ func = FunctionExpression("rank")
+ elif method == "dense":
+ func = FunctionExpression("dense_rank")
+ else: # method == "ordinal"
+ func = FunctionExpression("row_number")
+
+ def _rank(
+ expr: Expression,
+ *,
+ descending: bool,
+ partition_by: Sequence[str | Expression] | None = None,
+ ) -> Expression:
+ order_by_sql = (
+ f"order by {expr} desc nulls last"
+ if descending
+ else f"order by {expr} asc nulls last"
+ )
+ count_expr = FunctionExpression("count", StarExpression())
+ if partition_by is not None:
+ window = f"{generate_partition_by_sql(*partition_by)} {order_by_sql}"
+ count_window = f"{generate_partition_by_sql(*partition_by, expr)}"
+ else:
+ window = order_by_sql
+ count_window = generate_partition_by_sql(expr)
+ if method == "max":
+ rank_expr = (
+ SQLExpression(f"{func} OVER ({window})")
+ + SQLExpression(f"{count_expr} over ({count_window})")
+ - lit(1)
+ )
+ elif method == "average":
+ rank_expr = SQLExpression(f"{func} OVER ({window})") + (
+ SQLExpression(f"{count_expr} over ({count_window})") - lit(1)
+ ) / lit(2.0)
+ else:
+ rank_expr = SQLExpression(f"{func} OVER ({window})")
+ return when(expr.isnotnull(), rank_expr)
+
+ def _unpartitioned_rank(expr: Expression) -> Expression:
+ return _rank(expr, descending=descending)
+
+ def _partitioned_rank(
+ df: DuckDBLazyFrame, inputs: DuckDBWindowInputs
+ ) -> Sequence[Expression]:
+ assert not inputs.order_by # noqa: S101
+ return [
+ _rank(expr, descending=descending, partition_by=inputs.partition_by)
+ for expr in self(df)
+ ]
+
+ return self._with_callable(_unpartitioned_rank)._with_window_function(
+ _partitioned_rank
+ )
+
+ def log(self, base: float) -> Self:
+ def _log(expr: Expression) -> Expression:
+ log = FunctionExpression("log", expr)
+ return (
+ when(expr < lit(0), lit(float("nan")))
+ .when(expr == lit(0), lit(float("-inf")))
+ .otherwise(log / FunctionExpression("log", lit(base)))
+ )
+
+ return self._with_elementwise(_log)
+
+ def exp(self) -> Self:
+ def _exp(expr: Expression) -> Expression:
+ return FunctionExpression("exp", expr)
+
+ return self._with_elementwise(_exp)
+
+ @property
+ def str(self) -> DuckDBExprStringNamespace:
+ return DuckDBExprStringNamespace(self)
+
+ @property
+ def dt(self) -> DuckDBExprDateTimeNamespace:
+ return DuckDBExprDateTimeNamespace(self)
+
+ @property
+ def list(self) -> DuckDBExprListNamespace:
+ return DuckDBExprListNamespace(self)
+
+ @property
+ def struct(self) -> DuckDBExprStructNamespace:
+ return DuckDBExprStructNamespace(self)
+
+ drop_nulls = not_implemented()
+ unique = not_implemented()