diff --git a/.spi.yml b/.spi.yml
index d6c5826..20c68bd 100644
--- a/.spi.yml
+++ b/.spi.yml
@@ -15,3 +15,6 @@
version: 1
metadata:
authors: "Apache Spark project"
+builder:
+ configs:
+ - documentation_targets: [SparkConnect]
diff --git a/Package.swift b/Package.swift
index 5a4eb62..2bd68e9 100644
--- a/Package.swift
+++ b/Package.swift
@@ -47,6 +47,9 @@ let package = Package(
.product(name: "GRPCProtobuf", package: "grpc-swift-protobuf"),
.product(name: "GRPCNIOTransportHTTP2", package: "grpc-swift-nio-transport"),
.product(name: "FlatBuffers", package: "flatbuffers"),
+ ],
+ resources: [
+ .process("Documentation.docc")
]
),
.executableTarget(
diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift
index cbe4793..ad80b07 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -23,7 +23,154 @@ import GRPCNIOTransportHTTP2
import GRPCProtobuf
import Synchronization
-/// A DataFrame which supports only SQL queries
+/// A distributed collection of data organized into named columns.
+///
+/// A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various
+/// functions in ``SparkSession``. Once created, it can be manipulated using the various domain-specific
+/// language (DSL) functions defined in: ``DataFrame``, ``Column``, and functions.
+///
+/// ## Creating DataFrames
+///
+/// DataFrames can be created from various sources:
+///
+/// ```swift
+/// // From a range
+/// let df1 = try await spark.range(1, 100)
+///
+/// // From a SQL query
+/// let df2 = try await spark.sql("SELECT * FROM users")
+///
+/// // From files
+/// let df3 = try await spark.read.csv("data.csv")
+/// ```
+///
+/// ## Common Operations
+///
+/// ### Transformations
+///
+/// ```swift
+/// // Select specific columns
+/// let names = try await df.select("name", "age")
+///
+/// // Filter rows
+/// let adults = try await df.filter("age >= 18")
+///
+/// // Group and aggregate
+/// let stats = try await df.groupBy("department").agg("avg(salary)", "count(*)")
+/// ```
+///
+/// ### Actions
+///
+/// ```swift
+/// // Show the first 20 rows
+/// try await df.show()
+///
+/// // Collect all data to the driver
+/// let rows = try await df.collect()
+///
+/// // Count rows
+/// let count = try await df.count()
+/// ```
+///
+/// ## Topics
+///
+/// ### Basic Information
+/// - ``columns``
+/// - ``schema``
+/// - ``dtypes``
+/// - ``sparkSession``
+///
+/// ### Data Collection
+/// - ``count()``
+/// - ``collect()``
+/// - ``head(_:)``
+/// - ``tail(_:)``
+/// - ``show()``
+/// - ``show(_:)``
+/// - ``show(_:_:)``
+/// - ``show(_:_:_:)``
+///
+/// ### Transformation Operations
+/// - ``select(_:)``
+/// - ``selectExpr(_:)``
+/// - ``filter(_:)``
+/// - ``where(_:)``
+/// - ``sort(_:)``
+/// - ``orderBy(_:)``
+/// - ``limit(_:)``
+/// - ``offset(_:)``
+/// - ``drop(_:)``
+/// - ``withColumnRenamed(_:_:)``
+///
+/// ### Join Operations
+/// - ``join(_:)``
+/// - ``join(_:_:_:)``
+/// - ``join(_:joinExprs:)``
+/// - ``join(_:joinExprs:joinType:)``
+/// - ``crossJoin(_:)``
+/// - ``lateralJoin(_:)``
+/// - ``lateralJoin(_:joinType:)``
+/// - ``lateralJoin(_:joinExprs:)``
+/// - ``lateralJoin(_:joinExprs:joinType:)``
+///
+/// ### Set Operations
+/// - ``union(_:)``
+/// - ``unionAll(_:)``
+/// - ``unionByName(_:_:)``
+/// - ``intersect(_:)``
+/// - ``intersectAll(_:)``
+/// - ``except(_:)``
+/// - ``exceptAll(_:)``
+///
+/// ### Partitioning
+/// - ``repartition(_:)``
+/// - ``repartition(_:_:)``
+/// - ``repartitionByExpression(_:_:)``
+/// - ``coalesce(_:)``
+///
+/// ### Grouping Operations
+/// - ``groupBy(_:)``
+/// - ``rollup(_:)``
+/// - ``cube(_:)``
+///
+/// ### Persistence
+/// - ``cache()``
+/// - ``persist(storageLevel:)``
+/// - ``unpersist(blocking:)``
+/// - ``storageLevel``
+///
+/// ### Schema Information
+/// - ``printSchema()``
+/// - ``printSchema(_:)``
+/// - ``explain()``
+/// - ``explain(_:)``
+///
+/// ### View Creation
+/// - ``createTempView(_:)``
+/// - ``createOrReplaceTempView(_:)``
+/// - ``createGlobalTempView(_:)``
+/// - ``createOrReplaceGlobalTempView(_:)``
+///
+/// ### Write Operations
+/// - ``write``
+/// - ``writeTo(_:)``
+///
+/// ### Sampling
+/// - ``sample(_:_:_:)``
+/// - ``sample(_:_:)``
+/// - ``sample(_:)``
+///
+/// ### Utility Methods
+/// - ``isEmpty()``
+/// - ``isLocal()``
+/// - ``isStreaming()``
+/// - ``inputFiles()``
+/// - ``semanticHash()``
+/// - ``sameSemantics(other:)``
+///
+/// ### Internal Methods
+/// - ``rdd()``
+/// - ``getPlan()``
public actor DataFrame: Sendable {
var spark: SparkSession
var plan: Plan
@@ -340,23 +487,58 @@ public actor DataFrame: Sendable {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getWithColumnRenamed(self.plan.root, colsMap))
}
- /// Return a new ``DataFrame`` with filtered rows using the given expression.
- /// - Parameter conditionExpr: A string to filter.
- /// - Returns: A ``DataFrame`` with subset of rows.
+ /// Filters rows using the given condition.
+ ///
+ /// The condition should be a SQL expression that evaluates to a boolean value.
+ ///
+ /// ```swift
+ /// // Filter with simple condition
+ /// let adults = df.filter("age >= 18")
+ ///
+ /// // Filter with complex condition
+ /// let qualifiedUsers = df.filter("age >= 21 AND department = 'Engineering'")
+ ///
+ /// // Filter with SQL functions
+ /// let recent = df.filter("date_diff(current_date(), join_date) < 30")
+ /// ```
+ ///
+ /// - Parameter conditionExpr: A SQL expression string for filtering
+ /// - Returns: A new DataFrame containing only rows that match the condition
public func filter(_ conditionExpr: String) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getFilter(self.plan.root, conditionExpr))
}
- /// Return a new ``DataFrame`` with filtered rows using the given expression.
- /// - Parameter conditionExpr: A string to filter.
- /// - Returns: A ``DataFrame`` with subset of rows.
+ /// Filters rows using the given condition (alias for filter).
+ ///
+ /// This method is an alias for ``filter(_:)`` and behaves identically.
+ ///
+ /// ```swift
+ /// let highSalary = df.where("salary > 100000")
+ /// ```
+ ///
+ /// - Parameter conditionExpr: A SQL expression string for filtering
+ /// - Returns: A new DataFrame containing only rows that match the condition
public func `where`(_ conditionExpr: String) -> DataFrame {
return filter(conditionExpr)
}
- /// Return a new ``DataFrame`` sorted by the specified column(s).
- /// - Parameter cols: Column names.
- /// - Returns: A sorted ``DataFrame``
+ /// Returns a new DataFrame sorted by the specified columns.
+ ///
+ /// By default, sorts in ascending order. Use `desc("column")` for descending order.
+ ///
+ /// ```swift
+ /// // Sort by single column (ascending)
+ /// let sorted = df.sort("age")
+ ///
+ /// // Sort by multiple columns
+ /// let multiSort = df.sort("department", "salary")
+ ///
+ /// // Sort with mixed order
+ /// let mixedSort = df.sort("department", "desc(salary)")
+ /// ```
+ ///
+ /// - Parameter cols: Column names or expressions to sort by
+ /// - Returns: A new DataFrame sorted by the specified columns
public func sort(_ cols: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getSort(self.plan.root, cols))
}
@@ -369,8 +551,22 @@ public actor DataFrame: Sendable {
}
/// Limits the result count to the number specified.
- /// - Parameter n: Number of records to return. Will return this number of records or all records if the ``DataFrame`` contains less than this number of records.
- /// - Returns: A subset of the records
+ ///
+ /// This transformation is often used for:
+ /// - Previewing data
+ /// - Reducing data size for testing
+ /// - Implementing pagination
+ ///
+ /// ```swift
+ /// // Get top 10 records
+ /// let top10 = df.limit(10)
+ ///
+ /// // Preview data
+ /// let preview = df.filter("status = 'active'").limit(100)
+ /// ```
+ ///
+ /// - Parameter n: Maximum number of rows to return
+ /// - Returns: A new DataFrame with at most n rows
public func limit(_ n: Int32) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getLimit(self.plan.root, n))
}
@@ -420,9 +616,21 @@ public actor DataFrame: Sendable {
return sample(false, fraction)
}
- /// Returns the first `n` rows.
- /// - Parameter n: The number of rows. (default: 1)
- /// - Returns: ``[Row]``
+ /// Returns the first n rows.
+ ///
+ /// This method is useful for quickly examining the contents of a DataFrame.
+ ///
+ /// ```swift
+ /// // Get the first row
+ /// let firstRow = try await df.head()
+ ///
+ /// // Get the first 5 rows
+ /// let firstFive = try await df.head(5)
+ /// ```
+ ///
+ /// - Parameter n: Number of rows to return (default: 1)
+ /// - Returns: An array of ``Row`` objects
+ /// - Throws: `SparkConnectError` if the operation fails
public func head(_ n: Int32 = 1) async throws -> [Row] {
return try await limit(n).collect()
}
@@ -463,8 +671,22 @@ public actor DataFrame: Sendable {
return try await select().limit(1).count() == 0
}
- /// Persist this `DataFrame` with the default storage level (`MEMORY_AND_DISK`).
- /// - Returns: A `DataFrame`.
+ /// Persists this DataFrame with the default storage level (MEMORY_AND_DISK).
+ ///
+ /// Caching can significantly improve performance when a DataFrame is accessed multiple times.
+ /// The cached data is stored in memory and/or disk depending on the storage level.
+ ///
+ /// ```swift
+ /// // Cache a frequently used DataFrame
+ /// let cachedDf = try await df.cache()
+ ///
+ /// // Use the cached DataFrame multiple times
+ /// let count1 = try await cachedDf.count()
+ /// let count2 = try await cachedDf.filter("age > 30").count()
+ /// ```
+ ///
+ /// - Returns: The cached DataFrame
+ /// - Throws: `SparkConnectError` if the operation fails
public func cache() async throws -> DataFrame {
return try await persist()
}
@@ -575,24 +797,47 @@ public actor DataFrame: Sendable {
}
}
- /// Join with another `DataFrame`.
- /// Behaves as an INNER JOIN and requires a subsequent join predicate.
- /// - Parameter right: Right side of the join operation.
- /// - Returns: A `DataFrame`.
+ /// Join with another DataFrame.
+ ///
+ /// This performs an inner join and requires a subsequent join predicate.
+ /// For other join types, use the overloaded methods with join type parameter.
+ ///
+ /// ```swift
+ /// // Basic join (requires join condition)
+ /// let joined = df1.join(df2)
+ /// .where("df1.id = df2.user_id")
+ ///
+ /// // Join with condition
+ /// let result = users.join(orders, "id")
+ /// ```
+ ///
+ /// - Parameter right: The DataFrame to join with
+ /// - Returns: A new DataFrame representing the join result
public func join(_ right: DataFrame) async -> DataFrame {
let right = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getJoin(self.plan.root, right, JoinType.inner)
return DataFrame(spark: self.spark, plan: plan)
}
- /// Equi-join with another `DataFrame` using the given column. A cross join with a predicate is
- /// specified as an inner join. If you would explicitly like to perform a cross join use the
- /// `crossJoin` method.
+ /// Equi-join with another DataFrame using the given column.
+ ///
+ /// This method performs an equi-join on a single column that exists in both DataFrames.
+ ///
+ /// ```swift
+ /// // Inner join on a single column
+ /// let joined = users.join(orders, "user_id")
+ ///
+ /// // Left outer join
+ /// let leftJoined = users.join(orders, "user_id", "left")
+ ///
+ /// // Other join types: "inner", "outer", "left", "right", "semi", "anti"
+ /// ```
+ ///
/// - Parameters:
- /// - right: Right side of the join operation.
- /// - usingColumn: Name of the column to join on. This column must exist on both sides.
- /// - joinType: Type of join to perform. Default `inner`.
- /// - Returns: A `DataFrame`.
+ /// - right: The DataFrame to join with
+ /// - usingColumn: Column name that exists in both DataFrames
+ /// - joinType: Type of join (default: "inner")
+ /// - Returns: A new DataFrame with the join result
public func join(_ right: DataFrame, _ usingColumn: String, _ joinType: String = "inner") async -> DataFrame {
await join(right, [usingColumn], joinType)
}
@@ -863,9 +1108,26 @@ public actor DataFrame: Sendable {
return buildRepartition(numPartitions: numPartitions, shuffle: false)
}
- /// Groups the ``DataFrame`` using the specified columns, so we can run aggregation on them.
- /// - Parameter cols: Grouping column names.
- /// - Returns: A ``GroupedData``.
+ /// Groups the DataFrame using the specified columns.
+ ///
+ /// This method is used to perform aggregations on groups of data.
+ /// After grouping, you can apply aggregation functions like count(), sum(), avg(), etc.
+ ///
+ /// ```swift
+ /// // Group by single column
+ /// let byDept = df.groupBy("department")
+ /// .agg(count("*").alias("employee_count"))
+ ///
+ /// // Group by multiple columns
+ /// let byDeptAndLocation = df.groupBy("department", "location")
+ /// .agg(
+ /// avg("salary").alias("avg_salary"),
+ /// max("salary").alias("max_salary")
+ /// )
+ /// ```
+ ///
+ /// - Parameter cols: Column names to group by
+ /// - Returns: A ``GroupedData`` object for aggregation operations
public func groupBy(_ cols: String...) -> GroupedData {
return GroupedData(self, GroupType.groupby, cols)
}
diff --git a/Sources/SparkConnect/Documentation.docc/GettingStarted.md b/Sources/SparkConnect/Documentation.docc/GettingStarted.md
new file mode 100644
index 0000000..9aed405
--- /dev/null
+++ b/Sources/SparkConnect/Documentation.docc/GettingStarted.md
@@ -0,0 +1,100 @@
+# Getting Started with SparkConnect
+
+A step-by-step guide to get started with SparkConnect for Swift.
+
+## Installation
+
+Add SparkConnect to your Swift package dependencies:
+
+```swift
+dependencies: [
+ .package(url: "https://github.com/apache/spark-connect-swift.git", from: "main")
+]
+```
+
+Then, add it to your target dependencies:
+
+```swift
+targets: [
+ .target(
+ name: "YourApp",
+ dependencies: ["SparkConnect"]
+ )
+]
+```
+
+## Prerequisites
+
+- Swift 6.0 or later
+- macOS 15+, iOS 18+, watchOS 11+, or tvOS 18+
+- A running Apache Spark cluster with Spark Connect enabled
+
+## Basic Usage
+
+### 1. Create a SparkSession
+
+```swift
+import SparkConnect
+
+let spark = SparkSession.builder()
+ .appName("MySwiftApp")
+ .remote("sc://localhost:15002")
+ .build()
+```
+
+### 2. DataFrame Operations
+
+```swift
+// From a range
+let df1 = try await spark.range(1, 10)
+
+// Show data
+try await df1.show()
+
+// Select columns
+try await df1.select("id").show()
+
+let df2 = await df1.selectExpr("id", "id % 3 as value")
+try await df2.show()
+
+// Filter data
+try await df2.filter("value == 0").show()
+
+// Group and aggregate
+try await df2
+ .groupBy("value")
+ .agg("count(*)", "sum(value)")
+ .show()
+```
+
+### 3. SQL Queries
+
+```swift
+// Register a temporary view
+try await df2.createOrReplaceTempView("v1")
+
+// Run SQL Queries
+let result = try await spark.sql("""
+ SELECT id, sum(value) as value_sum
+ FROM v1
+ GROUP BY id
+ ORDER BY value_sum DESC
+""")
+
+result.show()
+```
+
+### 4. Reading and Writing Data
+
+```swift
+// Read CSV
+let csvDf = spark.read
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .csv("path/to/data.csv")
+
+// Write ORC
+csvDf.write
+ .mode("overwrite")
+ .orc("path/to/output")
+```
diff --git a/Sources/SparkConnect/Documentation.docc/Info.plist b/Sources/SparkConnect/Documentation.docc/Info.plist
new file mode 100644
index 0000000..7137f3f
--- /dev/null
+++ b/Sources/SparkConnect/Documentation.docc/Info.plist
@@ -0,0 +1,32 @@
+
+
+
+
+
+ CFBundleDisplayName
+ SparkConnect
+ CFBundleIdentifier
+ org.apache.spark.connect.swift.SparkConnect
+ CFBundleName
+ SparkConnect
+ CFBundleVersion
+ 0.1.0
+ NSHumanReadableCopyright
+ © 2025 Apache Software Foundation
+
+
diff --git a/Sources/SparkConnect/Documentation.docc/SparkConnect.md b/Sources/SparkConnect/Documentation.docc/SparkConnect.md
new file mode 100644
index 0000000..6c1f49d
--- /dev/null
+++ b/Sources/SparkConnect/Documentation.docc/SparkConnect.md
@@ -0,0 +1,25 @@
+# ``SparkConnect``
+
+Swift implementation of Apache Spark Connect client for distributed data processing.
+
+## Overview
+
+SparkConnect is a modern Swift library that provides a native interface to Apache Spark clusters using the Spark Connect protocol. This library enables Swift developers to leverage the power of Apache Spark for distributed data processing, machine learning, and analytical workloads directly from their Swift applications.
+
+### Key Features
+
+- Native Swift API for Apache Spark operations
+- Support for DataFrame and SQL operations
+- Support for grouped data operations and aggregations
+- Efficient data serialization using Arrow format
+
+## Topics
+
+### Getting Started
+
+-
+- ``SparkSession``
+
+### DataFrame Operations
+
+- ``DataFrame``
diff --git a/Sources/SparkConnect/Documentation.docc/SparkSession.md b/Sources/SparkConnect/Documentation.docc/SparkSession.md
new file mode 100644
index 0000000..5a86034
--- /dev/null
+++ b/Sources/SparkConnect/Documentation.docc/SparkSession.md
@@ -0,0 +1,55 @@
+# ``SparkConnect/SparkSession``
+
+The entry point for SparkConnect functionality.
+
+## Overview
+
+`SparkSession` is the primary interaction point with Apache Spark. It provides an interface to create DataFrames, execute SQL queries, and manage cluster configurations.
+
+### Creating a SparkSession
+
+```swift
+let spark = SparkSession.builder()
+ .appName("My Swift Spark App")
+ .remote("sc://localhost:15002")
+ .build()
+```
+
+### Basic Usage
+
+```swift
+// Create a DataFrame from a range
+let df = spark.range(1, 10)
+
+// Execute SQL query
+let result = spark.sql("SELECT * FROM table")
+
+// Read data from files
+let csvDf = spark.read.csv("path/to/file.csv")
+```
+
+## Topics
+
+### Creating Sessions
+
+- ``builder()``
+- ``active()``
+- ``stop()``
+
+### DataFrame Operations
+
+- ``range(_:_:)``
+- ``sql(_:)``
+- ``createDataFrame(_:_:)``
+
+### Data I/O
+
+- ``read``
+
+### Configuration
+
+- ``conf``
+
+### Catalog Operations
+
+- ``catalog``
diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift
index ebaf190..abfe908 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -105,58 +105,145 @@ public actor SparkSession {
return await DataFrame(spark: self, plan: client.getPlanRange(start, end, step))
}
- /// Create a ``DataFrame`` for the given SQL statement.
- /// - Parameter sqlText: A SQL string.
- /// - Returns: A ``DataFrame`` instance.
+ /// Executes a SQL query and returns the result as a DataFrame.
+ ///
+ /// This method allows you to run SQL queries against tables and views registered in the Spark catalog.
+ ///
+ /// ```swift
+ /// // Simple query
+ /// let users = try await spark.sql("SELECT * FROM users")
+ ///
+ /// // Query with filtering and aggregation
+ /// let stats = try await spark.sql("""
+ /// SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
+ /// FROM employees
+ /// GROUP BY department
+ /// ORDER BY avg_salary DESC
+ /// """)
+ /// ```
+ ///
+ /// - Parameter sqlText: A SQL query string
+ /// - Returns: A DataFrame containing the query results
+ /// - Throws: `SparkConnectError` if the query execution fails
public func sql(_ sqlText: String) async throws -> DataFrame {
return try await DataFrame(spark: self, sqlText: sqlText)
}
- /// Executes a SQL query substituting positional parameters by the given arguments, returning the
- /// result as a `DataFrame`.
+ /// Executes a SQL query with positional parameters.
+ ///
+ /// This method allows you to execute parameterized SQL queries using positional placeholders (`?`).
+ /// Parameters are automatically converted to SQL literal expressions.
+ ///
+ /// ```swift
+ /// // Query with positional parameters
+ /// let result = try await spark.sql(
+ /// "SELECT * FROM users WHERE age > ? AND department = ?",
+ /// 21,
+ /// "Engineering"
+ /// )
+ /// ```
+ ///
/// - Parameters:
- /// - sqlText: A SQL statement with positional parameters to execute.
- /// - args: ``Sendable`` values that can be converted to SQL literal expressions.
- /// - Returns: A ``DataFrame``.
+ /// - sqlText: A SQL query with positional parameter placeholders (`?`)
+ /// - args: Parameter values to substitute for the placeholders
+ /// - Returns: A DataFrame containing the query results
+ /// - Throws: `SparkConnectError` if the query execution fails or if parameter conversion fails
public func sql(_ sqlText: String, _ args: Sendable...) async throws -> DataFrame {
return try await DataFrame(spark: self, sqlText: sqlText, args)
}
- /// Executes a SQL query substituting named parameters by the given arguments, returning the
- /// result as a `DataFrame`.
+ /// Executes a SQL query with named parameters.
+ ///
+ /// This method allows you to execute parameterized SQL queries using named placeholders.
+ /// Named parameters provide better readability and maintainability for complex queries.
+ ///
+ /// ```swift
+ /// // Query with named parameters
+ /// let result = try await spark.sql(
+ /// "SELECT * FROM users WHERE age > :minAge AND department = :dept",
+ /// args: [
+ /// "minAge": 21,
+ /// "dept": "Engineering"
+ /// ]
+ /// )
+ /// ```
+ ///
/// - Parameters:
- /// - sqlText: A SQL statement with named parameters to execute.
- /// - args: A dictionary with key string and ``Sendable`` value.
- /// - Returns: A ``DataFrame``.
+ /// - sqlText: A SQL query with named parameter placeholders (`:paramName`)
+ /// - args: A dictionary mapping parameter names to values
+ /// - Returns: A DataFrame containing the query results
+ /// - Throws: `SparkConnectError` if the query execution fails or if parameter conversion fails
public func sql(_ sqlText: String, args: [String: Sendable]) async throws -> DataFrame {
return try await DataFrame(spark: self, sqlText: sqlText, args)
}
- /// Returns a ``DataFrameReader`` that can be used to read non-streaming data in as a
- /// `DataFrame`
+ /// Returns a DataFrameReader for reading data in various formats.
+ ///
+ /// The DataFrameReader provides methods to load data from external storage systems
+ /// such as file systems, databases, and streaming sources.
+ ///
+ /// ```swift
+ /// // Read a CSV file
+ /// let csvData = spark.read
+ /// .option("header", "true")
+ /// .option("inferSchema", "true")
+ /// .csv("path/to/file.csv")
+ ///
+ /// // Read a JSON file
+ /// let jsonData = spark.read
+ /// .json("path/to/file.json")
+ ///
+ /// // Read an ORC file
+ /// let parquetData = spark.read
+ /// .orc("path/to/file.orc")
+ /// ```
+ ///
+ /// - Returns: A DataFrameReader instance configured for this session
public var read: DataFrameReader {
get {
return DataFrameReader(sparkSession: self)
}
}
- /// Returns the specified table/view as a ``DataFrame``. If it's a table, it must support batch
- /// reading and the returned ``DataFrame`` is the batch scan query plan of this table. If it's a
- /// view, the returned ``DataFrame`` is simply the query plan of the view, which can either be a
- /// batch or streaming query plan.
+ /// Returns a DataFrame representing the specified table or view.
///
- /// - Parameter tableName: a qualified or unqualified name that designates a table or view. If a database is
- /// specified, it identifies the table/view from the database. Otherwise, it first attempts to
- /// find a temporary view with the given name and then match the table/view from the current
- /// database. Note that, the global temporary view database is also valid here.
- /// - Returns: A ``DataFrame`` instance.
+ /// This method retrieves a table or view from the Spark catalog and returns it as a DataFrame.
+ /// The table name can be qualified with a database name (e.g., "database.table") or unqualified.
+ ///
+ /// ```swift
+ /// // Load a table from the default database
+ /// let users = try await spark.table("users")
+ ///
+ /// // Load a table from a specific database
+ /// let sales = try await spark.table("analytics.sales_data")
+ ///
+ /// // Load a temporary view
+ /// let tempView = try await spark.table("temp_user_stats")
+ /// ```
+ ///
+ /// - Parameter tableName: The name of the table or view to load
+ /// - Returns: A DataFrame representing the table data
+ /// - Throws: `SparkConnectError` if the table doesn't exist or cannot be accessed
public func table(_ tableName: String) async throws -> DataFrame {
return await read.table(tableName)
}
- /// Executes some code block and prints to stdout the time taken to execute the block.
- /// - Parameter f: A function to execute.
- /// - Returns: The result of the executed code.
+ /// Executes a code block and prints the execution time.
+ ///
+ /// This utility method is useful for performance testing and optimization.
+ /// It measures the time taken to execute the provided async closure and prints it to stdout.
+ ///
+ /// ```swift
+ /// // Measure query execution time
+ /// let result = try await spark.time {
+ /// try await spark.sql("SELECT COUNT(*) FROM large_table").collect()
+ /// }
+ /// // Prints: Time taken: 1234 ms
+ /// ```
+ ///
+ /// - Parameter f: An async closure to execute and measure
+ /// - Returns: The result of the executed closure
+ /// - Throws: Any error thrown by the closure
public func time(_ f: () async throws -> T) async throws -> T {
let start = DispatchTime.now()
let ret = try await f()
@@ -166,27 +253,63 @@ public actor SparkSession {
return ret
}
- /// Add a tag to be assigned to all the operations started by this thread in this session.
- /// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string.
+ /// Adds a tag to all operations started by this thread in this session.
+ ///
+ /// Tags are useful for tracking and monitoring Spark operations. They can help identify
+ /// specific workloads in the Spark UI and logs.
+ ///
+ /// ```swift
+ /// // Add a tag for a specific operation
+ /// try await spark.addTag("etl_job_2024")
+ ///
+ /// // Perform operations that will be tagged
+ /// let df = try await spark.sql("SELECT * FROM source_table")
+ /// try await df.write.saveAsTable("processed_table")
+ ///
+ /// // Remove the tag when done
+ /// try await spark.removeTag("etl_job_2024")
+ /// ```
+ ///
+ /// - Parameter tag: The tag to add. Cannot contain commas or be empty
+ /// - Throws: `SparkConnectError` if the tag is invalid
public func addTag(_ tag: String) async throws {
try await client.addTag(tag: tag)
}
- /// Remove a tag previously added to be assigned to all the operations started by this thread in this session.
- /// Noop if such a tag was not added earlier.
- /// - Parameter tag: The tag to be removed. Cannot contain ',' (comma) character or be an empty string.
+ /// Removes a previously added tag from operations in this session.
+ ///
+ /// If the specified tag was not previously added, this method does nothing.
+ ///
+ /// ```swift
+ /// // Remove a specific tag
+ /// try await spark.removeTag("etl_job_2024")
+ /// ```
+ ///
+ /// - Parameter tag: The tag to remove. Cannot contain commas or be empty
+ /// - Throws: `SparkConnectError` if the tag is invalid
public func removeTag(_ tag: String) async throws {
try await client.removeTag(tag: tag)
}
- /// Get the operation tags that are currently set to be assigned to all the operations started by
- /// this thread in this session.
- /// - Returns: A set of string.
+ /// Returns all operation tags currently set for this thread in this session.
+ ///
+ /// ```swift
+ /// // Get all current tags
+ /// let currentTags = await spark.getTags()
+ /// print("Active tags: \(currentTags)")
+ /// ```
+ ///
+ /// - Returns: A set of currently active tags
public func getTags() async -> Set {
return await client.getTags()
}
- /// Clear the current thread's operation tags.
+ /// Removes all operation tags for this thread in this session.
+ ///
+ /// ```swift
+ /// // Clear all tags
+ /// await spark.clearTags()
+ /// ```
public func clearTags() async {
await client.clearTags()
}
@@ -205,44 +328,126 @@ public actor SparkSession {
public struct SparkContext: Sendable {
}
- /// A builder to create ``SparkSession``
+ /// A builder to create ``SparkSession`` instances.
+ ///
+ /// The Builder pattern provides a fluent interface for configuring and creating SparkSession instances.
+ /// This is the recommended way to create a SparkSession.
+ ///
+ /// ## Creating a Session
+ ///
+ /// ```swift
+ /// // Basic session creation
+ /// let spark = try await SparkSession.builder
+ /// .remote("sc://localhost:15002")
+ /// .getOrCreate()
+ ///
+ /// // With additional configuration
+ /// let configuredSpark = try await SparkSession.builder
+ /// .appName("MyAnalyticsApp")
+ /// .config("spark.sql.shuffle.partitions", "200")
+ /// .remote("sc://spark-cluster:15002")
+ /// .getOrCreate()
+ /// ```
+ ///
+ /// ## Environment Variables
+ ///
+ /// The builder will use the `SPARK_REMOTE` environment variable if no remote URL is specified.
+ /// If neither is provided, it defaults to `sc://localhost:15002`.
+ ///
+ /// - Important: This is a singleton builder. Multiple calls to `SparkSession.builder` return the same instance.
public actor Builder {
var sparkConf: [String: String] = [:]
- /// Set a new configuration.
+ /// Sets a configuration option for the SparkSession.
+ ///
+ /// Configuration options control various aspects of Spark behavior, from execution settings
+ /// to SQL optimization parameters.
+ ///
+ /// ```swift
+ /// let spark = try await SparkSession.builder
+ /// .config("spark.sql.shuffle.partitions", "200")
+ /// .config("spark.sql.adaptive.enabled", "true")
+ /// .getOrCreate()
+ /// ```
+ ///
/// - Parameters:
- /// - key: A string for the configuration key.
- /// - value: A string for the configuration value.
- /// - Returns: self
+ /// - key: The configuration key (e.g., "spark.sql.shuffle.partitions")
+ /// - value: The configuration value as a string
+ /// - Returns: The builder instance for method chaining
public func config(_ key: String, _ value: String) -> Builder {
sparkConf[key] = value
return self
}
- /// Remove all stored configurations.
- /// - Returns: self
+ /// Removes all configuration options from the builder.
+ ///
+ /// This method clears all previously set configurations, allowing you to start fresh.
+ ///
+ /// ```swift
+ /// // Clear all configurations
+ /// SparkSession.builder.clear()
+ /// ```
+ ///
+ /// - Returns: The builder instance for method chaining
@discardableResult
func clear() -> Builder {
sparkConf.removeAll()
return self
}
- /// Set a url for remote connection.
- /// - Parameter url: A connection string in a pattern, `sc://{host}:{post}`.
- /// - Returns: self
+ /// Sets the remote URL for the Spark Connect server.
+ ///
+ /// The remote URL specifies which Spark Connect server to connect to.
+ /// The URL format is `sc://{host}:{port}`.
+ ///
+ /// ```swift
+ /// // Connect to a local Spark server
+ /// let localSpark = try await SparkSession.builder
+ /// .remote("sc://localhost:15002")
+ /// .getOrCreate()
+ ///
+ /// // Connect to a remote cluster
+ /// let remoteSpark = try await SparkSession.builder
+ /// .remote("sc://spark-cluster.example.com:15002")
+ /// .getOrCreate()
+ /// ```
+ ///
+ /// - Parameter url: The connection URL in format `sc://{host}:{port}`
+ /// - Returns: The builder instance for method chaining
public func remote(_ url: String) -> Builder {
return config("spark.remote", url)
}
- /// Set `appName` of this session.
- /// - Parameter name: A string for application name
- /// - Returns: self
+ /// Sets the application name for this SparkSession.
+ ///
+ /// The application name is displayed in the Spark UI and helps identify your application
+ /// among others running on the cluster.
+ ///
+ /// ```swift
+ /// let spark = try await SparkSession.builder
+ /// .appName("ETL Pipeline - Q4 2024")
+ /// .remote("sc://localhost:15002")
+ /// .getOrCreate()
+ /// ```
+ ///
+ /// - Parameter name: The name of your Spark application
+ /// - Returns: The builder instance for method chaining
public func appName(_ name: String) -> Builder {
return config("spark.app.name", name)
}
- /// Enable `Apache Hive` metastore support configuration.
- /// - Returns: self
+ /// Enables Apache Hive metastore support.
+ ///
+ /// When Hive support is enabled, Spark can read and write data from Hive tables
+ /// and use the Hive metastore for table metadata.
+ ///
+ /// ```swift
+ /// let spark = try await SparkSession.builder
+ /// .enableHiveSupport()
+ /// .getOrCreate()
+ /// ```
+ ///
+ /// - Returns: The builder instance for method chaining
func enableHiveSupport() -> Builder {
return config("spark.sql.catalogImplementation", "hive")
}
@@ -259,8 +464,31 @@ public actor SparkSession {
return session
}
- /// Create a ``SparkSession`` from the given configurations.
- /// - Returns: A spark session.
+ /// Creates or retrieves an existing SparkSession.
+ ///
+ /// This is the primary method for obtaining a SparkSession instance. If a session with
+ /// the same configuration already exists, it will be returned. Otherwise, a new session
+ /// is created with the specified configuration.
+ ///
+ /// The method will:
+ /// 1. Check for the `SPARK_REMOTE` environment variable if no remote URL is set
+ /// 2. Use `sc://localhost:15002` as the default if neither is specified
+ /// 3. Connect to the Spark server and set up the session
+ /// 4. Apply all configured settings
+ ///
+ /// ```swift
+ /// // Basic usage
+ /// let spark = try await SparkSession.builder.getOrCreate()
+ ///
+ /// // With configuration
+ /// let configuredSpark = try await SparkSession.builder
+ /// .appName("DataAnalysis")
+ /// .config("spark.sql.shuffle.partitions", "100")
+ /// .getOrCreate()
+ /// ```
+ ///
+ /// - Returns: A configured SparkSession instance
+ /// - Throws: `SparkConnectError` if connection fails or configuration is invalid
public func getOrCreate() async throws -> SparkSession {
return try await create()
}