Skip to content

Commit 9edf839

Browse files
authored
fix bug: add check file isUploaded, add table for file transport status (#631)
1 parent 9fd032e commit 9edf839

File tree

37 files changed

+501
-209
lines changed

37 files changed

+501
-209
lines changed

weevent-broker/src/main/java/com/webank/weevent/broker/config/WeEventConfig.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,4 @@ public class WeEventConfig {
3737

3838
@Value("${mqtt.broker.keepalive:60}")
3939
private Integer keepAlive;
40-
41-
@Value("${file.path:./logs/file}")
42-
private String filePath;
43-
44-
@Value("${file.chunk.size:1048576}")
45-
private int fileChunkSize;
4640
}

weevent-broker/src/main/resources/weevent.properties

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,4 @@ block.chain.type=fisco
88
stomp.heartbeats=30
99
#mqtt broker
1010
#mqtt.broker.tcp.port=7001
11-
mqtt.broker.keepalive=60
12-
#upload/download path
13-
file.chunk.path=~/file
14-
#chunk size default 1MB
15-
file.chunk.size=1048576
11+
mqtt.broker.keepalive=60

weevent-client/src/main/java/com/webank/weevent/client/SendResult.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,4 @@ public SendResult(SendResultStatus status) {
4848
this.status = status;
4949
}
5050

51-
public SendResult(SendResultStatus status, String topic, String eventId) {
52-
this.status = status;
53-
this.topic = topic;
54-
this.eventId = eventId;
55-
}
5651
}

weevent-file/src/main/java/com/webank/weevent/file/IWeEventFileClient.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ static IWeEventFileClient build(String groupId, String filePath, FtpInfo ftpInfo
4545
* @param topic topic name
4646
* @param publicPem public pem path string
4747
* @throws BrokerException broker exception
48-
* @throws BrokerException BrokerException
48+
* @throws IOException IOException
4949
*/
5050
void openTransport4Sender(String topic, String publicPem) throws BrokerException, IOException;
5151

@@ -59,7 +59,6 @@ static IWeEventFileClient build(String groupId, String filePath, FtpInfo ftpInfo
5959
* @return send result, SendResult.SUCCESS if success, and return SendResult.eventId
6060
* @throws BrokerException broker exception
6161
* @throws IOException IOException
62-
* @throws BrokerException BrokerException
6362
*/
6463
FileChunksMeta publishFile(String topic, String localFile, boolean overwrite) throws BrokerException, IOException;
6564

@@ -172,5 +171,14 @@ interface FileListener {
172171
*/
173172
void genPemFile(String filePath) throws BrokerException;
174173

174+
/**
175+
* Check if the receiver end has a file.
176+
*
177+
* @param fileName file name
178+
* @param topic topic name
179+
* @param groupId group id
180+
* @return is file exist
181+
* @throws BrokerException BrokerException
182+
*/
175183
boolean isFileExist(String fileName, String topic, String groupId) throws BrokerException;
176184
}

weevent-file/src/main/java/com/webank/weevent/file/service/WeEventFileClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,6 @@ public void genPemFile(String filePath) throws BrokerException {
365365
PemFile privatePemFile = new PemFile(pair.getPrivate(), PRIVATE_KEY_DESC);
366366
PemFile publicPemFile = new PemFile(pair.getPublic(), PUBLIC_KEY_DESC);
367367

368-
369-
System.out.println(filePath + PATH_SEPARATOR + account + PRIVATE_KEY_SUFFIX);
370368
privatePemFile.write(filePath + PATH_SEPARATOR + account + PRIVATE_KEY_SUFFIX);
371369
publicPemFile.write(filePath + PATH_SEPARATOR + account + PUBLIC_KEY_SUFFIX);
372370
} catch (IOException | NoSuchProviderException | NoSuchAlgorithmException | InvalidAlgorithmParameterException e) {
@@ -416,7 +414,7 @@ public interface EventListener {
416414
void onException(Throwable e);
417415
}
418416

419-
static class FileEventListener implements EventListener{
417+
static class FileEventListener implements EventListener {
420418
private final String receivePath;
421419
private final FtpInfo ftpInfo;
422420
private final FileListener fileListener;

weevent-governance/src/main/java/com/webank/weevent/governance/common/ConstantProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,11 @@ public class ConstantProperties {
6363
public final static int UPLOAD_CHUNK_FAIL_RETRY_COUNT = 5;
6464
public final static long WAIT1S = 1000;
6565

66+
67+
// file transport status
68+
public final static String UPLOADING = "0";
69+
public final static String SUCCESS = "1";
70+
public final static String FAILED = "2";
71+
72+
6673
}

weevent-governance/src/main/java/com/webank/weevent/governance/common/ErrorCode.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ public enum ErrorCode {
2929
FILE_CHUNK_INDEX_ILLEGAL(102007, "the upload file chunk idx is illegal"),
3030
FILE_CHUNK_DATA_IS_NULL(102008, "the upload file chunk data is null"),
3131
FILE_UPLOAD_FAILED(102009, "file upload failed"),
32+
FILE_NOT_EXIST(1020010, "file not exist"),
3233
FILE_DOWNLOAD_ERROR(102011, "file download failed"),
3334
GENERATE_PEM_FAILED(102012, "generate pem key failed"),
35+
CHECK_FILE_IS_UPLOADED_ERROR(102013, "check file is uploaded error"),
3436

3537
PARSE_CHUNK_REQUEST_ERROR(102020, "parse chunk request error"),
3638
;

weevent-governance/src/main/java/com/webank/weevent/governance/controller/FileController.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import com.webank.weevent.governance.common.GovernanceException;
1515
import com.webank.weevent.governance.common.GovernanceResult;
16-
import com.webank.weevent.governance.entity.FileTransportEntity;
16+
import com.webank.weevent.governance.entity.FileTransportChannelEntity;
1717
import com.webank.weevent.governance.service.FileService;
1818
import com.webank.weevent.governance.utils.ParamCheckUtils;
1919

@@ -52,7 +52,7 @@ public void setFileService(FileService fileService) {
5252

5353
@PostMapping(path = "/openTransport")
5454
@ResponseBody
55-
public GovernanceResult openTransport(@RequestBody FileTransportEntity fileTransport) throws GovernanceException {
55+
public GovernanceResult openTransport(@RequestBody FileTransportChannelEntity fileTransport) throws GovernanceException {
5656
log.info("openTransport, fileTransport:{}.", fileTransport.toString());
5757

5858
return this.fileService.openTransport(fileTransport);
@@ -80,20 +80,19 @@ public GovernanceResult prepareUploadFile(@RequestParam(name = "groupId") String
8080
}
8181

8282
@RequestMapping(path = "/download")
83-
public void download(@RequestParam(name = "groupId") String groupId,
84-
@RequestParam(name = "brokerId") Integer brokerId,
85-
@RequestParam(name = "fileId") String fileId,
83+
public void download(@RequestParam(name = "topic") String topic,
84+
@RequestParam(name = "fileName") String fileName,
8685
HttpServletResponse response) throws GovernanceException {
87-
log.info("download file, groupId:{}, brokerId:{}, fileId:{}.", groupId, brokerId, fileId);
86+
log.info("download file, topic:{}, fileName:{}.", topic, fileName);
8887
response.setHeader("content-type", "application/octet-stream");
8988
response.setContentType("application/octet-stream; charset=UTF-8");
9089

91-
ParamCheckUtils.validateFileId(fileId);
92-
String downloadFile = this.fileService.downloadFile(groupId, brokerId, fileId);
90+
ParamCheckUtils.validateFileName(fileName);
91+
String downloadFile = this.fileService.downloadFile(topic, fileName);
9392
if (StringUtils.isBlank(downloadFile)) {
9493
throw new GovernanceException("download file not exist");
9594
}
96-
String fileName = downloadFile.substring(downloadFile.lastIndexOf("/") + 1);
95+
9796
try {
9897
response.setHeader("filename", URLEncoder.encode(fileName, StandardCharsets.UTF_8.toString()));
9998
} catch (UnsupportedEncodingException e) {
@@ -110,9 +109,9 @@ public void download(@RequestParam(name = "groupId") String groupId,
110109
os.flush();
111110
i = bis.read(buffer);
112111
}
113-
log.info("download file success, fileId:{}, fileName:{}", fileId, fileName);
112+
log.info("download file success, topic:{}, fileName:{}", topic, fileName);
114113
} catch (IOException e) {
115-
log.error("download file error, groupId:{} fileId:{}", groupId, fileId, e);
114+
log.error("download file error, topic:{} fileName:{}", topic, fileName, e);
116115
throw new GovernanceException("download file error", e);
117116
}
118117
}
@@ -126,14 +125,22 @@ public GovernanceResult listFile(@RequestParam(name = "groupId") String groupId,
126125
return this.fileService.listFile(groupId, brokerId, topicName);
127126
}
128127

129-
@RequestMapping(path = "/status")
128+
@RequestMapping(path = "/downLoadStatus")
129+
@ResponseBody
130+
public GovernanceResult downLoadStatus(@RequestParam(name = "groupId") String groupId,
131+
@RequestParam(name = "brokerId") Integer brokerId,
132+
@RequestParam(name = "topicName") String topicName) throws GovernanceException {
133+
log.info("status, groupId:{}, topic:{}.", groupId, topicName);
134+
return this.fileService.downLoadStatus(groupId, brokerId, topicName);
135+
}
136+
137+
@RequestMapping(path = "/uploadStatus")
130138
@ResponseBody
131-
public GovernanceResult status(@RequestParam(name = "groupId") String groupId,
132-
@RequestParam(name = "brokerId") Integer brokerId,
133-
@RequestParam(name = "topicName") String topicName,
134-
@RequestParam(name = "role") String role) throws GovernanceException {
135-
log.info("status, groupId:{}, topic:{}, role:{}.", groupId, topicName, role);
136-
return this.fileService.status(groupId, brokerId, topicName, role);
139+
public GovernanceResult uploadStatus(@RequestParam(name = "groupId") String groupId,
140+
@RequestParam(name = "brokerId") Integer brokerId,
141+
@RequestParam(name = "topicName") String topicName) throws GovernanceException {
142+
log.info("status, groupId:{}, topic:{}.", groupId, topicName);
143+
return this.fileService.uploadStatus(groupId, brokerId, topicName);
137144
}
138145

139146
@RequestMapping(path = "/listTransport")
@@ -146,7 +153,7 @@ public GovernanceResult listTransport(@RequestParam(name = "groupId") String gro
146153

147154
@PostMapping(path = "/closeTransport")
148155
@ResponseBody
149-
public GovernanceResult closeTransport(@RequestBody FileTransportEntity fileTransport) throws GovernanceException {
156+
public GovernanceResult closeTransport(@RequestBody FileTransportChannelEntity fileTransport) throws GovernanceException {
150157
log.info("closeTransport, groupId:{}, brokerId:{}, transportId:{}, roleId:{}, topic:{}.", fileTransport.getGroupId(),
151158
fileTransport.getBrokerId(), fileTransport.getId(), fileTransport.getRole(), fileTransport.getTopicName());
152159
return fileService.closeTransport(fileTransport);
@@ -160,4 +167,14 @@ public void genPemFile(@RequestParam(name = "groupId") String groupId,
160167
this.fileService.genPemFile(groupId, brokerId, filePath);
161168
}
162169

170+
@RequestMapping(path = "/checkUploaded")
171+
@ResponseBody
172+
public GovernanceResult checkFileIsUploaded(@RequestParam(name = "groupId") String groupId,
173+
@RequestParam(name = "brokerId") Integer brokerId,
174+
@RequestParam(name = "topicName") String topicName,
175+
@RequestParam(name = "fileName") String fileName) throws GovernanceException {
176+
log.info("checkFileIsUploaded, groupId:{}, topic:{}, fileName:{}.", groupId, topicName, fileName);
177+
return this.fileService.checkFileIsUploaded(groupId, brokerId, topicName, fileName);
178+
}
179+
163180
}

weevent-governance/src/main/java/com/webank/weevent/governance/entity/FileTransportEntity.java renamed to weevent-governance/src/main/java/com/webank/weevent/governance/entity/FileTransportChannelEntity.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import lombok.Setter;
1414

1515
/**
16-
* FileTransportEntity class.
16+
* FileTransportChannelEntity class.
1717
*
1818
* @author v_wbhwliu
1919
* @version 1.3
@@ -23,10 +23,10 @@
2323
@Getter
2424
@EqualsAndHashCode(callSuper = false)
2525
@Entity
26-
@Table(name = "t_file_transport",
26+
@Table(name = "t_file_transport_channel",
2727
uniqueConstraints = {@UniqueConstraint(name = "topicBrokerGroupDelete",
2828
columnNames = {"topic_name", "broker_id", "group_id"})})
29-
public class FileTransportEntity extends TopicBase {
29+
public class FileTransportChannelEntity extends TopicBase {
3030

3131
@Column(name = "role", columnDefinition = "varchar(1)")
3232
private String role;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.webank.weevent.governance.entity;
2+
3+
import javax.persistence.Column;
4+
import javax.persistence.Entity;
5+
import javax.persistence.Table;
6+
import javax.persistence.Transient;
7+
import javax.persistence.UniqueConstraint;
8+
9+
import com.webank.weevent.governance.entity.base.TopicBase;
10+
11+
import lombok.EqualsAndHashCode;
12+
import lombok.Getter;
13+
import lombok.Setter;
14+
15+
/**
16+
* FileTransportStatusEntity class.
17+
*
18+
* @author v_wbhwliu
19+
* @version 1.3
20+
* @since 2020/5/20
21+
*/
22+
@Setter
23+
@Getter
24+
@EqualsAndHashCode(callSuper = false)
25+
@Entity
26+
@Table(name = "t_file_transport_status",
27+
uniqueConstraints = {@UniqueConstraint(name = "topicBrokerGroupFileName",
28+
columnNames = {"broker_id", "group_id", "topic_name", "file_name"})})
29+
public class FileTransportStatusEntity extends TopicBase {
30+
31+
@Column(name = "file_name", columnDefinition = "varchar(256)")
32+
private String fileName;
33+
34+
@Column(name = "transport_status", columnDefinition = "varchar(1)")
35+
private String status;
36+
37+
@Column(name = "file_md5", columnDefinition = "varchar(32)")
38+
private String fileMD5;
39+
40+
@Column(name = "file_size")
41+
private Long fileSize;
42+
43+
// cost time in second
44+
@Transient
45+
private String time;
46+
// sender ready chunk
47+
@Transient
48+
private int readyChunk;
49+
// processing
50+
@Transient
51+
private String process;
52+
// speed in Byte/s
53+
@Transient
54+
private String speed;
55+
56+
// @Transient
57+
// private FileChunksMetaStatus fileChunksMetaStatus;
58+
59+
60+
}

0 commit comments

Comments
 (0)