@@ -138,26 +138,29 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str {
138
138
}
139
139
}
140
140
141
- /// Write container fetch progress to standard output.
142
- async fn handle_layer_progress_print (
143
- mut layers : tokio:: sync:: mpsc:: Receiver < ostree_container:: store:: ImportProgress > ,
144
- mut layer_bytes : tokio:: sync:: watch:: Receiver < Option < ostree_container:: store:: LayerProgress > > ,
141
+ /// Configuration for layer progress printing
142
+ struct LayerProgressConfig {
143
+ layers : tokio:: sync:: mpsc:: Receiver < ostree_container:: store:: ImportProgress > ,
144
+ layer_bytes : tokio:: sync:: watch:: Receiver < Option < ostree_container:: store:: LayerProgress > > ,
145
145
digest : Box < str > ,
146
146
n_layers_to_fetch : usize ,
147
147
layers_total : usize ,
148
148
bytes_to_download : u64 ,
149
149
bytes_total : u64 ,
150
150
prog : ProgressWriter ,
151
151
quiet : bool ,
152
- ) -> ProgressWriter {
152
+ }
153
+
154
+ /// Write container fetch progress to standard output.
155
+ async fn handle_layer_progress_print ( mut config : LayerProgressConfig ) -> ProgressWriter {
153
156
let start = std:: time:: Instant :: now ( ) ;
154
157
let mut total_read = 0u64 ;
155
158
let bar = indicatif:: MultiProgress :: new ( ) ;
156
- if quiet {
159
+ if config . quiet {
157
160
bar. set_draw_target ( indicatif:: ProgressDrawTarget :: hidden ( ) ) ;
158
161
}
159
162
let layers_bar = bar. add ( indicatif:: ProgressBar :: new (
160
- n_layers_to_fetch. try_into ( ) . unwrap ( ) ,
163
+ config . n_layers_to_fetch . try_into ( ) . unwrap ( ) ,
161
164
) ) ;
162
165
let byte_bar = bar. add ( indicatif:: ProgressBar :: new ( 0 ) ) ;
163
166
// let byte_bar = indicatif::ProgressBar::new(0);
@@ -185,7 +188,7 @@ async fn handle_layer_progress_print(
185
188
tokio:: select! {
186
189
// Always handle layer changes first.
187
190
biased;
188
- layer = layers. recv( ) => {
191
+ layer = config . layers. recv( ) => {
189
192
if let Some ( l) = layer {
190
193
let layer = descriptor_of_progress( & l) ;
191
194
let layer_type = prefix_of_progress( & l) ;
@@ -213,16 +216,16 @@ async fn handle_layer_progress_print(
213
216
// Emit an event where bytes == total to signal completion.
214
217
subtask. bytes = layer_size;
215
218
subtasks. push( subtask. clone( ) ) ;
216
- prog. send( Event :: ProgressBytes {
219
+ config . prog. send( Event :: ProgressBytes {
217
220
task: "pulling" . into( ) ,
218
- description: format!( "Pulling Image: {digest}" ) . into( ) ,
219
- id: ( * digest) . into( ) ,
220
- bytes_cached: bytes_total - bytes_to_download,
221
+ description: format!( "Pulling Image: {}" , config . digest ) . into( ) ,
222
+ id: ( * config . digest) . into( ) ,
223
+ bytes_cached: config . bytes_total - config . bytes_to_download,
221
224
bytes: total_read,
222
- bytes_total: bytes_to_download,
223
- steps_cached: ( layers_total - n_layers_to_fetch) as u64 ,
225
+ bytes_total: config . bytes_to_download,
226
+ steps_cached: ( config . layers_total - config . n_layers_to_fetch) as u64 ,
224
227
steps: layers_bar. position( ) ,
225
- steps_total: n_layers_to_fetch as u64 ,
228
+ steps_total: config . n_layers_to_fetch as u64 ,
226
229
subtasks: subtasks. clone( ) ,
227
230
} ) . await ;
228
231
}
@@ -231,28 +234,28 @@ async fn handle_layer_progress_print(
231
234
break
232
235
} ;
233
236
} ,
234
- r = layer_bytes. changed( ) => {
237
+ r = config . layer_bytes. changed( ) => {
235
238
if r. is_err( ) {
236
239
// If the receiver is disconnected, then we're done
237
240
break
238
241
}
239
242
let bytes = {
240
- let bytes = layer_bytes. borrow_and_update( ) ;
243
+ let bytes = config . layer_bytes. borrow_and_update( ) ;
241
244
bytes. as_ref( ) . cloned( )
242
245
} ;
243
246
if let Some ( bytes) = bytes {
244
247
byte_bar. set_position( bytes. fetched) ;
245
248
subtask. bytes = byte_bar. position( ) ;
246
- prog. send_lossy( Event :: ProgressBytes {
249
+ config . prog. send_lossy( Event :: ProgressBytes {
247
250
task: "pulling" . into( ) ,
248
- description: format!( "Pulling Image: {digest}" ) . into( ) ,
249
- id: ( * digest) . into( ) ,
250
- bytes_cached: bytes_total - bytes_to_download,
251
+ description: format!( "Pulling Image: {}" , config . digest ) . into( ) ,
252
+ id: ( * config . digest) . into( ) ,
253
+ bytes_cached: config . bytes_total - config . bytes_to_download,
251
254
bytes: total_read + byte_bar. position( ) ,
252
- bytes_total: bytes_to_download,
253
- steps_cached: ( layers_total - n_layers_to_fetch) as u64 ,
255
+ bytes_total: config . bytes_to_download,
256
+ steps_cached: ( config . layers_total - config . n_layers_to_fetch) as u64 ,
254
257
steps: layers_bar. position( ) ,
255
- steps_total: n_layers_to_fetch as u64 ,
258
+ steps_total: config . n_layers_to_fetch as u64 ,
256
259
subtasks: subtasks. clone( ) . into_iter( ) . chain( [ subtask. clone( ) ] ) . collect( ) ,
257
260
} ) . await ;
258
261
}
@@ -280,25 +283,27 @@ async fn handle_layer_progress_print(
280
283
// Since the progress notifier closed, we know import has started
281
284
// use as a heuristic to begin import progress
282
285
// Cannot be lossy or it is dropped
283
- prog. send ( Event :: ProgressSteps {
284
- task : "importing" . into ( ) ,
285
- description : "Importing Image" . into ( ) ,
286
- id : ( * digest) . into ( ) ,
287
- steps_cached : 0 ,
288
- steps : 0 ,
289
- steps_total : 1 ,
290
- subtasks : [ SubTaskStep {
291
- subtask : "importing" . into ( ) ,
286
+ config
287
+ . prog
288
+ . send ( Event :: ProgressSteps {
289
+ task : "importing" . into ( ) ,
292
290
description : "Importing Image" . into ( ) ,
293
- id : "importing" . into ( ) ,
294
- completed : false ,
295
- } ]
296
- . into ( ) ,
297
- } )
298
- . await ;
291
+ id : ( * config. digest ) . into ( ) ,
292
+ steps_cached : 0 ,
293
+ steps : 0 ,
294
+ steps_total : 1 ,
295
+ subtasks : [ SubTaskStep {
296
+ subtask : "importing" . into ( ) ,
297
+ description : "Importing Image" . into ( ) ,
298
+ id : "importing" . into ( ) ,
299
+ completed : false ,
300
+ } ]
301
+ . into ( ) ,
302
+ } )
303
+ . await ;
299
304
300
305
// Return the writer
301
- prog
306
+ config . prog
302
307
}
303
308
304
309
/// Gather all bound images in all deployments, then prune the image store,
@@ -332,7 +337,7 @@ pub(crate) struct PreparedImportMeta {
332
337
}
333
338
334
339
pub ( crate ) enum PreparedPullResult {
335
- Ready ( PreparedImportMeta ) ,
340
+ Ready ( Box < PreparedImportMeta > ) ,
336
341
AlreadyPresent ( Box < ImageState > ) ,
337
342
}
338
343
@@ -372,7 +377,7 @@ pub(crate) async fn prepare_for_pull(
372
377
prep,
373
378
} ;
374
379
375
- Ok ( PreparedPullResult :: Ready ( prepared_image) )
380
+ Ok ( PreparedPullResult :: Ready ( Box :: new ( prepared_image) ) )
376
381
}
377
382
378
383
#[ context( "Pulling" ) ]
@@ -388,17 +393,17 @@ pub(crate) async fn pull_from_prepared(
388
393
let digest_imp = prepared_image. digest . clone ( ) ;
389
394
390
395
let printer = tokio:: task:: spawn ( async move {
391
- handle_layer_progress_print (
392
- layer_progress,
393
- layer_byte_progress,
394
- digest. as_ref ( ) . into ( ) ,
395
- prepared_image. n_layers_to_fetch ,
396
- prepared_image. layers_total ,
397
- prepared_image. bytes_to_fetch ,
398
- prepared_image. bytes_total ,
396
+ handle_layer_progress_print ( LayerProgressConfig {
397
+ layers : layer_progress,
398
+ layer_bytes : layer_byte_progress,
399
+ digest : digest . as_ref ( ) . into ( ) ,
400
+ n_layers_to_fetch : prepared_image. n_layers_to_fetch ,
401
+ layers_total : prepared_image. layers_total ,
402
+ bytes_to_download : prepared_image. bytes_to_fetch ,
403
+ bytes_total : prepared_image. bytes_total ,
399
404
prog,
400
405
quiet,
401
- )
406
+ } )
402
407
. await
403
408
} ) ;
404
409
let import = prepared_image. imp . import ( prepared_image. prep ) . await ;
@@ -444,7 +449,7 @@ pub(crate) async fn pull(
444
449
match prepare_for_pull ( repo, imgref, target_imgref) . await ? {
445
450
PreparedPullResult :: AlreadyPresent ( existing) => Ok ( existing) ,
446
451
PreparedPullResult :: Ready ( prepared_image_meta) => {
447
- Ok ( pull_from_prepared ( imgref, quiet, prog, prepared_image_meta) . await ?)
452
+ Ok ( pull_from_prepared ( imgref, quiet, prog, * prepared_image_meta) . await ?)
448
453
}
449
454
}
450
455
}
0 commit comments