Skip to content

Commit d66acef

Browse files
committed
pg sink
1 parent e902d31 commit d66acef

File tree

13 files changed

+122
-385
lines changed

13 files changed

+122
-385
lines changed

docs/postgresqlSink.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ CREATE TABLE tableName(
1616
```
1717

1818
## 2.支持版本
19-
postgresql-8.2+
19+
postgresql-9.5+
2020

2121
## 3.表结构定义
2222

@@ -36,8 +36,6 @@ CREATE TABLE tableName(
3636
| password | postgresql连接密码|||
3737
| tableName | postgresqll表名称|||
3838
| parallelism | 并行度设置||1|
39-
| isUpsert | 使用upsert模式插入数据(版本9.5之后才支持upsert) |否|false
40-
| keyField | 设置更新主键字段名(isupsert为true时为必填项)||
4139

4240
## 5.样例:
4341
```

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MySQLDialect.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public Optional<String> getUpsertStatement(String schema, String tableName, Stri
6363
public Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldNames) {
6464
String updateClause = Arrays.stream(fieldNames).map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
6565
.collect(Collectors.joining(", "));
66-
return Optional.of(getInsertIntoStatement(tableName, fieldNames) +
66+
return Optional.of(getInsertIntoStatement("", tableName, fieldNames) +
6767
" ON DUPLICATE KEY UPDATE " + updateClause
6868
);
6969
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 0 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2121
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2222
import com.dtstack.flink.sql.sink.rdb.RdbSink;
23-
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2423
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2524

2625
/**
@@ -53,190 +52,4 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5352
.setAllReplace(allReplace)
5453
.setUpdateMode(updateMode).build();
5554
}
56-
57-
58-
// @Override
59-
// public RetractJDBCOutputFormat getOutputFormat() {
60-
// return new ExtendOutputFormat();
61-
// }
62-
//
63-
// @Override
64-
// public void buildSql(String scheam, String tableName, List<String> fields) {
65-
// buildInsertSql(scheam, tableName, fields);
66-
// }
67-
//
68-
// private void buildInsertSql(String scheam, String tableName, List<String> fields) {
69-
// tableName = DtStringUtil.getTableFullPath(scheam, tableName);
70-
// String columns = fields.stream()
71-
// .map(this::quoteIdentifier)
72-
// .collect(Collectors.joining(", "));
73-
//
74-
// String placeholders = fields.stream()
75-
// .map(f -> "?")
76-
// .collect(Collectors.joining(", "));
77-
// this.sql = "INSERT INTO " + tableName + "(" + columns + ")" + " VALUES (" + placeholders + ")";
78-
// }
79-
//
80-
// /**
81-
// * use MERGE INTO build oracle replace into sql
82-
// * @param tableName
83-
// * @param fieldNames create table contained column columns
84-
// * @param realIndexes <key: indexName, value: index contains columns >
85-
// * @param fullField real columns , query from db
86-
// * @return
87-
// */
88-
// @Override
89-
// public String buildUpdateSql(String scheam, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
90-
// tableName = DtStringUtil.getTableFullPath(scheam, tableName);
91-
//
92-
// StringBuilder sb = new StringBuilder();
93-
//
94-
// sb.append("MERGE INTO " + tableName + " T1 USING "
95-
// + "(" + makeValues(fieldNames) + ") T2 ON ("
96-
// + updateKeySql(realIndexes) + ") ");
97-
//
98-
//
99-
// String updateSql1 = buildUpdateSqlForAllValue(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
100-
// String updateSql = buildUpdateSqlForNotnullValue(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
101-
//
102-
// if (StringUtils.isNotEmpty(updateSql)) {
103-
// sb.append(" WHEN MATCHED THEN UPDATE SET ");
104-
// sb.append(updateSql);
105-
// }
106-
//
107-
// sb.append(" WHEN NOT MATCHED THEN "
108-
// + "INSERT (" + quoteColumns(fieldNames) + ") VALUES ("
109-
// + quoteColumns(fieldNames, "T2") + ")");
110-
//
111-
// return sb.toString();
112-
// }
113-
//
114-
//
115-
// public String quoteColumns(List<String> column) {
116-
// return quoteColumns(column, null);
117-
// }
118-
//
119-
// public String quoteColumns(List<String> column, String table) {
120-
// String prefix = StringUtils.isBlank(table) ? "" : DtStringUtil.addQuoteForStr(table) + ".";
121-
// List<String> list = new ArrayList<>();
122-
// for (String col : column) {
123-
// list.add(prefix + DtStringUtil.addQuoteForStr(col));
124-
// }
125-
// return StringUtils.join(list, ",");
126-
// }
127-
//
128-
// /**
129-
// * extract all distinct index column
130-
// * @param realIndexes
131-
// * @return
132-
// */
133-
// protected List<String> keyColList(Map<String, List<String>> realIndexes) {
134-
// List<String> keyCols = new ArrayList<>();
135-
// for (Map.Entry<String, List<String>> entry : realIndexes.entrySet()) {
136-
// List<String> list = entry.getValue();
137-
// for (String col : list) {
138-
// if (!containsIgnoreCase(keyCols,col)) {
139-
// keyCols.add(col);
140-
// }
141-
// }
142-
// }
143-
// return keyCols;
144-
// }
145-
//
146-
// /**
147-
// * build update sql , such as UPDATE SET "T1".A="T2".A
148-
// * @param updateColumn create table contained column columns
149-
// * @param fullColumn real columns , query from db
150-
// * @param leftTable alias
151-
// * @param rightTable alias
152-
// * @param indexCols index column
153-
// * @return
154-
// */
155-
// public String buildUpdateSqlForAllValue(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
156-
// String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForStr(leftTable) + ".";
157-
// String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForStr(rightTable) + ".";
158-
//
159-
// String sql = fullColumn.stream().filter(col -> {
160-
// return !(indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols, col));
161-
// }).map(col -> {
162-
// String leftCol = prefixLeft + DtStringUtil.addQuoteForStr(col);
163-
// String rightCol = prefixRight + DtStringUtil.addQuoteForStr(col);
164-
//
165-
// if (containsIgnoreCase(updateColumn, col)) {
166-
// return (leftCol + "=" + rightCol);
167-
// } else {
168-
// return (leftCol + "=null");
169-
// }
170-
// }).collect(Collectors.joining(","));
171-
//
172-
// return sql;
173-
// }
174-
//
175-
// public String buildUpdateSqlForNotnullValue(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
176-
// String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForStr(leftTable) + ".";
177-
// String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForStr(rightTable) + ".";
178-
//
179-
// String sql = fullColumn.stream().filter(col -> {
180-
// return !(indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols, col));
181-
// }).map(col -> {
182-
// String leftCol = prefixLeft + DtStringUtil.addQuoteForStr(col);
183-
// String rightCol = prefixRight + DtStringUtil.addQuoteForStr(col);
184-
//
185-
// if (containsIgnoreCase(updateColumn, col)) {
186-
// return leftCol + "= nvl(" + rightCol + "," + leftCol + ")";
187-
// }
188-
// return "";
189-
// }).collect(Collectors.joining(","));
190-
//
191-
// return sql;
192-
// }
193-
//
194-
//
195-
//
196-
//
197-
// /**
198-
// * build connect sql by index column, such as T1."A"=T2."A"
199-
// * @param updateKey
200-
// * @return
201-
// */
202-
// public String updateKeySql(Map<String, List<String>> updateKey) {
203-
// List<String> exprList = new ArrayList<>();
204-
// for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
205-
// List<String> colList = new ArrayList<>();
206-
// for (String col : entry.getValue()) {
207-
// colList.add("T1." + DtStringUtil.addQuoteForStr(col) + "=T2." + DtStringUtil.addQuoteForStr(col));
208-
// }
209-
// exprList.add(StringUtils.join(colList, " AND "));
210-
// }
211-
// return StringUtils.join(exprList, " OR ");
212-
// }
213-
//
214-
// /**
215-
// * build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
216-
// *
217-
// * @param column destination column
218-
// * @return
219-
// */
220-
// public String makeValues(List<String> column) {
221-
// StringBuilder sb = new StringBuilder("SELECT ");
222-
// for (int i = 0; i < column.size(); ++i) {
223-
// if (i != 0) {
224-
// sb.append(",");
225-
// }
226-
// sb.append("? " + DtStringUtil.addQuoteForStr(column.get(i)));
227-
// }
228-
// sb.append(" FROM DUAL");
229-
// return sb.toString();
230-
// }
231-
//
232-
// public boolean containsIgnoreCase(List<String> l, String s) {
233-
// Iterator<String> it = l.iterator();
234-
// while (it.hasNext()) {
235-
// if (it.next().equalsIgnoreCase(s))
236-
// return true;
237-
// }
238-
// return false;
239-
// }
240-
241-
24255
}

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
<!--<module>kafka10</module>-->
1515
<!--<module>kafka11</module>-->
1616
<!--<module>kafka</module>-->
17-
<module>mysql</module>
17+
<!--<module>mysql</module>-->
1818
<!--<module>hbase</module>-->
1919
<!--<module>elasticsearch5</module>-->
2020
<!--<module>mongo</module>-->
2121
<!--<module>redis5</module>-->
2222
<!--<module>launcher</module>-->
2323
<module>rdb</module>
2424
<!--<module>sqlserver</module>-->
25-
<!--<module>oracle</module>-->
25+
<module>oracle</module>
2626
<!--<module>cassandra</module>-->
2727
<!--<module>kudu</module>-->
28-
<!--<module>postgresql</module>-->
28+
<module>postgresql</module>
2929
<!--<module>kafka08</module>-->
3030
<!--<module>serversocket</module>-->
3131
<!--<module>console</module>-->
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.postgresql;
20+
21+
22+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
23+
import com.dtstack.flink.sql.util.DtStringUtil;
24+
25+
import java.util.Arrays;
26+
import java.util.Optional;
27+
import java.util.stream.Collectors;
28+
29+
/**
30+
* Date: 2020/1/3
31+
* Company: www.dtstack.com
32+
* @author maqi
33+
*/
34+
public class PostgresqlDialect implements JDBCDialect {
35+
@Override
36+
public boolean canHandle(String url) {
37+
return url.startsWith("jdbc:postgresql:");
38+
}
39+
40+
@Override
41+
public Optional<String> defaultDriverName() {
42+
return Optional.of("org.postgresql.Driver");
43+
}
44+
45+
@Override
46+
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
47+
String uniqueColumns = Arrays.stream(uniqueKeyFields)
48+
.map(this::quoteIdentifier)
49+
.collect(Collectors.joining(", "));
50+
51+
String updateClause = Arrays.stream(fieldNames)
52+
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
53+
.collect(Collectors.joining(", "));
54+
55+
return Optional.of(getInsertIntoStatement(schema, tableName, fieldNames) +
56+
" ON CONFLICT (" + uniqueColumns + ")" +
57+
" DO UPDATE SET " + updateClause
58+
);
59+
60+
}
61+
62+
}

0 commit comments

Comments
 (0)