3333from cudf_polars .dsl .nodebase import Node
3434from cudf_polars .dsl .to_ast import to_ast , to_parquet_filter
3535from cudf_polars .utils import dtypes
36+ from cudf_polars .utils .versions import POLARS_VERSION_LT_128
3637
3738if TYPE_CHECKING :
3839 from collections .abc import Callable , Hashable , Iterable , Sequence
@@ -373,7 +374,9 @@ def __init__(
373374 # TODO: polars has this implemented for parquet,
374375 # maybe we can do this too?
375376 raise NotImplementedError ("slice pushdown for negative slices" )
376- if self .typ in {"csv" } and self .skip_rows != 0 : # pragma: no cover
377+ if (
378+ POLARS_VERSION_LT_128 and self .typ in {"csv" } and self .skip_rows != 0
379+ ): # pragma: no cover
377380 # This comes from slice pushdown, but that
378381 # optimization doesn't happen right now
379382 raise NotImplementedError ("skipping rows in CSV reader" )
@@ -383,7 +386,7 @@ def __init__(
383386 raise NotImplementedError (
384387 "Read from cloud storage"
385388 ) # pragma: no cover; no test yet
386- if any (p .startswith ("https:/ /" ) for p in self .paths ):
389+ if any (str ( p ) .startswith ("https:/" ) for p in self .paths ):
387390 raise NotImplementedError ("Read from https" )
388391 if self .typ == "csv" :
389392 if self .reader_options ["skip_rows_after_header" ] != 0 :
@@ -459,7 +462,8 @@ def add_file_paths(
459462 Each path is repeated according to the number of rows read from it.
460463 """
461464 (filepaths ,) = plc .filling .repeat (
462- plc .Table ([plc .interop .from_arrow (pa .array (paths ))]),
465+ # TODO: Remove call from_arrow when we support python list to Column
466+ plc .Table ([plc .interop .from_arrow (pa .array (map (str , paths )))]),
463467 plc .interop .from_arrow (pa .array (rows_per_path , type = pa .int32 ())),
464468 ).columns ()
465469 return df .with_columns ([Column (filepaths , name = name )])
@@ -481,6 +485,17 @@ def do_evaluate(
481485 ) -> DataFrame :
482486 """Evaluate and return a dataframe."""
483487 if typ == "csv" :
488+
489+ def read_csv_header (
490+ path : Path | str , sep : str
491+ ) -> list [str ]: # pragma: no cover
492+ with Path (path ).open () as f :
493+ for line in f :
494+ stripped = line .strip ()
495+ if stripped :
496+ return stripped .split (sep )
497+ return []
498+
484499 parse_options = reader_options ["parse_options" ]
485500 sep = chr (parse_options ["separator" ])
486501 quote = chr (parse_options ["quote_char" ])
@@ -524,7 +539,9 @@ def do_evaluate(
524539 options = (
525540 plc .io .csv .CsvReaderOptions .builder (plc .io .SourceInfo ([path ]))
526541 .nrows (n_rows )
527- .skiprows (skiprows )
542+ .skiprows (
543+ skiprows if POLARS_VERSION_LT_128 else skiprows + skip_rows
544+ ) # pragma: no cover
528545 .lineterminator (str (eol ))
529546 .quotechar (str (quote ))
530547 .decimal (decimal )
@@ -535,6 +552,13 @@ def do_evaluate(
535552 options .set_delimiter (str (sep ))
536553 if column_names is not None :
537554 options .set_names ([str (name ) for name in column_names ])
555+ else :
556+ if (
557+ not POLARS_VERSION_LT_128 and skip_rows > header
558+ ): # pragma: no cover
559+ # We need to read the header otherwise we would skip it
560+ column_names = read_csv_header (path , str (sep ))
561+ options .set_names (column_names )
538562 options .set_header (header )
539563 options .set_dtypes (schema )
540564 if usecols is not None :
@@ -691,6 +715,8 @@ def slice_skip(tbl: plc.Table) -> plc.Table:
691715 name = name ,
692716 )
693717 df = DataFrame ([index_col , * df .columns ])
718+ if next (iter (schema )) != name :
719+ df = df .select (schema )
694720 assert all (c .obj .type () == schema [name ] for name , c in df .column_map .items ())
695721 if predicate is None :
696722 return df
0 commit comments