Skip to content

Commit 5cab09f

Browse files
[Flink] Fix comments for create table (lakesoul-io#579)
* fix comments for create table Signed-off-by: chenxu <chenxu@dmetasoul.com> * fix path qualifying Signed-off-by: chenxu <chenxu@dmetasoul.com> --------- Signed-off-by: chenxu <chenxu@dmetasoul.com> Co-authored-by: chenxu <chenxu@dmetasoul.com>
1 parent 4d968b8 commit 5cab09f

File tree

6 files changed

+99
-25
lines changed

6 files changed

+99
-25
lines changed

lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ private void initialize() {
185185
dataBaseProperty.getDbName(),
186186
dataBaseProperty.getUsername(),
187187
dataBaseProperty.getPassword());
188-
System.out.println("initialize native metadata client: " + config);
189188
final CompletableFuture<Boolean> future = new CompletableFuture<>();
190189
tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client(
191190
new ReferencedBooleanCallback((bool, msg) -> {

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,17 @@
1111
import com.dmetasoul.lakesoul.meta.entity.Namespace;
1212
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
1313
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
14+
import org.apache.commons.lang3.StringUtils;
1415
import org.apache.flink.configuration.GlobalConfiguration;
1516
import org.apache.flink.core.fs.FileSystem;
1617
import org.apache.flink.core.fs.Path;
1718
import org.apache.flink.lakesoul.table.LakeSoulDynamicTableFactory;
1819
import org.apache.flink.lakesoul.tool.FlinkUtil;
20+
import org.apache.flink.table.api.Schema;
1921
import org.apache.flink.table.api.TableColumn;
2022
import org.apache.flink.table.api.TableSchema;
2123
import org.apache.flink.table.api.constraints.UniqueConstraint;
22-
import org.apache.flink.table.catalog.Catalog;
23-
import org.apache.flink.table.catalog.CatalogBaseTable;
24-
import org.apache.flink.table.catalog.CatalogDatabase;
25-
import org.apache.flink.table.catalog.CatalogFunction;
26-
import org.apache.flink.table.catalog.CatalogPartition;
27-
import org.apache.flink.table.catalog.CatalogPartitionSpec;
28-
import org.apache.flink.table.catalog.ObjectPath;
29-
import org.apache.flink.table.catalog.ResolvedCatalogTable;
30-
import org.apache.flink.table.catalog.ResolvedCatalogView;
24+
import org.apache.flink.table.catalog.*;
3125
import org.apache.flink.table.catalog.exceptions.CatalogException;
3226
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
3327
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -48,6 +42,7 @@
4842

4943
import java.io.IOException;
5044
import java.util.*;
45+
import java.util.stream.Collectors;
5146

5247
import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_HASH_PARTITION_SPLITTER;
5348
import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH;
@@ -271,6 +266,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
271266
checkNotNull(table);
272267
TableSchema schema = table.getSchema();
273268
schema.getTableColumns().forEach(this::validateType);
269+
List<Optional<String>> comments = table.getUnresolvedSchema().getColumns().stream()
270+
.map(Schema.UnresolvedColumn::getComment).collect(Collectors.toList());
274271

275272
if (!databaseExists(tablePath.getDatabaseName())) {
276273
throw new DatabaseNotExistException(CATALOG_NAME, tablePath.getDatabaseName());
@@ -316,7 +313,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
316313
}
317314
String tableId = TABLE_ID_PREFIX + UUID.randomUUID();
318315
String qualifiedPath = "";
319-
String sparkSchema = FlinkUtil.toArrowSchema(schema, cdcColumn).toJson();
316+
String sparkSchema = FlinkUtil.toArrowSchema(schema, comments, cdcColumn).toJson();
320317
List<String> partitionKeys = Collections.emptyList();
321318
if (table instanceof ResolvedCatalogTable) {
322319
partitionKeys = ((ResolvedCatalogTable) table).getPartitionKeys();
@@ -328,11 +325,12 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
328325
String flinkWarehouseDir = GlobalConfiguration.loadConfiguration().get(FLINK_WAREHOUSE_DIR);
329326
if (null != flinkWarehouseDir) {
330327
path = String.join("/", flinkWarehouseDir, tablePath.getDatabaseName(), tablePath.getObjectName());
328+
} else {
329+
throw new CatalogException("Cannot determine table path");
331330
}
332331
}
333332
try {
334-
FileSystem fileSystem = new Path(path).getFileSystem();
335-
Path qp = new Path(path).makeQualified(fileSystem);
333+
Path qp = FlinkUtil.makeQualifiedPath(path);
336334
FlinkUtil.createAndSetTableDirPermission(qp, false);
337335
qualifiedPath = qp.toUri().toString();
338336
} catch (IOException e) {
@@ -350,6 +348,9 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
350348
if (!schema.getWatermarkSpecs().isEmpty()) {
351349
tableOptions.put(WATERMARK_SPEC_JSON, FlinkUtil.serializeWatermarkSpec(schema.getWatermarkSpecs()));
352350
}
351+
if (!StringUtils.isEmpty(table.getComment())) {
352+
tableOptions.put("comment", table.getComment());
353+
}
353354

354355
Map<String, String> computedColumns = new HashMap<>();
355356
schema.getTableColumns().forEach(tableColumn -> {

lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
1717
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
1818
import org.apache.arrow.vector.types.pojo.Field;
19+
import org.apache.arrow.vector.types.pojo.FieldType;
1920
import org.apache.flink.configuration.ConfigOption;
2021
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.configuration.GlobalConfiguration;
@@ -120,6 +121,7 @@ public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(RowType ro
120121
}
121122

122123
public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchema tableSchema,
124+
List<Optional<String>> comments,
123125
Optional<String> cdcColumn)
124126
throws CatalogException {
125127
List<Field> fields = new ArrayList<>();
@@ -130,6 +132,14 @@ public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchem
130132
fields.add(cdcField);
131133
}
132134

135+
java.lang.reflect.Field metaField;
136+
try {
137+
metaField = FieldType.class.getDeclaredField("metadata");
138+
metaField.setAccessible(true);
139+
} catch (NoSuchFieldException e) {
140+
throw new CatalogException(e);
141+
}
142+
133143
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
134144
if (tableSchema.getTableColumn(i).get() instanceof TableColumn.ComputedColumn) {
135145
continue;
@@ -153,6 +163,16 @@ public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchem
153163
fields.get(0).toString());
154164
}
155165
} else {
166+
if (comments.get(i).isPresent()) {
167+
try {
168+
Map<String, String> metadata = new HashMap<>();
169+
metadata.put("spark_comment", comments.get(i).get());
170+
metadata.putAll(arrowField.getMetadata());
171+
metaField.set(arrowField.getFieldType(), metadata);
172+
} catch (IllegalAccessException e) {
173+
throw new CatalogException(e);
174+
}
175+
}
156176
fields.add(arrowField);
157177
}
158178
}
@@ -236,13 +256,15 @@ public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) {
236256
Builder bd = Schema.newBuilder();
237257

238258
String lakesoulCdcColumnName = properties.getString(CDC_CHANGE_COLUMN);
239-
boolean contains = (lakesoulCdcColumnName != null && !lakesoulCdcColumnName.isEmpty());
259+
boolean containsCdc = (lakesoulCdcColumnName != null && !lakesoulCdcColumnName.isEmpty());
240260

241261
for (RowType.RowField field : rowType.getFields()) {
242-
if (contains && field.getName().equals(lakesoulCdcColumnName)) {
262+
if (containsCdc && field.getName().equals(lakesoulCdcColumnName)) {
243263
continue;
244264
}
245-
bd.column(field.getName(), field.getType().asSerializableString());
265+
bd.column(field.getName(), field.getType().asSerializableString())
266+
.withComment(arrowSchema.findField(field.getName())
267+
.getMetadata().getOrDefault("spark_comment", null));
246268
}
247269
if (properties.getString(COMPUTE_COLUMN_JSON) != null) {
248270
JSONObject computeColumnJson = JSONObject.parseObject(properties.getString(COMPUTE_COLUMN_JSON));
@@ -259,16 +281,20 @@ public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) {
259281
watermarkJson.forEach((column, watermarkExpr) -> bd.watermark(column, (String) watermarkExpr));
260282
}
261283

262-
263284
List<String> parKeys = partitionKeys.rangeKeys;
264285
HashMap<String, String> conf = new HashMap<>();
265286
properties.forEach((key, value) -> conf.put(key, value.toString()));
287+
288+
String comment = "";
289+
if (properties.containsKey("comment")) {
290+
comment = properties.getString("comment");
291+
}
292+
266293
if (FlinkUtil.isView(tableInfo)) {
267-
return CatalogView.of(bd.build(), "", properties.getString(VIEW_ORIGINAL_QUERY),
294+
return CatalogView.of(bd.build(), comment, properties.getString(VIEW_ORIGINAL_QUERY),
268295
properties.getString(VIEW_EXPANDED_QUERY), conf);
269296
} else {
270-
return CatalogTable.of(bd.build(), "", parKeys, conf);
271-
297+
return CatalogTable.of(bd.build(), comment, parKeys, conf);
272298
}
273299
}
274300

@@ -337,13 +363,20 @@ public static List<DataType> getFieldDataTypes(DataType dataType) {
337363

338364
public static Path makeQualifiedPath(String path) throws IOException {
339365
Path p = new Path(path);
340-
FileSystem fileSystem = p.getFileSystem();
341-
return p.makeQualified(fileSystem);
366+
return makeQualifiedPath(p);
342367
}
343368

344369
public static Path makeQualifiedPath(Path p) throws IOException {
345-
FileSystem fileSystem = p.getFileSystem();
346-
return p.makeQualified(fileSystem);
370+
String uriString = p.toUri().toString();
371+
if (uriString.startsWith("file:///")) {
372+
return p;
373+
} else if (uriString.startsWith("file:/")) {
374+
FileSystem fileSystem = p.getFileSystem();
375+
return new Path(uriString.substring("file:/".length() - 1)).makeQualified(fileSystem);
376+
} else {
377+
FileSystem fileSystem = p.getFileSystem();
378+
return p.makeQualified(fileSystem);
379+
}
347380
}
348381

349382
public static String getDatabaseName(String fullDatabaseName) {

lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/LakeSoulCatalogTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
1717
import org.apache.flink.table.catalog.Catalog;
1818
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
19+
import org.apache.flink.types.Row;
20+
import org.apache.flink.util.CollectionUtil;
1921
import org.apache.spark.sql.arrow.ArrowUtils;
2022
import org.apache.spark.sql.types.StructType;
2123
import org.assertj.core.api.Assertions;
2224
import org.junit.Before;
2325
import org.junit.Test;
2426

2527
import java.util.HashMap;
28+
import java.util.List;
2629
import java.util.Map;
2730

2831
import static org.apache.spark.sql.types.DataTypes.LongType;
@@ -107,6 +110,29 @@ public void createTableWithLike() {
107110
tEnvs.executeSql("DROP TABLE like_table");
108111
}
109112

113+
@Test
114+
public void createTableWithComments() {
115+
tEnvs.executeSql("create table table_with_comments (" +
116+
"user_id BIGINT PRIMARY KEY NOT ENFORCED COMMENT 'user id'," +
117+
"name STRING COMMENT 'user name') " +
118+
"COMMENT 'this is user table' " +
119+
"WITH (" +
120+
"'connector'='lakesoul'," +
121+
"'hashBucketNum'='2'," +
122+
"'path'='file:///tmp/table_with_comments')");
123+
List<Row> showCreateTableResult = CollectionUtil.iteratorToList(
124+
tEnvs.executeSql("show create table table_with_comments").collect());
125+
Assertions.assertThat(showCreateTableResult.get(0).getField(0).toString())
126+
.contains("COMMENT 'this is user table'");
127+
List<Row> descTableResult = CollectionUtil.iteratorToList(
128+
tEnvs.executeSql("desc table_with_comments").collect());
129+
Assertions.assertThat(descTableResult.get(0).getField(6).toString())
130+
.contains("user id");
131+
Assertions.assertThat(descTableResult.get(1).getField(6).toString())
132+
.contains("user name");
133+
tEnvs.executeSql("drop table table_with_comments").print();
134+
}
135+
110136
@Test
111137
public void dropTable() {
112138
tEnvs.executeSql("drop table if exists user_behavior7464434");

lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulCatalog.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class LakeSoulCatalog(val spark: SparkSession) extends TableCatalog
7777
val tableProperties = properties.asScala.filterKeys {
7878
case TableCatalog.PROP_LOCATION => false
7979
case TableCatalog.PROP_PROVIDER => false
80-
case TableCatalog.PROP_COMMENT => false
8180
case TableCatalog.PROP_OWNER => false
8281
case "path" => false
8382
case _ => true

lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,4 +1322,20 @@ class TableCreationSuite
13221322
}
13231323
}
13241324
}
1325+
1326+
test("create table with comment") {
1327+
withTempDir { dir =>
1328+
withTable("lakesoul_test") {
1329+
sql(
1330+
s"""CREATE TABLE lakesoul_test
1331+
|(a string comment 'this is a string')
1332+
|USING lakesoul
1333+
|comment 'this is a table'
1334+
""".stripMargin)
1335+
val createSql = sql("show create table lakesoul_test").collect()(0).getString(0)
1336+
assert(createSql.contains("a STRING COMMENT 'this is a string'"))
1337+
assert(createSql.contains("COMMENT 'this is a table'"))
1338+
}
1339+
}
1340+
}
13251341
}

0 commit comments

Comments
 (0)