Skip to content

Commit ebe21fb

Browse files
committed
add class
1 parent 0142a4f commit ebe21fb

File tree

2 files changed

+267
-0
lines changed

2 files changed

+267
-0
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.rdb;
20+
21+
22+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
23+
24+
import java.util.Objects;
25+
import java.util.Optional;
26+
27+
import static org.apache.flink.util.Preconditions.checkNotNull;
28+
29+
30+
public class JDBCOptions {
31+
32+
private String dbURL;
33+
private String tableName;
34+
private String driverName;
35+
private String username;
36+
private String password;
37+
private String schema;
38+
private JDBCDialect dialect;
39+
40+
private JDBCOptions(String dbURL, String tableName, String driverName, String username,
41+
String password, String schema, JDBCDialect dialect) {
42+
this.dbURL = dbURL;
43+
this.tableName = tableName;
44+
this.driverName = driverName;
45+
this.username = username;
46+
this.password = password;
47+
this.schema = schema;
48+
this.dialect = dialect;
49+
}
50+
51+
public String getDbURL() {
52+
return dbURL;
53+
}
54+
55+
public String getTableName() {
56+
return tableName;
57+
}
58+
59+
public String getDriverName() {
60+
return driverName;
61+
}
62+
63+
public String getUsername() {
64+
return username;
65+
}
66+
67+
public String getPassword() {
68+
return password;
69+
}
70+
71+
public JDBCDialect getDialect() {
72+
return dialect;
73+
}
74+
75+
public String getSchema() {
76+
return schema;
77+
}
78+
79+
public static Builder builder() {
80+
return new Builder();
81+
}
82+
83+
@Override
84+
public boolean equals(Object o) {
85+
if (o instanceof JDBCOptions) {
86+
JDBCOptions options = (JDBCOptions) o;
87+
return Objects.equals(dbURL, options.dbURL) &&
88+
Objects.equals(tableName, options.tableName) &&
89+
Objects.equals(driverName, options.driverName) &&
90+
Objects.equals(username, options.username) &&
91+
Objects.equals(password, options.password) &&
92+
Objects.equals(schema, options.schema) &&
93+
Objects.equals(dialect.getClass().getName(), options.dialect.getClass().getName());
94+
} else {
95+
return false;
96+
}
97+
}
98+
99+
/**
100+
* Builder of {@link JDBCOptions}.
101+
*/
102+
public static class Builder {
103+
private String dbURL;
104+
private String tableName;
105+
private String driverName;
106+
private String username;
107+
private String password;
108+
private String scheam;
109+
private JDBCDialect dialect;
110+
111+
/**
112+
* required, table name.
113+
*/
114+
public Builder setTableName(String tableName) {
115+
this.tableName = tableName;
116+
return this;
117+
}
118+
119+
/**
120+
* optional, user name.
121+
*/
122+
public Builder setUsername(String username) {
123+
this.username = username;
124+
return this;
125+
}
126+
127+
/**
128+
* optional, password.
129+
*/
130+
public Builder setPassword(String password) {
131+
this.password = password;
132+
return this;
133+
}
134+
135+
/**
136+
* optional, driver name, dialect has a default driver name,
137+
* See {@link JDBCDialect#defaultDriverName}.
138+
*/
139+
public Builder setDriverName(String driverName) {
140+
this.driverName = driverName;
141+
return this;
142+
}
143+
144+
/**
145+
* optional, schema info
146+
*/
147+
public Builder setScheam(String scheam) {
148+
this.scheam = scheam;
149+
return this;
150+
}
151+
152+
/**
153+
* required, JDBC DB url.
154+
*/
155+
public Builder setDBUrl(String dbURL) {
156+
this.dbURL = dbURL;
157+
return this;
158+
}
159+
160+
public Builder setDialect(JDBCDialect dialect) {
161+
this.dialect = dialect;
162+
return this;
163+
}
164+
165+
public JDBCOptions build() {
166+
checkNotNull(dbURL, "No dbURL supplied.");
167+
checkNotNull(tableName, "No tableName supplied.");
168+
169+
if (this.driverName == null) {
170+
Optional<String> optional = dialect.defaultDriverName();
171+
this.driverName = optional.orElseGet(() -> {
172+
throw new NullPointerException("No driverName supplied.");
173+
});
174+
}
175+
176+
return new JDBCOptions(dbURL, tableName, driverName, username, password, scheam, dialect);
177+
}
178+
}
179+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.rdb.format;
20+
21+
import com.dtstack.flink.sql.sink.MetricOutputFormat;
22+
import com.dtstack.flink.sql.util.JDBCUtils;
23+
import org.apache.flink.api.common.io.RichOutputFormat;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.types.Row;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.IOException;
30+
import java.sql.Connection;
31+
import java.sql.DriverManager;
32+
import java.sql.SQLException;
33+
34+
/**
35+
* OutputFormat to write Rows into a JDBC database.
36+
*
37+
* @see Row
38+
* @see DriverManager
39+
*/
40+
public abstract class AbstractJDBCOutputFormat<T> extends MetricOutputFormat {
41+
42+
private static final long serialVersionUID = 1L;
43+
static final int DEFAULT_FLUSH_MAX_SIZE = 100;
44+
static final long DEFAULT_FLUSH_INTERVAL_MILLS = 10000L;
45+
static final boolean DEFAULT_ALLREPLACE_VALUE = false;
46+
47+
private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCOutputFormat.class);
48+
49+
private final String username;
50+
private final String password;
51+
private final String drivername;
52+
protected final String dbURL;
53+
54+
protected transient Connection connection;
55+
56+
public AbstractJDBCOutputFormat(String username, String password, String drivername, String dbURL) {
57+
this.username = username;
58+
this.password = password;
59+
this.drivername = drivername;
60+
this.dbURL = dbURL;
61+
}
62+
63+
@Override
64+
public void configure(Configuration parameters) {
65+
}
66+
67+
protected void establishConnection() throws SQLException, ClassNotFoundException {
68+
JDBCUtils.forName(drivername, getClass().getClassLoader());
69+
if (username == null) {
70+
connection = DriverManager.getConnection(dbURL);
71+
} else {
72+
connection = DriverManager.getConnection(dbURL, username, password);
73+
}
74+
connection.setAutoCommit(false);
75+
}
76+
77+
protected void closeDbConnection() throws IOException {
78+
if (connection != null) {
79+
try {
80+
connection.close();
81+
} catch (SQLException se) {
82+
LOG.warn("JDBC connection could not be closed: " + se.getMessage());
83+
} finally {
84+
connection = null;
85+
}
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)