aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py')
-rw-r--r--venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py771
1 files changed, 771 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py b/venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py
new file mode 100644
index 0000000..19763b9
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/narwhals/_arrow/dataframe.py
@@ -0,0 +1,771 @@
+from __future__ import annotations
+
+from functools import partial
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Collection,
+ Iterator,
+ Literal,
+ Mapping,
+ Sequence,
+ cast,
+ overload,
+)
+
+import pyarrow as pa
+import pyarrow.compute as pc
+
+from narwhals._arrow.series import ArrowSeries
+from narwhals._arrow.utils import align_series_full_broadcast, native_to_narwhals_dtype
+from narwhals._compliant import EagerDataFrame
+from narwhals._expression_parsing import ExprKind
+from narwhals._utils import (
+ Implementation,
+ Version,
+ check_column_names_are_unique,
+ convert_str_slice_to_int_slice,
+ generate_temporary_column_name,
+ not_implemented,
+ parse_columns_to_drop,
+ parse_version,
+ scale_bytes,
+ supports_arrow_c_stream,
+ validate_backend_version,
+)
+from narwhals.dependencies import is_numpy_array_1d
+from narwhals.exceptions import ShapeError
+
+if TYPE_CHECKING:
+ from io import BytesIO
+ from pathlib import Path
+ from types import ModuleType
+
+ import pandas as pd
+ import polars as pl
+ from typing_extensions import Self, TypeAlias, TypeIs
+
+ from narwhals._arrow.expr import ArrowExpr
+ from narwhals._arrow.group_by import ArrowGroupBy
+ from narwhals._arrow.namespace import ArrowNamespace
+ from narwhals._arrow.typing import ( # type: ignore[attr-defined]
+ ChunkedArrayAny,
+ Mask,
+ Order,
+ )
+ from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny
+ from narwhals._translate import IntoArrowTable
+ from narwhals._utils import Version, _FullContext
+ from narwhals.dtypes import DType
+ from narwhals.schema import Schema
+ from narwhals.typing import (
+ JoinStrategy,
+ SizedMultiIndexSelector,
+ SizedMultiNameSelector,
+ SizeUnit,
+ UniqueKeepStrategy,
+ _1DArray,
+ _2DArray,
+ _SliceIndex,
+ _SliceName,
+ )
+
+ JoinType: TypeAlias = Literal[
+ "left semi",
+ "right semi",
+ "left anti",
+ "right anti",
+ "inner",
+ "left outer",
+ "right outer",
+ "full outer",
+ ]
+ PromoteOptions: TypeAlias = Literal["none", "default", "permissive"]
+
+
+class ArrowDataFrame(EagerDataFrame["ArrowSeries", "ArrowExpr", "pa.Table"]):
+ def __init__(
+ self,
+ native_dataframe: pa.Table,
+ *,
+ backend_version: tuple[int, ...],
+ version: Version,
+ validate_column_names: bool,
+ ) -> None:
+ if validate_column_names:
+ check_column_names_are_unique(native_dataframe.column_names)
+ self._native_frame = native_dataframe
+ self._implementation = Implementation.PYARROW
+ self._backend_version = backend_version
+ self._version = version
+ validate_backend_version(self._implementation, self._backend_version)
+
+ @classmethod
+ def from_arrow(cls, data: IntoArrowTable, /, *, context: _FullContext) -> Self:
+ backend_version = context._backend_version
+ if cls._is_native(data):
+ native = data
+ elif backend_version >= (14,) or isinstance(data, Collection):
+ native = pa.table(data)
+ elif supports_arrow_c_stream(data): # pragma: no cover
+ msg = f"'pyarrow>=14.0.0' is required for `from_arrow` for object of type {type(data).__name__!r}."
+ raise ModuleNotFoundError(msg)
+ else: # pragma: no cover
+ msg = f"`from_arrow` is not supported for object of type {type(data).__name__!r}."
+ raise TypeError(msg)
+ return cls.from_native(native, context=context)
+
+ @classmethod
+ def from_dict(
+ cls,
+ data: Mapping[str, Any],
+ /,
+ *,
+ context: _FullContext,
+ schema: Mapping[str, DType] | Schema | None,
+ ) -> Self:
+ from narwhals.schema import Schema
+
+ pa_schema = Schema(schema).to_arrow() if schema is not None else schema
+ native = pa.Table.from_pydict(data, schema=pa_schema)
+ return cls.from_native(native, context=context)
+
+ @staticmethod
+ def _is_native(obj: pa.Table | Any) -> TypeIs[pa.Table]:
+ return isinstance(obj, pa.Table)
+
+ @classmethod
+ def from_native(cls, data: pa.Table, /, *, context: _FullContext) -> Self:
+ return cls(
+ data,
+ backend_version=context._backend_version,
+ version=context._version,
+ validate_column_names=True,
+ )
+
+ @classmethod
+ def from_numpy(
+ cls,
+ data: _2DArray,
+ /,
+ *,
+ context: _FullContext,
+ schema: Mapping[str, DType] | Schema | Sequence[str] | None,
+ ) -> Self:
+ from narwhals.schema import Schema
+
+ arrays = [pa.array(val) for val in data.T]
+ if isinstance(schema, (Mapping, Schema)):
+ native = pa.Table.from_arrays(arrays, schema=Schema(schema).to_arrow())
+ else:
+ native = pa.Table.from_arrays(arrays, cls._numpy_column_names(data, schema))
+ return cls.from_native(native, context=context)
+
+ def __narwhals_namespace__(self) -> ArrowNamespace:
+ from narwhals._arrow.namespace import ArrowNamespace
+
+ return ArrowNamespace(
+ backend_version=self._backend_version, version=self._version
+ )
+
+ def __native_namespace__(self) -> ModuleType:
+ if self._implementation is Implementation.PYARROW:
+ return self._implementation.to_native_namespace()
+
+ msg = f"Expected pyarrow, got: {type(self._implementation)}" # pragma: no cover
+ raise AssertionError(msg)
+
+ def __narwhals_dataframe__(self) -> Self:
+ return self
+
+ def __narwhals_lazyframe__(self) -> Self:
+ return self
+
+ def _with_version(self, version: Version) -> Self:
+ return self.__class__(
+ self.native,
+ backend_version=self._backend_version,
+ version=version,
+ validate_column_names=False,
+ )
+
+ def _with_native(self, df: pa.Table, *, validate_column_names: bool = True) -> Self:
+ return self.__class__(
+ df,
+ backend_version=self._backend_version,
+ version=self._version,
+ validate_column_names=validate_column_names,
+ )
+
+ @property
+ def shape(self) -> tuple[int, int]:
+ return self.native.shape
+
+ def __len__(self) -> int:
+ return len(self.native)
+
+ def row(self, index: int) -> tuple[Any, ...]:
+ return tuple(col[index] for col in self.native.itercolumns())
+
+ @overload
+ def rows(self, *, named: Literal[True]) -> list[dict[str, Any]]: ...
+
+ @overload
+ def rows(self, *, named: Literal[False]) -> list[tuple[Any, ...]]: ...
+
+ @overload
+ def rows(self, *, named: bool) -> list[tuple[Any, ...]] | list[dict[str, Any]]: ...
+
+ def rows(self, *, named: bool) -> list[tuple[Any, ...]] | list[dict[str, Any]]:
+ if not named:
+ return list(self.iter_rows(named=False, buffer_size=512)) # type: ignore[return-value]
+ return self.native.to_pylist()
+
+ def iter_columns(self) -> Iterator[ArrowSeries]:
+ for name, series in zip(self.columns, self.native.itercolumns()):
+ yield ArrowSeries.from_native(series, context=self, name=name)
+
+ _iter_columns = iter_columns
+
+ def iter_rows(
+ self, *, named: bool, buffer_size: int
+ ) -> Iterator[tuple[Any, ...]] | Iterator[dict[str, Any]]:
+ df = self.native
+ num_rows = df.num_rows
+
+ if not named:
+ for i in range(0, num_rows, buffer_size):
+ rows = df[i : i + buffer_size].to_pydict().values()
+ yield from zip(*rows)
+ else:
+ for i in range(0, num_rows, buffer_size):
+ yield from df[i : i + buffer_size].to_pylist()
+
+ def get_column(self, name: str) -> ArrowSeries:
+ if not isinstance(name, str):
+ msg = f"Expected str, got: {type(name)}"
+ raise TypeError(msg)
+ return ArrowSeries.from_native(self.native[name], context=self, name=name)
+
+ def __array__(self, dtype: Any, *, copy: bool | None) -> _2DArray:
+ return self.native.__array__(dtype, copy=copy)
+
+ def _gather(self, rows: SizedMultiIndexSelector[ChunkedArrayAny]) -> Self:
+ if len(rows) == 0:
+ return self._with_native(self.native.slice(0, 0))
+ if self._backend_version < (18,) and isinstance(rows, tuple):
+ rows = list(rows)
+ return self._with_native(self.native.take(rows))
+
+ def _gather_slice(self, rows: _SliceIndex | range) -> Self:
+ start = rows.start or 0
+ stop = rows.stop if rows.stop is not None else len(self.native)
+ if start < 0:
+ start = len(self.native) + start
+ if stop < 0:
+ stop = len(self.native) + stop
+ if rows.step is not None and rows.step != 1:
+ msg = "Slicing with step is not supported on PyArrow tables"
+ raise NotImplementedError(msg)
+ return self._with_native(self.native.slice(start, stop - start))
+
+ def _select_slice_name(self, columns: _SliceName) -> Self:
+ start, stop, step = convert_str_slice_to_int_slice(columns, self.columns)
+ return self._with_native(self.native.select(self.columns[start:stop:step]))
+
+ def _select_slice_index(self, columns: _SliceIndex | range) -> Self:
+ return self._with_native(
+ self.native.select(self.columns[columns.start : columns.stop : columns.step])
+ )
+
+ def _select_multi_index(
+ self, columns: SizedMultiIndexSelector[ChunkedArrayAny]
+ ) -> Self:
+ selector: Sequence[int]
+ if isinstance(columns, pa.ChunkedArray):
+ # TODO @dangotbanned: Fix upstream with `pa.ChunkedArray.to_pylist(self) -> list[Any]:`
+ selector = cast("Sequence[int]", columns.to_pylist())
+ # TODO @dangotbanned: Fix upstream, it is actually much narrower
+ # **Doesn't accept `ndarray`**
+ elif is_numpy_array_1d(columns):
+ selector = columns.tolist()
+ else:
+ selector = columns
+ return self._with_native(self.native.select(selector))
+
+ def _select_multi_name(
+ self, columns: SizedMultiNameSelector[ChunkedArrayAny]
+ ) -> Self:
+ selector: Sequence[str] | _1DArray
+ if isinstance(columns, pa.ChunkedArray):
+ # TODO @dangotbanned: Fix upstream with `pa.ChunkedArray.to_pylist(self) -> list[Any]:`
+ selector = cast("Sequence[str]", columns.to_pylist())
+ else:
+ selector = columns
+ # NOTE: Fixed in https://github.com/zen-xu/pyarrow-stubs/pull/221
+ return self._with_native(self.native.select(selector)) # pyright: ignore[reportArgumentType]
+
+ @property
+ def schema(self) -> dict[str, DType]:
+ schema = self.native.schema
+ return {
+ name: native_to_narwhals_dtype(dtype, self._version)
+ for name, dtype in zip(schema.names, schema.types)
+ }
+
+ def collect_schema(self) -> dict[str, DType]:
+ return self.schema
+
+ def estimated_size(self, unit: SizeUnit) -> int | float:
+ sz = self.native.nbytes
+ return scale_bytes(sz, unit)
+
+ explode = not_implemented()
+
+ @property
+ def columns(self) -> list[str]:
+ return self.native.column_names
+
+ def simple_select(self, *column_names: str) -> Self:
+ return self._with_native(
+ self.native.select(list(column_names)), validate_column_names=False
+ )
+
+ def select(self: ArrowDataFrame, *exprs: ArrowExpr) -> ArrowDataFrame:
+ new_series = self._evaluate_into_exprs(*exprs)
+ if not new_series:
+ # return empty dataframe, like Polars does
+ return self._with_native(
+ self.native.__class__.from_arrays([]), validate_column_names=False
+ )
+ names = [s.name for s in new_series]
+ reshaped = align_series_full_broadcast(*new_series)
+ df = pa.Table.from_arrays([s.native for s in reshaped], names=names)
+ return self._with_native(df, validate_column_names=True)
+
+ def _extract_comparand(self, other: ArrowSeries) -> ChunkedArrayAny:
+ length = len(self)
+ if not other._broadcast:
+ if (len_other := len(other)) != length:
+ msg = f"Expected object of length {length}, got: {len_other}."
+ raise ShapeError(msg)
+ return other.native
+
+ value = other.native[0]
+ return pa.chunked_array([pa.repeat(value, length)])
+
+ def with_columns(self: ArrowDataFrame, *exprs: ArrowExpr) -> ArrowDataFrame:
+ # NOTE: We use a faux-mutable variable and repeatedly "overwrite" (native_frame)
+ # All `pyarrow` data is immutable, so this is fine
+ native_frame = self.native
+ new_columns = self._evaluate_into_exprs(*exprs)
+ columns = self.columns
+
+ for col_value in new_columns:
+ col_name = col_value.name
+ column = self._extract_comparand(col_value)
+ native_frame = (
+ native_frame.set_column(columns.index(col_name), col_name, column=column)
+ if col_name in columns
+ else native_frame.append_column(col_name, column=column)
+ )
+
+ return self._with_native(native_frame, validate_column_names=False)
+
+ def group_by(
+ self, keys: Sequence[str] | Sequence[ArrowExpr], *, drop_null_keys: bool
+ ) -> ArrowGroupBy:
+ from narwhals._arrow.group_by import ArrowGroupBy
+
+ return ArrowGroupBy(self, keys, drop_null_keys=drop_null_keys)
+
+ def join(
+ self,
+ other: Self,
+ *,
+ how: JoinStrategy,
+ left_on: Sequence[str] | None,
+ right_on: Sequence[str] | None,
+ suffix: str,
+ ) -> Self:
+ how_to_join_map: dict[str, JoinType] = {
+ "anti": "left anti",
+ "semi": "left semi",
+ "inner": "inner",
+ "left": "left outer",
+ "full": "full outer",
+ }
+
+ if how == "cross":
+ plx = self.__narwhals_namespace__()
+ key_token = generate_temporary_column_name(
+ n_bytes=8, columns=[*self.columns, *other.columns]
+ )
+
+ return self._with_native(
+ self.with_columns(
+ plx.lit(0, None).alias(key_token).broadcast(ExprKind.LITERAL)
+ )
+ .native.join(
+ other.with_columns(
+ plx.lit(0, None).alias(key_token).broadcast(ExprKind.LITERAL)
+ ).native,
+ keys=key_token,
+ right_keys=key_token,
+ join_type="inner",
+ right_suffix=suffix,
+ )
+ .drop([key_token])
+ )
+
+ coalesce_keys = how != "full" # polars full join does not coalesce keys
+ return self._with_native(
+ self.native.join(
+ other.native,
+ keys=left_on or [], # type: ignore[arg-type]
+ right_keys=right_on, # type: ignore[arg-type]
+ join_type=how_to_join_map[how],
+ right_suffix=suffix,
+ coalesce_keys=coalesce_keys,
+ )
+ )
+
+ join_asof = not_implemented()
+
+ def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
+ to_drop = parse_columns_to_drop(self, columns, strict=strict)
+ return self._with_native(self.native.drop(to_drop), validate_column_names=False)
+
+ def drop_nulls(self: ArrowDataFrame, subset: Sequence[str] | None) -> ArrowDataFrame:
+ if subset is None:
+ return self._with_native(self.native.drop_null(), validate_column_names=False)
+ plx = self.__narwhals_namespace__()
+ return self.filter(~plx.any_horizontal(plx.col(*subset).is_null()))
+
+ def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self:
+ if isinstance(descending, bool):
+ order: Order = "descending" if descending else "ascending"
+ sorting: list[tuple[str, Order]] = [(key, order) for key in by]
+ else:
+ sorting = [
+ (key, "descending" if is_descending else "ascending")
+ for key, is_descending in zip(by, descending)
+ ]
+
+ null_placement = "at_end" if nulls_last else "at_start"
+
+ return self._with_native(
+ self.native.sort_by(sorting, null_placement=null_placement),
+ validate_column_names=False,
+ )
+
+ def to_pandas(self) -> pd.DataFrame:
+ return self.native.to_pandas()
+
+ def to_polars(self) -> pl.DataFrame:
+ import polars as pl # ignore-banned-import
+
+ return pl.from_arrow(self.native) # type: ignore[return-value]
+
+ def to_numpy(self, dtype: Any = None, *, copy: bool | None = None) -> _2DArray:
+ import numpy as np # ignore-banned-import
+
+ arr: Any = np.column_stack([col.to_numpy() for col in self.native.columns])
+ return arr
+
+ @overload
+ def to_dict(self, *, as_series: Literal[True]) -> dict[str, ArrowSeries]: ...
+
+ @overload
+ def to_dict(self, *, as_series: Literal[False]) -> dict[str, list[Any]]: ...
+
+ def to_dict(
+ self, *, as_series: bool
+ ) -> dict[str, ArrowSeries] | dict[str, list[Any]]:
+ it = self.iter_columns()
+ if as_series:
+ return {ser.name: ser for ser in it}
+ return {ser.name: ser.to_list() for ser in it}
+
+ def with_row_index(self, name: str) -> Self:
+ df = self.native
+ cols = self.columns
+
+ row_indices = pa.array(range(df.num_rows))
+ return self._with_native(
+ df.append_column(name, row_indices).select([name, *cols])
+ )
+
+ def filter(
+ self: ArrowDataFrame, predicate: ArrowExpr | list[bool | None]
+ ) -> ArrowDataFrame:
+ if isinstance(predicate, list):
+ mask_native: Mask | ChunkedArrayAny = predicate
+ else:
+ # `[0]` is safe as the predicate's expression only returns a single column
+ mask_native = self._evaluate_into_exprs(predicate)[0].native
+ return self._with_native(
+ self.native.filter(mask_native), validate_column_names=False
+ )
+
+ def head(self, n: int) -> Self:
+ df = self.native
+ if n >= 0:
+ return self._with_native(df.slice(0, n), validate_column_names=False)
+ else:
+ num_rows = df.num_rows
+ return self._with_native(
+ df.slice(0, max(0, num_rows + n)), validate_column_names=False
+ )
+
+ def tail(self, n: int) -> Self:
+ df = self.native
+ if n >= 0:
+ num_rows = df.num_rows
+ return self._with_native(
+ df.slice(max(0, num_rows - n)), validate_column_names=False
+ )
+ else:
+ return self._with_native(df.slice(abs(n)), validate_column_names=False)
+
+ def lazy(self, *, backend: Implementation | None = None) -> CompliantLazyFrameAny:
+ if backend is None:
+ return self
+ elif backend is Implementation.DUCKDB:
+ import duckdb # ignore-banned-import
+
+ from narwhals._duckdb.dataframe import DuckDBLazyFrame
+
+ df = self.native # noqa: F841
+ return DuckDBLazyFrame(
+ duckdb.table("df"),
+ backend_version=parse_version(duckdb),
+ version=self._version,
+ )
+ elif backend is Implementation.POLARS:
+ import polars as pl # ignore-banned-import
+
+ from narwhals._polars.dataframe import PolarsLazyFrame
+
+ return PolarsLazyFrame(
+ cast("pl.DataFrame", pl.from_arrow(self.native)).lazy(),
+ backend_version=parse_version(pl),
+ version=self._version,
+ )
+ elif backend is Implementation.DASK:
+ import dask # ignore-banned-import
+ import dask.dataframe as dd # ignore-banned-import
+
+ from narwhals._dask.dataframe import DaskLazyFrame
+
+ return DaskLazyFrame(
+ dd.from_pandas(self.native.to_pandas()),
+ backend_version=parse_version(dask),
+ version=self._version,
+ )
+ raise AssertionError # pragma: no cover
+
+ def collect(
+ self, backend: Implementation | None, **kwargs: Any
+ ) -> CompliantDataFrameAny:
+ if backend is Implementation.PYARROW or backend is None:
+ from narwhals._arrow.dataframe import ArrowDataFrame
+
+ return ArrowDataFrame(
+ self.native,
+ backend_version=self._backend_version,
+ version=self._version,
+ validate_column_names=False,
+ )
+
+ if backend is Implementation.PANDAS:
+ import pandas as pd # ignore-banned-import
+
+ from narwhals._pandas_like.dataframe import PandasLikeDataFrame
+
+ return PandasLikeDataFrame(
+ self.native.to_pandas(),
+ implementation=Implementation.PANDAS,
+ backend_version=parse_version(pd),
+ version=self._version,
+ validate_column_names=False,
+ )
+
+ if backend is Implementation.POLARS:
+ import polars as pl # ignore-banned-import
+
+ from narwhals._polars.dataframe import PolarsDataFrame
+
+ return PolarsDataFrame(
+ cast("pl.DataFrame", pl.from_arrow(self.native)),
+ backend_version=parse_version(pl),
+ version=self._version,
+ )
+
+ msg = f"Unsupported `backend` value: {backend}" # pragma: no cover
+ raise AssertionError(msg) # pragma: no cover
+
+ def clone(self) -> Self:
+ return self._with_native(self.native, validate_column_names=False)
+
+ def item(self, row: int | None, column: int | str | None) -> Any:
+ from narwhals._arrow.series import maybe_extract_py_scalar
+
+ if row is None and column is None:
+ if self.shape != (1, 1):
+ msg = (
+ "can only call `.item()` if the dataframe is of shape (1, 1),"
+ " or if explicit row/col values are provided;"
+ f" frame has shape {self.shape!r}"
+ )
+ raise ValueError(msg)
+ return maybe_extract_py_scalar(self.native[0][0], return_py_scalar=True)
+
+ elif row is None or column is None:
+ msg = "cannot call `.item()` with only one of `row` or `column`"
+ raise ValueError(msg)
+
+ _col = self.columns.index(column) if isinstance(column, str) else column
+ return maybe_extract_py_scalar(self.native[_col][row], return_py_scalar=True)
+
+ def rename(self, mapping: Mapping[str, str]) -> Self:
+ names: dict[str, str] | list[str]
+ if self._backend_version >= (17,):
+ names = cast("dict[str, str]", mapping)
+ else: # pragma: no cover
+ names = [mapping.get(c, c) for c in self.columns]
+ return self._with_native(self.native.rename_columns(names))
+
+ def write_parquet(self, file: str | Path | BytesIO) -> None:
+ import pyarrow.parquet as pp
+
+ pp.write_table(self.native, file)
+
+ @overload
+ def write_csv(self, file: None) -> str: ...
+
+ @overload
+ def write_csv(self, file: str | Path | BytesIO) -> None: ...
+
+ def write_csv(self, file: str | Path | BytesIO | None) -> str | None:
+ import pyarrow.csv as pa_csv
+
+ if file is None:
+ csv_buffer = pa.BufferOutputStream()
+ pa_csv.write_csv(self.native, csv_buffer)
+ return csv_buffer.getvalue().to_pybytes().decode()
+ pa_csv.write_csv(self.native, file)
+ return None
+
+ def is_unique(self) -> ArrowSeries:
+ col_token = generate_temporary_column_name(n_bytes=8, columns=self.columns)
+ row_index = pa.array(range(len(self)))
+ keep_idx = (
+ self.native.append_column(col_token, row_index)
+ .group_by(self.columns)
+ .aggregate([(col_token, "min"), (col_token, "max")])
+ )
+ native = pa.chunked_array(
+ pc.and_(
+ pc.is_in(row_index, keep_idx[f"{col_token}_min"]),
+ pc.is_in(row_index, keep_idx[f"{col_token}_max"]),
+ )
+ )
+ return ArrowSeries.from_native(native, context=self)
+
+ def unique(
+ self: ArrowDataFrame,
+ subset: Sequence[str] | None,
+ *,
+ keep: UniqueKeepStrategy,
+ maintain_order: bool | None = None,
+ ) -> ArrowDataFrame:
+ # The param `maintain_order` is only here for compatibility with the Polars API
+ # and has no effect on the output.
+ import numpy as np # ignore-banned-import
+
+ if subset and (error := self._check_columns_exist(subset)):
+ raise error
+ subset = list(subset or self.columns)
+
+ if keep in {"any", "first", "last"}:
+ from narwhals._arrow.group_by import ArrowGroupBy
+
+ agg_func = ArrowGroupBy._REMAP_UNIQUE[keep]
+ col_token = generate_temporary_column_name(n_bytes=8, columns=self.columns)
+ keep_idx_native = (
+ self.native.append_column(col_token, pa.array(np.arange(len(self))))
+ .group_by(subset)
+ .aggregate([(col_token, agg_func)])
+ .column(f"{col_token}_{agg_func}")
+ )
+ return self._with_native(
+ self.native.take(keep_idx_native), validate_column_names=False
+ )
+
+ keep_idx = self.simple_select(*subset).is_unique()
+ plx = self.__narwhals_namespace__()
+ return self.filter(plx._expr._from_series(keep_idx))
+
+ def gather_every(self, n: int, offset: int) -> Self:
+ return self._with_native(self.native[offset::n], validate_column_names=False)
+
+ def to_arrow(self) -> pa.Table:
+ return self.native
+
+ def sample(
+ self,
+ n: int | None,
+ *,
+ fraction: float | None,
+ with_replacement: bool,
+ seed: int | None,
+ ) -> Self:
+ import numpy as np # ignore-banned-import
+
+ num_rows = len(self)
+ if n is None and fraction is not None:
+ n = int(num_rows * fraction)
+ rng = np.random.default_rng(seed=seed)
+ idx = np.arange(0, num_rows)
+ mask = rng.choice(idx, size=n, replace=with_replacement)
+ return self._with_native(self.native.take(mask), validate_column_names=False)
+
+ def unpivot(
+ self,
+ on: Sequence[str] | None,
+ index: Sequence[str] | None,
+ variable_name: str,
+ value_name: str,
+ ) -> Self:
+ n_rows = len(self)
+ index_ = [] if index is None else index
+ on_ = [c for c in self.columns if c not in index_] if on is None else on
+ concat = (
+ partial(pa.concat_tables, promote_options="permissive")
+ if self._backend_version >= (14, 0, 0)
+ else pa.concat_tables
+ )
+ names = [*index_, variable_name, value_name]
+ return self._with_native(
+ concat(
+ [
+ pa.Table.from_arrays(
+ [
+ *(self.native.column(idx_col) for idx_col in index_),
+ cast(
+ "ChunkedArrayAny",
+ pa.array([on_col] * n_rows, pa.string()),
+ ),
+ self.native.column(on_col),
+ ],
+ names=names,
+ )
+ for on_col in on_
+ ]
+ )
+ )
+ # TODO(Unassigned): Even with promote_options="permissive", pyarrow does not
+ # upcast numeric to non-numeric (e.g. string) datatypes
+
+ pivot = not_implemented()