Skip to content

Commit fefbaf6

Browse files
authored
[core] support format table for json (#5306)
1 parent 24d690a commit fefbaf6

File tree

11 files changed

+130
-15
lines changed

11 files changed

+130
-15
lines changed

docs/content/concepts/table-types.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ directory structure.
145145

146146
Format Table is enabled by default, you can disable it by configuring Catalog option: `'format-table.enabled'`.
147147

148-
Currently only support `CSV`, `Parquet`, `ORC` formats.
148+
Currently only support `CSV`, `Parquet`, `ORC`, `JSON` formats.
149149

150150
{{< tabs "format-table" >}}
151151
{{< tab "Flink-CSV" >}}
@@ -197,6 +197,30 @@ CREATE TABLE my_parquet_table (
197197

198198
{{< /tab >}}
199199

200+
{{< tab "Flink-JSON" >}}
201+
202+
```sql
203+
CREATE TABLE my_json_table (
204+
a INT,
205+
b STRING
206+
) WITH (
207+
'type'='format-table',
208+
'file.format'='json'
209+
)
210+
```
211+
{{< /tab >}}
212+
213+
{{< tab "Spark-JSON" >}}
214+
215+
```sql
216+
CREATE TABLE my_json_table (
217+
a INT,
218+
b STRING
219+
) USING json
220+
```
221+
222+
{{< /tab >}}
223+
200224
{{< /tabs >}}
201225

202226
## Object Table

paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public interface FormatTable extends Table {
6868
enum Format {
6969
ORC,
7070
PARQUET,
71-
CSV
71+
CSV,
72+
JSON
7273
}
7374

7475
/** Parses a file format string to a corresponding {@link Format} enum constant. */

paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,8 @@ private String getSerdeClassName(@Nullable FormatTable.Format provider) {
14041404
return "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
14051405
case ORC:
14061406
return "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
1407+
case JSON:
1408+
return "org.apache.hive.hcatalog.data.JsonSerDe";
14071409
}
14081410
return SERDE_CLASS_NAME;
14091411
}
@@ -1414,6 +1416,7 @@ private String getInputFormatName(@Nullable FormatTable.Format provider) {
14141416
}
14151417
switch (provider) {
14161418
case CSV:
1419+
case JSON:
14171420
return "org.apache.hadoop.mapred.TextInputFormat";
14181421
case PARQUET:
14191422
return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
@@ -1429,6 +1432,7 @@ private String getOutputFormatClassName(@Nullable FormatTable.Format provider) {
14291432
}
14301433
switch (provider) {
14311434
case CSV:
1435+
case JSON:
14321436
return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
14331437
case PARQUET:
14341438
return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";

paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,15 @@ public static Schema tryToFormatSchema(Table hiveTable) {
7272
} else if (serLib.contains("orc")) {
7373
format = Format.ORC;
7474
} else if (inputFormat.contains("Text")) {
75-
format = Format.CSV;
76-
// hive default field delimiter is '\u0001'
77-
options.set(
78-
FIELD_DELIMITER, serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
75+
if (serLib.contains("json")) {
76+
format = Format.JSON;
77+
} else {
78+
format = Format.CSV;
79+
// hive default field delimiter is '\u0001'
80+
options.set(
81+
FIELD_DELIMITER,
82+
serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
83+
}
7984
} else {
8085
throw new UnsupportedOperationException("Unsupported table: " + hiveTable);
8186
}

paimon-hive/paimon-hive-connector-2.3/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ under the License.
113113
<scope>test</scope>
114114
</dependency>
115115

116+
<dependency>
117+
<groupId>org.apache.flink</groupId>
118+
<artifactId>flink-json</artifactId>
119+
<version>${test.flink.version}</version>
120+
<scope>test</scope>
121+
</dependency>
122+
116123
<dependency>
117124
<groupId>org.apache.flink</groupId>
118125
<artifactId>flink-connector-test-utils</artifactId>

paimon-hive/paimon-hive-connector-3.1/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ under the License.
108108
<scope>test</scope>
109109
</dependency>
110110

111+
<dependency>
112+
<groupId>org.apache.flink</groupId>
113+
<artifactId>flink-json</artifactId>
114+
<version>${test.flink.version}</version>
115+
<scope>test</scope>
116+
</dependency>
117+
111118
<dependency>
112119
<groupId>org.apache.flink</groupId>
113120
<artifactId>flink-orc</artifactId>

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.lang.annotation.Retention;
3939
import java.lang.annotation.RetentionPolicy;
4040
import java.lang.annotation.Target;
41+
import java.time.LocalDateTime;
4142
import java.util.ArrayList;
4243
import java.util.HashMap;
4344
import java.util.List;
@@ -122,46 +123,63 @@ public void evaluate() throws Throwable {
122123
public void testCsvFormatTable() throws Exception {
123124
hiveShell.execute(
124125
"CREATE TABLE csv_table (a INT COMMENT 'comment a', b STRING COMMENT 'comment b')");
125-
doTestFormatTable("csv_table");
126+
doTestCSVFormatTable("csv_table");
126127
}
127128

128129
@Test
129130
public void testCsvFormatTableWithDelimiter() throws Exception {
130131
hiveShell.execute(
131132
"CREATE TABLE csv_table_delimiter (a INT COMMENT 'comment a', b STRING COMMENT 'comment b') ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'");
132-
doTestFormatTable("csv_table_delimiter");
133+
doTestCSVFormatTable("csv_table_delimiter");
133134
}
134135

135136
@Test
136137
public void testPartitionTable() throws Exception {
137138
hiveShell.execute(
138139
"CREATE TABLE partition_table (a INT COMMENT 'comment a') PARTITIONED BY (b STRING COMMENT 'comment b')");
139-
doTestFormatTable("partition_table");
140+
doTestCSVFormatTable("partition_table");
140141
}
141142

142143
@Test
143144
public void testFlinkCreateCsvFormatTable() throws Exception {
144145
tEnv.executeSql(
145146
"CREATE TABLE flink_csv_table (a INT COMMENT 'comment a', b STRING COMMENT 'comment b') with ('type'='format-table', 'file.format'='csv')")
146147
.await();
147-
doTestFormatTable("flink_csv_table");
148+
doTestCSVFormatTable("flink_csv_table");
148149
}
149150

150151
@Test
151152
public void testFlinkCreateFormatTableWithDelimiter() throws Exception {
152153
tEnv.executeSql(
153154
"CREATE TABLE flink_csv_table_delimiter (a INT COMMENT 'comment a', b STRING COMMENT 'comment b') with ('type'='format-table', 'file.format'='csv', 'field-delimiter'=';')");
154-
doTestFormatTable("flink_csv_table_delimiter");
155+
doTestCSVFormatTable("flink_csv_table_delimiter");
155156
}
156157

157158
@Test
158159
public void testFlinkCreatePartitionTable() throws Exception {
159160
tEnv.executeSql(
160161
"CREATE TABLE flink_partition_table (a INT COMMENT 'comment a', b STRING COMMENT 'comment b') PARTITIONED BY (b) with ('type'='format-table', 'file.format'='csv')");
161-
doTestFormatTable("flink_partition_table");
162+
doTestCSVFormatTable("flink_partition_table");
162163
}
163164

164-
private void doTestFormatTable(String tableName) throws Exception {
165+
@Test
166+
public void testJsonFormatTable() throws Exception {
167+
hiveShell.execute(
168+
"CREATE TABLE json_table (a INT COMMENT 'comment a', b TIMESTAMP COMMENT 'comment b') "
169+
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' "
170+
+ "STORED AS TEXTFILE");
171+
doTestJSONFormatTable("json_table");
172+
}
173+
174+
@Test
175+
public void testFlinkCreateJsonFormatTable() throws Exception {
176+
tEnv.executeSql(
177+
"CREATE TABLE flink_json_table (a INT COMMENT 'comment a', b TIMESTAMP COMMENT 'comment b') with ('type'='format-table', 'file.format'='json')")
178+
.await();
179+
doTestJSONFormatTable("flink_json_table");
180+
}
181+
182+
private void doTestCSVFormatTable(String tableName) throws Exception {
165183
List<String> descResult =
166184
collect("DESC " + tableName).stream()
167185
.map(Objects::toString)
@@ -180,6 +198,35 @@ private void doTestFormatTable(String tableName) throws Exception {
180198
Row.of(100, "Hive"), Row.of(200, "Table"), Row.of(300, "Paimon"));
181199
}
182200

201+
private void doTestJSONFormatTable(String tableName) throws Exception {
202+
List<String> descResult =
203+
collect("DESC " + tableName).stream()
204+
.map(Objects::toString)
205+
.collect(Collectors.toList());
206+
assertThat(descResult)
207+
.containsExactly(
208+
"+I[a, INT, true, null, null, null, comment a]",
209+
"+I[b, TIMESTAMP(3), true, null, null, null, comment b]");
210+
hiveShell.execute(
211+
String.format(
212+
"INSERT INTO %s VALUES (1, '2025-03-17 10:15:30'), (2, '2025-03-18 10:15:30')",
213+
tableName));
214+
assertThat(collect(String.format("SELECT * FROM %s", tableName)))
215+
.containsExactlyInAnyOrder(
216+
Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
217+
Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
218+
tEnv.executeSql(
219+
String.format(
220+
"INSERT INTO %s VALUES (3, CAST('2025-03-19 10:15:30' AS TIMESTAMP))",
221+
tableName))
222+
.await();
223+
assertThat(collect(String.format("SELECT * FROM %s", tableName)))
224+
.containsExactlyInAnyOrder(
225+
Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
226+
Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")),
227+
Row.of(3, LocalDateTime.parse("2025-03-19T10:15:30")));
228+
}
229+
183230
@Test
184231
public void testListTables() throws Exception {
185232
hiveShell.execute("CREATE TABLE list_table ( a INT, b STRING)");

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@
4646
import org.apache.spark.sql.connector.expressions.NamedReference;
4747
import org.apache.spark.sql.connector.expressions.Transform;
4848
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
49+
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
4950
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
5051
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
5152
import org.apache.spark.sql.execution.datasources.v2.FileTable;
5253
import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
54+
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable;
5355
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable;
5456
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable;
5557
import org.apache.spark.sql.types.StructField;
@@ -489,6 +491,14 @@ private static FileTable convertToFileTable(Identifier ident, FormatTable format
489491
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
490492
scala.Option.apply(schema),
491493
ParquetFileFormat.class);
494+
} else if (formatTable.format() == FormatTable.Format.JSON) {
495+
return new JsonTable(
496+
ident.name(),
497+
SparkSession.active(),
498+
dsOptions,
499+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
500+
scala.Option.apply(schema),
501+
JsonFileFormat.class);
492502
} else {
493503
throw new UnsupportedOperationException(
494504
"Unsupported format table "

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ object SparkSource {
118118

119119
val NAME = "paimon"
120120

121-
val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet")
121+
val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet", "json")
122122

123123
def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): BaseRelation = {
124124
new BaseRelation {

paimon-spark/paimon-spark-ut/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ under the License.
3333

3434
<properties>
3535
<spark.version>${paimon-spark-common.spark.version}</spark.version>
36-
<jackson.version>${paimon.shade.jackson.version}</jackson.version>
36+
<jackson.version>2.15.2</jackson.version>
3737
</properties>
3838

3939
<dependencies>

0 commit comments

Comments
 (0)