Skip to content

Commit 2bb76d2

Browse files
Paddy0523FlechazoW
authored andcommitted
[feat-4826][inceptor] Fix the get DistributeCache error
1 parent 3c911b0 commit 2bb76d2

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flinkx.connector.inceptor.sink;
20+
21+
import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf;
22+
import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil;
23+
import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormat;
24+
import com.dtstack.flinkx.connector.jdbc.util.JdbcUtil;
25+
import com.dtstack.flinkx.enums.EWriteMode;
26+
import com.dtstack.flinkx.enums.Semantic;
27+
import com.dtstack.flinkx.throwable.FlinkxRuntimeException;
28+
import com.dtstack.flinkx.util.PluginUtil;
29+
30+
import org.apache.flink.api.common.cache.DistributedCache;
31+
import org.apache.flink.api.common.functions.RuntimeContext;
32+
33+
import java.sql.Connection;
34+
import java.sql.SQLException;
35+
36+
/** @author liuliu 2022/3/7 */
37+
public class InceptorHyperbaseOutputFormat extends JdbcOutputFormat {
38+
@Override
39+
protected void openInternal(int taskNumber, int numTasks) {
40+
try {
41+
dbConn = getConnection();
42+
// 默认关闭事务自动提交,手动控制事务
43+
if (Semantic.EXACTLY_ONCE == semantic) {
44+
dbConn.setAutoCommit(false);
45+
}
46+
initColumnList();
47+
if (!EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode())) {
48+
throw new FlinkxRuntimeException(
49+
String.format(
50+
"inceptor hyperbase not support %s mode", jdbcConf.getMode()));
51+
}
52+
53+
buildStmtProxy();
54+
LOG.info("subTask[{}}] wait finished", taskNumber);
55+
} catch (SQLException sqe) {
56+
throw new IllegalArgumentException("open() failed.", sqe);
57+
} finally {
58+
JdbcUtil.commit(dbConn);
59+
}
60+
}
61+
62+
@Override
63+
public Connection getConnection() {
64+
DistributedCache distributedCache;
65+
try {
66+
distributedCache = getRuntimeContext().getDistributedCache();
67+
} catch (Exception e) {
68+
distributedCache = PluginUtil.createDistributedCacheFromContextClassLoader();
69+
}
70+
return InceptorDbUtil.getConnection((InceptorConf) jdbcConf, distributedCache, jobId);
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flinkx.connector.inceptor.sink;
20+
21+
import com.dtstack.flinkx.connector.inceptor.conf.InceptorConf;
22+
import com.dtstack.flinkx.connector.inceptor.util.InceptorDbUtil;
23+
import com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormat;
24+
import com.dtstack.flinkx.connector.jdbc.util.JdbcUtil;
25+
import com.dtstack.flinkx.enums.EWriteMode;
26+
import com.dtstack.flinkx.enums.Semantic;
27+
import com.dtstack.flinkx.throwable.FlinkxRuntimeException;
28+
import com.dtstack.flinkx.util.PluginUtil;
29+
30+
import org.apache.flink.api.common.cache.DistributedCache;
31+
32+
import java.sql.Connection;
33+
import java.sql.SQLException;
34+
35+
/** @author liuliu 2022/2/24 */
36+
public class InceptorSearchOutputFormat extends JdbcOutputFormat {
37+
38+
@Override
39+
protected void openInternal(int taskNumber, int numTasks) {
40+
try {
41+
dbConn = getConnection();
42+
// 默认关闭事务自动提交,手动控制事务
43+
if (Semantic.EXACTLY_ONCE == semantic) {
44+
dbConn.setAutoCommit(false);
45+
}
46+
initColumnList();
47+
if (!EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode())) {
48+
throw new FlinkxRuntimeException(
49+
String.format("inceptor search not support %s mode", jdbcConf.getMode()));
50+
}
51+
52+
buildStmtProxy();
53+
LOG.info("subTask[{}}] wait finished", taskNumber);
54+
} catch (SQLException sqe) {
55+
throw new IllegalArgumentException("open() failed.", sqe);
56+
} finally {
57+
JdbcUtil.commit(dbConn);
58+
}
59+
}
60+
61+
@Override
62+
public Connection getConnection() {
63+
DistributedCache distributedCache;
64+
try {
65+
distributedCache = getRuntimeContext().getDistributedCache();
66+
} catch (Exception e) {
67+
distributedCache = PluginUtil.createDistributedCacheFromContextClassLoader();
68+
}
69+
return InceptorDbUtil.getConnection((InceptorConf) jdbcConf, distributedCache, jobId);
70+
}
71+
}

0 commit comments

Comments
 (0)