Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 66 additions & 30 deletions Sources/SparkConnect/DataFrameReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//
import Foundation

/// An interface used to load a `DataFrame` from external storage systems
/// An interface used to load a ``DataFrame`` from external storage systems
/// (e.g. file systems, key-value stores, etc). Use `SparkSession.read` to access this.
public actor DataFrameReader: Sendable {
var source: String = ""
Expand Down Expand Up @@ -64,7 +64,7 @@ public actor DataFrameReader: Sendable {

/// Specifies the input data source format.
/// - Parameter source: A string.
/// - Returns: A `DataFrameReader`.
/// - Returns: A ``DataFrameReader``.
public func format(_ source: String) -> DataFrameReader {
self.source = source
return self
Expand All @@ -74,17 +74,53 @@ public actor DataFrameReader: Sendable {
/// - Parameters:
/// - key: A key string.
/// - value: A value string.
/// - Returns: A `DataFrameReader`.
/// - Returns: A ``DataFrameReader``.
public func option(_ key: String, _ value: String) -> DataFrameReader {
self.extraOptions[key] = value
return self
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A bool value.
/// - Returns: A ``DataFrameReader``.
public func option(_ key: String, _ value: Bool) -> DataFrameReader {
return option(key, String(value))
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A `Int` value.
/// - Returns: A ``DataFrameReader``.
public func option(_ key: String, _ value: Int) -> DataFrameReader {
return option(key, String(value))
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A `Int64` value.
/// - Returns: A ``DataFrameReader``.
public func option(_ key: String, _ value: Int64) -> DataFrameReader {
return option(key, String(value))
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A `Double` value.
/// - Returns: A ``DataFrameReader``.
public func option(_ key: String, _ value: Double) -> DataFrameReader {
return option(key, String(value))
}

/// Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
/// automatically from data. By specifying the schema here, the underlying data source can skip
/// the schema inference step, and thus speed up data loading.
/// - Parameter schema: A DDL schema string.
/// - Returns: A `DataFrameReader`.
/// - Returns: A ``DataFrameReader``.
@discardableResult
public func schema(_ schema: String) async throws -> DataFrameReader {
// Validate by parsing.
Expand All @@ -97,25 +133,25 @@ public actor DataFrameReader: Sendable {
return self
}

/// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external
/// Loads input in as a ``DataFrame``, for data sources that don't require a path (e.g. external
/// key-value stores).
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func load() -> DataFrame {
return load([])
}

/// Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a
/// Loads input in as a ``DataFrame``, for data sources that require a path (e.g. data backed by a
/// local or distributed file system).
/// - Parameter path: A path string.
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func load(_ path: String) -> DataFrame {
return load([path])
}

/// Loads input in as a `DataFrame`, for data sources that support multiple paths. Only works if
/// Loads input in as a ``DataFrame``, for data sources that support multiple paths. Only works if
/// the source is a HadoopFsRelationProvider.
/// - Parameter paths: An array of path strings.
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func load(_ paths: [String]) -> DataFrame {
self.paths = paths

Expand All @@ -139,85 +175,85 @@ public actor DataFrameReader: Sendable {
return DataFrame(spark: sparkSession, plan: plan)
}

/// Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other
/// Loads a CSV file and returns the result as a ``DataFrame``. See the documentation on the other
/// overloaded `csv()` method for more details.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func csv(_ path: String) -> DataFrame {
self.source = "csv"
return load(path)
}

/// Loads CSV files and returns the result as a `DataFrame`.
/// Loads CSV files and returns the result as a ``DataFrame``.
/// This function will go through the input once to determine the input schema if `inferSchema`
/// is enabled. To avoid going through the entire data once, disable `inferSchema` option or
/// specify the schema explicitly using `schema`.
/// - Parameter paths: Path strings.
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func csv(_ paths: String...) -> DataFrame {
self.source = "csv"
return load(paths)
}

/// Loads a JSON file and returns the result as a `DataFrame`.
/// Loads a JSON file and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func json(_ path: String) -> DataFrame {
self.source = "json"
return load(path)
}

/// Loads JSON files and returns the result as a `DataFrame`.
/// Loads JSON files and returns the result as a ``DataFrame``.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func json(_ paths: String...) -> DataFrame {
self.source = "json"
return load(paths)
}

/// Loads an XML file and returns the result as a `DataFrame`.
/// Loads an XML file and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func xml(_ path: String) -> DataFrame {
self.source = "xml"
return load(path)
}

/// Loads XML files and returns the result as a `DataFrame`.
/// Loads XML files and returns the result as a ``DataFrame``.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func xml(_ paths: String...) -> DataFrame {
self.source = "xml"
return load(paths)
}

/// Loads an ORC file and returns the result as a `DataFrame`.
/// Loads an ORC file and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func orc(_ path: String) -> DataFrame {
self.source = "orc"
return load(path)
}

/// Loads ORC files and returns the result as a `DataFrame`.
/// Loads ORC files and returns the result as a ``DataFrame``.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func orc(_ paths: String...) -> DataFrame {
self.source = "orc"
return load(paths)
}

/// Loads a Parquet file and returns the result as a `DataFrame`.
/// Loads a Parquet file and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func parquet(_ path: String) -> DataFrame {
self.source = "parquet"
return load(path)
}

/// Loads Parquet files, returning the result as a `DataFrame`.
/// Loads Parquet files, returning the result as a ``DataFrame``.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func parquet(_ paths: String...) -> DataFrame {
self.source = "parquet"
return load(paths)
Expand Down
66 changes: 36 additions & 30 deletions Sources/SparkConnect/DataStreamReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//
import Foundation

/// An actor to load a streaming `Dataset` from external storage systems
/// An actor to load a streaming ``DataFrame`` from external storage systems
/// (e.g. file systems, key-value stores, etc). Use `SparkSession.readStream` to access this.
public actor DataStreamReader: Sendable {
var source: String = ""
Expand Down Expand Up @@ -47,7 +47,7 @@ public actor DataStreamReader: Sendable {
/// automatically from data. By specifying the schema here, the underlying data source can skip
/// the schema inference step, and thus speed up data loading.
/// - Parameter schema: A DDL schema string.
/// - Returns: A `DataStreamReader`.
/// - Returns: A ``DataStreamReader``.
@discardableResult
public func schema(_ schema: String) async throws -> DataStreamReader {
// Validate by parsing.
Expand All @@ -64,7 +64,7 @@ public actor DataStreamReader: Sendable {
/// - Parameters:
/// - key: A key string.
/// - value: A value string.
/// - Returns: A `DataStreamReader`.
/// - Returns: A ``DataStreamReader``.
public func option(_ key: String, _ value: String) -> DataStreamReader {
self.extraOptions[key] = value
return self
Expand All @@ -74,53 +74,59 @@ public actor DataStreamReader: Sendable {
/// - Parameters:
/// - key: A key string.
/// - value: A `Bool` value.
/// - Returns: A `DataStreamReader`.
/// - Returns: A ``DataStreamReader``.
public func option(_ key: String, _ value: Bool) -> DataStreamReader {
self.extraOptions[key] = String(value)
return self
return option(key, String(value))
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A `Int` value.
/// - Returns: A ``DataStreamReader``.
public func option(_ key: String, _ value: Int) -> DataStreamReader {
return option(key, String(value))
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A `Int64` value.
/// - Returns: A `DataStreamReader`.
/// - Returns: A ``DataStreamReader``.
public func option(_ key: String, _ value: Int64) -> DataStreamReader {
self.extraOptions[key] = String(value)
return self
return option(key, String(value))
}

/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A `Double` value.
/// - Returns: A `DataStreamReader`.
/// - Returns: A ``DataStreamReader``.
public func option(_ key: String, _ value: Double) -> DataStreamReader {
self.extraOptions[key] = String(value)
return self
return option(key, String(value))
}

/// Adds input options for the underlying data source.
/// - Parameter options: A string-string dictionary.
/// - Returns: A `DataStreamReader`.
/// - Returns: A ``DataStreamReader``.
public func options(_ options: [String: String]) -> DataStreamReader {
for (key, value) in options {
self.extraOptions[key] = value
}
return self
}

/// Loads input data stream in as a `DataFrame`, for data streams that don't require a path
/// Loads input data stream in as a ``DataFrame``, for data streams that don't require a path
/// (e.g. external key-value stores).
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func load() -> DataFrame {
return load([])
}

/// Loads input data stream in as a `DataFrame`, for data streams that require a path
/// Loads input data stream in as a ``DataFrame``, for data streams that require a path
/// (e.g. data backed by a local or distributed file system).
/// - Parameter path: A path string.
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func load(_ path: String) -> DataFrame {
return load([path])
}
Expand Down Expand Up @@ -149,7 +155,7 @@ public actor DataStreamReader: Sendable {
return DataFrame(spark: sparkSession, plan: plan)
}

/// Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
/// Define a Streaming ``DataFrame`` on a Table. The DataSource corresponding to the table should
/// support streaming mode.
/// - Parameter tableName: The name of the table.
/// - Returns: A ``DataFrame``.
Expand All @@ -171,49 +177,49 @@ public actor DataStreamReader: Sendable {
return DataFrame(spark: sparkSession, plan: plan)
}

/// Loads a text file stream and returns the result as a `DataFrame`.
/// Loads a text file stream and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func text(_ path: String) -> DataFrame {
self.source = "text"
return load(path)
}

/// Loads a CSV file stream and returns the result as a `DataFrame`.
/// Loads a CSV file stream and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func csv(_ path: String) -> DataFrame {
self.source = "csv"
return load(path)
}

/// Loads a JSON file stream and returns the result as a `DataFrame`.
/// Loads a JSON file stream and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func json(_ path: String) -> DataFrame {
self.source = "json"
return load(path)
}

/// Loads an XML file stream and returns the result as a `DataFrame`.
/// Loads an XML file stream and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func xml(_ path: String) -> DataFrame {
self.source = "xml"
return load(path)
}

/// Loads an ORC file stream and returns the result as a `DataFrame`.
/// Loads an ORC file stream and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func orc(_ path: String) -> DataFrame {
self.source = "orc"
return load(path)
}

/// Loads a Parquet file stream and returns the result as a `DataFrame`.
/// Loads a Parquet file stream and returns the result as a ``DataFrame``.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
/// - Returns: A ``DataFrame``.
public func parquet(_ path: String) -> DataFrame {
self.source = "parquet"
return load(path)
Expand Down
Loading
Loading