@@ -17,17 +17,17 @@ use {
1717 } ,
1818 rayon:: prelude:: * ,
1919 reqwest:: { Client , StatusCode } ,
20- reqwest_middleware:: { self , ClientWithMiddleware } ,
2120 reqwest_retry:: {
22- default_on_request_failure, policies:: ExponentialBackoff , RetryTransientMiddleware ,
23- Retryable , RetryableStrategy ,
21+ default_on_request_failure, policies:: ExponentialBackoff , RetryPolicy , Retryable ,
22+ RetryableStrategy ,
2423 } ,
2524 sha2:: { Digest , Sha256 } ,
2625 std:: {
2726 collections:: { BTreeMap , BTreeSet , HashMap } ,
2827 io:: Read ,
2928 path:: PathBuf ,
3029 str:: FromStr ,
30+ time:: { Duration , SystemTime } ,
3131 } ,
3232 url:: Url ,
3333 zip:: ZipArchive ,
@@ -65,12 +65,19 @@ async fn fetch_artifact(
6565 Ok ( res)
6666}
6767
68+ enum UploadSource {
69+ Filename ( PathBuf ) ,
70+ Data ( Bytes ) ,
71+ }
72+
6873async fn upload_release_artifact (
69- client : & ClientWithMiddleware ,
74+ client : & Client ,
75+ retry_policy : & impl RetryPolicy ,
76+ retryable_strategy : & impl RetryableStrategy ,
7077 auth_token : String ,
7178 release : & Release ,
7279 filename : String ,
73- data : Bytes ,
80+ body : UploadSource ,
7481 dry_run : bool ,
7582) -> Result < ( ) > {
7683 if release. assets . iter ( ) . any ( |asset| asset. name == filename) {
@@ -93,17 +100,52 @@ async fn upload_release_artifact(
93100 return Ok ( ( ) ) ;
94101 }
95102
96- // Octocrab doesn't yet support release artifact upload. And the low-level HTTP API
97- // forces the use of strings on us. So we have to make our own HTTP client.
98-
99- let response = client
100- . put ( url)
101- . header ( "Authorization" , format ! ( "Bearer {auth_token}" ) )
102- . header ( "Content-Length" , data. len ( ) )
103- . header ( "Content-Type" , "application/x-tar" )
104- . body ( data)
105- . send ( )
106- . await ?;
103+ // Octocrab's high-level API for uploading release artifacts doesn't yet support streaming
104+ // bodies, and their low-level API isn't more helpful than using our own HTTP client.
105+ //
106+ // Because we are streaming the body, we can't use the standard retry middleware for reqwest
107+ // (see e.g. https://github.com/seanmonstar/reqwest/issues/2416), so we have to recreate the
108+ // request on each retry and handle the retry logic ourself. This logic is inspired by
109+ // uv/crates/uv-publish/src/lib.rs (which has the same problem), which in turn is inspired by
110+ // reqwest-middleware/reqwest-retry/src/middleware.rs.
111+ //
112+ // (While Octocrab's API would work fine for the non-streaming case, we just use this function
113+ // for both cases so that we can make a homogeneous Vec<impl Future> later in the file.)
114+
115+ let mut n_past_retries = 0 ;
116+ let start_time = SystemTime :: now ( ) ;
117+ let response = loop {
118+ let request = client
119+ . put ( url. clone ( ) )
120+ . timeout ( Duration :: from_secs ( 60 ) )
121+ . header ( "Authorization" , format ! ( "Bearer {auth_token}" ) )
122+ . header ( "Content-Type" , "application/octet-stream" ) ;
123+ let request = match body {
124+ UploadSource :: Filename ( ref path) => {
125+ let file = tokio:: fs:: File :: open ( & path) . await ?;
126+ let len = file. metadata ( ) . await ?. len ( ) ;
127+ request. header ( "Content-Length" , len) . body ( file)
128+ }
129+ UploadSource :: Data ( ref bytes) => request
130+ . header ( "Content-Length" , bytes. len ( ) )
131+ . body ( bytes. clone ( ) ) ,
132+ } ;
133+ let result = request. send ( ) . await . map_err ( |e| e. into ( ) ) ;
134+
135+ if retryable_strategy. handle ( & result) == Some ( Retryable :: Transient ) {
136+ let retry_decision = retry_policy. should_retry ( start_time, n_past_retries) ;
137+ if let reqwest_retry:: RetryDecision :: Retry { execute_after } = retry_decision {
138+ println ! ( "retrying upload to {url} after {result:?}" ) ;
139+ let duration = execute_after
140+ . duration_since ( SystemTime :: now ( ) )
141+ . unwrap_or_else ( |_| Duration :: default ( ) ) ;
142+ tokio:: time:: sleep ( duration) . await ;
143+ n_past_retries += 1 ;
144+ continue ;
145+ }
146+ }
147+ break result?;
148+ } ;
107149
108150 if !response. status ( ) . is_success ( ) {
109151 return Err ( anyhow ! ( "HTTP {}" , response. status( ) ) ) ;
@@ -215,10 +257,8 @@ pub async fn command_fetch_release_distributions(args: &ArgMatches) -> Result<()
215257 . await ?;
216258
217259 for artifact in artifacts {
218- if matches ! (
219- artifact. name. as_str( ) ,
220- "pythonbuild" | "toolchain"
221- ) || artifact. name . contains ( "install-only" )
260+ if matches ! ( artifact. name. as_str( ) , "pythonbuild" | "toolchain" )
261+ || artifact. name . contains ( "install-only" )
222262 {
223263 continue ;
224264 }
@@ -475,12 +515,7 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
475515 let mut digests = BTreeMap :: new ( ) ;
476516
477517 let retry_policy = ExponentialBackoff :: builder ( ) . build_with_max_retries ( 5 ) ;
478- let raw_client = reqwest_middleware:: ClientBuilder :: new ( Client :: new ( ) )
479- . with ( RetryTransientMiddleware :: new_with_policy_and_strategy (
480- retry_policy,
481- GitHubUploadRetryStrategy ,
482- ) )
483- . build ( ) ;
518+ let raw_client = Client :: new ( ) ;
484519
485520 {
486521 let mut fs = vec ! [ ] ;
@@ -490,23 +525,31 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
490525 continue ;
491526 }
492527
493- let file_data = Bytes :: copy_from_slice ( & std:: fs:: read ( dist_dir. join ( & source) ) ?) ;
494-
495- let mut digest = Sha256 :: new ( ) ;
496- digest. update ( & file_data) ;
497-
498- let digest = hex:: encode ( digest. finalize ( ) ) ;
499-
500- digests. insert ( dest. clone ( ) , digest. clone ( ) ) ;
501-
528+ let local_filename = dist_dir. join ( & source) ;
502529 fs. push ( upload_release_artifact (
503530 & raw_client,
531+ & retry_policy,
532+ & GitHubUploadRetryStrategy ,
504533 token. clone ( ) ,
505534 & release,
506535 dest. clone ( ) ,
507- file_data ,
536+ UploadSource :: Filename ( local_filename . clone ( ) ) ,
508537 dry_run,
509538 ) ) ;
539+
540+ // reqwest wants to take ownership of the body, so it's hard for us to do anything
541+ // clever with reading the file once and calculating the sha256sum while we read.
542+ // So we open and read the file again.
543+ let digest = {
544+ let file = tokio:: fs:: File :: open ( local_filename) . await ?;
545+ let mut stream = tokio_util:: io:: ReaderStream :: with_capacity ( file, 1048576 ) ;
546+ let mut hasher = Sha256 :: new ( ) ;
547+ while let Some ( chunk) = stream. next ( ) . await {
548+ hasher. update ( & chunk?) ;
549+ }
550+ hex:: encode ( hasher. finalize ( ) )
551+ } ;
552+ digests. insert ( dest. clone ( ) , digest. clone ( ) ) ;
510553 }
511554
512555 let mut buffered = futures:: stream:: iter ( fs) . buffer_unordered ( 16 ) ;
@@ -526,10 +569,12 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
526569
527570 upload_release_artifact (
528571 & raw_client,
572+ & retry_policy,
573+ & GitHubUploadRetryStrategy ,
529574 token. clone ( ) ,
530575 & release,
531576 "SHA256SUMS" . to_string ( ) ,
532- Bytes :: copy_from_slice ( shasums. as_bytes ( ) ) ,
577+ UploadSource :: Data ( Bytes :: copy_from_slice ( shasums. as_bytes ( ) ) ) ,
533578 dry_run,
534579 )
535580 . await ?;
0 commit comments