diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6c6da2a9b7..41dba0cf12 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1630,16 +1630,17 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression: return parser.parse(expr) if isinstance(expr, str) else expr -S = TypeVar("S", bound="TableScan", covariant=True) +A = TypeVar("A", bound="AbstractTableScan", covariant=True) + +class AbstractTableScan(ABC): + """A base class for all table scans.""" -class TableScan(ABC): table_metadata: TableMetadata io: FileIO row_filter: BooleanExpression selected_fields: Tuple[str, ...] case_sensitive: bool - snapshot_id: Optional[int] options: Properties limit: Optional[int] @@ -1650,7 +1651,6 @@ def __init__( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ): @@ -1659,10 +1659,108 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + def select(self: A, *field_names: str) -> A: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: A, expr: Union[str, BooleanExpression]) -> A: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: A, case_sensitive: bool = True) -> A: + return self.update(case_sensitive=case_sensitive) + + def update(self: A, **overrides: Any) -> A: + """Create a copy of this table scan with updated fields.""" + from inspect import signature + + # Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the + # constructors because it may contain additional attributes that are not part of the constructor signature. + params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter + kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes + + return type(self)(**{**kwargs, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +S = TypeVar("S", bound="TableScan", covariant=True) + + +class TableScan(AbstractTableScan, ABC): + """A base class for table scans targeting a single snapshot.""" + + snapshot_id: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + snapshot_id: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.snapshot_id = snapshot_id + def snapshot(self) -> Optional[Snapshot]: if self.snapshot_id: return self.table_metadata.snapshot_by_id(self.snapshot_id) @@ -1688,29 +1786,6 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - @abstractmethod - def plan_files(self) -> Iterable[ScanTask]: ... - - @abstractmethod - def to_arrow(self) -> pa.Table: ... - - @abstractmethod - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... - - @abstractmethod - def to_polars(self) -> pl.DataFrame: ... - - def update(self: S, **overrides: Any) -> S: - """Create a copy of this table scan with updated fields.""" - from inspect import signature - - # Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the - # constructors because it may contain additional attributes that are not part of the constructor signature. - params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter - kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes - - return type(self)(**{**kwargs, **overrides}) - def use_ref(self: S, name: str) -> S: if self.snapshot_id: raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") @@ -1719,17 +1794,6 @@ def use_ref(self: S, name: str) -> S: raise ValueError(f"Cannot scan unknown ref={name}") - def select(self: S, *field_names: str) -> S: - if "*" in self.selected_fields: - return self.update(selected_fields=field_names) - return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) - - def filter(self: S, expr: Union[str, BooleanExpression]) -> S: - return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) - - def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: - return self.update(case_sensitive=case_sensitive) - @abstractmethod def count(self) -> int: ... @@ -2002,51 +2066,6 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: batches, ).cast(target_schema) - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: - """Read a Pandas DataFrame eagerly from this Iceberg table. - - Returns: - pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table - """ - return self.to_arrow().to_pandas(**kwargs) - - def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: - """Shorthand for loading the Iceberg Table in DuckDB. - - Returns: - DuckDBPyConnection: In memory DuckDB connection with the Iceberg table. - """ - import duckdb - - con = connection or duckdb.connect(database=":memory:") - con.register(table_name, self.to_arrow()) - - return con - - def to_ray(self) -> ray.data.dataset.Dataset: - """Read a Ray Dataset eagerly from this Iceberg table. - - Returns: - ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table - """ - import ray - - return ray.data.from_arrow(self.to_arrow()) - - def to_polars(self) -> pl.DataFrame: - """Read a Polars DataFrame from this Iceberg table. - - Returns: - pl.DataFrame: Materialized Polars Dataframe from the Iceberg table - """ - import polars as pl - - result = pl.from_arrow(self.to_arrow()) - if isinstance(result, pl.Series): - result = result.to_frame() - - return result - def count(self) -> int: from pyiceberg.io.pyarrow import ArrowScan