Skip to content

Commit 8b7ca0b

Browse files
committed
impala sink
1 parent 977a20b commit 8b7ca0b

File tree

23 files changed

+285
-281
lines changed

23 files changed

+285
-281
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends MetricOutputFormat {
72+
public class CassandraOutputFormat extends MetricOutputFormat<Tuple2> {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends MetricOutputFormat {
40+
public class ConsoleOutputFormat extends MetricOutputFormat<Tuple2> {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.db;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/19
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class DbDialect implements JDBCDialect {
31+
@Override
32+
public boolean canHandle(String url) {
33+
return url.startsWith("jdbc:db2:");
34+
}
35+
36+
@Override
37+
public Optional<String> defaultDriverName() {
38+
return Optional.of("com.ibm.db2.jcc.DB2Driver");
39+
}
40+
41+
@Override
42+
public String quoteIdentifier(String identifier) {
43+
return identifier;
44+
}
45+
46+
}
Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.dtstack.flink.sql.sink.db;
22

3+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
34
import com.dtstack.flink.sql.sink.rdb.RdbSink;
5+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
46

57
import java.util.List;
68
import java.util.Map;
@@ -10,42 +12,23 @@ public class DbSink extends RdbSink {
1012
private static final String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver";
1113

1214
public DbSink() {
15+
super(new DbDialect());
1316
}
14-
15-
@Override
16-
public void buildSql(String schema, String tableName, List<String> fields) {
17-
buildInsertSql(tableName, fields);
18-
}
19-
20-
private void buildInsertSql(String tableName, List<String> fields) {
21-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
22-
String fieldsStr = "";
23-
String placeholder = "";
24-
25-
for (String fieldName : fields) {
26-
fieldsStr += "," + fieldName;
27-
placeholder += ",?";
28-
}
29-
30-
fieldsStr = fieldsStr.replaceFirst(",", "");
31-
placeholder = placeholder.replaceFirst(",", "");
32-
33-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
34-
this.sql = sqlTmp;
35-
}
36-
37-
@Override
38-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
39-
return null;
40-
}
41-
42-
@Override
43-
public String getDriverName() {
44-
return DB2_DRIVER;
45-
}
46-
4717
@Override
48-
public RetractJDBCOutputFormat getOutputFormat() {
49-
return new RetractJDBCOutputFormat();
18+
public JDBCUpsertOutputFormat getOutputFormat() {
19+
JDBCOptions jdbcOptions = JDBCOptions.builder()
20+
.setDBUrl(dbURL).setDialect(jdbcDialect)
21+
.setUsername(userName).setPassword(password)
22+
.setTableName(tableName).build();
23+
24+
return JDBCUpsertOutputFormat.builder()
25+
.setOptions(jdbcOptions)
26+
.setFieldNames(fieldNames)
27+
.setFlushMaxSize(batchNum)
28+
.setFlushIntervalMills(batchWaitInterval)
29+
.setFieldTypes(sqlTypes)
30+
.setKeyFields(primaryKeys)
31+
.setAllReplace(allReplace)
32+
.setUpdateMode(updateMode).build();
5033
}
5134
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
4646
* date: 2017-6-29
4747
*/
48-
public class HbaseOutputFormat extends MetricOutputFormat {
48+
public class HbaseOutputFormat extends MetricOutputFormat<Tuple2> {
4949

5050
private static final Logger LOG = LoggerFactory.getLogger(HbaseOutputFormat.class);
5151

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.impala;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
import org.apache.commons.lang3.StringUtils;
23+
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.Optional;
27+
import java.util.stream.Collectors;
28+
29+
/**
30+
* Date: 2020/1/17
31+
* Company: www.dtstack.com
32+
* @author maqi
33+
*/
34+
public class ImpalaDialect implements JDBCDialect {
35+
private static final long serialVersionUID = 1L;
36+
37+
private static final String IMPALA_PARTITION_KEYWORD = " partition";
38+
39+
@Override
40+
public boolean canHandle(String url) {
41+
return url.startsWith("jdbc:impala:");
42+
}
43+
44+
@Override
45+
public Optional<String> defaultDriverName() {
46+
return Optional.of("com.cloudera.impala.jdbc41.Driver");
47+
}
48+
49+
@Override
50+
public String quoteIdentifier(String identifier) {
51+
return identifier;
52+
}
53+
54+
@Override
55+
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
56+
throw new RuntimeException("impala does not support update sql, please remove primary key or use append mode");
57+
}
58+
59+
@Override
60+
public String getInsertIntoStatement(String schema, String tableName, String[] fieldNames, String[] partitionFields) {
61+
62+
String schemaInfo = StringUtils.isEmpty(schema) ? "" : quoteIdentifier(schema) + ".";
63+
64+
List<String> partitionFieldsList = Arrays.asList(partitionFields);
65+
66+
String columns = Arrays.stream(fieldNames)
67+
.filter(f -> !partitionFieldsList.contains(f))
68+
.map(this::quoteIdentifier)
69+
.collect(Collectors.joining(", "));
70+
71+
String placeholders = Arrays.stream(fieldNames)
72+
.map(f -> "?")
73+
.collect(Collectors.joining(", "));
74+
75+
String partitionFieldStr = Arrays.stream(partitionFields)
76+
.map(field -> field.replaceAll("\"", "'"))
77+
.collect(Collectors.joining(", "));
78+
79+
String partitionStatement = StringUtils.isEmpty(partitionFieldStr) ? "" : IMPALA_PARTITION_KEYWORD + "(" + partitionFieldStr + ")";
80+
81+
return "INSERT INTO " + schemaInfo + quoteIdentifier(tableName) +
82+
"(" + columns + ")" + partitionStatement + " VALUES (" + placeholders + ")";
83+
}
84+
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 0 additions & 119 deletions
This file was deleted.

0 commit comments

Comments
 (0)