Skip to content

Commit 898188b

Browse files
authored
Merge pull request #244 from jemygraw/master
add resume upload function and test case and example code
2 parents f7f082b + 172914d commit 898188b

File tree

8 files changed

+426
-17
lines changed

8 files changed

+426
-17
lines changed

examples/upload_simple.js renamed to examples/form_upload_simple.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@ var putPolicy = new qiniu.rs.PutPolicy(options);
1212

1313
var uploadToken = putPolicy.uploadToken(mac);
1414
var config = new qiniu.conf.Config();
15-
var localFile = "/Users/jemy/Documents/github.png";
15+
var localFile = "/Users/jemy/Documents/qiniu.mp4";
1616
//config.zone = qiniu.zone.Zone_z0;
1717
var formUploader = new qiniu.form_io.FormUploader(config);
1818
var putExtra = new qiniu.form_io.PutExtra();
1919

20-
2120
//bytes
2221
formUploader.put(uploadToken, null, "hello", null, function(respErr,
2322
respBody, respInfo) {

examples/resume_upload_simple.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
const qiniu = require("../index.js");
2+
const proc = require("process");
3+
4+
var bucket = 'if-pbl';
5+
var accessKey = proc.env.QINIU_ACCESS_KEY;
6+
var secretKey = proc.env.QINIU_SECRET_KEY;
7+
var mac = new qiniu.auth.digest.Mac(accessKey, secretKey);
8+
var options = {
9+
scope: bucket,
10+
}
11+
var putPolicy = new qiniu.rs.PutPolicy(options);
12+
13+
var uploadToken = putPolicy.uploadToken(mac);
14+
var config = new qiniu.conf.Config();
15+
var localFile = "/Users/jemy/Documents/qiniu.mp4";
16+
config.zone = qiniu.zone.Zone_z0;
17+
config.useCdnDomain = true;
18+
var resumeUploader = new qiniu.resume_io.ResumeUploader(config);
19+
var putExtra = new qiniu.resume_io.PutExtra();
20+
putExtra.params = {
21+
"x:name": "",
22+
"x:age": 27,
23+
}
24+
putExtra.fname = 'testfile.mp4';
25+
26+
//file
27+
resumeUploader.putFile(uploadToken, null, localFile, putExtra, function(respErr,
28+
respBody, respInfo) {
29+
if (respErr) {
30+
throw respErr;
31+
}
32+
33+
if (respInfo.statusCode == 200) {
34+
console.log(respBody);
35+
} else {
36+
console.log(respInfo.statusCode);
37+
console.log(respBody);
38+
}
39+
});

index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module.exports = {
66
},
77
cdn: require(libPath + "/cdn.js"),
88
form_io: require(libPath + '/io/form.js'),
9+
resume_io: require(libPath + '/io/resume.js'),
910
rs: require(libPath + '/rs.js'),
1011
fop: require(libPath + '/fop.js'),
1112
conf: require(libPath + '/conf.js'),

qiniu/conf.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ var defaultUserAgent = function() {
1212
}
1313

1414
exports.USER_AGENT = defaultUserAgent();
15-
const BLOCK_SIZE = 4 * 1024 * 1024; //4MB, never change
15+
exports.BLOCK_SIZE = 4 * 1024 * 1024; //4MB, never change
1616

1717
//define api form mime type
1818
exports.FormMimeUrl = "application/x-www-form-urlencoded";
1919
exports.FormMimeJson = "application/json";
2020
exports.FormMimeRaw = "application/octet-stream";
2121
exports.RS_HOST = "http://rs.qiniu.com";
22-
exports.RPC_TIMEOUT = 30000; //30s
22+
exports.RPC_TIMEOUT = 60000; //60s
2323

2424
exports.Config = function Config(options) {
2525
options = options || {};
@@ -28,11 +28,11 @@ exports.Config = function Config(options) {
2828
//response timeout, in seconds
2929
this.responseTimeout = options.responseTimeout || 30;
3030
//put threshold, in bytes
31-
this.putThreshold = options.putThreshold || BLOCK_SIZE;
31+
this.putThreshold = options.putThreshold || exports.BLOCK_SIZE;
3232
//use http or https protocol
3333
this.useHttpsDomain = options.useHttpsDomain || false;
3434
//use cdn accerlated domains
35-
this.useCdnDomain = options.useCdnDomain || false;
35+
this.useCdnDomain = options.useCdnDomain || true;
3636
//max retry times for chunk upload
3737
this.maxRetryTimes = options.maxRetryTimes || 3;
3838
//zone of the bucket

qiniu/io/resume.js

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
const conf = require('../conf');
2+
const zone = require('../zone');
3+
const util = require('../util');
4+
const rpc = require('../rpc');
5+
const path = require('path');
6+
const mime = require('mime');
7+
const fs = require('fs');
8+
9+
exports.ResumeUploader = ResumeUploader;
10+
exports.PutExtra = PutExtra;
11+
12+
function ResumeUploader(config) {
13+
this.config = config || new conf.Config();
14+
}
15+
16+
// 上传可选参数
17+
// @params fname 请求体中的文件的名称
18+
// @params params 额外参数设置,参数名称必须以x:开头
19+
// @param mimeType 指定文件的mimeType
20+
// @param progressCallback(BlkputRet) 上传进度回调
21+
// @param ctxList 断点续传的已上传的文件ctx列表,
22+
// 在上传的过程中可以在progressCallback中写入本地文件
23+
function PutExtra(fname, params, mimeType, blkputRets, progressCallback) {
24+
this.fname = fname || '';
25+
this.params = params || {};
26+
this.mimeType = mimeType || null;
27+
this.blkputRets = blkputRets || [];
28+
this.progressCallback = progressCallback || null;
29+
}
30+
31+
function BlkputRet(options) {
32+
this.ctx = options.ctx || null;
33+
this.checksum = options.checksum || null;
34+
this.crc32 = options.crc32 || null;
35+
this.offset = options.offset || null;
36+
this.host = options.host || null;
37+
this.expired_at = options.expired_at || null;
38+
}
39+
40+
ResumeUploader.prototype.putStream = function(uploadToken, key, rsStream,
41+
rsStreamLen, putExtra, callbackFunc) {
42+
putExtra = putExtra || new PutExtra();
43+
if (!putExtra.mimeType) {
44+
putExtra.mimeType = 'application/octet-stream';
45+
}
46+
47+
if (!putExtra.fname) {
48+
putExtra.fname = key ? key : '?';
49+
}
50+
51+
rsStream.on("error", function(err) {
52+
//callbackFunc
53+
callbackFunc(err, null, null);
54+
return;
55+
});
56+
57+
var useCache = false;
58+
var that = this;
59+
if (this.config.zone) {
60+
if (this.config.zoneExpire == -1) {
61+
useCache = true;
62+
} else {
63+
if (!util.isTimestampExpired(this.config.zoneExpire)) {
64+
useCache = true;
65+
}
66+
}
67+
}
68+
69+
var accessKey = util.getAKFromUptoken(uploadToken);
70+
var bucket = util.getBucketFromUptoken(uploadToken);
71+
if (useCache) {
72+
putReq(this.config, uploadToken, key, rsStream, rsStreamLen, putExtra,
73+
callbackFunc);
74+
} else {
75+
zone.getZoneInfo(accessKey, bucket, function(err, cZoneInfo,
76+
cZoneExpire) {
77+
if (err) {
78+
callbackFunc(err, null, null);
79+
return;
80+
}
81+
82+
//update object
83+
that.config.zone = cZoneInfo;
84+
that.config.zoneExpire = cZoneExpire;
85+
86+
//req
87+
putReq(that.config, uploadToken, key, rsStream, rsStreamLen,
88+
putExtra,
89+
callbackFunc);
90+
});
91+
}
92+
}
93+
94+
function putReq(config, uploadToken, key, rsStream, rsStreamLen, putExtra,
95+
callbackFunc) {
96+
//set up hosts order
97+
var upHosts = [];
98+
99+
if (config.useCdnDomain) {
100+
if (config.zone.cdnUpHosts) {
101+
config.zone.cdnUpHosts.forEach(function(host) {
102+
upHosts.push(host);
103+
});
104+
}
105+
config.zone.srcUpHosts.forEach(function(host) {
106+
upHosts.push(host);
107+
});
108+
} else {
109+
config.zone.srcUpHosts.forEach(function(host) {
110+
upHosts.push(host);
111+
});
112+
config.zone.cdnUpHosts.forEach(function(host) {
113+
upHosts.push(host);
114+
});
115+
}
116+
117+
var scheme = config.useHttpsDomain ? "https://" : "http://";
118+
var upDomain = scheme + upHosts[0];
119+
// block upload
120+
121+
var fileSize = rsStreamLen;
122+
//console.log("file size:" + fileSize);
123+
var blockCnt = fileSize / conf.BLOCK_SIZE
124+
var totalBlockNum = (fileSize % conf.BLOCK_SIZE == 0) ? blockCnt : (blockCnt +
125+
1);
126+
var finishedBlock = 0;
127+
var curBlock = 0;
128+
var readLen = 0;
129+
var readBuffers = [];
130+
var finishedCtxList = [];
131+
132+
//check putExtra.blkputRets
133+
if (putExtra.blkputRets) {
134+
for (var index = 0; index < putExtra.blkputRets.length; index++) {
135+
//check ctx expired or not
136+
var blkputRet = putExtra.blkputRets[index];
137+
var expiredAt = blkputRet.expired_at;
138+
//make sure the ctx at least has one day expiration
139+
expiredAt += 3600 * 24;
140+
if (util.isTimestampExpired(expiredAt)) {
141+
//discard these ctxs
142+
break;
143+
}
144+
145+
finishedBlock += 1;
146+
finishedCtxList.push(blkputRet.ctx);
147+
}
148+
}
149+
150+
//check when to mkblk
151+
rsStream.on('data', function(chunk) {
152+
readLen += chunk.length;
153+
readBuffers.push(chunk);
154+
155+
if (readLen % conf.BLOCK_SIZE == 0 || readLen == fileSize) {
156+
//console.log(readLen);
157+
var readData = Buffer.concat(readBuffers);
158+
readBuffers = []; //reset read buffer
159+
curBlock += 1; //set current block
160+
if (curBlock > finishedBlock) {
161+
rsStream.pause();
162+
mkblkReq(upDomain, uploadToken, readData, function(respErr,
163+
respBody,
164+
respInfo) {
165+
if (respInfo.statusCode != 200) {
166+
callbackFunc(respErr, respBody, respInfo);
167+
return;
168+
} else {
169+
finishedBlock += 1;
170+
rsStream.resume();
171+
var blkputRet = respBody;
172+
finishedCtxList.push(blkputRet.ctx);
173+
if (putExtra.progressCallback) {
174+
putExtra.progressCallback(blkputRet);
175+
}
176+
}
177+
});
178+
}
179+
}
180+
});
181+
182+
//check when to mkfile
183+
rsStream.on('end', function() {
184+
//console.log("end");
185+
mkfileReq(upDomain, uploadToken, fileSize, finishedCtxList, key,
186+
putExtra, callbackFunc);
187+
});
188+
}
189+
190+
function mkblkReq(upDomain, uploadToken, blkData, callbackFunc) {
191+
//console.log("mkblk");
192+
var requestURI = upDomain + "/mkblk/" + blkData.length;
193+
var auth = 'UpToken ' + uploadToken;
194+
var headers = {
195+
'Authorization': auth,
196+
'Content-Type': 'application/octet-stream'
197+
}
198+
rpc.post(requestURI, blkData, headers, callbackFunc);
199+
}
200+
201+
function mkfileReq(upDomain, uploadToken, fileSize, ctxList, key, putExtra,
202+
callbackFunc) {
203+
//console.log("mkfile");
204+
var requestURI = upDomain + "/mkfile/" + fileSize;
205+
if (key) {
206+
requestURI += "/key/" + util.urlsafeBase64Encode(key);
207+
}
208+
if (putExtra.mimeType) {
209+
requestURI += "/mimeType/" + util.urlsafeBase64Encode(putExtra.mimeType);
210+
}
211+
if (putExtra.fname) {
212+
requestURI += "/fname/" + util.urlsafeBase64Encode(putExtra.fname);
213+
}
214+
if (putExtra.params) {
215+
//putExtra params
216+
for (var k in putExtra.params) {
217+
if (k.startsWith("x:") && putExtra.params[k]) {
218+
requestURI += "/" + k + "/" + util.urlsafeBase64Encode(putExtra.params[
219+
k].toString());
220+
}
221+
}
222+
}
223+
var auth = 'UpToken ' + uploadToken;
224+
var headers = {
225+
'Authorization': auth,
226+
'Content-Type': 'application/octet-stream'
227+
}
228+
var postBody = ctxList.join(",");
229+
rpc.post(requestURI, postBody, headers, callbackFunc);
230+
}
231+
232+
ResumeUploader.prototype.putFile = function(uploadToken, key, localFile,
233+
putExtra, callbackFunc) {
234+
putExtra = putExtra || new PutExtra();
235+
var rsStream = fs.createReadStream(localFile);
236+
var rsStreamLen = fs.statSync(localFile).size;
237+
if (!putExtra.mimeType) {
238+
putExtra.mimeType = mime.lookup(localFile);
239+
}
240+
241+
if (!putExtra.fname) {
242+
putExtra.fname = path.basename(localFile);
243+
}
244+
245+
return this.putStream(uploadToken, key, rsStream, rsStreamLen, putExtra,
246+
callbackFunc);
247+
}
248+
249+
ResumeUploader.prototype.putFileWithoutKey = function(uploadToken, localFile,
250+
putExtra, callbackFunc) {
251+
return this.putFile(uploadToken, null, localFile, putExtra, callbackFunc);
252+
}

qiniu/rpc.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ var urllib = require('urllib');
22
var util = require('./util');
33
var conf = require('./conf');
44

5+
exports.post = post;
56
exports.postMultipart = postMultipart;
67
exports.postWithForm = postWithForm;
78
exports.postWithoutForm = postWithoutForm;
@@ -31,14 +32,18 @@ function postWithoutForm(requestURI, token, callbackFunc) {
3132
}
3233

3334
function post(requestURI, requestForm, headers, callbackFunc) {
35+
//var start = parseInt(Date.now() / 1000);
3436
headers = headers || {};
3537
headers['User-Agent'] = headers['User-Agent'] || conf.USER_AGENT;
38+
headers['Connection'] = 'keep-alive';
3639

3740
var data = {
3841
headers: headers,
3942
method: 'POST',
4043
dataType: 'json',
4144
timeout: conf.RPC_TIMEOUT,
45+
gzip: true,
46+
// timing: true,
4247
};
4348

4449
if (Buffer.isBuffer(requestForm) || typeof requestForm === 'string') {
@@ -51,6 +56,15 @@ function post(requestURI, requestForm, headers, callbackFunc) {
5156

5257
var req = urllib.request(requestURI, data, function(respErr, respBody,
5358
respInfo) {
59+
//var end = parseInt(Date.now() / 1000);
60+
// console.log((end - start) + " seconds");
61+
// console.log("queuing:\t" + respInfo.timing.queuing);
62+
// console.log("dnslookup:\t" + respInfo.timing.dnslookup);
63+
// console.log("connected:\t" + respInfo.timing.connected);
64+
// console.log("requestSent:\t" + respInfo.timing.requestSent);
65+
// console.log("waiting:\t" + respInfo.timing.waiting);
66+
// console.log("contentDownload:\t" + respInfo.timing.contentDownload);
67+
5468
callbackFunc(respErr, respBody, respInfo);
5569
});
5670

0 commit comments

Comments
 (0)