Skip to content

Commit 816a76f

Browse files
authored
[spark] Allow format table and spark table options to recognize each other
1 parent 6b18fc3 commit 816a76f

File tree

4 files changed

+58
-0
lines changed

4 files changed

+58
-0
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,6 +2368,8 @@ public String formatTableFileCompression() {
23682368
return options.get(FILE_COMPRESSION.key());
23692369
} else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) {
23702370
return options.get(FORMAT_TABLE_FILE_COMPRESSION.key());
2371+
} else if (options.containsKey("compression")) {
2372+
return options.get("compression");
23712373
} else {
23722374
String format = formatType();
23732375
switch (format) {

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,28 @@ public class CsvOptions {
4747
ConfigOptions.key("csv.quote-character")
4848
.stringType()
4949
.defaultValue("\"")
50+
.withFallbackKeys("quote")
5051
.withDescription("The quote character for CSV format");
5152

5253
public static final ConfigOption<String> ESCAPE_CHARACTER =
5354
ConfigOptions.key("csv.escape-character")
5455
.stringType()
5556
.defaultValue("\\")
57+
.withFallbackKeys("escape")
5658
.withDescription("The escape character for CSV format");
5759

5860
public static final ConfigOption<Boolean> INCLUDE_HEADER =
5961
ConfigOptions.key("csv.include-header")
6062
.booleanType()
6163
.defaultValue(false)
64+
.withFallbackKeys("header")
6265
.withDescription("Whether to include header in CSV files");
6366

6467
public static final ConfigOption<String> NULL_LITERAL =
6568
ConfigOptions.key("csv.null-literal")
6669
.stringType()
6770
.defaultValue("")
71+
.withFallbackKeys("nullvalue")
6872
.withDescription("The literal for null values in CSV format");
6973

7074
public static final ConfigOption<Mode> MODE =

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,15 @@ default FileTable convertToFileTable(
8686
CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap(options.toMap());
8787
if (formatTable.format() == FormatTable.Format.CSV) {
8888
options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
89+
options.set("lineSep", options.get(CsvOptions.LINE_DELIMITER));
90+
options.set("quote", options.get(CsvOptions.QUOTE_CHARACTER));
91+
options.set("header", options.get(CsvOptions.INCLUDE_HEADER).toString());
92+
options.set("escape", options.get(CsvOptions.ESCAPE_CHARACTER));
93+
options.set("nullvalue", options.get(CsvOptions.NULL_LITERAL));
94+
options.set("mode", options.get(CsvOptions.MODE).getValue());
95+
if (options.contains(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION)) {
96+
options.set("compression", options.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION));
97+
}
8998
dsOptions = new CaseInsensitiveStringMap(options.toMap());
9099
return new PartitionedCSVTable(
91100
ident.name(),

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,47 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {
195195
assert(df.queryExecution.executedPlan.toString().contains("BroadcastExchange"))
196196
}
197197
}
198+
199+
test("Format table: format table and spark table props recognize") {
200+
val paimonFormatTblProps =
201+
"""
202+
|'csv.field-delimiter'=';',
203+
|'csv.line-delimiter'='?',
204+
|'csv.quote-character'='%',
205+
|'csv.include-header'='true',
206+
|'csv.null-literal'='null',
207+
|'csv.mode'='permissive',
208+
|'format-table.file.compression'='gzip'
209+
|""".stripMargin
210+
211+
val sparkTblProps =
212+
"""
213+
|'sep'=';',
214+
|'lineSep'='?',
215+
|'quote'='%',
216+
|'header'='true',
217+
|'nullvalue'='null',
218+
|'mode'='permissive',
219+
|'compression'='gzip'
220+
|""".stripMargin
221+
222+
val defaultProps = "'k'='v'"
223+
224+
for (tblProps <- Seq(paimonFormatTblProps, sparkTblProps, defaultProps)) {
225+
withTable("t") {
226+
sql(s"CREATE TABLE t (id INT, v STRING) USING CSV TBLPROPERTIES ($tblProps)")
227+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id + 1 FROM range(2)")
228+
sql("INSERT INTO t VALUES (2, null)")
229+
230+
for (impl <- Seq("engine", "paimon")) {
231+
withSparkSQLConf("spark.paimon.format-table.implementation" -> impl) {
232+
checkAnswer(
233+
sql("SELECT * FROM t ORDER BY id"),
234+
Seq(Row(0, "1"), Row(1, "2"), Row(2, null))
235+
)
236+
}
237+
}
238+
}
239+
}
240+
}
198241
}

0 commit comments

Comments
 (0)