Skip to content

Commit a75a24d

Browse files
authored
Merge pull request #243 from sxci/crc32
分片上传,每一片添加 crc32 校验
2 parents b5e1f35 + aa8a903 commit a75a24d

File tree

3 files changed

+67
-24
lines changed

3 files changed

+67
-24
lines changed

src/main/java/com/qiniu/storage/ResumeUploader.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.qiniu.http.Client;
77
import com.qiniu.http.Response;
88
import com.qiniu.storage.model.ResumeBlockInfo;
9+
import com.qiniu.util.Crc32;
910
import com.qiniu.util.StringMap;
1011
import com.qiniu.util.StringUtils;
1112
import com.qiniu.util.UrlSafeBase64;
@@ -45,6 +46,7 @@ public final class ResumeUploader {
4546
private final RecordHelper helper;
4647
private FileInputStream file;
4748
private String host;
49+
private int retryMax;
4850

4951
ResumeUploader(Client client, String upToken, String key, File file,
5052
StringMap params, String mime, Recorder recorder, Configuration configuration) {
@@ -62,6 +64,7 @@ public final class ResumeUploader {
6264
this.recorder = recorder;
6365
this.modifyTime = f.lastModified();
6466
helper = new RecordHelper();
67+
retryMax = configuration.retryMax;
6568
}
6669

6770
public Response upload() throws QiniuException {
@@ -91,8 +94,9 @@ public Response upload() throws QiniuException {
9194
throw new QiniuException(e);
9295
}
9396

94-
// long crc = Crc32.bytes(blockBuffer, 0, blockSize);
97+
long crc = Crc32.bytes(blockBuffer, 0, blockSize);
9598
Response response = null;
99+
QiniuException temp = null;
96100
try {
97101
response = makeBlock(blockBuffer, blockSize);
98102
} catch (QiniuException e) {
@@ -101,25 +105,38 @@ public Response upload() throws QiniuException {
101105
}
102106
if (e.response == null || e.response.needRetry()) {
103107
retry = true;
108+
temp = e;
104109
} else {
105110
close();
106111
throw e;
107112
}
108113
}
114+
115+
if (!retry) {
116+
ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class);
117+
if (blockInfo0.crc32 != crc) {
118+
retry = true;
119+
temp = new QiniuException(new Exception("block's crc32 is not match"));
120+
}
121+
}
122+
109123
if (retry) {
110-
try {
111-
response = makeBlock(blockBuffer, blockSize);
112-
retry = false;
113-
} catch (QiniuException e) {
124+
if (retryMax > 0) {
125+
retryMax--;
126+
try {
127+
response = makeBlock(blockBuffer, blockSize);
128+
retry = false;
129+
} catch (QiniuException e) {
130+
close();
131+
throw e;
132+
}
133+
} else {
114134
close();
115-
throw e;
135+
throw temp;
116136
}
117-
118137
}
119-
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
120-
//TODO check return crc32
121-
// if blockInfo.crc32 != crc{}
122138

139+
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
123140
contexts[contextIndex++] = blockInfo.ctx;
124141
uploaded += blockSize;
125142
helper.record(uploaded);
@@ -147,7 +164,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException {
147164
private void close() {
148165
try {
149166
file.close();
150-
} catch (IOException e) {
167+
} catch (Exception e) {
151168
e.printStackTrace();
152169
}
153170
}

src/main/java/com/qiniu/storage/StreamUploader.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.qiniu.http.Client;
66
import com.qiniu.http.Response;
77
import com.qiniu.storage.model.ResumeBlockInfo;
8+
import com.qiniu.util.Crc32;
89
import com.qiniu.util.StringMap;
910
import com.qiniu.util.StringUtils;
1011
import com.qiniu.util.UrlSafeBase64;
@@ -28,6 +29,7 @@ public final class StreamUploader {
2829
private final InputStream stream;
2930
private long size;
3031
private String host;
32+
private int retryMax;
3133

3234
StreamUploader(Client client, String upToken, String key, InputStream stream,
3335
StringMap params, String mime, Configuration configuration) {
@@ -40,6 +42,7 @@ public final class StreamUploader {
4042
this.contexts = new ArrayList<>();
4143
this.blockBuffer = new byte[Constants.BLOCK_SIZE];
4244
this.stream = stream;
45+
retryMax = configuration.retryMax;
4346
}
4447

4548
public Response upload() throws QiniuException {
@@ -54,9 +57,11 @@ public Response upload() throws QiniuException {
5457

5558
while (size == 0) {
5659
int bufferIndex = 0;
60+
int blockSize = 0;
5761
while (ret != -1 && bufferIndex != blockBuffer.length) {
5862
try {
59-
ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex);
63+
blockSize = blockBuffer.length - bufferIndex;
64+
ret = stream.read(blockBuffer, bufferIndex, blockSize);
6065
} catch (IOException e) {
6166
close();
6267
throw new QiniuException(e);
@@ -75,7 +80,9 @@ public Response upload() throws QiniuException {
7580
}
7681
}
7782

83+
long crc = Crc32.bytes(blockBuffer, 0, blockSize);
7884
Response response = null;
85+
QiniuException temp = null;
7986
try {
8087
response = makeBlock(blockBuffer, bufferIndex);
8188
} catch (QiniuException e) {
@@ -84,24 +91,34 @@ public Response upload() throws QiniuException {
8491
}
8592
if (e.response == null || e.response.needRetry()) {
8693
retry = true;
94+
temp = e;
8795
} else {
8896
close();
8997
throw e;
9098
}
9199
}
100+
if (!retry) {
101+
ResumeBlockInfo blockInfo0 = response.jsonToObject(ResumeBlockInfo.class);
102+
if (blockInfo0.crc32 != crc) {
103+
retry = true;
104+
temp = new QiniuException(new Exception("block's crc32 is not match"));
105+
}
106+
}
92107
if (retry) {
93-
try {
94-
response = makeBlock(blockBuffer, bufferIndex);
95-
retry = false;
96-
} catch (QiniuException e) {
97-
close();
98-
throw e;
108+
if (retryMax > 0) {
109+
retryMax--;
110+
try {
111+
response = makeBlock(blockBuffer, bufferIndex);
112+
retry = false;
113+
} catch (QiniuException e) {
114+
close();
115+
throw e;
116+
}
117+
} else {
118+
throw temp;
99119
}
100-
101120
}
102121
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
103-
//TODO check return crc32
104-
// if blockInfo.crc32 != crc{}
105122
contexts.add(blockInfo.ctx);
106123
uploaded += bufferIndex;
107124
}
@@ -126,7 +143,7 @@ private Response makeBlock(byte[] block, int blockSize) throws QiniuException {
126143
private void close() {
127144
try {
128145
stream.close();
129-
} catch (IOException e) {
146+
} catch (Exception e) {
130147
e.printStackTrace();
131148
}
132149
}

src/test/java/com/qiniu/streaming/StreamingTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,19 @@
1212
import static org.junit.Assert.*;
1313

1414
/**
15-
* Created by bailong on 16/9/22.
15+
* Created by bailong on 16/9/22
1616
*/
1717
public class StreamingTest {
18-
private Auth auth = TestConfig.testAuth;
18+
private Auth auth = null;
19+
20+
{
21+
try {
22+
auth = Auth.create(System.getenv("ak"), System.getenv("sk"));
23+
} catch (Exception e) {
24+
auth = TestConfig.testAuth;
25+
}
26+
}
27+
1928
private String hub = "pilisdktest";
2029
private String streamKeyPrefix = "pilijava" + System.currentTimeMillis();
2130
private StreamingManager manager = new StreamingManager(auth, hub);

0 commit comments

Comments
 (0)