Skip to content

tokoko/spark-adbc

Repository files navigation

spark-adbc

A Spark DataSource V2 connector for Apache Arrow ADBC (Arrow Database Connectivity). It enables Spark to read from ADBC-compatible databases using Arrow's columnar format, avoiding the row-by-row serialization overhead of traditional JDBC.

Features

  • Columnar data transfer — reads data as Arrow columnar batches, enabling zero-copy integration with Spark's columnar execution engine
  • Read support — read via dbtable or custom query
  • Column pruning — only fetches the columns Spark actually needs
  • Filter pushdown — pushes filter expressions down to the database (equality, comparison, null checks, IN, string LIKE patterns, AND/OR/NOT)
  • Aggregate pushdown — pushes COUNT, SUM, AVG, MIN, MAX (including DISTINCT variants) and GROUP BY down to the database
  • Limit pushdown — pushes LIMIT down to the database
  • Top-N pushdown — pushes ORDER BY ... LIMIT N down to the database
  • SQL dialect support — generates dialect-specific SQL for PostgreSQL, SQLite, and MSSQL (auto-detected from jni.driver or set explicitly via dialect option)
  • Partitioned reads — split reads across multiple Spark partitions by a numeric column for parallel data ingestion
  • Works with any ADBC driver — PostgreSQL, SQLite, MSSQL, DuckDB, Flight SQL, Snowflake, etc.

Usage

// Read from a table
val df = spark.read
  .format("com.tokoko.spark.adbc")
  .option("driver", "org.apache.arrow.adbc.driver.jni.JniDriverFactory")
  .option("jni.driver", "postgresql")
  .option("uri", "postgresql://user:pass@localhost:5432/mydb")
  .option("dbtable", "my_table")
  .load()

// Read from a query
val df = spark.read
  .format("com.tokoko.spark.adbc")
  .option("driver", "org.apache.arrow.adbc.driver.jni.JniDriverFactory")
  .option("jni.driver", "postgresql")
  .option("uri", "postgresql://user:pass@localhost:5432/mydb")
  .option("query", "SELECT id, name FROM my_table WHERE active = true")
  .load()

// Partitioned read (parallel ingestion across 8 tasks)
val df = spark.read
  .format("com.tokoko.spark.adbc")
  .option("driver", "org.apache.arrow.adbc.driver.jni.JniDriverFactory")
  .option("jni.driver", "postgresql")
  .option("uri", "postgresql://user:pass@localhost:5432/mydb")
  .option("dbtable", "my_table")
  .option("partitionColumn", "id")
  .option("lowerBound", "0")
  .option("upperBound", "1000000")
  .option("numPartitions", "8")
  .load()

lowerBound and upperBound are used to compute the stride for range splitting — they do not filter data. Rows outside the specified range are still read (they land in the first or last partition).

Benchmarks

Local benchmarks comparing spark-adbc against Spark's built-in JDBC connector, reading 20 million rows from PostgreSQL 16 (single-node, local[*] mode).

Benchmark ADBC JDBC Speedup
Full table scan 73.9s 135.8s 1.84x
Full table scan — columnar* 38.3s 142.6s 3.72x
Column projection (2 of 6 cols) 34.2s 64.3s 1.88x
Filtered read (category = 'cat_5') 5.6s 6.2s 1.11x
Aggregation (GROUP BY with AVG, SUM, COUNT) 4.1s 49.2s 11.91x
Join + filter + aggregate (with Comet) 15.8s 43.7s 2.76x

*The columnar benchmark measures Arrow batch ingestion without Spark's columnar-to-row conversion. This represents a theoretical upper bound — Spark currently converts columnar batches back to rows for most operations, so real-world queries go through the standard (non-columnar) read path.

Apache DataFusion Comet

Apache DataFusion Comet can operate directly on the Arrow columnar batches returned by spark-adbc, avoiding Spark's columnar-to-row conversion for downstream operations like joins, filters, and aggregations. To enable this, configure Comet's experimental sparkToColumnar feature to accept BatchScan as a columnar source:

val spark = SparkSession.builder()
  .config("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
  .config("spark.comet.enabled", "true")
  .config("spark.comet.exec.enabled", "true")
  .config("spark.comet.exec.all.enabled", "true")
  .config("spark.comet.sparkToColumnar.enabled", "true")
  .config("spark.comet.sparkToColumnar.supportedOperatorList",
    "Range,InMemoryTableScan,RDDScan,BatchScan")
  .getOrCreate()

This replaces Spark's ColumnarToRow with Comet's CometSparkColumnarToColumnar bridge, keeping the entire pipeline columnar through DataFusion's native execution engine.

Note: CometSparkColumnarToColumnar is not zero-copy — it reads values element-by-element from Spark's ColumnVector API and writes them into new Arrow vectors for Comet's native format, even though the source data is already Arrow-backed. This adds conversion overhead at the scan boundary, but all subsequent operations (joins, filters, aggregations) run natively in DataFusion without further conversion.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages