@@ -949,24 +949,29 @@ function sliceCopyFile(params, callback) {
949949 var Region = params . Region ;
950950 var Key = params . Key ;
951951 var CopySource = params . CopySource ;
952- var m = CopySource . match ( / ^ ( [ ^ . ] + - \d + ) \. c o s ( v 6 ) ? \. ( [ ^ . ] + ) \. [ ^ / ] + \/ ( . + ) $ / ) ;
952+ var m = util . getSourceParams . call ( this , CopySource ) ;
953953 if ( ! m ) {
954954 callback ( util . error ( new Error ( 'CopySource format error' ) ) ) ;
955955 return ;
956956 }
957957
958- var SourceBucket = m [ 1 ] ;
959- var SourceRegion = m [ 3 ] ;
960- var SourceKey = decodeURIComponent ( m [ 4 ] ) ;
958+ var SourceBucket = m . Bucket ;
959+ var SourceRegion = m . Region ;
960+ var SourceKey = decodeURIComponent ( m . Key ) ;
961961 var CopySliceSize = params . CopySliceSize === undefined ? self . options . CopySliceSize : params . CopySliceSize ;
962962 CopySliceSize = Math . max ( 0 , CopySliceSize ) ;
963963
964964 var ChunkSize = params . CopyChunkSize || this . options . CopyChunkSize ;
965965 var ChunkParallel = this . options . CopyChunkParallelLimit ;
966+ var ChunkRetryTimes = this . options . ChunkRetryTimes + 1 ;
966967
968+ var ChunkCount = 0 ;
967969 var FinishSize = 0 ;
968970 var FileSize ;
969971 var onProgress ;
972+ var SourceResHeaders = { } ;
973+ var SourceHeaders = { } ;
974+ var TargetHeader = { } ;
970975
971976 // 分片复制完成,开始 multipartComplete 操作
972977 ep . on ( 'copy_slice_complete' , function ( UploadData ) {
@@ -980,54 +985,133 @@ function sliceCopyFile(params, callback) {
980985 ETag : item . ETag ,
981986 } ;
982987 } ) ;
983- self . multipartComplete ( {
984- Bucket : Bucket ,
985- Region : Region ,
986- Key : Key ,
987- UploadId : UploadData . UploadId ,
988- Parts : Parts ,
989- } , function ( err , data ) {
990- if ( err ) {
991- onProgress ( null , true ) ;
992- return callback ( err ) ;
993- }
994- onProgress ( { loaded : FileSize , total : FileSize } , true ) ;
995- callback ( null , data ) ;
996- } ) ;
988+ // 完成上传的请求也做重试
989+ Async . retry ( ChunkRetryTimes , function ( tryCallback ) {
990+ self . multipartComplete ( {
991+ Bucket : Bucket ,
992+ Region : Region ,
993+ Key : Key ,
994+ UploadId : UploadData . UploadId ,
995+ Parts : Parts ,
996+ } , tryCallback ) ;
997+ } , function ( err , data ) {
998+ session . removeUsing ( UploadData . UploadId ) ; // 标记 UploadId 没被使用了,因为复制没提供重试,所以只要出错,就是 UploadId 停用了。
999+ if ( err ) {
1000+ onProgress ( null , true ) ;
1001+ return callback ( err ) ;
1002+ }
1003+ session . removeUploadId . call ( self , UploadData . UploadId ) ;
1004+ onProgress ( { loaded : FileSize , total : FileSize } , true ) ;
1005+ callback ( null , data ) ;
1006+ } ) ;
9971007 } ) ;
9981008
9991009 ep . on ( 'get_copy_data_finish' , function ( UploadData ) {
1000- Async . eachLimit ( UploadData . PartList , ChunkParallel , function ( SliceItem , asyncCallback ) {
1010+ // 处理 UploadId 缓存
1011+ var uuid = session . getCopyFileId ( CopySource , SourceResHeaders , ChunkSize , Bucket , Key ) ;
1012+ uuid && session . saveUploadId . call ( self , uuid , UploadData . UploadId , self . options . UploadIdCacheLimit ) ; // 缓存 UploadId
1013+ session . setUsing ( UploadData . UploadId ) ; // 标记 UploadId 为正在使用
1014+
1015+ var needCopySlices = util . filter ( UploadData . PartList , function ( SliceItem ) {
1016+ if ( SliceItem [ 'Uploaded' ] ) {
1017+ FinishSize += SliceItem [ 'PartNumber' ] >= ChunkCount ? ( FileSize % ChunkSize || ChunkSize ) : ChunkSize ;
1018+ }
1019+ return ! SliceItem [ 'Uploaded' ] ;
1020+ } ) ;
1021+ Async . eachLimit ( needCopySlices , ChunkParallel , function ( SliceItem , asyncCallback ) {
10011022 var PartNumber = SliceItem . PartNumber ;
10021023 var CopySourceRange = SliceItem . CopySourceRange ;
10031024 var currentSize = SliceItem . end - SliceItem . start ;
1004-
1005- copySliceItem . call ( self , {
1006- Bucket : Bucket ,
1007- Region : Region ,
1008- Key : Key ,
1009- CopySource : CopySource ,
1010- UploadId : UploadData . UploadId ,
1011- PartNumber : PartNumber ,
1012- CopySourceRange : CopySourceRange ,
1013- } , function ( err , data ) {
1014- if ( err ) return asyncCallback ( err ) ;
1015- FinishSize += currentSize ;
1016- onProgress ( { loaded : FinishSize , total : FileSize } ) ;
1017- SliceItem . ETag = data . ETag ;
1018- asyncCallback ( err || null , data ) ;
1019- } ) ;
1025+ Async . retry ( ChunkRetryTimes , function ( tryCallback ) {
1026+ copySliceItem . call ( self , {
1027+ Bucket : Bucket ,
1028+ Region : Region ,
1029+ Key : Key ,
1030+ CopySource : CopySource ,
1031+ UploadId : UploadData . UploadId ,
1032+ PartNumber : PartNumber ,
1033+ CopySourceRange : CopySourceRange ,
1034+ } , tryCallback ) ;
1035+ } , function ( err , data ) {
1036+ if ( err ) return asyncCallback ( err ) ;
1037+ FinishSize += currentSize ;
1038+ onProgress ( { loaded : FinishSize , total : FileSize } ) ;
1039+ SliceItem . ETag = data . ETag ;
1040+ asyncCallback ( err || null , data ) ;
1041+ } ) ;
10201042 } , function ( err ) {
10211043 if ( err ) {
1044+ session . removeUsing ( UploadData . UploadId ) ; // 标记 UploadId 没被使用了,因为复制没提供重试,所以只要出错,就是 UploadId 停用了。
10221045 onProgress ( null , true ) ;
10231046 return callback ( err ) ;
10241047 }
1025-
10261048 ep . emit ( 'copy_slice_complete' , UploadData ) ;
10271049 } ) ;
10281050 } ) ;
10291051
1030- ep . on ( 'get_file_size_finish' , function ( SourceHeaders ) {
1052+ ep . on ( 'get_chunk_size_finish' , function ( ) {
1053+ var createNewUploadId = function ( ) {
1054+ self . multipartInit ( {
1055+ Bucket : Bucket ,
1056+ Region : Region ,
1057+ Key : Key ,
1058+ Headers : TargetHeader ,
1059+ } , function ( err , data ) {
1060+ if ( err ) return callback ( err ) ;
1061+ params . UploadId = data . UploadId ;
1062+ ep . emit ( 'get_copy_data_finish' , { UploadId : params . UploadId , PartList : params . PartList } ) ;
1063+ } ) ;
1064+ } ;
1065+
1066+ // 在本地找可用的 UploadId
1067+ var uuid = session . getCopyFileId ( CopySource , SourceResHeaders , ChunkSize , Bucket , Key ) ;
1068+ var LocalUploadIdList = session . getUploadIdList . call ( self , uuid ) ;
1069+ if ( ! uuid || ! LocalUploadIdList ) return createNewUploadId ( ) ;
1070+
1071+ var next = function ( index ) {
1072+ // 如果本地找不到可用 UploadId,再一个个遍历校验远端
1073+ if ( index >= LocalUploadIdList . length ) return createNewUploadId ( ) ;
1074+ var UploadId = LocalUploadIdList [ index ] ;
1075+ // 如果正在被使用,跳过
1076+ if ( session . using [ UploadId ] ) return next ( index + 1 ) ;
1077+ // 判断 UploadId 是否存在线上
1078+ wholeMultipartListPart . call ( self , {
1079+ Bucket : Bucket ,
1080+ Region : Region ,
1081+ Key : Key ,
1082+ UploadId : UploadId ,
1083+ } , function ( err , PartListData ) {
1084+ if ( err ) {
1085+ // 如果 UploadId 获取会出错,跳过并删除
1086+ session . removeUploadId . call ( self , UploadId ) ;
1087+ next ( index + 1 ) ;
1088+ } else {
1089+ // 如果异步回来 UploadId 已经被用了,也跳过
1090+ if ( session . using [ UploadId ] ) return next ( index + 1 ) ;
1091+ // 找到可用 UploadId
1092+ var finishETagMap = { } ;
1093+ var offset = 0 ;
1094+ util . each ( PartListData . PartList , function ( PartItem ) {
1095+ var size = parseInt ( PartItem . Size ) ;
1096+ var end = offset + size - 1 ;
1097+ finishETagMap [ PartItem . PartNumber + '|' + offset + '|' + end ] = PartItem . ETag ;
1098+ offset += size ;
1099+ } ) ;
1100+ util . each ( params . PartList , function ( PartItem ) {
1101+ var ETag = finishETagMap [ PartItem . PartNumber + '|' + PartItem . start + '|' + PartItem . end ] ;
1102+ if ( ETag ) {
1103+ PartItem . ETag = ETag ;
1104+ PartItem . Uploaded = true ;
1105+ }
1106+ } ) ;
1107+ ep . emit ( 'get_copy_data_finish' , { UploadId : UploadId , PartList : params . PartList } ) ;
1108+ }
1109+ } ) ;
1110+ } ;
1111+ next ( 0 ) ;
1112+ } ) ;
1113+
1114+ ep . on ( 'get_file_size_finish' , function ( ) {
10311115 // 控制分片大小
10321116 ( function ( ) {
10331117 var SIZE = [ 1 , 2 , 4 , 8 , 16 , 32 , 64 , 128 , 256 , 512 , 1024 , 1024 * 2 , 1024 * 4 , 1024 * 5 ] ;
@@ -1037,8 +1121,7 @@ function sliceCopyFile(params, callback) {
10371121 if ( FileSize / AutoChunkSize <= self . options . MaxPartNumber ) break ;
10381122 }
10391123 params . ChunkSize = ChunkSize = Math . max ( ChunkSize , AutoChunkSize ) ;
1040-
1041- var ChunkCount = Math . ceil ( FileSize / ChunkSize ) ;
1124+ ChunkCount = Math . ceil ( FileSize / ChunkSize ) ;
10421125
10431126 var list = [ ] ;
10441127 for ( var partNumber = 1 ; partNumber <= ChunkCount ; partNumber ++ ) {
@@ -1083,16 +1166,7 @@ function sliceCopyFile(params, callback) {
10831166 delete TargetHeader [ 'x-cos-copy-source-If-Unmodified-Since' ] ;
10841167 delete TargetHeader [ 'x-cos-copy-source-If-Match' ] ;
10851168 delete TargetHeader [ 'x-cos-copy-source-If-None-Match' ] ;
1086- self . multipartInit ( {
1087- Bucket : Bucket ,
1088- Region : Region ,
1089- Key : Key ,
1090- Headers : TargetHeader ,
1091- } , function ( err , data ) {
1092- if ( err ) return callback ( err ) ;
1093- params . UploadId = data . UploadId ;
1094- ep . emit ( 'get_copy_data_finish' , params ) ;
1095- } ) ;
1169+ ep . emit ( 'get_chunk_size_finish' ) ;
10961170 } ) ;
10971171
10981172 // 获取远端复制源文件的大小
@@ -1133,7 +1207,8 @@ function sliceCopyFile(params, callback) {
11331207 } ) ;
11341208 } else {
11351209 var resHeaders = data . headers ;
1136- var SourceHeaders = {
1210+ SourceResHeaders = resHeaders ;
1211+ SourceHeaders = {
11371212 'Cache-Control' : resHeaders [ 'cache-control' ] ,
11381213 'Content-Disposition' : resHeaders [ 'content-disposition' ] ,
11391214 'Content-Encoding' : resHeaders [ 'content-encoding' ] ,
@@ -1147,7 +1222,7 @@ function sliceCopyFile(params, callback) {
11471222 SourceHeaders [ k ] = v ;
11481223 }
11491224 } ) ;
1150- ep . emit ( 'get_file_size_finish' , SourceHeaders ) ;
1225+ ep . emit ( 'get_file_size_finish' ) ;
11511226 }
11521227 } ) ;
11531228}
0 commit comments