Skip to content

Unify streaming source callbacks into shared factory (_streaming.py) #306

@mwiewior

Description

@mwiewior

Context

polars-bio has three independent streaming callbacks that plug Rust DataFusion table providers into Polars register_io_source:

  1. _overlap_source (io.py) — used by scan_bam, scan_vcf, scan_gff, etc.
  2. _pileup_source (pileup_op.py) — used by pb.depth()
  3. _range_source (range_op_io.py) — used by overlap, nearest, count_overlaps

All three follow the same pattern: acquire DataFusion DataFrame → apply pushdowns → execute_stream() → yield batches with tqdm + client-side fallback. This results in ~100 lines of duplicated code per callback, plus duplicated utilities.

Goal

Extract a shared make_streaming_source() factory and shared utilities into a new polars_bio/_streaming.py module, and consolidate Rust-side alignment provider creation.

Shared utilities

extract_column_names(with_columns)

Replaces identical copies at io.py and pileup_op.py. Already imported by range_op_io.py from io.py — update that import.

pyarrow_schema_to_polars_dict(schema: pa.Schema) -> dict

Replaces identical pattern at io.py, pileup_op.py, and range_op_io.py:

def pyarrow_schema_to_polars_dict(schema):
    empty_table = pa.table({f.name: pa.array([], type=f.type) for f in schema})
    return dict(pl.from_arrow(empty_table).schema)

The factory: make_streaming_source(df_factory, table_name, config)

Returns a callback compatible with register_io_source. Parameterizes the dimensions where callbacks differ:

Dimension _overlap_source _pileup_source _range_source
DataFrame acquisition py_read_sql(ctx, "SELECT * FROM {tn}") same Rust range_operation_*()
Predicate pushdown Yes (format-specific column types) No No
Projection method parse_sql_expr() bridge parse_sql_expr() bridge Direct select(col_names)
Pre-query hook GFF re-registration None None
Limit in factory Yes Yes No (handled in Rust)

Configuration via dataclasses:

@dataclass(frozen=True)
class PredicatePushdownConfig:
    string_cols: Optional[Set[str]]
    uint32_cols: Optional[Set[str]]
    float32_cols: Optional[Set[str]]

@dataclass(frozen=True)
class StreamingConfig:
    use_sql_expr_projection: bool = True     # False for range_op (uses direct select)
    predicate_config: Optional[PredicatePushdownConfig] = None  # None = disabled
    pre_query_hook: Optional[Callable] = None  # GFF re-registration hook
    apply_limit_pushdown: bool = True        # False for range_op (limit in Rust)

The df_factory callable signature: (table_name: Optional[str], n_rows: Optional[int]) -> DataFusion DataFrame. Range ops use n_rows to pass to Rust; scan/pileup ignore it.

The factory body implements the unified pipeline: pre-hook → df_factory → predicate pushdown → projection pushdown → limit → execute_stream → tqdm → client-side fallbacks → yield.

Migration steps

1. Foundation — Create _streaming.py

  • extract_column_names(), pyarrow_schema_to_polars_dict(), StreamingConfig, PredicatePushdownConfig, make_streaming_source()

2. Pileup migration (pileup_op.py)

  • Simplest callback (no predicate pushdown, no pre-hook)
  • Delete local _extract_column_names_from_expr copy
  • Schema conversion → pyarrow_schema_to_polars_dict()
  • Replace _pileup_source closure with make_streaming_source(df_factory, table_name, config) where config.predicate_config=None

3. IO scan migration (io.py)

  • Extract _make_gff_pre_hook() as standalone function (format-specific, stays in io.py)
  • Schema conversion → pyarrow_schema_to_polars_dict()
  • Replace _overlap_source closure with make_streaming_source(df_factory, table_name, config)
  • Delete _extract_column_names_from_expr (use from _streaming)
  • _FORMAT_COLUMN_TYPES stays in io.py, used to build PredicatePushdownConfig

4. Range ops migration (range_op_io.py)

  • Import extract_column_names from _streaming instead of io
  • Replace _range_source closure with make_streaming_source(df_factory, None, config) where config.use_sql_expr_projection=False, apply_limit_pushdown=False
  • The df_factory captures all range setup logic (file/lazy/frame dispatch) and uses n_rows param

5. Dead code cleanup

  • utils.py: Delete _lazy_scan() and _extract_column_names_from_expr() (dead code, never imported)
  • interval_op_helpers.py: Clean up dead df_to_lazyframe() docstring referencing _overlap_source

6. Rust helper — create_alignment_provider() in scan.rs

Extract BAM/CRAM provider creation shared between 3 call sites:

pub(crate) async fn create_alignment_provider(
    path: String,
    object_storage_options: Option<ObjectStorageOptions>,
    zero_based: bool,
    tag_fields: Option<Vec<String>>,
    binary_cigar: bool,
    reference_path: Option<String>,  // CRAM only
) -> Result<Arc<dyn TableProvider>, String>

Simplifies:

  • register_table() BAM/SAM/CRAM arms in scan.rs
  • py_register_pileup_table() in lib.rs
  • DepthFunction::call() in pileup.rs

Files to modify

File Change
polars_bio/_streaming.py NEW — shared utilities + factory
polars_bio/io.py Replace _overlap_source + utility with factory; extract GFF hook
polars_bio/pileup_op.py Replace _pileup_source + local utility with factory
polars_bio/range_op_io.py Replace _range_source with factory; update import
polars_bio/utils.py Delete dead _lazy_scan and _extract_column_names_from_expr
polars_bio/interval_op_helpers.py Clean up dead docstring
src/scan.rs Add create_alignment_provider(); simplify BAM/SAM/CRAM arms
src/lib.rs Simplify py_register_pileup_table() to use helper
src/pileup.rs Simplify DepthFunction::call() to use helper

Relationship to Rust-side pushdown (#305)

Once #305 is implemented (especially py_read_sql_with_pushdowns()), the factory simplifies further by eliminating:

  • The parse_sql_expr() binding bridge workaround
  • The PredicatePushdownConfig dataclass
  • The use_sql_expr_projection flag
  • Client-side fallback logic

The GFF pre-query hook would still be needed (attr_fields must be known at registration time).

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions