@@ -2,59 +2,22 @@ use crate::run::{
22 check_system:: SystemInfo ,
33 config:: Config ,
44 run_environment:: { RunEnvironment , RunEnvironmentProvider } ,
5- runner:: ExecutorName ,
6- runner:: RunData ,
7- uploader:: UploadError ,
5+ runner:: { ExecutorName , RunData } ,
6+ uploader:: { UploadError , profile_archive:: ProfileArchiveContent } ,
87} ;
98use crate :: {
109 prelude:: * ,
1110 request_client:: { REQUEST_CLIENT , STREAMING_CLIENT } ,
1211} ;
1312use async_compression:: tokio:: write:: GzipEncoder ;
14- use base64:: { Engine as _, engine:: general_purpose} ;
1513use console:: style;
1614use reqwest:: StatusCode ;
17- use std:: path:: PathBuf ;
1815use tokio:: fs:: File ;
19- use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
16+ use tokio:: io:: AsyncWriteExt ;
2017use tokio_tar:: Builder ;
2118
2219use super :: interfaces:: { UploadData , UploadMetadata } ;
23-
24- #[ derive( Debug ) ]
25- enum ProfileArchive {
26- CompressedInMemory ( Vec < u8 > ) ,
27- UncompressedOnDisk ( PathBuf ) ,
28- }
29-
30- impl ProfileArchive {
31- async fn size ( & self ) -> Result < u64 > {
32- match self {
33- ProfileArchive :: CompressedInMemory ( data) => Ok ( data. len ( ) as u64 ) ,
34- ProfileArchive :: UncompressedOnDisk ( path) => {
35- let metadata = tokio:: fs:: metadata ( path) . await ?;
36- Ok ( metadata. len ( ) )
37- }
38- }
39- }
40-
41- fn to_content_encoding ( & self ) -> Option < String > {
42- match self {
43- ProfileArchive :: CompressedInMemory ( _) => Some ( "gzip" . to_string ( ) ) ,
44- ProfileArchive :: UncompressedOnDisk ( _) => None ,
45- }
46- }
47- }
48-
49- impl Drop for ProfileArchive {
50- fn drop ( & mut self ) {
51- if let ProfileArchive :: UncompressedOnDisk ( path) = self {
52- if path. exists ( ) {
53- let _ = std:: fs:: remove_file ( path) ;
54- }
55- }
56- }
57- }
20+ use super :: profile_archive:: ProfileArchive ;
5821
5922/// Create a profile archive from the profile folder and return its md5 hash encoded in base64
6023///
@@ -63,7 +26,7 @@ impl Drop for ProfileArchive {
6326async fn create_profile_archive (
6427 run_data : & RunData ,
6528 executor_name : ExecutorName ,
66- ) -> Result < ( ProfileArchive , String ) > {
29+ ) -> Result < ProfileArchive > {
6730 let time_start = std:: time:: Instant :: now ( ) ;
6831 let profile_archive = match executor_name {
6932 ExecutorName :: Valgrind => {
@@ -74,7 +37,8 @@ async fn create_profile_archive(
7437 . await ?;
7538 let mut gzip_encoder = tar. into_inner ( ) . await ?;
7639 gzip_encoder. shutdown ( ) . await ?;
77- ProfileArchive :: CompressedInMemory ( gzip_encoder. into_inner ( ) )
40+ let data = gzip_encoder. into_inner ( ) ;
41+ ProfileArchive :: new_compressed_in_memory ( data)
7842 }
7943 ExecutorName :: WallTime => {
8044 debug ! ( "Creating uncompressed tar archive for WallTime on disk" ) ;
@@ -92,44 +56,18 @@ async fn create_profile_archive(
9256
9357 // Persist the temporary file to prevent deletion when temp_file goes out of scope
9458 let persistent_path = temp_file. into_temp_path ( ) . keep ( ) ?;
95- ProfileArchive :: UncompressedOnDisk ( persistent_path)
96- }
97- } ;
9859
99- let ( archive_digest, archive_size) = match & profile_archive {
100- ProfileArchive :: CompressedInMemory ( data) => {
101- let digest = md5:: compute ( data. as_slice ( ) ) ;
102- ( digest, data. len ( ) as u64 )
103- }
104- ProfileArchive :: UncompressedOnDisk ( path) => {
105- let mut file = File :: open ( path) . await . context ( format ! (
106- "Failed to open uncompressed file at path: {}" ,
107- path. display( )
108- ) ) ?;
109- let mut hasher = md5:: Context :: new ( ) ;
110- let mut buffer = [ 0 ; 8192 ] ;
111- let mut total_size = 0u64 ;
112-
113- loop {
114- let bytes_read = file. read ( & mut buffer) . await ?;
115- if bytes_read == 0 {
116- break ;
117- }
118- hasher. consume ( & buffer[ ..bytes_read] ) ;
119- total_size += bytes_read as u64 ;
120- }
121- ( hasher. compute ( ) , total_size)
60+ ProfileArchive :: new_uncompressed_on_disk ( persistent_path) ?
12261 }
12362 } ;
12463
125- let archive_hash = general_purpose:: STANDARD . encode ( archive_digest. 0 ) ;
12664 debug ! (
12765 "Created archive ({} bytes) in {:.2?}" ,
128- archive_size ,
66+ profile_archive . content . size ( ) . await ? ,
12967 time_start. elapsed( )
13068 ) ;
13169
132- Ok ( ( profile_archive, archive_hash ) )
70+ Ok ( profile_archive)
13371}
13472
13573async fn retrieve_upload_data (
@@ -187,23 +125,26 @@ async fn retrieve_upload_data(
187125async fn upload_profile_archive (
188126 upload_data : & UploadData ,
189127 profile_archive : ProfileArchive ,
190- archive_hash : & String ,
191128) -> Result < ( ) > {
192- let archive_size = profile_archive. size ( ) . await ?;
129+ let archive_size = profile_archive. content . size ( ) . await ?;
130+ let archive_hash = profile_archive. hash ;
193131
194- let response = match & profile_archive {
195- ProfileArchive :: CompressedInMemory ( data) => {
132+ let response = match & profile_archive. content {
133+ ProfileArchiveContent :: CompressedInMemory { data } => {
196134 // Use regular client with retry middleware for compressed data
197- let request = REQUEST_CLIENT
135+ let mut request = REQUEST_CLIENT
198136 . put ( upload_data. upload_url . clone ( ) )
199137 . header ( "Content-Type" , "application/x-tar" )
200138 . header ( "Content-Length" , archive_size)
201- . header ( "Content-MD5" , archive_hash)
202- . header ( "Content-Encoding" , "gzip" ) ;
139+ . header ( "Content-MD5" , archive_hash) ;
140+
141+ if let Some ( encoding) = profile_archive. content . encoding ( ) {
142+ request = request. header ( "Content-Encoding" , encoding) ;
143+ }
203144
204145 request. body ( data. clone ( ) ) . send ( ) . await ?
205146 }
206- ProfileArchive :: UncompressedOnDisk ( path) => {
147+ ProfileArchiveContent :: UncompressedOnDisk { path } => {
207148 // Use streaming client without retry middleware for file streams
208149 let file = File :: open ( path)
209150 . await
@@ -248,21 +189,15 @@ pub async fn upload(
248189 run_data : & RunData ,
249190 executor_name : ExecutorName ,
250191) -> Result < UploadResult > {
251- let ( profile_archive, archive_hash) =
252- create_profile_archive ( run_data, executor_name. clone ( ) ) . await ?;
192+ let profile_archive = create_profile_archive ( run_data, executor_name. clone ( ) ) . await ?;
253193
254194 debug ! (
255195 "Run Environment provider detected: {:?}" ,
256196 provider. get_run_environment( )
257197 ) ;
258198
259- let upload_metadata = provider. get_upload_metadata (
260- config,
261- system_info,
262- & archive_hash,
263- profile_archive. to_content_encoding ( ) ,
264- executor_name,
265- ) ?;
199+ let upload_metadata =
200+ provider. get_upload_metadata ( config, system_info, & profile_archive, executor_name) ?;
266201 debug ! ( "Upload metadata: {upload_metadata:#?}" ) ;
267202 info ! (
268203 "Linked repository: {}\n " ,
@@ -283,8 +218,11 @@ pub async fn upload(
283218 debug ! ( "runId: {}" , upload_data. run_id) ;
284219
285220 info ! ( "Uploading performance data..." ) ;
286- debug ! ( "Uploading {} bytes..." , profile_archive. size( ) . await ?) ;
287- upload_profile_archive ( & upload_data, profile_archive, & archive_hash) . await ?;
221+ debug ! (
222+ "Uploading {} bytes..." ,
223+ profile_archive. content. size( ) . await ?
224+ ) ;
225+ upload_profile_archive ( & upload_data, profile_archive) . await ?;
288226 info ! ( "Performance data uploaded" ) ;
289227
290228 Ok ( UploadResult {
0 commit comments