|
21 | 21 | import com.dtstack.flinkx.constants.ConstantValue; |
22 | 22 | import com.dtstack.flinkx.metadata.MetaDataCons; |
23 | 23 | import com.dtstack.flinkx.metadata.inputformat.BaseMetadataInputFormat; |
| 24 | +import com.dtstack.flinkx.metadata.inputformat.MetadataInputSplit; |
24 | 25 | import com.dtstack.flinkx.metadatasqlserver.constants.SqlServerMetadataCons; |
25 | 26 | import com.dtstack.flinkx.util.ExceptionUtil; |
| 27 | +import org.apache.commons.collections.CollectionUtils; |
26 | 28 | import org.apache.commons.lang.StringUtils; |
27 | 29 | import org.apache.commons.lang3.tuple.Pair; |
| 30 | +import org.apache.flink.core.io.InputSplit; |
28 | 31 | import org.apache.flink.types.Row; |
29 | 32 |
|
30 | 33 | import java.io.IOException; |
@@ -52,6 +55,34 @@ public class MetadatasqlserverInputFormat extends BaseMetadataInputFormat { |
52 | 55 |
|
53 | 56 | protected String table; |
54 | 57 |
|
| 58 | + /** |
| 59 | + * 在use database失败时,不影响下一个任务 |
| 60 | + * @param inputSplit 分片 |
| 61 | + * @throws IOException 异常 |
| 62 | + */ |
| 63 | + @Override |
| 64 | + protected void openInternal(InputSplit inputSplit) throws IOException { |
| 65 | + LOG.info("inputSplit = {}", inputSplit); |
| 66 | + try { |
| 67 | + connection.set(getConnection()); |
| 68 | + statement.set(connection.get().createStatement()); |
| 69 | + currentDb.set(((MetadataInputSplit) inputSplit).getDbName()); |
| 70 | + tableList = ((MetadataInputSplit) inputSplit).getTableList(); |
| 71 | + switchDatabase(currentDb.get()); |
| 72 | + if (CollectionUtils.isEmpty(tableList)) { |
| 73 | + tableList = showTables(); |
| 74 | + queryTable = true; |
| 75 | + } |
| 76 | + } catch (ClassNotFoundException e) { |
| 77 | + LOG.error("could not find suitable driver, e={}", ExceptionUtil.getErrorMessage(e)); |
| 78 | + throw new IOException(e); |
| 79 | + } catch (SQLException e){ |
| 80 | + LOG.error("获取table列表异常, dbUrl = {}, username = {}, inputSplit = {}, e = {}", dbUrl, username, inputSplit, ExceptionUtil.getErrorMessage(e)); |
| 81 | + tableList = new LinkedList<>(); |
| 82 | + } |
| 83 | + tableIterator.set(tableList.iterator()); |
| 84 | + } |
| 85 | + |
55 | 86 | @Override |
56 | 87 | protected List<Object> showTables() throws SQLException { |
57 | 88 | List<Object> tableNameList = new LinkedList<>(); |
|
0 commit comments