diff --git a/Sources/SparkConnect/DataFrameReader.swift b/Sources/SparkConnect/DataFrameReader.swift index 859b854..7230ee1 100644 --- a/Sources/SparkConnect/DataFrameReader.swift +++ b/Sources/SparkConnect/DataFrameReader.swift @@ -18,7 +18,7 @@ // import Foundation -/// An interface used to load a `DataFrame` from external storage systems +/// An interface used to load a ``DataFrame`` from external storage systems /// (e.g. file systems, key-value stores, etc). Use `SparkSession.read` to access this. public actor DataFrameReader: Sendable { var source: String = "" @@ -64,7 +64,7 @@ public actor DataFrameReader: Sendable { /// Specifies the input data source format. /// - Parameter source: A string. - /// - Returns: A `DataFrameReader`. + /// - Returns: A ``DataFrameReader``. public func format(_ source: String) -> DataFrameReader { self.source = source return self @@ -74,17 +74,53 @@ public actor DataFrameReader: Sendable { /// - Parameters: /// - key: A key string. /// - value: A value string. - /// - Returns: A `DataFrameReader`. + /// - Returns: A ``DataFrameReader``. public func option(_ key: String, _ value: String) -> DataFrameReader { self.extraOptions[key] = value return self } + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A bool value. + /// - Returns: A ``DataFrameReader``. + public func option(_ key: String, _ value: Bool) -> DataFrameReader { + return option(key, String(value)) + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Int` value. + /// - Returns: A ``DataFrameReader``. + public func option(_ key: String, _ value: Int) -> DataFrameReader { + return option(key, String(value)) + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Int64` value. + /// - Returns: A ``DataFrameReader``. + public func option(_ key: String, _ value: Int64) -> DataFrameReader { + return option(key, String(value)) + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Double` value. + /// - Returns: A ``DataFrameReader``. + public func option(_ key: String, _ value: Double) -> DataFrameReader { + return option(key, String(value)) + } + /// Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema /// automatically from data. By specifying the schema here, the underlying data source can skip /// the schema inference step, and thus speed up data loading. /// - Parameter schema: A DDL schema string. - /// - Returns: A `DataFrameReader`. + /// - Returns: A ``DataFrameReader``. @discardableResult public func schema(_ schema: String) async throws -> DataFrameReader { // Validate by parsing. @@ -97,25 +133,25 @@ public actor DataFrameReader: Sendable { return self } - /// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external + /// Loads input in as a ``DataFrame``, for data sources that don't require a path (e.g. external /// key-value stores). - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func load() -> DataFrame { return load([]) } - /// Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a + /// Loads input in as a ``DataFrame``, for data sources that require a path (e.g. data backed by a /// local or distributed file system). /// - Parameter path: A path string. - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func load(_ path: String) -> DataFrame { return load([path]) } - /// Loads input in as a `DataFrame`, for data sources that support multiple paths. Only works if + /// Loads input in as a ``DataFrame``, for data sources that support multiple paths. Only works if /// the source is a HadoopFsRelationProvider. /// - Parameter paths: An array of path strings. - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func load(_ paths: [String]) -> DataFrame { self.paths = paths @@ -139,85 +175,85 @@ public actor DataFrameReader: Sendable { return DataFrame(spark: sparkSession, plan: plan) } - /// Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other + /// Loads a CSV file and returns the result as a ``DataFrame``. See the documentation on the other /// overloaded `csv()` method for more details. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func csv(_ path: String) -> DataFrame { self.source = "csv" return load(path) } - /// Loads CSV files and returns the result as a `DataFrame`. + /// Loads CSV files and returns the result as a ``DataFrame``. /// This function will go through the input once to determine the input schema if `inferSchema` /// is enabled. To avoid going through the entire data once, disable `inferSchema` option or /// specify the schema explicitly using `schema`. /// - Parameter paths: Path strings. - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func csv(_ paths: String...) -> DataFrame { self.source = "csv" return load(paths) } - /// Loads a JSON file and returns the result as a `DataFrame`. + /// Loads a JSON file and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func json(_ path: String) -> DataFrame { self.source = "json" return load(path) } - /// Loads JSON files and returns the result as a `DataFrame`. + /// Loads JSON files and returns the result as a ``DataFrame``. /// - Parameter paths: Path strings - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func json(_ paths: String...) -> DataFrame { self.source = "json" return load(paths) } - /// Loads an XML file and returns the result as a `DataFrame`. + /// Loads an XML file and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func xml(_ path: String) -> DataFrame { self.source = "xml" return load(path) } - /// Loads XML files and returns the result as a `DataFrame`. + /// Loads XML files and returns the result as a ``DataFrame``. /// - Parameter paths: Path strings - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func xml(_ paths: String...) -> DataFrame { self.source = "xml" return load(paths) } - /// Loads an ORC file and returns the result as a `DataFrame`. + /// Loads an ORC file and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func orc(_ path: String) -> DataFrame { self.source = "orc" return load(path) } - /// Loads ORC files and returns the result as a `DataFrame`. + /// Loads ORC files and returns the result as a ``DataFrame``. /// - Parameter paths: Path strings - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func orc(_ paths: String...) -> DataFrame { self.source = "orc" return load(paths) } - /// Loads a Parquet file and returns the result as a `DataFrame`. + /// Loads a Parquet file and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func parquet(_ path: String) -> DataFrame { self.source = "parquet" return load(path) } - /// Loads Parquet files, returning the result as a `DataFrame`. + /// Loads Parquet files, returning the result as a ``DataFrame``. /// - Parameter paths: Path strings - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func parquet(_ paths: String...) -> DataFrame { self.source = "parquet" return load(paths) diff --git a/Sources/SparkConnect/DataStreamReader.swift b/Sources/SparkConnect/DataStreamReader.swift index 41763e7..e90a1c1 100644 --- a/Sources/SparkConnect/DataStreamReader.swift +++ b/Sources/SparkConnect/DataStreamReader.swift @@ -18,7 +18,7 @@ // import Foundation -/// An actor to load a streaming `Dataset` from external storage systems +/// An actor to load a streaming ``DataFrame`` from external storage systems /// (e.g. file systems, key-value stores, etc). Use `SparkSession.readStream` to access this. public actor DataStreamReader: Sendable { var source: String = "" @@ -47,7 +47,7 @@ public actor DataStreamReader: Sendable { /// automatically from data. By specifying the schema here, the underlying data source can skip /// the schema inference step, and thus speed up data loading. /// - Parameter schema: A DDL schema string. - /// - Returns: A `DataStreamReader`. + /// - Returns: A ``DataStreamReader``. @discardableResult public func schema(_ schema: String) async throws -> DataStreamReader { // Validate by parsing. @@ -64,7 +64,7 @@ public actor DataStreamReader: Sendable { /// - Parameters: /// - key: A key string. /// - value: A value string. - /// - Returns: A `DataStreamReader`. + /// - Returns: A ``DataStreamReader``. public func option(_ key: String, _ value: String) -> DataStreamReader { self.extraOptions[key] = value return self @@ -74,35 +74,41 @@ public actor DataStreamReader: Sendable { /// - Parameters: /// - key: A key string. /// - value: A `Bool` value. - /// - Returns: A `DataStreamReader`. + /// - Returns: A ``DataStreamReader``. public func option(_ key: String, _ value: Bool) -> DataStreamReader { - self.extraOptions[key] = String(value) - return self + return option(key, String(value)) + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Int` value. + /// - Returns: A ``DataStreamReader``. + public func option(_ key: String, _ value: Int) -> DataStreamReader { + return option(key, String(value)) } /// Adds an input option for the underlying data source. /// - Parameters: /// - key: A key string. /// - value: A `Int64` value. - /// - Returns: A `DataStreamReader`. + /// - Returns: A ``DataStreamReader``. public func option(_ key: String, _ value: Int64) -> DataStreamReader { - self.extraOptions[key] = String(value) - return self + return option(key, String(value)) } /// Adds an input option for the underlying data source. /// - Parameters: /// - key: A key string. /// - value: A `Double` value. - /// - Returns: A `DataStreamReader`. + /// - Returns: A ``DataStreamReader``. public func option(_ key: String, _ value: Double) -> DataStreamReader { - self.extraOptions[key] = String(value) - return self + return option(key, String(value)) } /// Adds input options for the underlying data source. /// - Parameter options: A string-string dictionary. - /// - Returns: A `DataStreamReader`. + /// - Returns: A ``DataStreamReader``. public func options(_ options: [String: String]) -> DataStreamReader { for (key, value) in options { self.extraOptions[key] = value @@ -110,17 +116,17 @@ public actor DataStreamReader: Sendable { return self } - /// Loads input data stream in as a `DataFrame`, for data streams that don't require a path + /// Loads input data stream in as a ``DataFrame``, for data streams that don't require a path /// (e.g. external key-value stores). - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func load() -> DataFrame { return load([]) } - /// Loads input data stream in as a `DataFrame`, for data streams that require a path + /// Loads input data stream in as a ``DataFrame``, for data streams that require a path /// (e.g. data backed by a local or distributed file system). /// - Parameter path: A path string. - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func load(_ path: String) -> DataFrame { return load([path]) } @@ -149,7 +155,7 @@ public actor DataStreamReader: Sendable { return DataFrame(spark: sparkSession, plan: plan) } - /// Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should + /// Define a Streaming ``DataFrame`` on a Table. The DataSource corresponding to the table should /// support streaming mode. /// - Parameter tableName: The name of the table. /// - Returns: A ``DataFrame``. @@ -171,49 +177,49 @@ public actor DataStreamReader: Sendable { return DataFrame(spark: sparkSession, plan: plan) } - /// Loads a text file stream and returns the result as a `DataFrame`. + /// Loads a text file stream and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func text(_ path: String) -> DataFrame { self.source = "text" return load(path) } - /// Loads a CSV file stream and returns the result as a `DataFrame`. + /// Loads a CSV file stream and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func csv(_ path: String) -> DataFrame { self.source = "csv" return load(path) } - /// Loads a JSON file stream and returns the result as a `DataFrame`. + /// Loads a JSON file stream and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func json(_ path: String) -> DataFrame { self.source = "json" return load(path) } - /// Loads an XML file stream and returns the result as a `DataFrame`. + /// Loads an XML file stream and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func xml(_ path: String) -> DataFrame { self.source = "xml" return load(path) } - /// Loads an ORC file stream and returns the result as a `DataFrame`. + /// Loads an ORC file stream and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func orc(_ path: String) -> DataFrame { self.source = "orc" return load(path) } - /// Loads a Parquet file stream and returns the result as a `DataFrame`. + /// Loads a Parquet file stream and returns the result as a ``DataFrame``. /// - Parameter path: A path string - /// - Returns: A `DataFrame`. + /// - Returns: A ``DataFrame``. public func parquet(_ path: String) -> DataFrame { self.source = "parquet" return load(path) diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index 392f387..2ef0cef 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -91,14 +91,14 @@ public actor SparkSession { await client.stop() } - /// Returns a `DataFrame` with no rows or columns. + /// Returns a ``DataFrame`` with no rows or columns. public var emptyDataFrame: DataFrame { get async { return await DataFrame(spark: self, plan: client.getLocalRelation()) } } - /// Create a ``DataFrame`` with a single ``Int64`` column name `id`, containing elements in a + /// Create a ``DataFrame`` with a single `Int64` column name `id`, containing elements in a /// range from 0 to `end` (exclusive) with step value 1. /// /// - Parameter end: A value for the end of range. @@ -107,7 +107,7 @@ public actor SparkSession { return try await range(0, end) } - /// Create a ``DataFrame`` with a single ``Int64`` column named `id`, containing elements in a + /// Create a ``DataFrame`` with a single `Int64` column named `id`, containing elements in a /// range from `start` to `end` (exclusive) with a step value (default: 1). /// /// - Parameters: @@ -138,7 +138,7 @@ public actor SparkSession { /// /// - Parameter sqlText: A SQL query string /// - Returns: A DataFrame containing the query results - /// - Throws: `SparkConnectError` if the query execution fails + /// - Throws: ``SparkConnectError`` if the query execution fails public func sql(_ sqlText: String) async throws -> DataFrame { return try await DataFrame(spark: self, sqlText: sqlText) } @@ -161,7 +161,7 @@ public actor SparkSession { /// - sqlText: A SQL query with positional parameter placeholders (`?`) /// - args: Parameter values to substitute for the placeholders /// - Returns: A DataFrame containing the query results - /// - Throws: `SparkConnectError` if the query execution fails or if parameter conversion fails + /// - Throws: ``SparkConnectError`` if the query execution fails or if parameter conversion fails public func sql(_ sqlText: String, _ args: Sendable...) async throws -> DataFrame { return try await DataFrame(spark: self, sqlText: sqlText, args) } @@ -186,14 +186,14 @@ public actor SparkSession { /// - sqlText: A SQL query with named parameter placeholders (`:paramName`) /// - args: A dictionary mapping parameter names to values /// - Returns: A DataFrame containing the query results - /// - Throws: `SparkConnectError` if the query execution fails or if parameter conversion fails + /// - Throws: ``SparkConnectError`` if the query execution fails or if parameter conversion fails public func sql(_ sqlText: String, args: [String: Sendable]) async throws -> DataFrame { return try await DataFrame(spark: self, sqlText: sqlText, args) } - /// Returns a DataFrameReader for reading data in various formats. + /// Returns a ``DataFrameReader`` for reading data in various formats. /// - /// The DataFrameReader provides methods to load data from external storage systems + /// The ``DataFrameReader`` provides methods to load data from external storage systems /// such as file systems and databases. /// /// ```swift @@ -212,16 +212,16 @@ public actor SparkSession { /// .orc("path/to/file.orc") /// ``` /// - /// - Returns: A DataFrameReader instance configured for this session + /// - Returns: A ``DataFrameReader`` instance configured for this session public var read: DataFrameReader { get { DataFrameReader(sparkSession: self) } } - /// Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. + /// Returns a ``DataStreamReader`` that can be used to read streaming data in as a ``DataFrame``. /// - /// The DataFrameReader provides methods to load data from external storage systems + /// The ``DataFrameReader`` provides methods to load data from external storage systems /// such as file systems, databases, and streaming sources. /// /// ```swift @@ -229,16 +229,16 @@ public actor SparkSession { /// let orcData = spark.readStream.orc("path/to/file.orc") /// ``` /// - /// - Returns: A DataFrameReader instance configured for this session + /// - Returns: A ``DataFrameReader`` instance configured for this session public var readStream: DataStreamReader { get { DataStreamReader(sparkSession: self) } } - /// Returns a DataFrame representing the specified table or view. + /// Returns a ``DataFrame`` representing the specified table or view. /// - /// This method retrieves a table or view from the Spark catalog and returns it as a DataFrame. + /// This method retrieves a table or view from the Spark catalog and returns it as a ``DataFrame``. /// The table name can be qualified with a database name (e.g., "database.table") or unqualified. /// /// ```swift @@ -253,8 +253,8 @@ public actor SparkSession { /// ``` /// /// - Parameter tableName: The name of the table or view to load - /// - Returns: A DataFrame representing the table data - /// - Throws: `SparkConnectError` if the table doesn't exist or cannot be accessed + /// - Returns: A ``DataFrame`` representing the table data + /// - Throws: ``SparkConnectError`` if the table doesn't exist or cannot be accessed public func table(_ tableName: String) async throws -> DataFrame { return await read.table(tableName) } @@ -302,7 +302,7 @@ public actor SparkSession { /// ``` /// /// - Parameter tag: The tag to add. Cannot contain commas or be empty - /// - Throws: `SparkConnectError` if the tag is invalid + /// - Throws: ``SparkConnectError`` if the tag is invalid public func addTag(_ tag: String) async throws { try await client.addTag(tag: tag) } @@ -317,7 +317,7 @@ public actor SparkSession { /// ``` /// /// - Parameter tag: The tag to remove. Cannot contain commas or be empty - /// - Throws: `SparkConnectError` if the tag is invalid + /// - Throws: ``SparkConnectError`` if the tag is invalid public func removeTag(_ tag: String) async throws { try await client.removeTag(tag: tag) } @@ -410,7 +410,7 @@ public actor SparkSession { public actor Builder { var sparkConf: [String: String] = [:] - /// Sets a configuration option for the SparkSession. + /// Sets a configuration option for the ``SparkSession``. /// /// Configuration options control various aspects of Spark behavior, from execution settings /// to SQL optimization parameters. @@ -431,6 +431,42 @@ public actor SparkSession { return self } + /// Sets a configuration option for the ``SparkSession``. + /// - Parameters: + /// - key: The configuration key (e.g., "spark.sql.shuffle.partitions") + /// - value: The configuration bool value. + /// - Returns: The builder instance for method chaining + public func config(_ key: String, _ value: Bool) -> Builder { + return config(key, String(value)) + } + + /// Sets a configuration option for the ``SparkSession``. + /// - Parameters: + /// - key: The configuration key (e.g., "spark.sql.shuffle.partitions") + /// - value: The configuration `Int` value. + /// - Returns: The builder instance for method chaining + public func config(_ key: String, _ value: Int) -> Builder { + return config(key, String(value)) + } + + /// Sets a configuration option for the ``SparkSession``. + /// - Parameters: + /// - key: The configuration key (e.g., "spark.sql.shuffle.partitions") + /// - value: The configuration `Int64` value. + /// - Returns: The builder instance for method chaining + public func config(_ key: String, _ value: Int64) -> Builder { + return config(key, String(value)) + } + + /// Sets a configuration option for the ``SparkSession``. + /// - Parameters: + /// - key: The configuration key (e.g., "spark.sql.shuffle.partitions") + /// - value: The configuration `Double` value. + /// - Returns: The builder instance for method chaining + public func config(_ key: String, _ value: Double) -> Builder { + return config(key, String(value)) + } + /// Removes all configuration options from the builder. /// /// This method clears all previously set configurations, allowing you to start fresh. @@ -470,7 +506,7 @@ public actor SparkSession { return config("spark.remote", url) } - /// Sets the application name for this SparkSession. + /// Sets the application name for this ``SparkSession``. /// /// The application name is displayed in the Spark UI and helps identify your application /// among others running on the cluster. @@ -505,7 +541,7 @@ public actor SparkSession { } /// Create a new ``SparkSession``. If `spark.remote` is not given, `sc://localhost:15002` is used. - /// - Returns: A newly created `SparkSession`. + /// - Returns: A newly created ``SparkSession``. func create() async throws -> SparkSession { let remote = ProcessInfo.processInfo.environment["SPARK_REMOTE"] ?? "sc://localhost:15002" let session = SparkSession(sparkConf["spark.remote"] ?? remote) @@ -516,9 +552,9 @@ public actor SparkSession { return session } - /// Creates or retrieves an existing SparkSession. + /// Creates or retrieves an existing ``SparkSession``. /// - /// This is the primary method for obtaining a SparkSession instance. If a session with + /// This is the primary method for obtaining a ``SparkSession`` instance. If a session with /// the same configuration already exists, it will be returned. Otherwise, a new session /// is created with the specified configuration. /// @@ -540,7 +576,7 @@ public actor SparkSession { /// ``` /// /// - Returns: A configured SparkSession instance - /// - Throws: `SparkConnectError` if connection fails or configuration is invalid + /// - Throws: ``SparkConnectError`` if connection fails or configuration is invalid public func getOrCreate() async throws -> SparkSession { return try await create() }