44use Upyun \Api \Rest ;
55use Upyun \Api \Form ;
66use GuzzleHttp \Psr7 ;
7+ use GuzzleHttp \Pool ;
8+ use GuzzleHttp \Client ;
79
810class Uploader
911{
@@ -37,13 +39,15 @@ public function upload($path, $file, $params, $withAsyncProcess)
3739 ->withHeaders ($ params )
3840 ->withFile ($ stream )
3941 ->send ();
42+ } elseif ($ this ->config ->uploadType === 'BLOCK_PARALLEL ' ) {
43+ return $ this ->concurrentPointUpload ($ path , $ stream , $ params );
4044 } else {
4145 return $ this ->pointUpload ($ path , $ stream , $ params );
4246 }
4347 }
4448
4549 /**
46- * 断点续传
50+ * 串行式断点续传
4751 * @param $path
4852 * @param $stream
4953 * @param $params
@@ -108,7 +112,8 @@ private function pointUpload($path, $stream, $params)
108112
109113 private function needUseBlock ($ fileSize )
110114 {
111- if ($ this ->config ->uploadType === 'BLOCK ' ) {
115+ if ($ this ->config ->uploadType === 'BLOCK ' ||
116+ $ this ->config ->uploadType === 'BLOCK_PARALLEL ' ) {
112117 return true ;
113118 } elseif ($ this ->config ->uploadType === 'AUTO ' &&
114119 $ fileSize >= $ this ->config ->sizeBoundary ) {
@@ -117,4 +122,81 @@ private function needUseBlock($fileSize)
117122 return false ;
118123 }
119124 }
125+
126+ /**
127+ * 并行式断点续传
128+ * @param $path
129+ * @param $stream
130+ * @param $params
131+ *
132+ * @return mixed|\Psr\Http\Message\ResponseInterface
133+ * @throws \Exception
134+ */
135+ private function concurrentPointUpload ($ path , $ stream , $ params )
136+ {
137+ $ req = new Rest ($ this ->config );
138+
139+ $ headers = array ();
140+ if (is_array ($ params )) {
141+ foreach ($ params as $ key => $ val ) {
142+ $ headers ['X-Upyun-Meta- ' . $ key ] = $ val ;
143+ }
144+ }
145+ $ res = $ req ->request ('PUT ' , $ path )
146+ ->withHeaders (array_merge (array (
147+ 'X-Upyun-Multi-Disorder ' => 'true ' ,
148+ 'X-Upyun-Multi-Stage ' => 'initiate ' ,
149+ 'X-Upyun-Multi-Type ' => Psr7 \mimetype_from_filename ($ path ),
150+ 'X-Upyun-Multi-Length ' => $ stream ->getSize (),
151+ ), $ headers ))
152+ ->send ();
153+ if ($ res ->getStatusCode () !== 204 ) {
154+ throw new \Exception ('init request failed when poinit upload! ' );
155+ }
156+
157+ $ init = Util::getHeaderParams ($ res ->getHeaders ());
158+ $ uuid = $ init ['x-upyun-multi-uuid ' ];
159+ $ requests = function ($ req , $ path , $ stream , $ uuid ) {
160+ $ blockSize = 1024 * 1024 ;
161+ $ total = ceil ($ stream ->getSize () / $ blockSize );
162+ for ($ i = 0 ; $ i < $ total ; $ i ++) {
163+ $ fileBlock = $ stream ->read ($ blockSize );
164+ yield $ req ->request ('PUT ' , $ path )
165+ ->withHeaders (array (
166+ 'X-Upyun-Multi-Stage ' => 'upload ' ,
167+ 'X-Upyun-Multi-Uuid ' => $ uuid ,
168+ 'X-Upyun-Part-Id ' => $ i
169+ ))
170+ ->withFile (Psr7 \stream_for ($ fileBlock ))
171+ ->toRequest ();
172+ }
173+ };
174+ $ client = new Client ([
175+ 'timeout ' => $ this ->config ->timeout ,
176+ ]);
177+ $ pool = new Pool ($ client , $ requests ($ req , $ path , $ stream , $ uuid ), [
178+ 'concurrency ' => $ this ->config ->concurrency ,
179+ 'fulfilled ' => function ($ res ) {
180+ if ($ res ->getStatusCode () !== 204 ) {
181+ throw new \Exception ('upload request failed when poinit upload! ' );
182+ }
183+ },
184+ 'rejected ' => function () {
185+ throw new \Exception ('upload request failed when poinit upload! ' );
186+ },
187+ ]);
188+ $ promise = $ pool ->promise ();
189+ $ promise ->wait ();
190+
191+ $ res = $ req ->request ('PUT ' , $ path )
192+ ->withHeaders (array (
193+ 'X-Upyun-Multi-Uuid ' => $ uuid ,
194+ 'X-Upyun-Multi-Stage ' => 'complete '
195+ ))
196+ ->send ();
197+ if ($ res ->getStatusCode () != 204 && $ res ->getStatusCode () != 201 ) {
198+ throw new \Exception ('end request failed when poinit upload! ' );
199+ }
200+ return $ res ;
201+ }
120202}
0 commit comments