Skip to content

Commit 46d90f7

Browse files
committed
Merge pull request #103 from simon-liubin/feature/resume
分片上传,断点续传
2 parents 360a1c4 + 1d2f5a0 commit 46d90f7

File tree

13 files changed

+1185
-23
lines changed

13 files changed

+1185
-23
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.qiniu.api.resumableio;
2+
3+
import org.json.JSONException;
4+
import org.json.JSONObject;
5+
6+
import com.qiniu.api.net.CallRet;
7+
8+
public class ChunkUploadCallRet extends CallRet {
9+
protected String ctx;
10+
protected String checksum;
11+
protected int offset;
12+
protected String host;
13+
protected long crc32;
14+
15+
public ChunkUploadCallRet(CallRet ret) {
16+
super(ret);
17+
doUnmarshal();
18+
}
19+
20+
public ChunkUploadCallRet(int statusCode, String response) {
21+
super(statusCode, response);
22+
doUnmarshal();
23+
}
24+
25+
public ChunkUploadCallRet(int statusCode, Exception e) {
26+
super(statusCode, e);
27+
}
28+
29+
private void doUnmarshal() {
30+
if (this.exception != null || this.response == null
31+
|| !this.response.trim().startsWith("{")) {
32+
return;
33+
}
34+
try {
35+
unmarshal();
36+
} catch (Exception e) {
37+
e.printStackTrace();
38+
if (this.exception == null) {
39+
this.exception = e;
40+
}
41+
}
42+
43+
}
44+
45+
protected void unmarshal() throws JSONException{
46+
JSONObject jsonObject = new JSONObject(this.response);
47+
ctx = jsonObject.optString("ctx", null);
48+
checksum = jsonObject.optString("checksum", null);
49+
offset = jsonObject.optInt("offset", 0);
50+
host = jsonObject.optString("host", null);
51+
crc32 = jsonObject.optLong("crc32", 0);
52+
}
53+
54+
public String getCtx() {
55+
return ctx;
56+
}
57+
58+
public String getChecksum() {
59+
return checksum;
60+
}
61+
62+
public long getOffset() {
63+
return offset;
64+
}
65+
66+
public String getHost() {
67+
return host;
68+
}
69+
70+
public long getCrc32() {
71+
return crc32;
72+
}
73+
74+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.qiniu.api.resumableio;
2+
3+
import java.io.File;
4+
import java.io.FileNotFoundException;
5+
import java.io.IOException;
6+
import java.io.RandomAccessFile;
7+
import java.util.concurrent.locks.Lock;
8+
import java.util.concurrent.locks.ReentrantLock;
9+
10+
public class RandomAccessFileUpload extends SliceUpload {
11+
protected RandomAccessFile file;
12+
private final Lock fileUploadLock;
13+
14+
// / 断点续传记录实例
15+
private String resumeKey;
16+
private int currentBlockIdx = 0;
17+
18+
public RandomAccessFileUpload(File file, String token,
19+
String key, String mimeType) {
20+
super(token, key, mimeType);
21+
try {
22+
this.contentLength = file.length();
23+
24+
this.file = new RandomAccessFile(file, "r");
25+
fileUploadLock = new ReentrantLock();
26+
resumeKey = key;
27+
if(resumeKey == null){
28+
resumeKey = file.getAbsolutePath();
29+
}
30+
} catch (FileNotFoundException e) {
31+
throw new RuntimeException(e);
32+
}
33+
}
34+
35+
36+
@Override
37+
protected boolean hasNext() {
38+
return contentLength > currentBlockIdx * BLOCK_SIZE;
39+
}
40+
41+
42+
@Override
43+
protected UploadBlock buildNextBlockUpload() throws IOException {
44+
long start = currentBlockIdx * BLOCK_SIZE;
45+
int len = (int) Math.min(BLOCK_SIZE, contentLength - start);
46+
47+
RandomAccessFileUploadBlock fb = new RandomAccessFileUploadBlock(this, httpClient, host,
48+
currentBlockIdx, start, len, file, fileUploadLock);
49+
50+
currentBlockIdx++;
51+
52+
return fb;
53+
}
54+
55+
56+
@Override
57+
protected void clean() throws Exception {
58+
if (file != null) {
59+
file.close();
60+
}
61+
}
62+
63+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.qiniu.api.resumableio;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import java.io.RandomAccessFile;
7+
import java.util.concurrent.locks.Lock;
8+
9+
import org.apache.http.HttpEntity;
10+
import org.apache.http.client.HttpClient;
11+
import org.apache.http.entity.AbstractHttpEntity;
12+
13+
14+
public class RandomAccessFileUploadBlock extends UploadBlock {
15+
protected RandomAccessFile file;
16+
private final Lock fileUploadLock;
17+
18+
RandomAccessFileUploadBlock(SliceUpload sliceUpload,
19+
HttpClient httpClient, String host, int blockIdx, long offset,
20+
int len, RandomAccessFile file, Lock fileUploadLock) {
21+
super(sliceUpload, httpClient, host, blockIdx, offset, len);
22+
this.file = file;
23+
this.fileUploadLock = fileUploadLock;
24+
}
25+
26+
@Override
27+
protected HttpEntity buildHttpEntity(final int start, final int len) {
28+
AbstractHttpEntity entity = new AbstractHttpEntity() {
29+
private boolean consumed = false;
30+
private long length = len;
31+
32+
@Override
33+
public boolean isRepeatable() {
34+
return true;
35+
}
36+
37+
@Override
38+
public long getContentLength() {
39+
return length;
40+
}
41+
42+
@Override
43+
public InputStream getContent() throws IOException,
44+
IllegalStateException {
45+
return null;
46+
}
47+
48+
@Override
49+
public void writeTo(OutputStream os) throws IOException {
50+
consumed = false;
51+
try {
52+
byte[] b = new byte[1024 * 4];
53+
int len = -1;
54+
long off = offset + start;
55+
56+
fileUploadLock.lock();
57+
file.seek(off);
58+
while (true) {
59+
len = file.read(b);
60+
if (len == -1) {
61+
break;
62+
}
63+
64+
os.write(b, 0, len);
65+
off += len;
66+
}
67+
os.flush();
68+
} finally {
69+
fileUploadLock.unlock();
70+
consumed = true;
71+
}
72+
}
73+
74+
@Override
75+
public boolean isStreaming() {
76+
return !consumed;
77+
}
78+
};
79+
entity.setContentType("application/octet-stream");
80+
return entity;
81+
}
82+
83+
@Override
84+
protected long buildCrc32(int start, int len) {
85+
return Util.crc32(copy2New(start, len));
86+
}
87+
88+
private byte[] copy2New(int start, int len) {
89+
byte[] data = new byte[len];
90+
try {
91+
long off = offset + start;
92+
fileUploadLock.lock();
93+
file.seek(off);
94+
file.read(data);
95+
return data;
96+
} catch (IOException e) {
97+
throw new RuntimeException(e);
98+
} finally {
99+
fileUploadLock.unlock();
100+
}
101+
}
102+
103+
@Override
104+
protected void clean() {
105+
106+
}
107+
108+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package com.qiniu.api.resumableio;
2+
3+
import java.io.File;
4+
import java.io.InputStream;
5+
6+
import com.qiniu.api.io.PutRet;
7+
import com.qiniu.api.net.Http;
8+
9+
public class ResumeableIoApi {
10+
11+
public static RandomAccessFileUpload upload(File file, String upToken) {
12+
return upload(file, upToken, null, null);
13+
}
14+
15+
public static RandomAccessFileUpload upload(File file, String upToken,
16+
String key) {
17+
return upload(file, upToken, key, null);
18+
}
19+
20+
21+
/**
22+
* @param file
23+
* @param upToken
24+
* @param key
25+
* @param mimeType
26+
* @param resumeClass
27+
* @return
28+
*
29+
* 获得upload后执行execute方法,返回 CallRet
30+
* upload。execute()
31+
*
32+
*/
33+
public static RandomAccessFileUpload upload(File file, String upToken,
34+
String key, String mimeType) {
35+
RandomAccessFileUpload upload = new RandomAccessFileUpload(file, upToken, key, mimeType);
36+
37+
upload.httpClient = Http.getClient();
38+
39+
return upload;
40+
}
41+
42+
43+
public static StreamSliceUpload upload(InputStream is, String upToken) {
44+
return upload(is, upToken, null, null);
45+
}
46+
47+
48+
public static StreamSliceUpload upload(InputStream is, String upToken,
49+
String key) {
50+
return upload(is, upToken, key, null);
51+
}
52+
53+
54+
public static StreamSliceUpload upload(InputStream is, String upToken,
55+
String key, String mimeType) {
56+
return upload(is, upToken, key, mimeType, -1);
57+
}
58+
59+
public static StreamSliceUpload upload(InputStream is, String upToken, long streamLength) {
60+
return upload(is, upToken, null, null, streamLength);
61+
}
62+
63+
64+
public static StreamSliceUpload upload(InputStream is, String upToken,
65+
String key, long streamLength) {
66+
return upload(is, upToken, key, null, streamLength);
67+
}
68+
69+
/**
70+
* @param is
71+
* @param upToken
72+
* @param key
73+
* @param mimeType
74+
* @param streamLength
75+
* @return
76+
*
77+
* 获得upload后执行execute方法,返回 CallRet
78+
* upload。execute()
79+
*/
80+
public static StreamSliceUpload upload(InputStream is, String upToken,
81+
String key, String mimeType, long streamLength) {
82+
StreamSliceUpload upload = new StreamSliceUpload(is, upToken, key, mimeType, streamLength);
83+
84+
upload.httpClient = Http.getClient();
85+
86+
return upload;
87+
}
88+
89+
/**
90+
* @param file 具体文件,不能为文件夹
91+
* @param upToken
92+
* @param key
93+
* @param mimeType
94+
* @return
95+
*/
96+
public static PutRet put(File file, String upToken,
97+
String key, String mimeType){
98+
return upload(file, upToken, key, mimeType).execute();
99+
}
100+
101+
public static PutRet put(File file, String upToken,
102+
String key){
103+
return put(file, upToken, key, null);
104+
}
105+
106+
/**
107+
* @param is
108+
* @param upToken
109+
* @param key
110+
* @param mimeType
111+
* @param streamLength 若不知具体大小,可传 -1
112+
* @return
113+
*/
114+
public static PutRet put(InputStream is, String upToken,
115+
String key, String mimeType, long streamLength){
116+
return upload(is, upToken, key, mimeType, streamLength).execute();
117+
}
118+
119+
public static PutRet put(InputStream is, String upToken,
120+
String key, String mimeType){
121+
return upload(is, upToken, key, mimeType).execute();
122+
}
123+
124+
public static PutRet put(InputStream is, String upToken,
125+
String key){
126+
return upload(is, upToken, key, null).execute();
127+
}
128+
129+
}

0 commit comments

Comments
 (0)