Skip to content

Commit 7be2c44

Browse files
committed
Escape column names
1 parent acd47fa commit 7be2c44

File tree

5 files changed

+128
-0
lines changed

5 files changed

+128
-0
lines changed

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.cloudsql.mysql;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import com.google.common.base.Strings;
2021
import com.google.common.collect.ImmutableMap;
2122
import io.cdap.cdap.api.annotation.Description;
@@ -25,6 +26,7 @@
2526
import io.cdap.cdap.api.annotation.Name;
2627
import io.cdap.cdap.api.annotation.Plugin;
2728
import io.cdap.cdap.api.data.format.StructuredRecord;
29+
import io.cdap.cdap.api.data.schema.Schema;
2830
import io.cdap.cdap.etl.api.FailureCollector;
2931
import io.cdap.cdap.etl.api.PipelineConfigurer;
3032
import io.cdap.cdap.etl.api.batch.BatchSink;
@@ -40,7 +42,11 @@
4042
import io.cdap.plugin.util.CloudSQLUtil;
4143
import io.cdap.plugin.util.DBUtils;
4244

45+
import java.util.ArrayList;
46+
import java.util.Collections;
47+
import java.util.List;
4348
import java.util.Map;
49+
import java.util.StringJoiner;
4450
import javax.annotation.Nullable;
4551

4652
/** Sink support for a CloudSQL MySQL database. */
@@ -52,6 +58,7 @@
5258
public class CloudSQLMySQLSink extends AbstractDBSink<CloudSQLMySQLSink.CloudSQLMySQLSinkConfig> {
5359

5460
private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig;
61+
private static final Character ESCAPE_CHAR = '`';
5562

5663
public CloudSQLMySQLSink(CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig) {
5764
super(cloudsqlMysqlSinkConfig);
@@ -78,6 +85,24 @@ protected DBRecord getDBRecord(StructuredRecord output) {
7885
return new MysqlDBRecord(output, columnTypes);
7986
}
8087

88+
@Override
89+
protected void setColumnsInfo(List<Schema.Field> fields) {
90+
List<String> columnsList = new ArrayList<>();
91+
StringJoiner columnsJoiner = new StringJoiner(",");
92+
for (Schema.Field field : fields) {
93+
columnsList.add(field.getName());
94+
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
95+
}
96+
97+
super.columns = Collections.unmodifiableList(columnsList);
98+
super.dbColumns = columnsJoiner.toString();
99+
}
100+
101+
@VisibleForTesting
102+
String getDbColumns() {
103+
return dbColumns;
104+
}
105+
81106
@Override
82107
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
83108
String host;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.cloudsql.mysql;
18+
19+
import io.cdap.cdap.api.data.schema.Schema;
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
public class CloudSQLMySQLSinkTest {
24+
@Test
25+
public void testSetColumnsInfo() {
26+
Schema outputSchema = Schema.recordOf("output",
27+
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
28+
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
29+
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
30+
CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig());
31+
Assert.assertNotNull(outputSchema.getFields());
32+
cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields());
33+
Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns());
34+
}
35+
}

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,13 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
9393

9494
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
9595
}
96+
97+
@Override
98+
protected void insertOperation(PreparedStatement stmt) throws SQLException {
99+
for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) {
100+
ColumnType columnType = columnTypes.get(fieldIndex);
101+
Schema.Field field = record.getSchema().getField(columnType.getName(), true);
102+
writeToDB(stmt, field, fieldIndex);
103+
}
104+
}
96105
}

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.mysql;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
@@ -24,6 +25,7 @@
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.api.data.format.StructuredRecord;
28+
import io.cdap.cdap.api.data.schema.Schema;
2729
import io.cdap.cdap.etl.api.FailureCollector;
2830
import io.cdap.cdap.etl.api.batch.BatchSink;
2931
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
@@ -39,9 +41,12 @@
3941
import io.cdap.plugin.db.sink.FieldsValidator;
4042
import io.cdap.plugin.util.DBUtils;
4143

44+
import java.util.ArrayList;
4245
import java.util.Collections;
4346
import java.util.List;
4447
import java.util.Map;
48+
import java.util.StringJoiner;
49+
import java.util.stream.Collectors;
4550
import javax.annotation.Nullable;
4651

4752
/**
@@ -54,6 +59,7 @@
5459
public class MysqlSink extends AbstractDBSink<MysqlSink.MysqlSinkConfig> {
5560

5661
private final MysqlSinkConfig mysqlSinkConfig;
62+
private static final Character ESCAPE_CHAR = '`';
5763

5864
public MysqlSink(MysqlSinkConfig mysqlSinkConfig) {
5965
super(mysqlSinkConfig);
@@ -85,6 +91,24 @@ protected SchemaReader getSchemaReader() {
8591
return new MysqlSchemaReader(null);
8692
}
8793

94+
@Override
95+
protected void setColumnsInfo(List<Schema.Field> fields) {
96+
List<String> columnsList = new ArrayList<>();
97+
StringJoiner columnsJoiner = new StringJoiner(",");
98+
for (Schema.Field field : fields) {
99+
columnsList.add(field.getName());
100+
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
101+
}
102+
103+
super.columns = Collections.unmodifiableList(columnsList);
104+
super.dbColumns = columnsJoiner.toString();
105+
}
106+
107+
@VisibleForTesting
108+
String getDbColumns() {
109+
return dbColumns;
110+
}
111+
88112
/**
89113
* MySQL action configuration.
90114
*/
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.mysql;
18+
19+
import io.cdap.cdap.api.data.schema.Schema;
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
public class MysqlSinkTest {
24+
@Test
25+
public void testSetColumnsInfo() {
26+
Schema outputSchema = Schema.recordOf("output",
27+
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
28+
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
29+
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
30+
MysqlSink mySQLSink = new MysqlSink(new MysqlSink.MysqlSinkConfig());
31+
Assert.assertNotNull(outputSchema.getFields());
32+
mySQLSink.setColumnsInfo(outputSchema.getFields());
33+
Assert.assertEquals("`id`,`name`,`insert`", mySQLSink.getDbColumns());
34+
}
35+
}

0 commit comments

Comments
 (0)