Skip to content

Commit e4b6311

Browse files
committed
【解决Windows系统下FTP模式读取不到文件bug】【31248】
1 parent 0da34b6 commit e4b6311

File tree

3 files changed

+38
-12
lines changed

3 files changed

+38
-12
lines changed

flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpHandler.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public List<String> getFiles(String path) {
152152
FTPFile[] ftpFiles = ftpClient.listFiles(new String(path.getBytes(StandardCharsets.UTF_8),FTP.DEFAULT_CONTROL_ENCODING));
153153
if(ftpFiles != null) {
154154
for(FTPFile ftpFile : ftpFiles) {
155-
sources.addAll(getFiles(path + ftpFile.getName()));
155+
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
156156
}
157157
}
158158
} catch (IOException e) {
@@ -168,6 +168,34 @@ public List<String> getFiles(String path) {
168168
return sources;
169169
}
170170

171+
/**
172+
* 递归获取指定路径下的所有文件(暂无过滤)
173+
* isDirExist()、isFileExist()方法在Windows系统下判断有误
174+
* @param path
175+
* @param file
176+
* @return
177+
* @throws IOException
178+
*/
179+
private List<String> getFiles(String path, FTPFile file)throws IOException {
180+
List<String> sources = new ArrayList<>();
181+
if(file.isDirectory()){
182+
if(!path.endsWith(SP)) {
183+
path = path + SP;
184+
}
185+
ftpClient.enterLocalPassiveMode();
186+
FTPFile[] ftpFiles = ftpClient.listFiles(new String(path.getBytes(StandardCharsets.UTF_8),FTP.DEFAULT_CONTROL_ENCODING));
187+
if(ftpFiles != null) {
188+
for(FTPFile ftpFile : ftpFiles) {
189+
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
190+
}
191+
}
192+
}else{
193+
sources.add(path);
194+
}
195+
LOG.info("path = {}, FTPFile is directory = {}", path, file.isDirectory());
196+
return sources;
197+
}
198+
171199
@Override
172200
public void mkDirRecursive(String directoryPath){
173201
StringBuilder dirPath = new StringBuilder();

flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@
1919
package com.dtstack.flinkx.ftp.reader;
2020

2121
import com.dtstack.flinkx.constants.ConstantValue;
22-
import com.dtstack.flinkx.ftp.EProtocol;
2322
import com.dtstack.flinkx.ftp.FtpConfig;
24-
import com.dtstack.flinkx.ftp.FtpHandler;
2523
import com.dtstack.flinkx.ftp.FtpHandlerFactory;
2624
import com.dtstack.flinkx.ftp.IFtpHandler;
27-
import com.dtstack.flinkx.ftp.SftpHandler;
2825
import com.dtstack.flinkx.inputformat.BaseRichInputFormat;
2926
import com.dtstack.flinkx.reader.MetaColumn;
27+
import com.dtstack.flinkx.util.GsonUtil;
3028
import com.dtstack.flinkx.util.StringUtil;
3129
import org.apache.flink.core.io.InputSplit;
3230
import org.apache.flink.types.Row;
31+
3332
import java.io.IOException;
3433
import java.util.ArrayList;
3534
import java.util.List;
@@ -77,7 +76,7 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception
7776
files.addAll(ftpHandler.getFiles(p.trim()));
7877
}
7978
}
80-
79+
LOG.info("FTP files = {}", GsonUtil.GSON.toJson(files));
8180
int numSplits = (Math.min(files.size(), minNumSplits));
8281
FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits];
8382
for(int index = 0; index < numSplits; ++index) {

flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormat.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flinkx.ftp.writer;
2020

21+
import com.dtstack.flinkx.constants.ConstantValue;
2122
import com.dtstack.flinkx.exception.WriteRecordException;
2223
import com.dtstack.flinkx.ftp.FtpConfig;
2324
import com.dtstack.flinkx.ftp.FtpHandlerFactory;
@@ -31,7 +32,7 @@
3132

3233
import java.io.IOException;
3334
import java.io.OutputStream;
34-
import java.util.Arrays;
35+
import java.util.Collections;
3536
import java.util.List;
3637
import java.util.function.Predicate;
3738

@@ -56,8 +57,6 @@ public class FtpOutputFormat extends BaseFileOutputFormat {
5657

5758
private transient OutputStream os;
5859

59-
private static final String DOT = ".";
60-
6160
private static final String FILE_SUFFIX = ".csv";
6261

6362
private static final String OVERWRITE_MODE = "overwrite";
@@ -133,7 +132,7 @@ protected void nextBlock(){
133132

134133
@Override
135134
public void moveTemporaryDataBlockFileToDirectory(){
136-
if (currentBlockFileName == null || !currentBlockFileName.startsWith(DOT)){
135+
if (currentBlockFileName == null || !currentBlockFileName.startsWith(ConstantValue.POINT_SYMBOL)){
137136
return;
138137
}
139138

@@ -242,9 +241,9 @@ protected void waitForAllTasksToFinish(){
242241

243242
@Override
244243
protected void coverageData(){
245-
boolean cleanPath = restoreConfig.isRestore() && OVERWRITE_MODE.equalsIgnoreCase(ftpConfig.getWriteMode()) && !SP.equals(ftpConfig.getPath());
244+
boolean cleanPath = OVERWRITE_MODE.equalsIgnoreCase(ftpConfig.getWriteMode()) && !SP.equals(ftpConfig.getPath());
246245
if(cleanPath){
247-
ftpHandler.deleteAllFilesInDir(ftpConfig.getPath(), Arrays.asList(tmpPath));
246+
ftpHandler.deleteAllFilesInDir(ftpConfig.getPath(), Collections.singletonList(tmpPath));
248247
}
249248
}
250249

@@ -271,7 +270,7 @@ protected void moveAllTemporaryDataFileToDirectory() throws IOException {
271270
List<String> files = ftpHandler.getFiles(tmpPath);
272271
for (String file : files) {
273272
String fileName = file.substring(file.lastIndexOf(SP) + 1);
274-
if (fileName.endsWith(FILE_SUFFIX) && !fileName.startsWith(DOT)){
273+
if (fileName.endsWith(FILE_SUFFIX) && !fileName.startsWith(ConstantValue.POINT_SYMBOL)){
275274
String newPath = ftpConfig.getPath() + SP + fileName;
276275
LOG.info("Move file {} to path {}", file, newPath);
277276
ftpHandler.rename(file, newPath);

0 commit comments

Comments
 (0)