Skip to content

Commit 0fa5f1c

Browse files
SajidAlamQBravi-kumar-pillaDimedS
authored
feat(datasets): SparkDataset Rewrite (#1185)
* rework spark in pyproject.toml Signed-off-by: Sajid Alam <[email protected]> * Update pyproject.toml Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset.py Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset.py Signed-off-by: Sajid Alam <[email protected]> * lint Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset.py Signed-off-by: Sajid Alam <[email protected]> * Update test_spark_dataset.py Signed-off-by: Sajid Alam <[email protected]> * lint Signed-off-by: Sajid Alam <[email protected]> * revert and split sparkdataset rewrite into v2 Signed-off-by: Sajid Alam <[email protected]> * Update test_spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * changes based on feedback Signed-off-by: Sajid Alam <[email protected]> * lint Signed-off-by: Sajid Alam <[email protected]> * Update __init__.py Signed-off-by: Sajid Alam <[email protected]> * fix tests Signed-off-by: Sajid Alam <[email protected]> * fix docstring and lint Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * fix unity catalog Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * revert Signed-off-by: Sajid Alam <[email protected]> * clean-up SparkDatasetV2 Signed-off-by: Sajid Alam <[email protected]> * type check fix Signed-off-by: Sajid Alam <[email protected]> * Update databricks_utils.py Signed-off-by: Sajid Alam <[email protected]> * remove duplicate Signed-off-by: Sajid Alam <[email protected]> * lint Signed-off-by: Sajid Alam <[email protected]> * Delete .idea/workspace.xml Signed-off-by: Sajid Alam <[email protected]> * Update pyproject.toml Signed-off-by: Sajid Alam <[email protected]> * changes based on review Signed-off-by: Sajid Alam <[email protected]> * changes based on review Signed-off-by: Sajid Alam <[email protected]> * fix test 1 Signed-off-by: Sajid Alam <[email protected]> * fix test 2 Signed-off-by: Sajid Alam <[email protected]> * address review comments Signed-off-by: Sajid Alam <[email protected]> * secret fix Signed-off-by: Sajid Alam <[email protected]> * coverage Signed-off-by: Sajid Alam <[email protected]> * Update test_spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * add SparkDatasetV2 windows tests Signed-off-by: Sajid Alam <[email protected]> * clean tests and coverage Signed-off-by: Sajid Alam <[email protected]> * coverage pt 2 Signed-off-by: Sajid Alam <[email protected]> * coverage Signed-off-by: Sajid Alam <[email protected]> * skip coverage for unreachable Signed-off-by: Sajid Alam <[email protected]> * enable spark windows tests Signed-off-by: Sajid Alam <[email protected]> * windows ci test with spark 4.0.1 Signed-off-by: Sajid Alam <[email protected]> * Update unit-tests.yml Signed-off-by: Sajid Alam <[email protected]> * Update unit-tests.yml Signed-off-by: Sajid Alam <[email protected]> * use java 11 Signed-off-by: Sajid Alam <[email protected]> * Update unit-tests.yml Signed-off-by: Sajid Alam <[email protected]> * Update unit-tests.yml Signed-off-by: Sajid Alam <[email protected]> * please spark tests work!! Signed-off-by: Sajid Alam <[email protected]> * try fix spark test 2 Signed-off-by: Sajid Alam <[email protected]> * attempt 3 Signed-off-by: Sajid Alam <[email protected]> * please work spark windows Signed-off-by: Sajid Alam <[email protected]> * Update unit-tests.yml Signed-off-by: Sajid Alam <[email protected]> * Update unit-tests.yml Signed-off-by: Sajid Alam <[email protected]> * spark potential fix Signed-off-by: Sajid Alam <[email protected]> * update makefile Signed-off-by: Sajid Alam <[email protected]> * Update conftest.py Signed-off-by: Sajid Alam <[email protected]> * Update conftest.py Signed-off-by: Sajid Alam <[email protected]> * revert spark windows tests Signed-off-by: Sajid Alam <[email protected]> * Modify Databrics connect Signed-off-by: Dmitry Sorokin <[email protected]> * check if DatabricksSession is None Signed-off-by: Sajid Alam <[email protected]> * coverage Signed-off-by: Sajid Alam <[email protected]> * Update spark_dataset_v2.py Signed-off-by: Sajid Alam <[email protected]> * auto convert pandas to spark dataframe Signed-off-by: Sajid Alam <[email protected]> * revert conftest Signed-off-by: Sajid Alam <[email protected]> * lint Signed-off-by: Sajid Alam <[email protected]> * add docs and release notes Signed-off-by: Sajid Alam <[email protected]> * pin mkdocsstrings 2.0 has breaking changes Signed-off-by: Sajid Alam <[email protected]> --------- Signed-off-by: Sajid Alam <[email protected]> Signed-off-by: Sajid Alam <[email protected]> Signed-off-by: Dmitry Sorokin <[email protected]> Co-authored-by: Ravi Kumar Pilla <[email protected]> Co-authored-by: Dmitry Sorokin <[email protected]>
1 parent 7028b65 commit 0fa5f1c

File tree

13 files changed

+1956
-24
lines changed

13 files changed

+1956
-24
lines changed

kedro-datasets/RELEASE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# Upcoming Release
22

33
## Major features and improvements
4+
5+
- Added the following new datasets:
6+
7+
| Type | Description | Location |
8+
|-----------------------|----------------------------------------------------------------------------------------------------------------|-------------------------|
9+
| `spark.SparkDatasetV2` | A Spark dataset with Spark Connect, Databricks Connect support, and automatic Pandas-to-Spark conversion | `kedro_datasets.spark` |
10+
411
## Bug fixes and other changes
512
## Community contributions
613

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# SparkDatasetV2
2+
3+
`SparkDatasetV2` loads and saves data using Apache Spark DataFrames with improved support for Spark Connect, Databricks Connect, and automatic Pandas DataFrame conversion.
4+
5+
::: kedro_datasets.spark.SparkDatasetV2
6+
options:
7+
members: true
8+
show_source: true

kedro-datasets/docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Name | Description
5858
[spark.DeltaTableDataset](api/kedro_datasets/spark.DeltaTableDataset.md) | ``DeltaTableDataset`` loads data into DeltaTable objects.
5959
[spark.GBQQueryDataset](api/kedro_datasets/spark.GBQQueryDataset.md) | ``GBQQueryDataset`` loads data from Google BigQuery with a SQL query using BigQuery Spark connector.
6060
[spark.SparkDataset](api/kedro_datasets/spark.SparkDataset.md) | ``SparkDataset`` loads and saves Spark dataframes.
61+
[spark.SparkDatasetV2](api/kedro_datasets/spark.SparkDatasetV2.md) | ``SparkDatasetV2`` loads and saves Spark dataframes with support for Spark Connect, Databricks Connect, and automatic Pandas-to-Spark conversion.
6162
[spark.SparkHiveDataset](api/kedro_datasets/spark.SparkHiveDataset.md) |``SparkHiveDataset`` loads and saves Spark dataframes stored on Hive.
6263
[spark.SparkJDBCDataset](api/kedro_datasets/spark.SparkJDBCDataset.md) |``SparkJDBCDataset`` loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC.
6364
[spark.SparkStreamingDataset](api/kedro_datasets/spark.SparkStreamingDataset.md) |``SparkStreamingDataset`` loads data to Spark Streaming Dataframe objects.

kedro-datasets/kedro_datasets/_utils/databricks_utils.py

Lines changed: 133 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
"""Utility functions for Databricks."""
2+
from __future__ import annotations
3+
4+
import logging
15
import os
26
from fnmatch import fnmatch
37
from pathlib import PurePosixPath
48
from typing import TYPE_CHECKING, Union
59

10+
from kedro.io.core import get_protocol_and_path
611
from pyspark.sql import SparkSession
712

813
if TYPE_CHECKING:
914
from databricks.connect import DatabricksSession
1015
from pyspark.dbutils import DBUtils
1116

17+
logger = logging.getLogger(__name__)
18+
1219

1320
def parse_glob_pattern(pattern: str) -> str:
1421
special = ("*", "?", "[")
@@ -31,7 +38,7 @@ def strip_dbfs_prefix(path: str, prefix: str = "/dbfs") -> str:
3138
return path[len(prefix) :] if path.startswith(prefix) else path
3239

3340

34-
def dbfs_glob(pattern: str, dbutils: "DBUtils") -> list[str]:
41+
def dbfs_glob(pattern: str, dbutils: DBUtils) -> list[str]:
3542
"""Perform a custom glob search in DBFS using the provided pattern.
3643
It is assumed that version paths are managed by Kedro only.
3744
@@ -40,7 +47,7 @@ def dbfs_glob(pattern: str, dbutils: "DBUtils") -> list[str]:
4047
dbutils: dbutils instance to operate with DBFS.
4148
4249
Returns:
43-
List of DBFS paths prefixed with '/dbfs' that satisfy the glob pattern.
50+
List of DBFS paths prefixed with '/dbfs' that satisfy the glob pattern.
4451
"""
4552
pattern = strip_dbfs_prefix(pattern)
4653
prefix = parse_glob_pattern(pattern)
@@ -58,7 +65,7 @@ def dbfs_glob(pattern: str, dbutils: "DBUtils") -> list[str]:
5865
return sorted(matched)
5966

6067

61-
def get_dbutils(spark: Union[SparkSession, "DatabricksSession"]) -> "DBUtils":
68+
def get_dbutils(spark: SparkSession | DatabricksSession) -> DBUtils:
6269
"""Get the instance of 'dbutils' or None if the one could not be found."""
6370
dbutils = globals().get("dbutils")
6471
if dbutils:
@@ -71,16 +78,18 @@ def get_dbutils(spark: Union[SparkSession, "DatabricksSession"]) -> "DBUtils":
7178
except ImportError:
7279
try:
7380
import IPython # noqa: PLC0415
74-
except ImportError:
75-
pass
81+
except ImportError: # pragma: no cover
82+
pass # pragma: no cover
7683
else:
7784
ipython = IPython.get_ipython()
78-
dbutils = ipython.user_ns.get("dbutils") if ipython else None
85+
dbutils = (
86+
ipython.user_ns.get("dbutils") if ipython else None
87+
) # pragma: no cover
7988

8089
return dbutils
8190

8291

83-
def dbfs_exists(pattern: str, dbutils: "DBUtils") -> bool:
92+
def dbfs_exists(pattern: str, dbutils: DBUtils) -> bool:
8493
"""Perform an `ls` list operation in DBFS using the provided pattern.
8594
It is assumed that version paths are managed by Kedro.
8695
Broad `Exception` is present due to `dbutils.fs.ExecutionError` that
@@ -103,3 +112,120 @@ def dbfs_exists(pattern: str, dbutils: "DBUtils") -> bool:
103112
def deployed_on_databricks() -> bool:
104113
"""Check if running on Databricks."""
105114
return "DATABRICKS_RUNTIME_VERSION" in os.environ
115+
116+
117+
def parse_spark_filepath(filepath: str) -> tuple[str, str]:
118+
"""Parse filepath handling special cases like DBFS and Unity Catalog.
119+
120+
Args:
121+
filepath: Path to parse.
122+
123+
Returns:
124+
Tuple of (protocol, path).
125+
"""
126+
# Handle DBFS paths with /dbfs/ prefix
127+
if filepath.startswith("/dbfs/"):
128+
# /dbfs/path -> dbfs protocol with /path
129+
path = filepath[5:] # Remove /dbfs prefix, keep leading /
130+
return "dbfs", path
131+
132+
# Handle DBFS paths with dbfs:/ prefix (single slash format)
133+
if filepath.startswith("dbfs:") and not filepath.startswith("dbfs://"):
134+
# dbfs:/path -> dbfs protocol with /path
135+
path = filepath[5:] # Remove "dbfs:", keep the path
136+
if not path.startswith("/"):
137+
path = "/" + path
138+
return "dbfs", path
139+
140+
# Handle Unity Catalog volumes
141+
if filepath.startswith("/Volumes"):
142+
return "file", filepath
143+
144+
# For standard protocols with ://
145+
protocol, path = get_protocol_and_path(filepath)
146+
147+
# Normalise empty protocol to "file"
148+
if not protocol:
149+
protocol = "file" # pragma: no cover
150+
151+
return protocol, path
152+
153+
154+
def validate_databricks_path(filepath: str) -> None:
155+
"""Warn about potential Databricks path issues.
156+
157+
Args:
158+
filepath: Path to validate.
159+
"""
160+
if not deployed_on_databricks():
161+
return
162+
163+
# Check if path has a valid Databricks format
164+
valid_prefixes = ("/dbfs", "dbfs:/", "/Volumes")
165+
cloud_protocols = ("s3://", "s3a://", "s3n://", "gs://", "abfs://", "wasbs://")
166+
167+
has_valid_prefix = any(filepath.startswith(p) for p in valid_prefixes)
168+
has_cloud_protocol = any(filepath.startswith(p) for p in cloud_protocols)
169+
170+
if not has_valid_prefix and not has_cloud_protocol:
171+
logger.warning(
172+
"Using SparkDataset on Databricks without the `/dbfs/`, `dbfs:/`, or "
173+
"`/Volumes` prefix in the filepath may cause errors. Consider adding "
174+
"the appropriate prefix to: %s",
175+
filepath,
176+
)
177+
178+
179+
def to_spark_path(protocol: str, path: str) -> str:
180+
"""Convert protocol and path to Spark-compatible format.
181+
182+
Args:
183+
protocol: Detected protocol (e.g., 'file', 's3', 'dbfs').
184+
path: Path component without protocol.
185+
186+
Returns:
187+
Spark-compatible path string.
188+
"""
189+
# For Databricks DBFS paths
190+
if protocol == "dbfs":
191+
# Ensure dbfs:/ format for Spark
192+
if not path.startswith("/"):
193+
path = "/" + path
194+
return f"dbfs:{path}"
195+
196+
# Map protocols to Spark-preferred protocols
197+
spark_protocols = {
198+
"s3": "s3a", # Spark prefers s3a://
199+
"s3n": "s3a",
200+
"s3a": "s3a",
201+
"gs": "gs",
202+
"gcs": "gs",
203+
"abfs": "abfs",
204+
"abfss": "abfss",
205+
"wasbs": "wasbs",
206+
"wasb": "wasb",
207+
"hdfs": "hdfs",
208+
"file": "file",
209+
}
210+
211+
spark_protocol = spark_protocols.get(protocol, protocol)
212+
213+
# Handle local/file paths
214+
if spark_protocol == "file":
215+
# Unity Catalog volumes don't use file:// prefix
216+
if path.startswith("/Volumes"):
217+
return path
218+
# Regular local files need file:// prefix
219+
if not path.startswith("/"):
220+
path = f"/{path}"
221+
return f"file://{path}"
222+
223+
# Handle cloud and other protocols
224+
if spark_protocol:
225+
# Remove any existing protocol prefix from path
226+
if "://" in path:
227+
path = path.split("://", 1)[1]
228+
return f"{spark_protocol}://{path}"
229+
230+
# Fallback: return path as-is if we can't determine protocol
231+
return path

0 commit comments

Comments
 (0)