Skip to content

Commit 122b25c

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_31248' into 1.8_release_3.10.x
2 parents dfa847f + ffa2d51 commit 122b25c

File tree

3 files changed

+48
-26
lines changed

3 files changed

+48
-26
lines changed

flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpHandler.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -143,28 +143,52 @@ public boolean isFileExist(String filePath) {
143143
@Override
144144
public List<String> getFiles(String path) {
145145
List<String> sources = new ArrayList<>();
146-
if(isDirExist(path)) {
146+
if(isFileExist(path)) {
147+
sources.add(path);
148+
return sources;
149+
}else{
150+
path = path + SP;
151+
}
152+
try {
153+
ftpClient.enterLocalPassiveMode();
154+
FTPFile[] ftpFiles = ftpClient.listFiles(new String(path.getBytes(StandardCharsets.UTF_8),FTP.DEFAULT_CONTROL_ENCODING));
155+
if(ftpFiles != null) {
156+
for(FTPFile ftpFile : ftpFiles) {
157+
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
158+
}
159+
}
160+
} catch (IOException e) {
161+
LOG.error("", e);
162+
throw new RuntimeException(e);
163+
}
164+
return sources;
165+
}
166+
167+
/**
168+
* 递归获取指定路径下的所有文件(暂无过滤)
169+
* isDirExist()、isFileExist()方法在Windows系统下判断有误
170+
* @param path
171+
* @param file
172+
* @return
173+
* @throws IOException
174+
*/
175+
private List<String> getFiles(String path, FTPFile file)throws IOException {
176+
List<String> sources = new ArrayList<>();
177+
if(file.isDirectory()){
147178
if(!path.endsWith(SP)) {
148179
path = path + SP;
149180
}
150-
try {
151-
ftpClient.enterLocalPassiveMode();
152-
FTPFile[] ftpFiles = ftpClient.listFiles(new String(path.getBytes(StandardCharsets.UTF_8),FTP.DEFAULT_CONTROL_ENCODING));
153-
if(ftpFiles != null) {
154-
for(FTPFile ftpFile : ftpFiles) {
155-
sources.addAll(getFiles(path + ftpFile.getName()));
156-
}
181+
ftpClient.enterLocalPassiveMode();
182+
FTPFile[] ftpFiles = ftpClient.listFiles(new String(path.getBytes(StandardCharsets.UTF_8),FTP.DEFAULT_CONTROL_ENCODING));
183+
if(ftpFiles != null) {
184+
for(FTPFile ftpFile : ftpFiles) {
185+
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
157186
}
158-
} catch (IOException e) {
159-
LOG.error("", e);
160-
throw new RuntimeException(e);
161187
}
162-
163-
} else if(isFileExist(path)) {
188+
}else{
164189
sources.add(path);
165-
return sources;
166190
}
167-
191+
LOG.info("path = {}, FTPFile is directory = {}", path, file.isDirectory());
168192
return sources;
169193
}
170194

flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@
1919
package com.dtstack.flinkx.ftp.reader;
2020

2121
import com.dtstack.flinkx.constants.ConstantValue;
22-
import com.dtstack.flinkx.ftp.EProtocol;
2322
import com.dtstack.flinkx.ftp.FtpConfig;
24-
import com.dtstack.flinkx.ftp.FtpHandler;
2523
import com.dtstack.flinkx.ftp.FtpHandlerFactory;
2624
import com.dtstack.flinkx.ftp.IFtpHandler;
27-
import com.dtstack.flinkx.ftp.SftpHandler;
2825
import com.dtstack.flinkx.inputformat.BaseRichInputFormat;
2926
import com.dtstack.flinkx.reader.MetaColumn;
27+
import com.dtstack.flinkx.util.GsonUtil;
3028
import com.dtstack.flinkx.util.StringUtil;
3129
import org.apache.flink.core.io.InputSplit;
3230
import org.apache.flink.types.Row;
31+
3332
import java.io.IOException;
3433
import java.util.ArrayList;
3534
import java.util.List;
@@ -77,7 +76,7 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception
7776
files.addAll(ftpHandler.getFiles(p.trim()));
7877
}
7978
}
80-
79+
LOG.info("FTP files = {}", GsonUtil.GSON.toJson(files));
8180
int numSplits = (Math.min(files.size(), minNumSplits));
8281
FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits];
8382
for(int index = 0; index < numSplits; ++index) {

flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormat.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flinkx.ftp.writer;
2020

21+
import com.dtstack.flinkx.constants.ConstantValue;
2122
import com.dtstack.flinkx.exception.WriteRecordException;
2223
import com.dtstack.flinkx.ftp.FtpConfig;
2324
import com.dtstack.flinkx.ftp.FtpHandlerFactory;
@@ -31,7 +32,7 @@
3132

3233
import java.io.IOException;
3334
import java.io.OutputStream;
34-
import java.util.Arrays;
35+
import java.util.Collections;
3536
import java.util.List;
3637
import java.util.function.Predicate;
3738

@@ -56,8 +57,6 @@ public class FtpOutputFormat extends BaseFileOutputFormat {
5657

5758
private transient OutputStream os;
5859

59-
private static final String DOT = ".";
60-
6160
private static final String FILE_SUFFIX = ".csv";
6261

6362
private static final String OVERWRITE_MODE = "overwrite";
@@ -133,7 +132,7 @@ protected void nextBlock(){
133132

134133
@Override
135134
public void moveTemporaryDataBlockFileToDirectory(){
136-
if (currentBlockFileName == null || !currentBlockFileName.startsWith(DOT)){
135+
if (currentBlockFileName == null || !currentBlockFileName.startsWith(ConstantValue.POINT_SYMBOL)){
137136
return;
138137
}
139138

@@ -242,9 +241,9 @@ protected void waitForAllTasksToFinish(){
242241

243242
@Override
244243
protected void coverageData(){
245-
boolean cleanPath = restoreConfig.isRestore() && OVERWRITE_MODE.equalsIgnoreCase(ftpConfig.getWriteMode()) && !SP.equals(ftpConfig.getPath());
244+
boolean cleanPath = OVERWRITE_MODE.equalsIgnoreCase(ftpConfig.getWriteMode()) && !SP.equals(ftpConfig.getPath());
246245
if(cleanPath){
247-
ftpHandler.deleteAllFilesInDir(ftpConfig.getPath(), Arrays.asList(tmpPath));
246+
ftpHandler.deleteAllFilesInDir(ftpConfig.getPath(), Collections.singletonList(tmpPath));
248247
}
249248
}
250249

@@ -271,7 +270,7 @@ protected void moveAllTemporaryDataFileToDirectory() throws IOException {
271270
List<String> files = ftpHandler.getFiles(tmpPath);
272271
for (String file : files) {
273272
String fileName = file.substring(file.lastIndexOf(SP) + 1);
274-
if (fileName.endsWith(FILE_SUFFIX) && !fileName.startsWith(DOT)){
273+
if (fileName.endsWith(FILE_SUFFIX) && !fileName.startsWith(ConstantValue.POINT_SYMBOL)){
275274
String newPath = ftpConfig.getPath() + SP + fileName;
276275
LOG.info("Move file {} to path {}", file, newPath);
277276
ftpHandler.rename(file, newPath);

0 commit comments

Comments
 (0)