diff options
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py')
-rw-r--r-- | venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py | 898 |
1 files changed, 898 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py new file mode 100644 index 0000000..b3d55f3 --- /dev/null +++ b/venv/lib/python3.8/site-packages/narwhals/_duckdb/expr.py @@ -0,0 +1,898 @@ +from __future__ import annotations + +import contextlib +import operator +from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, cast + +from duckdb import CoalesceOperator, FunctionExpression, StarExpression +from duckdb.typing import DuckDBPyType + +from narwhals._compliant import LazyExpr +from narwhals._compliant.window import WindowInputs +from narwhals._duckdb.expr_dt import DuckDBExprDateTimeNamespace +from narwhals._duckdb.expr_list import DuckDBExprListNamespace +from narwhals._duckdb.expr_str import DuckDBExprStringNamespace +from narwhals._duckdb.expr_struct import DuckDBExprStructNamespace +from narwhals._duckdb.utils import ( + col, + generate_order_by_sql, + generate_partition_by_sql, + lit, + narwhals_to_native_dtype, + when, +) +from narwhals._expression_parsing import ExprKind +from narwhals._utils import Implementation, not_implemented, requires + +if TYPE_CHECKING: + from duckdb import Expression + from typing_extensions import Self + + from narwhals._compliant.typing import ( + AliasNames, + EvalNames, + EvalSeries, + WindowFunction, + ) + from narwhals._duckdb.dataframe import DuckDBLazyFrame + from narwhals._duckdb.namespace import DuckDBNamespace + from narwhals._expression_parsing import ExprMetadata + from narwhals._utils import Version, _FullContext + from narwhals.typing import ( + FillNullStrategy, + IntoDType, + NonNestedLiteral, + NumericLiteral, + RankMethod, + RollingInterpolationMethod, + TemporalLiteral, + ) + + DuckDBWindowFunction = WindowFunction[DuckDBLazyFrame, Expression] + DuckDBWindowInputs = WindowInputs[Expression] + + +with contextlib.suppress(ImportError): # requires duckdb>=1.3.0 + from duckdb import SQLExpression + + +class DuckDBExpr(LazyExpr["DuckDBLazyFrame", "Expression"]): + _implementation = Implementation.DUCKDB + + def __init__( + self, + call: EvalSeries[DuckDBLazyFrame, Expression], + window_function: DuckDBWindowFunction | None = None, + *, + evaluate_output_names: EvalNames[DuckDBLazyFrame], + alias_output_names: AliasNames | None, + backend_version: tuple[int, ...], + version: Version, + ) -> None: + self._call = call + self._evaluate_output_names = evaluate_output_names + self._alias_output_names = alias_output_names + self._backend_version = backend_version + self._version = version + self._metadata: ExprMetadata | None = None + self._window_function: DuckDBWindowFunction | None = window_function + + @property + def window_function(self) -> DuckDBWindowFunction: + def default_window_func( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + assert not window_inputs.order_by # noqa: S101 + partition_by_sql = generate_partition_by_sql(*window_inputs.partition_by) + template = f"{{expr}} over ({partition_by_sql})" + return [SQLExpression(template.format(expr=expr)) for expr in self(df)] + + return self._window_function or default_window_func + + def __call__(self, df: DuckDBLazyFrame) -> Sequence[Expression]: + return self._call(df) + + def __narwhals_expr__(self) -> None: ... + + def __narwhals_namespace__(self) -> DuckDBNamespace: # pragma: no cover + # Unused, just for compatibility with PandasLikeExpr + from narwhals._duckdb.namespace import DuckDBNamespace + + return DuckDBNamespace( + backend_version=self._backend_version, version=self._version + ) + + def _cum_window_func( + self, + *, + reverse: bool, + func_name: Literal["sum", "max", "min", "count", "product"], + ) -> DuckDBWindowFunction: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=not reverse) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + sql = ( + f"{func_name} ({{expr}}) over ({partition_by_sql} {order_by_sql} " + "rows between unbounded preceding and current row)" + ) + return [SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return func + + def _rolling_window_func( + self, + *, + func_name: Literal["sum", "mean", "std", "var"], + center: bool, + window_size: int, + min_samples: int, + ddof: int | None = None, + ) -> DuckDBWindowFunction: + supported_funcs = ["sum", "mean", "std", "var"] + if center: + half = (window_size - 1) // 2 + remainder = (window_size - 1) % 2 + start = f"{half + remainder} preceding" + end = f"{half} following" + else: + start = f"{window_size - 1} preceding" + end = "current row" + + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + window = f"({partition_by_sql} {order_by_sql} rows between {start} and {end})" + if func_name in {"sum", "mean"}: + func_: str = func_name + elif func_name == "var" and ddof == 0: + func_ = "var_pop" + elif func_name in "var" and ddof == 1: + func_ = "var_samp" + elif func_name == "std" and ddof == 0: + func_ = "stddev_pop" + elif func_name == "std" and ddof == 1: + func_ = "stddev_samp" + elif func_name in {"var", "std"}: # pragma: no cover + msg = f"Only ddof=0 and ddof=1 are currently supported for rolling_{func_name}." + raise ValueError(msg) + else: # pragma: no cover + msg = f"Only the following functions are supported: {supported_funcs}.\nGot: {func_name}." + raise ValueError(msg) + condition_sql = f"count({{expr}}) over {window} >= {min_samples}" + value_sql = f"{func_}({{expr}}) over {window}" + return [ + when( + SQLExpression(condition_sql.format(expr=expr)), + SQLExpression(value_sql.format(expr=expr)), + ) + for expr in self(df) + ] + + return func + + def broadcast(self, kind: Literal[ExprKind.AGGREGATION, ExprKind.LITERAL]) -> Self: + if kind is ExprKind.LITERAL: + return self + if self._backend_version < (1, 3): + msg = "At least version 1.3 of DuckDB is required for binary operations between aggregates and columns." + raise NotImplementedError(msg) + return self.over([lit(1)], []) + + @classmethod + def from_column_names( + cls, + evaluate_column_names: EvalNames[DuckDBLazyFrame], + /, + *, + context: _FullContext, + ) -> Self: + def func(df: DuckDBLazyFrame) -> list[Expression]: + return [col(name) for name in evaluate_column_names(df)] + + return cls( + func, + evaluate_output_names=evaluate_column_names, + alias_output_names=None, + backend_version=context._backend_version, + version=context._version, + ) + + @classmethod + def from_column_indices(cls, *column_indices: int, context: _FullContext) -> Self: + def func(df: DuckDBLazyFrame) -> list[Expression]: + columns = df.columns + return [col(columns[i]) for i in column_indices] + + return cls( + func, + evaluate_output_names=cls._eval_names_indices(column_indices), + alias_output_names=None, + backend_version=context._backend_version, + version=context._version, + ) + + def _callable_to_eval_series( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> EvalSeries[DuckDBLazyFrame, Expression]: + def func(df: DuckDBLazyFrame) -> list[Expression]: + native_series_list = self(df) + other_native_series = { + key: df._evaluate_expr(value) if self._is_expr(value) else lit(value) + for key, value in expressifiable_args.items() + } + return [ + call(native_series, **other_native_series) + for native_series in native_series_list + ] + + return func + + def _push_down_window_function( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> DuckDBWindowFunction: + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + # If a function `f` is elementwise, and `g` is another function, then + # - `f(g) over (window)` + # - `f(g over (window)) + # are equivalent. + # Make sure to only use with if `call` is elementwise! + native_series_list = self.window_function(df, window_inputs) + other_native_series = { + key: df._evaluate_window_expr(value, window_inputs) + if self._is_expr(value) + else lit(value) + for key, value in expressifiable_args.items() + } + return [ + call(native_series, **other_native_series) + for native_series in native_series_list + ] + + return window_f + + def _with_callable( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> Self: + """Create expression from callable. + + Arguments: + call: Callable from compliant DataFrame to native Expression + expr_name: Expression name + expressifiable_args: arguments pass to expression which should be parsed + as expressions (e.g. in `nw.col('a').is_between('b', 'c')`) + """ + return self.__class__( + self._callable_to_eval_series(call, **expressifiable_args), + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_elementwise( + self, call: Callable[..., Expression], /, **expressifiable_args: Self | Any + ) -> Self: + return self.__class__( + self._callable_to_eval_series(call, **expressifiable_args), + self._push_down_window_function(call, **expressifiable_args), + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_binary(self, op: Callable[..., Expression], other: Self | Any) -> Self: + return self.__class__( + self._callable_to_eval_series(op, other=other), + self._push_down_window_function(op, other=other), + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_alias_output_names(self, func: AliasNames | None, /) -> Self: + return type(self)( + self._call, + self._window_function, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=func, + backend_version=self._backend_version, + version=self._version, + ) + + def _with_window_function(self, window_function: DuckDBWindowFunction) -> Self: + return self.__class__( + self._call, + window_function, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + @classmethod + def _alias_native(cls, expr: Expression, name: str) -> Expression: + return expr.alias(name) + + def __and__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr & other, other) + + def __or__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr | other, other) + + def __add__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr + other, other) + + def __truediv__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr / other, other) + + def __rtruediv__(self, other: DuckDBExpr) -> Self: + return self._with_binary( + lambda expr, other: other.__truediv__(expr), other + ).alias("literal") + + def __floordiv__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr // other, other) + + def __rfloordiv__(self, other: DuckDBExpr) -> Self: + return self._with_binary( + lambda expr, other: other.__floordiv__(expr), other + ).alias("literal") + + def __mod__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr % other, other) + + def __rmod__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: other.__mod__(expr), other).alias( + "literal" + ) + + def __sub__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr - other, other) + + def __rsub__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: other.__sub__(expr), other).alias( + "literal" + ) + + def __mul__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr * other, other) + + def __pow__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr**other, other) + + def __rpow__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: other.__pow__(expr), other).alias( + "literal" + ) + + def __lt__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr < other, other) + + def __gt__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr > other, other) + + def __le__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr <= other, other) + + def __ge__(self, other: DuckDBExpr) -> Self: + return self._with_binary(lambda expr, other: expr >= other, other) + + def __eq__(self, other: DuckDBExpr) -> Self: # type: ignore[override] + return self._with_binary(lambda expr, other: expr == other, other) + + def __ne__(self, other: DuckDBExpr) -> Self: # type: ignore[override] + return self._with_binary(lambda expr, other: expr != other, other) + + def __invert__(self) -> Self: + invert = cast("Callable[..., Expression]", operator.invert) + return self._with_elementwise(invert) + + def abs(self) -> Self: + return self._with_elementwise(lambda expr: FunctionExpression("abs", expr)) + + def mean(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("mean", expr)) + + def skew(self) -> Self: + def func(expr: Expression) -> Expression: + count = FunctionExpression("count", expr) + # Adjust population skewness by correction factor to get sample skewness + sample_skewness = ( + FunctionExpression("skewness", expr) + * (count - lit(2)) + / FunctionExpression("sqrt", count * (count - lit(1))) + ) + return when(count == lit(0), lit(None)).otherwise( + when(count == lit(1), lit(float("nan"))).otherwise( + when(count == lit(2), lit(0.0)).otherwise(sample_skewness) + ) + ) + + return self._with_callable(func) + + def median(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("median", expr)) + + def all(self) -> Self: + def f(expr: Expression) -> Expression: + return CoalesceOperator(FunctionExpression("bool_and", expr), lit(True)) # noqa: FBT003 + + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + pb = generate_partition_by_sql(*window_inputs.partition_by) + return [ + CoalesceOperator( + SQLExpression(f"{FunctionExpression('bool_and', expr)} over ({pb})"), + lit(True), # noqa: FBT003 + ) + for expr in self(df) + ] + + return self._with_callable(f)._with_window_function(window_f) + + def any(self) -> Self: + def f(expr: Expression) -> Expression: + return CoalesceOperator(FunctionExpression("bool_or", expr), lit(False)) # noqa: FBT003 + + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + pb = generate_partition_by_sql(*window_inputs.partition_by) + return [ + CoalesceOperator( + SQLExpression(f"{FunctionExpression('bool_or', expr)} over ({pb})"), + lit(False), # noqa: FBT003 + ) + for expr in self(df) + ] + + return self._with_callable(f)._with_window_function(window_f) + + def quantile( + self, quantile: float, interpolation: RollingInterpolationMethod + ) -> Self: + def func(expr: Expression) -> Expression: + if interpolation == "linear": + return FunctionExpression("quantile_cont", expr, lit(quantile)) + msg = "Only linear interpolation methods are supported for DuckDB quantile." + raise NotImplementedError(msg) + + return self._with_callable(func) + + def clip( + self, + lower_bound: Self | NumericLiteral | TemporalLiteral | None, + upper_bound: Self | NumericLiteral | TemporalLiteral | None, + ) -> Self: + def _clip_lower(expr: Expression, lower_bound: Any) -> Expression: + return FunctionExpression("greatest", expr, lower_bound) + + def _clip_upper(expr: Expression, upper_bound: Any) -> Expression: + return FunctionExpression("least", expr, upper_bound) + + def _clip_both( + expr: Expression, lower_bound: Any, upper_bound: Any + ) -> Expression: + return FunctionExpression( + "greatest", FunctionExpression("least", expr, upper_bound), lower_bound + ) + + if lower_bound is None: + return self._with_elementwise(_clip_upper, upper_bound=upper_bound) + if upper_bound is None: + return self._with_elementwise(_clip_lower, lower_bound=lower_bound) + return self._with_elementwise( + _clip_both, lower_bound=lower_bound, upper_bound=upper_bound + ) + + def sum(self) -> Self: + def f(expr: Expression) -> Expression: + return CoalesceOperator(FunctionExpression("sum", expr), lit(0)) + + def window_f( + df: DuckDBLazyFrame, window_inputs: DuckDBWindowInputs + ) -> list[Expression]: + pb = generate_partition_by_sql(*window_inputs.partition_by) + return [ + CoalesceOperator( + SQLExpression(f"{FunctionExpression('sum', expr)} over ({pb})"), + lit(0), + ) + for expr in self(df) + ] + + return self._with_callable(f)._with_window_function(window_f) + + def n_unique(self) -> Self: + def func(expr: Expression) -> Expression: + # https://stackoverflow.com/a/79338887/4451315 + return FunctionExpression( + "array_unique", FunctionExpression("array_agg", expr) + ) + FunctionExpression( + "max", when(expr.isnotnull(), lit(0)).otherwise(lit(1)) + ) + + return self._with_callable(func) + + def count(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("count", expr)) + + def len(self) -> Self: + return self._with_callable(lambda _expr: FunctionExpression("count")) + + def std(self, ddof: int) -> Self: + if ddof == 0: + return self._with_callable( + lambda expr: FunctionExpression("stddev_pop", expr) + ) + if ddof == 1: + return self._with_callable( + lambda expr: FunctionExpression("stddev_samp", expr) + ) + + def _std(expr: Expression) -> Expression: + n_samples = FunctionExpression("count", expr) + return ( + FunctionExpression("stddev_pop", expr) + * FunctionExpression("sqrt", n_samples) + / (FunctionExpression("sqrt", (n_samples - lit(ddof)))) + ) + + return self._with_callable(_std) + + def var(self, ddof: int) -> Self: + if ddof == 0: + return self._with_callable(lambda expr: FunctionExpression("var_pop", expr)) + if ddof == 1: + return self._with_callable(lambda expr: FunctionExpression("var_samp", expr)) + + def _var(expr: Expression) -> Expression: + n_samples = FunctionExpression("count", expr) + return ( + FunctionExpression("var_pop", expr) * n_samples / (n_samples - lit(ddof)) + ) + + return self._with_callable(_var) + + def max(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("max", expr)) + + def min(self) -> Self: + return self._with_callable(lambda expr: FunctionExpression("min", expr)) + + def null_count(self) -> Self: + return self._with_callable( + lambda expr: FunctionExpression("sum", expr.isnull().cast("int")) + ) + + @requires.backend_version((1, 3)) + def over( + self, partition_by: Sequence[str | Expression], order_by: Sequence[str] + ) -> Self: + def func(df: DuckDBLazyFrame) -> Sequence[Expression]: + return self.window_function(df, WindowInputs(partition_by, order_by)) + + return self.__class__( + func, + evaluate_output_names=self._evaluate_output_names, + alias_output_names=self._alias_output_names, + backend_version=self._backend_version, + version=self._version, + ) + + def is_null(self) -> Self: + return self._with_elementwise(lambda expr: expr.isnull()) + + def is_nan(self) -> Self: + return self._with_elementwise(lambda expr: FunctionExpression("isnan", expr)) + + def is_finite(self) -> Self: + return self._with_elementwise(lambda expr: FunctionExpression("isfinite", expr)) + + def is_in(self, other: Sequence[Any]) -> Self: + return self._with_elementwise( + lambda expr: FunctionExpression("contains", lit(other), expr) + ) + + def round(self, decimals: int) -> Self: + return self._with_elementwise( + lambda expr: FunctionExpression("round", expr, lit(decimals)) + ) + + @requires.backend_version((1, 3)) + def shift(self, n: int) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + sql = f"lag({{expr}}, {n}) over ({partition_by_sql} {order_by_sql})" + return [SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def is_first_distinct(self) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + if inputs.partition_by: + partition_by_sql = ( + generate_partition_by_sql(*inputs.partition_by) + ", {expr}" + ) + else: + partition_by_sql = "partition by {expr}" + sql = ( + f"{FunctionExpression('row_number')} " + f"over({partition_by_sql} {order_by_sql})" + ) + return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def is_last_distinct(self) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=False) + if inputs.partition_by: + partition_by_sql = ( + generate_partition_by_sql(*inputs.partition_by) + ", {expr}" + ) + else: + partition_by_sql = "partition by {expr}" + sql = ( + f"{FunctionExpression('row_number')} " + f"over({partition_by_sql} {order_by_sql})" + ) + return [SQLExpression(sql.format(expr=expr)) == lit(1) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def diff(self) -> Self: + def func(df: DuckDBLazyFrame, inputs: DuckDBWindowInputs) -> list[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + sql = f"lag({{expr}}) over ({partition_by_sql} {order_by_sql})" + return [expr - SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return self._with_window_function(func) + + @requires.backend_version((1, 3)) + def cum_sum(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="sum") + ) + + @requires.backend_version((1, 3)) + def cum_max(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="max") + ) + + @requires.backend_version((1, 3)) + def cum_min(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="min") + ) + + @requires.backend_version((1, 3)) + def cum_count(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="count") + ) + + @requires.backend_version((1, 3)) + def cum_prod(self, *, reverse: bool) -> Self: + return self._with_window_function( + self._cum_window_func(reverse=reverse, func_name="product") + ) + + @requires.backend_version((1, 3)) + def rolling_sum(self, window_size: int, *, min_samples: int, center: bool) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="sum", + center=center, + window_size=window_size, + min_samples=min_samples, + ) + ) + + @requires.backend_version((1, 3)) + def rolling_mean(self, window_size: int, *, min_samples: int, center: bool) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="mean", + center=center, + window_size=window_size, + min_samples=min_samples, + ) + ) + + @requires.backend_version((1, 3)) + def rolling_var( + self, window_size: int, *, min_samples: int, center: bool, ddof: int + ) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="var", + center=center, + window_size=window_size, + min_samples=min_samples, + ddof=ddof, + ) + ) + + @requires.backend_version((1, 3)) + def rolling_std( + self, window_size: int, *, min_samples: int, center: bool, ddof: int + ) -> Self: + return self._with_window_function( + self._rolling_window_func( + func_name="std", + center=center, + window_size=window_size, + min_samples=min_samples, + ddof=ddof, + ) + ) + + def fill_null( + self, + value: Self | NonNestedLiteral, + strategy: FillNullStrategy | None, + limit: int | None, + ) -> Self: + if strategy is not None: + if self._backend_version < (1, 3): # pragma: no cover + msg = f"`fill_null` with `strategy={strategy}` is only available in 'duckdb>=1.3.0'." + raise NotImplementedError(msg) + + def _fill_with_strategy( + df: DuckDBLazyFrame, inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + order_by_sql = generate_order_by_sql(*inputs.order_by, ascending=True) + partition_by_sql = generate_partition_by_sql(*inputs.partition_by) + + fill_func = "last_value" if strategy == "forward" else "first_value" + _limit = "unbounded" if limit is None else limit + rows_between = ( + f"{_limit} preceding and current row" + if strategy == "forward" + else f"current row and {_limit} following" + ) + sql = ( + f"{fill_func}({{expr}} ignore nulls) over " + f"({partition_by_sql} {order_by_sql} rows between {rows_between})" + ) + return [SQLExpression(sql.format(expr=expr)) for expr in self(df)] + + return self._with_window_function(_fill_with_strategy) + + def _fill_constant(expr: Expression, value: Any) -> Expression: + return CoalesceOperator(expr, value) + + return self._with_elementwise(_fill_constant, value=value) + + def cast(self, dtype: IntoDType) -> Self: + def func(expr: Expression) -> Expression: + native_dtype = narwhals_to_native_dtype(dtype, self._version) + return expr.cast(DuckDBPyType(native_dtype)) + + return self._with_elementwise(func) + + @requires.backend_version((1, 3)) + def is_unique(self) -> Self: + def _is_unique(expr: Expression, *partition_by: str | Expression) -> Expression: + pb = generate_partition_by_sql(expr, *partition_by) + sql = f"{FunctionExpression('count', col('*'))} over ({pb})" + return SQLExpression(sql) == lit(1) + + def _unpartitioned_is_unique(expr: Expression) -> Expression: + return _is_unique(expr) + + def _partitioned_is_unique( + df: DuckDBLazyFrame, inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + assert not inputs.order_by # noqa: S101 + return [_is_unique(expr, *inputs.partition_by) for expr in self(df)] + + return self._with_callable(_unpartitioned_is_unique)._with_window_function( + _partitioned_is_unique + ) + + @requires.backend_version((1, 3)) + def rank(self, method: RankMethod, *, descending: bool) -> Self: + if method in {"min", "max", "average"}: + func = FunctionExpression("rank") + elif method == "dense": + func = FunctionExpression("dense_rank") + else: # method == "ordinal" + func = FunctionExpression("row_number") + + def _rank( + expr: Expression, + *, + descending: bool, + partition_by: Sequence[str | Expression] | None = None, + ) -> Expression: + order_by_sql = ( + f"order by {expr} desc nulls last" + if descending + else f"order by {expr} asc nulls last" + ) + count_expr = FunctionExpression("count", StarExpression()) + if partition_by is not None: + window = f"{generate_partition_by_sql(*partition_by)} {order_by_sql}" + count_window = f"{generate_partition_by_sql(*partition_by, expr)}" + else: + window = order_by_sql + count_window = generate_partition_by_sql(expr) + if method == "max": + rank_expr = ( + SQLExpression(f"{func} OVER ({window})") + + SQLExpression(f"{count_expr} over ({count_window})") + - lit(1) + ) + elif method == "average": + rank_expr = SQLExpression(f"{func} OVER ({window})") + ( + SQLExpression(f"{count_expr} over ({count_window})") - lit(1) + ) / lit(2.0) + else: + rank_expr = SQLExpression(f"{func} OVER ({window})") + return when(expr.isnotnull(), rank_expr) + + def _unpartitioned_rank(expr: Expression) -> Expression: + return _rank(expr, descending=descending) + + def _partitioned_rank( + df: DuckDBLazyFrame, inputs: DuckDBWindowInputs + ) -> Sequence[Expression]: + assert not inputs.order_by # noqa: S101 + return [ + _rank(expr, descending=descending, partition_by=inputs.partition_by) + for expr in self(df) + ] + + return self._with_callable(_unpartitioned_rank)._with_window_function( + _partitioned_rank + ) + + def log(self, base: float) -> Self: + def _log(expr: Expression) -> Expression: + log = FunctionExpression("log", expr) + return ( + when(expr < lit(0), lit(float("nan"))) + .when(expr == lit(0), lit(float("-inf"))) + .otherwise(log / FunctionExpression("log", lit(base))) + ) + + return self._with_elementwise(_log) + + def exp(self) -> Self: + def _exp(expr: Expression) -> Expression: + return FunctionExpression("exp", expr) + + return self._with_elementwise(_exp) + + @property + def str(self) -> DuckDBExprStringNamespace: + return DuckDBExprStringNamespace(self) + + @property + def dt(self) -> DuckDBExprDateTimeNamespace: + return DuckDBExprDateTimeNamespace(self) + + @property + def list(self) -> DuckDBExprListNamespace: + return DuckDBExprListNamespace(self) + + @property + def struct(self) -> DuckDBExprStructNamespace: + return DuckDBExprStructNamespace(self) + + drop_nulls = not_implemented() + unique = not_implemented() |