1818
1919package com .dtstack .flinkx .kingbase .format ;
2020
21+ import com .dtstack .flinkx .constants .ConstantValue ;
2122import com .dtstack .flinkx .enums .EWriteMode ;
2223import com .dtstack .flinkx .exception .WriteRecordException ;
2324import com .dtstack .flinkx .rdb .outputformat .JdbcOutputFormat ;
25+ import com .dtstack .flinkx .rdb .util .DbUtil ;
26+ import com .dtstack .flinkx .util .ClassUtil ;
2427import com .dtstack .flinkx .util .ExceptionUtil ;
25- import com .dtstack .flinkx .util .StringUtil ;
2628import com .kingbase8 .copy .CopyManager ;
2729import com .kingbase8 .core .BaseConnection ;
2830import org .apache .commons .collections .CollectionUtils ;
@@ -54,6 +56,60 @@ public class KingbaseOutputFormat extends JdbcOutputFormat {
5456
5557 private CopyManager copyManager ;
5658
59+ /**
60+ * schema名
61+ */
62+ public String schema ;
63+
64+ @ Override
65+ protected void openInternal (int taskNumber , int numTasks ){
66+ try {
67+ ClassUtil .forName (driverName , getClass ().getClassLoader ());
68+ dbConn = DbUtil .getConnection (dbUrl , username , password );
69+
70+ if (restoreConfig .isRestore ()){
71+ dbConn .setAutoCommit (false );
72+ }
73+ // 查询主键时用table格式
74+ if (CollectionUtils .isEmpty (fullColumn )) {
75+ fullColumn = probeFullColumns (table , dbConn );
76+ }
77+
78+ if (!EWriteMode .INSERT .name ().equalsIgnoreCase (mode )){
79+ if (updateKey == null || updateKey .size () == 0 ) {
80+ updateKey = probePrimaryKeys (table , dbConn );
81+ }
82+ }
83+ // 其他情况,使用schema.table作为表名
84+ table = schema + ConstantValue .POINT_SYMBOL + table ;
85+ if (fullColumnType == null ) {
86+ fullColumnType = analyzeTable ();
87+ }
88+
89+ for (String col : column ) {
90+ for (int i = 0 ; i < fullColumn .size (); i ++) {
91+ if (col .equalsIgnoreCase (fullColumn .get (i ))){
92+ columnType .add (fullColumnType .get (i ));
93+ break ;
94+ }
95+ }
96+ }
97+
98+ preparedStatement = prepareTemplates ();
99+ readyCheckpoint = false ;
100+
101+ LOG .info ("subTask[{}}] wait finished" , taskNumber );
102+ } catch (SQLException sqe ) {
103+ throw new IllegalArgumentException ("open() failed." , sqe );
104+ }
105+ try {
106+ if (batchInterval > 1 ) {
107+ dbConn .setAutoCommit (false );
108+ }
109+ } catch (Exception e ) {
110+ LOG .warn (ExceptionUtil .getErrorMessage (e ));
111+ }
112+ }
57113
58114 @ Override
59115 protected PreparedStatement prepareTemplates () throws SQLException {
@@ -71,18 +127,6 @@ protected PreparedStatement prepareTemplates() throws SQLException {
71127 return super .prepareTemplates ();
72128 }
73129
74- @ Override
75- protected void openInternal (int taskNumber , int numTasks ){
76- super .openInternal (taskNumber , numTasks );
77- try {
78- if (batchInterval > 1 ) {
79- dbConn .setAutoCommit (false );
80- }
81- } catch (Exception e ) {
82- LOG .warn (ExceptionUtil .getErrorMessage (e ));
83- }
84- }
85-
86130 @ Override
87131 protected void writeSingleRecordInternal (Row row ) throws WriteRecordException {
88132 if (!checkIsCopyMode (insertSqlMode )){
@@ -195,4 +239,7 @@ private boolean checkIsCopyMode(String insertMode){
195239 return true ;
196240 }
197241
242+ public void setSchema (String schema ){
243+ this .schema = schema ;
244+ }
198245}
0 commit comments