Skip to content

Commit 310a8cd

Browse files
committed
[SPARK-23341][SQL] define some standard options for data source v2
## What changes were proposed in this pull request? Each data source implementation can define its own options and teach its users how to set them. Spark doesn't have any restrictions about what options a data source should or should not have. It's possible that some options are very common and many data sources use them. However different data sources may define the common options(key and meaning) differently, which is quite confusing to end users. This PR defines some standard options that data sources can optionally adopt: path, table and database. ## How was this patch tested? a new test case. Author: Wenchen Fan <[email protected]> Closes apache#20535 from cloud-fan/options.
1 parent 1e3b876 commit 310a8cd

File tree

3 files changed

+135
-4
lines changed

3 files changed

+135
-4
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,61 @@
1717

1818
package org.apache.spark.sql.sources.v2;
1919

20+
import java.io.IOException;
2021
import java.util.HashMap;
2122
import java.util.Locale;
2223
import java.util.Map;
2324
import java.util.Optional;
25+
import java.util.stream.Stream;
26+
27+
import com.fasterxml.jackson.databind.ObjectMapper;
2428

2529
import org.apache.spark.annotation.InterfaceStability;
2630

2731
/**
2832
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
2933
* data source options.
34+
*
35+
* Each data source implementation can define its own options and teach its users how to set them.
36+
* Spark doesn't have any restrictions about what options a data source should or should not have.
37+
* Instead Spark defines some standard options that data sources can optionally adopt. It's possible
38+
* that some options are very common and many data sources use them. However different data
39+
* sources may define the common options(key and meaning) differently, which is quite confusing to
40+
* end users.
41+
*
42+
* The standard options defined by Spark:
43+
* <table summary="standard data source options">
44+
* <tr>
45+
* <th><b>Option key</b></th>
46+
* <th><b>Option value</b></th>
47+
* </tr>
48+
* <tr>
49+
* <td>path</td>
50+
* <td>A path string of the data files/directories, like
51+
* <code>path1</code>, <code>/absolute/file2</code>, <code>path3/*</code>. The path can
52+
* either be relative or absolute, points to either file or directory, and can contain
53+
* wildcards. This option is commonly used by file-based data sources.</td>
54+
* </tr>
55+
* <tr>
56+
* <td>paths</td>
57+
* <td>A JSON array style paths string of the data files/directories, like
58+
* <code>["path1", "/absolute/file2"]</code>. The format of each path is same as the
59+
* <code>path</code> option, plus it should follow JSON string literal format, e.g. quotes
60+
* should be escaped, <code>pa\"th</code> means pa"th.
61+
* </td>
62+
* </tr>
63+
* <tr>
64+
* <td>table</td>
65+
* <td>A table name string representing the table name directly without any interpretation.
66+
* For example, <code>db.tbl</code> means a table called db.tbl, not a table called tbl
67+
* inside database db. <code>`t*b.l`</code> means a table called `t*b.l`, not t*b.l.</td>
68+
* </tr>
69+
* <tr>
70+
* <td>database</td>
71+
* <td>A database name string representing the database name directly without any
72+
* interpretation, which is very similar to the table name option.</td>
73+
* </tr>
74+
* </table>
3075
*/
3176
@InterfaceStability.Evolving
3277
public class DataSourceOptions {
@@ -97,4 +142,59 @@ public double getDouble(String key, double defaultValue) {
97142
return keyLowerCasedMap.containsKey(lcaseKey) ?
98143
Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
99144
}
145+
146+
/**
147+
* The option key for singular path.
148+
*/
149+
public static final String PATH_KEY = "path";
150+
151+
/**
152+
* The option key for multiple paths.
153+
*/
154+
public static final String PATHS_KEY = "paths";
155+
156+
/**
157+
* The option key for table name.
158+
*/
159+
public static final String TABLE_KEY = "table";
160+
161+
/**
162+
* The option key for database name.
163+
*/
164+
public static final String DATABASE_KEY = "database";
165+
166+
/**
167+
* Returns all the paths specified by both the singular path option and the multiple
168+
* paths option.
169+
*/
170+
public String[] paths() {
171+
String[] singularPath =
172+
get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]);
173+
Optional<String> pathsStr = get(PATHS_KEY);
174+
if (pathsStr.isPresent()) {
175+
ObjectMapper objectMapper = new ObjectMapper();
176+
try {
177+
String[] paths = objectMapper.readValue(pathsStr.get(), String[].class);
178+
return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new);
179+
} catch (IOException e) {
180+
return singularPath;
181+
}
182+
} else {
183+
return singularPath;
184+
}
185+
}
186+
187+
/**
188+
* Returns the value of the table name option.
189+
*/
190+
public Optional<String> tableName() {
191+
return get(TABLE_KEY);
192+
}
193+
194+
/**
195+
* Returns the value of the database name option.
196+
*/
197+
public Optional<String> databaseName() {
198+
return get(DATABASE_KEY);
199+
}
100200
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.{Locale, Properties}
2121

2222
import scala.collection.JavaConverters._
2323

24+
import com.fasterxml.jackson.databind.ObjectMapper
25+
2426
import org.apache.spark.Partition
2527
import org.apache.spark.annotation.InterfaceStability
2628
import org.apache.spark.api.java.JavaRDD
@@ -34,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
3436
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
3537
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3638
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
37-
import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema}
39+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
3840
import org.apache.spark.sql.types.{StringType, StructType}
3941
import org.apache.spark.unsafe.types.UTF8String
4042

@@ -171,7 +173,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
171173
* @since 1.4.0
172174
*/
173175
def load(path: String): DataFrame = {
174-
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
176+
// force invocation of `load(...varargs...)`
177+
option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
175178
}
176179

177180
/**
@@ -193,10 +196,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
193196
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
194197
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
195198
ds = ds, conf = sparkSession.sessionState.conf)
199+
val pathsOption = {
200+
val objectMapper = new ObjectMapper()
201+
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
202+
}
196203
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
197-
ds, extraOptions.toMap ++ sessionOptions,
204+
ds, extraOptions.toMap ++ sessionOptions + pathsOption,
198205
userSpecifiedSchema = userSpecifiedSchema))
199-
200206
} else {
201207
loadV1Source(paths: _*)
202208
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,29 @@ class DataSourceOptionsSuite extends SparkFunSuite {
7979
options.getDouble("foo", 0.1d)
8080
}
8181
}
82+
83+
test("standard options") {
84+
val options = new DataSourceOptions(Map(
85+
DataSourceOptions.PATH_KEY -> "abc",
86+
DataSourceOptions.TABLE_KEY -> "tbl").asJava)
87+
88+
assert(options.paths().toSeq == Seq("abc"))
89+
assert(options.tableName().get() == "tbl")
90+
assert(!options.databaseName().isPresent)
91+
}
92+
93+
test("standard options with both singular path and multi-paths") {
94+
val options = new DataSourceOptions(Map(
95+
DataSourceOptions.PATH_KEY -> "abc",
96+
DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava)
97+
98+
assert(options.paths().toSeq == Seq("abc", "c", "d"))
99+
}
100+
101+
test("standard options with only multi-paths") {
102+
val options = new DataSourceOptions(Map(
103+
DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava)
104+
105+
assert(options.paths().toSeq == Seq("c", "d\"e"))
106+
}
82107
}

0 commit comments

Comments
 (0)