@@ -17,17 +17,17 @@ use {
1717    } , 
1818    rayon:: prelude:: * , 
1919    reqwest:: { Client ,  StatusCode } , 
20-     reqwest_middleware:: { self ,  ClientWithMiddleware } , 
2120    reqwest_retry:: { 
22-         default_on_request_failure,  policies:: ExponentialBackoff ,  RetryTransientMiddleware , 
23-         Retryable ,   RetryableStrategy , 
21+         default_on_request_failure,  policies:: ExponentialBackoff ,  RetryPolicy ,   Retryable , 
22+         RetryableStrategy , 
2423    } , 
2524    sha2:: { Digest ,  Sha256 } , 
2625    std:: { 
2726        collections:: { BTreeMap ,  BTreeSet ,  HashMap } , 
2827        io:: Read , 
2928        path:: PathBuf , 
3029        str:: FromStr , 
30+         time:: { Duration ,  SystemTime } , 
3131    } , 
3232    url:: Url , 
3333    zip:: ZipArchive , 
@@ -65,12 +65,19 @@ async fn fetch_artifact(
6565    Ok ( res) 
6666} 
6767
68+ enum  UploadSource  { 
69+     Filename ( PathBuf ) , 
70+     Data ( Bytes ) , 
71+ } 
72+ 
6873async  fn  upload_release_artifact ( 
69-     client :  & ClientWithMiddleware , 
74+     client :  & Client , 
75+     retry_policy :  & impl  RetryPolicy , 
76+     retryable_strategy :  & impl  RetryableStrategy , 
7077    auth_token :  String , 
7178    release :  & Release , 
7279    filename :  String , 
73-     data :   Bytes , 
80+     body :   UploadSource , 
7481    dry_run :  bool , 
7582)  -> Result < ( ) >  { 
7683    if  release. assets . iter ( ) . any ( |asset| asset. name  == filename)  { 
@@ -93,17 +100,52 @@ async fn upload_release_artifact(
93100        return  Ok ( ( ) ) ; 
94101    } 
95102
96-     // Octocrab doesn't yet support release artifact upload. And the low-level HTTP API 
97-     // forces the use of strings on us. So we have to make our own HTTP client. 
98- 
99-     let  response = client
100-         . put ( url) 
101-         . header ( "Authorization" ,  format ! ( "Bearer {auth_token}" ) ) 
102-         . header ( "Content-Length" ,  data. len ( ) ) 
103-         . header ( "Content-Type" ,  "application/x-tar" ) 
104-         . body ( data) 
105-         . send ( ) 
106-         . await ?; 
103+     // Octocrab's high-level API for uploading release artifacts doesn't yet support streaming 
104+     // bodies, and their low-level API isn't more helpful than using our own HTTP client. 
105+     // 
106+     // Because we are streaming the body, we can't use the standard retry middleware for reqwest 
107+     // (see e.g. https://github.com/seanmonstar/reqwest/issues/2416), so we have to recreate the 
108+     // request on each retry and handle the retry logic ourself. This logic is inspired by 
109+     // uv/crates/uv-publish/src/lib.rs (which has the same problem), which in turn is inspired by 
110+     // reqwest-middleware/reqwest-retry/src/middleware.rs. 
111+     // 
112+     // (While Octocrab's API would work fine for the non-streaming case, we just use this function 
113+     // for both cases so that we can make a homogeneous Vec<impl Future> later in the file.) 
114+ 
115+     let  mut  n_past_retries = 0 ; 
116+     let  start_time = SystemTime :: now ( ) ; 
117+     let  response = loop  { 
118+         let  request = client
119+             . put ( url. clone ( ) ) 
120+             . timeout ( Duration :: from_secs ( 60 ) ) 
121+             . header ( "Authorization" ,  format ! ( "Bearer {auth_token}" ) ) 
122+             . header ( "Content-Type" ,  "application/octet-stream" ) ; 
123+         let  request = match  body { 
124+             UploadSource :: Filename ( ref  path)  => { 
125+                 let  file = tokio:: fs:: File :: open ( & path) . await ?; 
126+                 let  len = file. metadata ( ) . await ?. len ( ) ; 
127+                 request. header ( "Content-Length" ,  len) . body ( file) 
128+             } 
129+             UploadSource :: Data ( ref  bytes)  => request
130+                 . header ( "Content-Length" ,  bytes. len ( ) ) 
131+                 . body ( bytes. clone ( ) ) , 
132+         } ; 
133+         let  result = request. send ( ) . await . map_err ( |e| e. into ( ) ) ; 
134+ 
135+         if  retryable_strategy. handle ( & result)  == Some ( Retryable :: Transient )  { 
136+             let  retry_decision = retry_policy. should_retry ( start_time,  n_past_retries) ; 
137+             if  let  reqwest_retry:: RetryDecision :: Retry  {  execute_after }  = retry_decision { 
138+                 println ! ( "retrying upload to {url} after {result:?}" ) ; 
139+                 let  duration = execute_after
140+                     . duration_since ( SystemTime :: now ( ) ) 
141+                     . unwrap_or_else ( |_| Duration :: default ( ) ) ; 
142+                 tokio:: time:: sleep ( duration) . await ; 
143+                 n_past_retries += 1 ; 
144+                 continue ; 
145+             } 
146+         } 
147+         break  result?; 
148+     } ; 
107149
108150    if  !response. status ( ) . is_success ( )  { 
109151        return  Err ( anyhow ! ( "HTTP {}" ,  response. status( ) ) ) ; 
@@ -215,10 +257,8 @@ pub async fn command_fetch_release_distributions(args: &ArgMatches) -> Result<()
215257            . await ?; 
216258
217259        for  artifact in  artifacts { 
218-             if  matches ! ( 
219-                 artifact. name. as_str( ) , 
220-                 "pythonbuild"  | "toolchain" 
221-             )  || artifact. name . contains ( "install-only" ) 
260+             if  matches ! ( artifact. name. as_str( ) ,  "pythonbuild"  | "toolchain" ) 
261+                 || artifact. name . contains ( "install-only" ) 
222262            { 
223263                continue ; 
224264            } 
@@ -475,12 +515,7 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
475515    let  mut  digests = BTreeMap :: new ( ) ; 
476516
477517    let  retry_policy = ExponentialBackoff :: builder ( ) . build_with_max_retries ( 5 ) ; 
478-     let  raw_client = reqwest_middleware:: ClientBuilder :: new ( Client :: new ( ) ) 
479-         . with ( RetryTransientMiddleware :: new_with_policy_and_strategy ( 
480-             retry_policy, 
481-             GitHubUploadRetryStrategy , 
482-         ) ) 
483-         . build ( ) ; 
518+     let  raw_client = Client :: new ( ) ; 
484519
485520    { 
486521        let  mut  fs = vec ! [ ] ; 
@@ -490,23 +525,31 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
490525                continue ; 
491526            } 
492527
493-             let  file_data = Bytes :: copy_from_slice ( & std:: fs:: read ( dist_dir. join ( & source) ) ?) ; 
494- 
495-             let  mut  digest = Sha256 :: new ( ) ; 
496-             digest. update ( & file_data) ; 
497- 
498-             let  digest = hex:: encode ( digest. finalize ( ) ) ; 
499- 
500-             digests. insert ( dest. clone ( ) ,  digest. clone ( ) ) ; 
501- 
528+             let  local_filename = dist_dir. join ( & source) ; 
502529            fs. push ( upload_release_artifact ( 
503530                & raw_client, 
531+                 & retry_policy, 
532+                 & GitHubUploadRetryStrategy , 
504533                token. clone ( ) , 
505534                & release, 
506535                dest. clone ( ) , 
507-                 file_data , 
536+                 UploadSource :: Filename ( local_filename . clone ( ) ) , 
508537                dry_run, 
509538            ) ) ; 
539+ 
540+             // reqwest wants to take ownership of the body, so it's hard for us to do anything 
541+             // clever with reading the file once and calculating the sha256sum while we read. 
542+             // So we open and read the file again. 
543+             let  digest = { 
544+                 let  file = tokio:: fs:: File :: open ( local_filename) . await ?; 
545+                 let  mut  stream = tokio_util:: io:: ReaderStream :: with_capacity ( file,  1048576 ) ; 
546+                 let  mut  hasher = Sha256 :: new ( ) ; 
547+                 while  let  Some ( chunk)  = stream. next ( ) . await  { 
548+                     hasher. update ( & chunk?) ; 
549+                 } 
550+                 hex:: encode ( hasher. finalize ( ) ) 
551+             } ; 
552+             digests. insert ( dest. clone ( ) ,  digest. clone ( ) ) ; 
510553        } 
511554
512555        let  mut  buffered = futures:: stream:: iter ( fs) . buffer_unordered ( 16 ) ; 
@@ -526,10 +569,12 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
526569
527570    upload_release_artifact ( 
528571        & raw_client, 
572+         & retry_policy, 
573+         & GitHubUploadRetryStrategy , 
529574        token. clone ( ) , 
530575        & release, 
531576        "SHA256SUMS" . to_string ( ) , 
532-         Bytes :: copy_from_slice ( shasums. as_bytes ( ) ) , 
577+         UploadSource :: Data ( Bytes :: copy_from_slice ( shasums. as_bytes ( ) ) ) , 
533578        dry_run, 
534579    ) 
535580    . await ?; 
0 commit comments