Skip to content

Commit bc654f1

Browse files
committed
【LogMiner支持Oracle10】【31707】
1 parent c1ac926 commit bc654f1

File tree

6 files changed

+160
-39
lines changed

6 files changed

+160
-39
lines changed

flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ public static ClusterClient createYarnClient(Options launcherOptions) throws Exc
115115
continue;
116116
}
117117

118+
if(!report.getQueue().equals(launcherOptions.getQueue())) {
119+
continue;
120+
}
121+
118122
int thisMemory = report.getApplicationResourceUsageReport().getNeededResources().getMemory();
119123
int thisCores = report.getApplicationResourceUsageReport().getNeededResources().getVirtualCores();
120124

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogFile.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
19-
2018
package com.dtstack.flinkx.oraclelogminer.format;
2119

2220
import java.util.Objects;

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class LogMinerConfig implements Serializable {
6161

6262
private List<String> table;
6363

64-
private Long queryTimeout;
64+
private Long queryTimeout = 100L;
6565

6666
/**
6767
* Oracle 12c第二个版本之后LogMiner不支持自动添加日志

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,20 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33-
import java.sql.*;
34-
import java.util.*;
33+
import java.sql.CallableStatement;
34+
import java.sql.Connection;
35+
import java.sql.DriverManager;
36+
import java.sql.PreparedStatement;
37+
import java.sql.ResultSet;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.sql.Timestamp;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.HashMap;
44+
import java.util.List;
45+
import java.util.Map;
46+
3547

3648
/**
3749
* @author jiangbo
@@ -48,6 +60,7 @@ public class LogMinerConnection {
4860
public static final String EXECUTE_CATALOG_ROLE = "EXECUTE_CATALOG_ROLE";
4961

5062
public static final int ORACLE_11_VERSION = 11;
63+
public int oracleVersion;
5164

5265
public static final List<String> PRIVILEGES_NEEDED = Arrays.asList(
5366
"CREATE SESSION",
@@ -139,15 +152,15 @@ public void startOrUpdateLogMiner(Long startScn) {
139152
String startSql = null;
140153
try {
141154
if (logMinerConfig.getSupportAutoAddLog()) {
142-
startSql = SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG;
155+
startSql = oracleVersion == 10 ? SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG_10 : SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG;
143156
} else {
144157
List<LogFile> newLogFiles = queryLogFiles(startScn);
145158
if (addedLogFiles.equals(newLogFiles)) {
146159
return;
147160
} else {
148161
LOG.info("Log group changed, new log group = {}", GsonUtil.GSON.toJson(newLogFiles));
149162
addedLogFiles = newLogFiles;
150-
startSql = SqlUtil.SQL_START_LOG_MINER;
163+
startSql = oracleVersion == 10 ? SqlUtil.SQL_START_LOG_MINER_10 : SqlUtil.SQL_START_LOG_MINER;
151164
}
152165
}
153166

@@ -378,20 +391,23 @@ private List<LogFile> queryLogFiles(Long scn) throws SQLException{
378391
PreparedStatement statement = null;
379392
ResultSet rs = null;
380393
try {
381-
statement = connection.prepareStatement(SqlUtil.SQL_QUERY_LOG_FILE);
394+
statement = connection.prepareStatement(oracleVersion == 10 ? SqlUtil.SQL_QUERY_LOG_FILE_10 : SqlUtil.SQL_QUERY_LOG_FILE);
382395
statement.setLong(1, scn);
383396
statement.setLong(2, scn);
384397
rs = statement.executeQuery();
385398
while (rs.next()) {
386399
LogFile logFile = new LogFile();
387400
logFile.setFileName(rs.getString("name"));
388401
logFile.setFirstChange(rs.getLong("first_change#"));
389-
390-
String nextChangeString = rs.getString("next_change#");
391-
if (nextChangeString.length() == 20) {
402+
if(oracleVersion == 10){
392403
logFile.setNextChange(Long.MAX_VALUE);
393-
} else {
394-
logFile.setNextChange(Long.parseLong(nextChangeString));
404+
}else{
405+
String nextChangeString = rs.getString("next_change#");
406+
if (nextChangeString.length() == 20) {
407+
logFile.setNextChange(Long.MAX_VALUE);
408+
} else {
409+
logFile.setNextChange(Long.parseLong(nextChangeString));
410+
}
395411
}
396412

397413
logFiles.add(logFile);
@@ -455,6 +471,10 @@ public boolean hasNext() throws SQLException{
455471

456472
public void checkPrivileges() {
457473
try (Statement statement = connection.createStatement()) {
474+
475+
oracleVersion = connection.getMetaData().getDatabaseMajorVersion();
476+
LOG.info("Oracle版本为:{}", oracleVersion);
477+
458478
List<String> roles = getUserRoles(statement);
459479
if (roles.contains(DBA_ROLE)) {
460480
return;
@@ -464,15 +484,12 @@ public void checkPrivileges() {
464484
throw new IllegalArgumentException("非DBA角色的用户必须是[EXECUTE_CATALOG_ROLE]角色,请执行sql赋权:GRANT EXECUTE_CATALOG_ROLE TO USERNAME");
465485
}
466486

467-
int version = connection.getMetaData().getDatabaseMajorVersion();
468-
LOG.info("Oracle版本为:{}", version);
469-
470-
if (containsNeededPrivileges(statement, version)) {
487+
if (containsNeededPrivileges(statement)) {
471488
return;
472489
}
473490

474491
String message;
475-
if(ORACLE_11_VERSION == version){
492+
if(ORACLE_11_VERSION <= oracleVersion){
476493
message = "权限不足,请执行sql赋权:GRANT CREATE SESSION, EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, FLASHBACK ANY TABLE, SELECT ANY TABLE, LOCK ANY TABLE, SELECT ANY DICTIONARY TO USER_ROLE;";
477494
}else{
478495
message = "权限不足,请执行sql赋权:GRANT LOGMINING, CREATE SESSION, SELECT ANY TRANSACTION ,SELECT ANY DICTIONARY TO USER_ROLE;";
@@ -484,7 +501,7 @@ public void checkPrivileges() {
484501
}
485502
}
486503

487-
private boolean containsNeededPrivileges(Statement statement, int version) {
504+
private boolean containsNeededPrivileges(Statement statement) {
488505
try (ResultSet rs = statement.executeQuery(SqlUtil.SQL_QUERY_PRIVILEGES)) {
489506
List<String> privileges = new ArrayList<>();
490507
while (rs.next()) {
@@ -496,7 +513,7 @@ private boolean containsNeededPrivileges(Statement statement, int version) {
496513

497514
int privilegeCount = 0;
498515
List<String> privilegeList;
499-
if (version == ORACLE_11_VERSION) {
516+
if (oracleVersion <= ORACLE_11_VERSION) {
500517
privilegeList = ORACLE_11_PRIVILEGES_NEEDED;
501518
} else {
502519
privilegeList = PRIVILEGES_NEEDED;

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/util/SqlUtil.java

Lines changed: 120 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,27 @@ public class SqlUtil {
4545
* 例如NLS_DATE_FORMAT,NLS_NUMERIC_CHARACTERS等)。使用此选项,将使用ANSI / ISO字符串文字格式对重构的SQL语句进行格式化。
4646
*/
4747
public final static String SQL_START_LOG_MINER_AUTO_ADD_LOG = "" +
48-
"BEGIN DBMS_LOGMNR.START_LOGMNR(" +
48+
"BEGIN SYS.DBMS_LOGMNR.START_LOGMNR(" +
4949
" STARTSCN => ?," +
50-
" OPTIONS => DBMS_LOGMNR.SKIP_CORRUPTION " +
51-
" + DBMS_LOGMNR.NO_SQL_DELIMITER " +
52-
" + DBMS_LOGMNR.NO_ROWID_IN_STMT " +
53-
" + DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG " +
54-
" + DBMS_LOGMNR.CONTINUOUS_MINE " +
55-
" + DBMS_LOGMNR.COMMITTED_DATA_ONLY " +
56-
" + DBMS_LOGMNR.STRING_LITERALS_IN_STMT" +
50+
" OPTIONS => SYS.DBMS_LOGMNR.SKIP_CORRUPTION " +
51+
" + SYS.DBMS_LOGMNR.NO_SQL_DELIMITER " +
52+
" + SYS.DBMS_LOGMNR.NO_ROWID_IN_STMT " +
53+
" + SYS.DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG " +
54+
" + SYS.DBMS_LOGMNR.CONTINUOUS_MINE " +
55+
" + SYS.DBMS_LOGMNR.COMMITTED_DATA_ONLY " +
56+
" + SYS.DBMS_LOGMNR.STRING_LITERALS_IN_STMT" +
57+
");" +
58+
"END;";
59+
60+
public final static String SQL_START_LOG_MINER_AUTO_ADD_LOG_10 = "" +
61+
"BEGIN SYS.DBMS_LOGMNR.START_LOGMNR(" +
62+
" STARTSCN => ?," +
63+
" OPTIONS => SYS.DBMS_LOGMNR.SKIP_CORRUPTION " +
64+
" + SYS.DBMS_LOGMNR.NO_SQL_DELIMITER " +
65+
" + SYS.DBMS_LOGMNR.NO_ROWID_IN_STMT " +
66+
" + SYS.DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG " +
67+
" + SYS.DBMS_LOGMNR.CONTINUOUS_MINE " +
68+
" + SYS.DBMS_LOGMNR.COMMITTED_DATA_ONLY " +
5769
");" +
5870
"END;";
5971

@@ -105,20 +117,75 @@ public class SqlUtil {
105117
" ORDER BY\n" +
106118
" first_change#\n" +
107119
" ) LOOP IF st THEN\n" +
108-
" dbms_logmnr.add_logfile(l_log_rec.name, dbms_logmnr.new);\n" +
120+
" SYS.DBMS_LOGMNR.add_logfile(l_log_rec.name, SYS.DBMS_LOGMNR.new);\n" +
109121
" st := false;\n" +
110122
" ELSE\n" +
111-
" dbms_logmnr.add_logfile(l_log_rec.name);\n" +
123+
" SYS.DBMS_LOGMNR.add_logfile(l_log_rec.name);\n" +
112124
" END IF;\n" +
113125
" END LOOP;\n" +
114126
"\n" +
115-
" dbms_logmnr.start_logmnr(" +
127+
" SYS.DBMS_LOGMNR.start_logmnr(" +
116128
" options => " +
117-
" dbms_logmnr.skip_corruption " +
118-
" + dbms_logmnr.no_sql_delimiter " +
119-
" + dbms_logmnr.no_rowid_in_stmt\n" +
120-
" + dbms_logmnr.dict_from_online_catalog " +
121-
" + dbms_logmnr.string_literals_in_stmt" +
129+
" SYS.DBMS_LOGMNR.skip_corruption " +
130+
" + SYS.DBMS_LOGMNR.no_sql_delimiter " +
131+
" + SYS.DBMS_LOGMNR.no_rowid_in_stmt\n" +
132+
" + SYS.DBMS_LOGMNR.dict_from_online_catalog " +
133+
" + SYS.DBMS_LOGMNR.string_literals_in_stmt" +
134+
" );\n" +
135+
"END;";
136+
137+
public final static String SQL_START_LOG_MINER_10 = "" +
138+
"DECLARE\n" +
139+
" st BOOLEAN := true;\n" +
140+
" start_scn NUMBER := ?;\n" +
141+
"BEGIN\n" +
142+
" FOR l_log_rec IN (\n" +
143+
" SELECT\n" +
144+
" MIN(name) name,\n" +
145+
" first_change#\n" +
146+
" FROM\n" +
147+
" (\n" +
148+
" SELECT\n" +
149+
" MIN(member) AS name,\n" +
150+
" first_change#,\n" +
151+
" 281474976710655 AS next_change#\n" +
152+
" FROM\n" +
153+
" v$log l\n" +
154+
" INNER JOIN v$logfile f ON l.group# = f.group#\n" +
155+
" WHERE l.STATUS != 'UNUSED'\n" +
156+
" GROUP BY\n" +
157+
" first_change#\n" +
158+
" UNION\n" +
159+
" SELECT\n" +
160+
" name,\n" +
161+
" first_change#,\n" +
162+
" next_change#\n" +
163+
" FROM\n" +
164+
" v$archived_log\n" +
165+
" WHERE\n" +
166+
" name IS NOT NULL\n" +
167+
" )\n" +
168+
" WHERE\n" +
169+
" first_change# >= start_scn\n" +
170+
" OR start_scn < next_change#\n" +
171+
" GROUP BY\n" +
172+
" first_change#\n" +
173+
" ORDER BY\n" +
174+
" first_change#\n" +
175+
" ) LOOP IF st THEN\n" +
176+
" SYS.DBMS_LOGMNR.add_logfile(l_log_rec.name, SYS.DBMS_LOGMNR.new);\n" +
177+
" st := false;\n" +
178+
" ELSE\n" +
179+
" SYS.DBMS_LOGMNR.add_logfile(l_log_rec.name);\n" +
180+
" END IF;\n" +
181+
" END LOOP;\n" +
182+
"\n" +
183+
" SYS.DBMS_LOGMNR.start_logmnr(" +
184+
" options => " +
185+
" SYS.DBMS_LOGMNR.skip_corruption " +
186+
" + SYS.DBMS_LOGMNR.no_sql_delimiter " +
187+
" + SYS.DBMS_LOGMNR.no_rowid_in_stmt\n" +
188+
" + SYS.DBMS_LOGMNR.dict_from_online_catalog " +
122189
" );\n" +
123190
"END;";
124191

@@ -158,10 +225,45 @@ public class SqlUtil {
158225
"ORDER BY\n" +
159226
" first_change#";
160227

228+
public final static String SQL_QUERY_LOG_FILE_10 =
229+
"SELECT\n" +
230+
" MIN(name) name,\n" +
231+
" first_change#\n" +
232+
"FROM\n" +
233+
" (\n" +
234+
" SELECT\n" +
235+
" MIN(member) AS name,\n" +
236+
" first_change#,\n" +
237+
" 281474976710655 AS next_change#\n" +
238+
" FROM\n" +
239+
" v$log l\n" +
240+
" INNER JOIN v$logfile f ON l.group# = f.group#\n" +
241+
" WHERE l.STATUS != 'UNUSED'\n" +
242+
" GROUP BY\n" +
243+
" first_change#\n" +
244+
" UNION\n" +
245+
" SELECT\n" +
246+
" name,\n" +
247+
" first_change#,\n" +
248+
" next_change#\n" +
249+
" FROM\n" +
250+
" v$archived_log\n" +
251+
" WHERE\n" +
252+
" name IS NOT NULL\n" +
253+
" )\n" +
254+
"WHERE\n" +
255+
" first_change# >= ?\n" +
256+
" OR ? < next_change#\n" +
257+
"GROUP BY\n" +
258+
" first_change#\n" +
259+
"ORDER BY\n" +
260+
" first_change#";
261+
161262
public final static String SQL_SELECT_DATA = "" +
162263
"SELECT\n" +
163264
" scn,\n" +
164-
" commit_scn,\n" +
265+
//oracle 10 没有该字段
266+
// " commit_scn,\n" +
165267
" timestamp,\n" +
166268
" operation,\n" +
167269
" seg_owner,\n" +
@@ -174,7 +276,7 @@ public class SqlUtil {
174276
"WHERE\n" +
175277
" scn > ?";
176278

177-
public final static String SQL_STOP_LOG_MINER = "BEGIN DBMS_LOGMNR.END_LOGMNR; end;";
279+
public final static String SQL_STOP_LOG_MINER = "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR; end;";
178280

179281
public final static String SQL_GET_CURRENT_SCN = "select min(CURRENT_SCN) CURRENT_SCN from gv$database";
180282

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
<module>flinkx-launcher</module>
1616
<!-- <module>flinkx-test</module>-->
17-
<module>flinkx-examples</module>
17+
<!-- <module>flinkx-examples</module>-->
1818
<module>flinkx-stream</module>
1919

2020
<!--******离线******-->

0 commit comments

Comments
 (0)