@@ -341,30 +341,81 @@ pub struct ConvertedLayerInfo {
341
341
pub media_type : oci_spec:: image:: MediaType ,
342
342
}
343
343
344
- // Consumes an iterable and tries to convert it to a fixed-size array. Returns Ok([T; N]) if the
345
- // number of items in the iterable was correct, else an error describing the mismatch.
346
- fn fixed_from_iterable < T , const N : usize > (
347
- iterable : impl IntoIterator < IntoIter : FusedIterator , Item = T > ,
348
- ) -> Result < [ T ; N ] > {
349
- let mut iter = iterable. into_iter ( ) ;
350
- // We make use of the fact that [_; N].map() returns [_; N]. That makes this a bit more
351
- // awkward than it would otherwise be, but it's not too bad.
352
- let collected = [ ( ) ; N ] . map ( |_| iter. next ( ) ) ;
353
- // Count the Some() in `collected` plus leftovers in the iter.
354
- let actual = collected. iter ( ) . flatten ( ) . count ( ) + iter. count ( ) ;
355
- if actual == N {
356
- // SAFETY: This is a fused iter, so all N items are in our array
357
- Ok ( collected. map ( Option :: unwrap) )
358
- } else {
359
- let type_name = std:: any:: type_name :: < T > ( ) ;
360
- let basename = type_name
361
- . rsplit_once ( "::" )
362
- . map ( |( _path, name) | name)
363
- . unwrap_or ( type_name) ;
364
-
365
- Err ( Error :: Other (
366
- format ! ( "Expected {N} {basename} but got {actual}" ) . into ( ) ,
367
- ) )
344
+ /// A single fd; requires invoking FinishPipe
345
+ #[ derive( Debug ) ]
346
+ struct FinishPipe {
347
+ pipeid : PipeId ,
348
+ datafd : OwnedFd ,
349
+ }
350
+
351
+ /// There is a data FD and an error FD. The error FD will be JSON.
352
+ #[ derive( Debug ) ]
353
+ struct DualFds {
354
+ datafd : OwnedFd ,
355
+ errfd : OwnedFd ,
356
+ }
357
+
358
+ /// Helper trait for parsing the pipeid and/or file descriptors of a reply
359
+ trait FromReplyFds : Send + ' static
360
+ where
361
+ Self : Sized ,
362
+ {
363
+ fn from_reply (
364
+ iterable : impl IntoIterator < IntoIter : FusedIterator , Item = OwnedFd > ,
365
+ pipeid : u32 ,
366
+ ) -> Result < Self > ;
367
+ }
368
+
369
+ /// No file descriptors or pipeid expected
370
+ impl FromReplyFds for ( ) {
371
+ fn from_reply ( fds : impl IntoIterator < Item = OwnedFd > , pipeid : u32 ) -> Result < Self > {
372
+ if fds. into_iter ( ) . next ( ) . is_some ( ) {
373
+ return Err ( Error :: Other ( "expected no fds" . into ( ) ) ) ;
374
+ }
375
+ if pipeid != 0 {
376
+ return Err ( Error :: Other ( "unexpected pipeid" . into ( ) ) ) ;
377
+ }
378
+ Ok ( ( ) )
379
+ }
380
+ }
381
+
382
+ /// A FinishPipe instance
383
+ impl FromReplyFds for FinishPipe {
384
+ fn from_reply ( fds : impl IntoIterator < Item = OwnedFd > , pipeid : u32 ) -> Result < Self > {
385
+ let mut fds = fds. into_iter ( ) ;
386
+ let Some ( first_fd) = fds. next ( ) else {
387
+ return Err ( Error :: Other ( "Expected fd for FinishPipe" . into ( ) ) ) ;
388
+ } ;
389
+ if fds. next ( ) . is_some ( ) {
390
+ return Err ( Error :: Other ( "More than one fd for FinishPipe" . into ( ) ) ) ;
391
+ }
392
+ let Some ( pipeid) = PipeId :: try_new ( pipeid) else {
393
+ return Err ( Error :: Other ( "Expected pipeid for FinishPipe" . into ( ) ) ) ;
394
+ } ;
395
+ Ok ( Self {
396
+ pipeid,
397
+ datafd : first_fd,
398
+ } )
399
+ }
400
+ }
401
+
402
+ /// A DualFds instance
403
+ impl FromReplyFds for DualFds {
404
+ fn from_reply ( fds : impl IntoIterator < Item = OwnedFd > , pipeid : u32 ) -> Result < Self > {
405
+ let mut fds = fds. into_iter ( ) ;
406
+ let Some ( datafd) = fds. next ( ) else {
407
+ return Err ( Error :: Other ( "Expected data fd for DualFds" . into ( ) ) ) ;
408
+ } ;
409
+ let Some ( errfd) = fds. next ( ) else {
410
+ return Err ( Error :: Other ( "Expected err fd for DualFds" . into ( ) ) ) ;
411
+ } ;
412
+ if fds. next ( ) . is_some ( ) {
413
+ return Err ( Error :: Other ( "More than two fds for DualFds" . into ( ) ) ) ;
414
+ }
415
+ if pipeid != 0 {
416
+ return Err ( Error :: Other ( "Unexpected pipeid with DualFds" . into ( ) ) ) ;
417
+ }
418
+ Ok ( Self { datafd, errfd } )
368
419
}
369
420
}
370
421
@@ -404,7 +455,7 @@ impl ImageProxy {
404
455
} ;
405
456
406
457
// Verify semantic version
407
- let ( protover, [ ] , [ ] ) : ( String , _ , _ ) = r. impl_request ( "Initialize" , [ ( ) ; 0 ] ) . await ?;
458
+ let ( protover, _ ) : ( String , ( ) ) = r. impl_request ( "Initialize" , [ ( ) ; 0 ] ) . await ?;
408
459
tracing:: debug!( "Remote protocol version: {protover}" ) ;
409
460
let protover = semver:: Version :: parse ( protover. as_str ( ) ) ?;
410
461
// Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
@@ -420,14 +471,10 @@ impl ImageProxy {
420
471
Ok ( r)
421
472
}
422
473
423
- async fn impl_request_raw <
424
- T : serde:: de:: DeserializeOwned + Send + ' static ,
425
- const N : usize ,
426
- const M : usize ,
427
- > (
474
+ async fn impl_request_raw < T : serde:: de:: DeserializeOwned + Send + ' static , F : FromReplyFds > (
428
475
sockfd : Arc < Mutex < OwnedFd > > ,
429
476
req : Request ,
430
- ) -> Result < ( T , [ OwnedFd ; N ] , [ PipeId ; M ] ) > {
477
+ ) -> Result < ( T , F ) > {
431
478
tracing:: trace!( "sending request {}" , req. method. as_str( ) ) ;
432
479
// TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
433
480
let r = tokio:: task:: spawn_blocking ( move || {
@@ -463,11 +510,8 @@ impl ImageProxy {
463
510
error : reply. error . into ( ) ,
464
511
} ) ;
465
512
}
466
- Ok ( (
467
- serde_json:: from_value ( reply. value ) ?,
468
- fixed_from_iterable ( fdret) ?,
469
- fixed_from_iterable ( PipeId :: try_new ( reply. pipeid ) ) ?,
470
- ) )
513
+ let fds = FromReplyFds :: from_reply ( fdret, reply. pipeid ) ?;
514
+ Ok ( ( serde_json:: from_value ( reply. value ) ?, fds) )
471
515
} )
472
516
. await
473
517
. map_err ( |e| Error :: Other ( e. to_string ( ) . into ( ) ) ) ??;
@@ -476,15 +520,11 @@ impl ImageProxy {
476
520
}
477
521
478
522
#[ instrument( skip( args) ) ]
479
- async fn impl_request <
480
- T : serde:: de:: DeserializeOwned + Send + ' static ,
481
- const N : usize ,
482
- const M : usize ,
483
- > (
523
+ async fn impl_request < T : serde:: de:: DeserializeOwned + Send + ' static , F : FromReplyFds > (
484
524
& self ,
485
525
method : & str ,
486
526
args : impl IntoIterator < Item = impl Into < serde_json:: Value > > ,
487
- ) -> Result < ( T , [ OwnedFd ; N ] , [ PipeId ; M ] ) > {
527
+ ) -> Result < ( T , F ) > {
488
528
let req = Self :: impl_request_raw ( Arc :: clone ( & self . sockfd ) , Request :: new ( method, args) ) ;
489
529
let mut childwait = self . childwait . lock ( ) . await ;
490
530
tokio:: select! {
@@ -500,21 +540,21 @@ impl ImageProxy {
500
540
#[ instrument]
501
541
async fn finish_pipe ( & self , pipeid : PipeId ) -> Result < ( ) > {
502
542
tracing:: debug!( "closing pipe" ) ;
503
- let ( r, [ ] , [ ] ) = self . impl_request ( "FinishPipe" , [ pipeid. 0 . get ( ) ] ) . await ?;
543
+ let ( r, ( ) ) = self . impl_request ( "FinishPipe" , [ pipeid. 0 . get ( ) ] ) . await ?;
504
544
Ok ( r)
505
545
}
506
546
507
547
#[ instrument]
508
548
pub async fn open_image ( & self , imgref : & str ) -> Result < OpenedImage > {
509
549
tracing:: debug!( "opening image" ) ;
510
- let ( imgid, [ ] , [ ] ) = self . impl_request ( "OpenImage" , [ imgref] ) . await ?;
550
+ let ( imgid, ( ) ) = self . impl_request ( "OpenImage" , [ imgref] ) . await ?;
511
551
Ok ( OpenedImage ( imgid) )
512
552
}
513
553
514
554
#[ instrument]
515
555
pub async fn open_image_optional ( & self , imgref : & str ) -> Result < Option < OpenedImage > > {
516
556
tracing:: debug!( "opening image" ) ;
517
- let ( imgid, [ ] , [ ] ) = self . impl_request ( "OpenImageOptional" , [ imgref] ) . await ?;
557
+ let ( imgid, ( ) ) = self . impl_request ( "OpenImageOptional" , [ imgref] ) . await ?;
518
558
if imgid == 0 {
519
559
Ok ( None )
520
560
} else {
@@ -525,16 +565,16 @@ impl ImageProxy {
525
565
#[ instrument]
526
566
pub async fn close_image ( & self , img : & OpenedImage ) -> Result < ( ) > {
527
567
tracing:: debug!( "closing image" ) ;
528
- let ( r, [ ] , [ ] ) = self . impl_request ( "CloseImage" , [ img. 0 ] ) . await ?;
568
+ let ( r, ( ) ) = self . impl_request ( "CloseImage" , [ img. 0 ] ) . await ?;
529
569
Ok ( r)
530
570
}
531
571
532
- async fn read_all_fd ( & self , datafd : OwnedFd , pipeid : PipeId ) -> Result < Vec < u8 > > {
533
- let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( datafd) ) ;
572
+ async fn read_finish_pipe ( & self , pipe : FinishPipe ) -> Result < Vec < u8 > > {
573
+ let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( pipe . datafd ) ) ;
534
574
let mut fd = tokio:: io:: BufReader :: new ( fd) ;
535
575
let mut r = Vec :: new ( ) ;
536
576
let reader = fd. read_to_end ( & mut r) ;
537
- let ( nbytes, finish) = tokio:: join!( reader, self . finish_pipe( pipeid) ) ;
577
+ let ( nbytes, finish) = tokio:: join!( reader, self . finish_pipe( pipe . pipeid) ) ;
538
578
finish?;
539
579
assert_eq ! ( nbytes?, r. len( ) ) ;
540
580
Ok ( r)
@@ -544,8 +584,8 @@ impl ImageProxy {
544
584
/// The original digest of the unconverted manifest is also returned.
545
585
/// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
546
586
pub async fn fetch_manifest_raw_oci ( & self , img : & OpenedImage ) -> Result < ( String , Vec < u8 > ) > {
547
- let ( digest, [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetManifest" , [ img. 0 ] ) . await ?;
548
- Ok ( ( digest, self . read_all_fd ( datafd , pipeid ) . await ?) )
587
+ let ( digest, pipefd ) = self . impl_request ( "GetManifest" , [ img. 0 ] ) . await ?;
588
+ Ok ( ( digest, self . read_finish_pipe ( pipefd ) . await ?) )
549
589
}
550
590
551
591
/// Fetch the manifest.
@@ -562,8 +602,8 @@ impl ImageProxy {
562
602
/// Fetch the config.
563
603
/// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
564
604
pub async fn fetch_config_raw ( & self , img : & OpenedImage ) -> Result < Vec < u8 > > {
565
- let ( ( ) , [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetFullConfig" , [ img. 0 ] ) . await ?;
566
- self . read_all_fd ( datafd , pipeid ) . await
605
+ let ( ( ) , pipe ) = self . impl_request ( "GetFullConfig" , [ img. 0 ] ) . await ?;
606
+ self . read_finish_pipe ( pipe ) . await
567
607
}
568
608
569
609
/// Fetch the config.
@@ -600,11 +640,11 @@ impl ImageProxy {
600
640
tracing:: debug!( "fetching blob" ) ;
601
641
let args: Vec < serde_json:: Value > =
602
642
vec ! [ img. 0 . into( ) , digest. to_string( ) . into( ) , size. into( ) ] ;
603
- let ( bloblen, [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetBlob" , args) . await ?;
643
+ let ( bloblen, pipe ) : ( u64 , FinishPipe ) = self . impl_request ( "GetBlob" , args) . await ?;
604
644
let _: u64 = bloblen;
605
- let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( datafd) ) ;
645
+ let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( pipe . datafd ) ) ;
606
646
let fd = tokio:: io:: BufReader :: new ( fd) ;
607
- let finish = Box :: pin ( self . finish_pipe ( pipeid) ) ;
647
+ let finish = Box :: pin ( self . finish_pipe ( pipe . pipeid ) ) ;
608
648
Ok ( ( fd, finish) )
609
649
}
610
650
@@ -649,9 +689,9 @@ impl ImageProxy {
649
689
) > {
650
690
tracing:: debug!( "fetching blob" ) ;
651
691
let args: Vec < serde_json:: Value > = vec ! [ img. 0 . into( ) , digest. to_string( ) . into( ) ] ;
652
- let ( bloblen, [ datafd , errfd ] , [ ] ) = self . impl_request ( "GetRawBlob" , args) . await ?;
653
- let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( datafd) ) ;
654
- let err = Self :: read_blob_error ( errfd) . boxed ( ) ;
692
+ let ( bloblen, fds ) : ( u64 , DualFds ) = self . impl_request ( "GetRawBlob" , args) . await ?;
693
+ let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( fds . datafd ) ) ;
694
+ let err = Self :: read_blob_error ( fds . errfd ) . boxed ( ) ;
655
695
Ok ( ( bloblen, fd, err) )
656
696
}
657
697
@@ -677,14 +717,14 @@ impl ImageProxy {
677
717
) -> Result < Option < Vec < ConvertedLayerInfo > > > {
678
718
tracing:: debug!( "Getting layer info" ) ;
679
719
if layer_info_piped_proto_version ( ) . matches ( & self . protover ) {
680
- let ( ( ) , [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetLayerInfoPiped" , [ img. 0 ] ) . await ?;
681
- let buf = self . read_all_fd ( datafd , pipeid ) . await ?;
720
+ let ( ( ) , pipe ) = self . impl_request ( "GetLayerInfoPiped" , [ img. 0 ] ) . await ?;
721
+ let buf = self . read_finish_pipe ( pipe ) . await ?;
682
722
return Ok ( Some ( serde_json:: from_slice ( & buf) ?) ) ;
683
723
}
684
724
if !layer_info_proto_version ( ) . matches ( & self . protover ) {
685
725
return Ok ( None ) ;
686
726
}
687
- let ( layers, [ ] , [ ] ) = self . impl_request ( "GetLayerInfo" , [ img. 0 ] ) . await ?;
727
+ let ( layers, ( ) ) = self . impl_request ( "GetLayerInfo" , [ img. 0 ] ) . await ?;
688
728
Ok ( Some ( layers) )
689
729
}
690
730
@@ -892,31 +932,15 @@ mod tests {
892
932
memfd_create ( c"test-fd" , MemfdFlags :: CLOEXEC ) . unwrap ( )
893
933
}
894
934
895
- fn fds_and_pipeid < const N : usize , const M : usize > (
896
- fds : impl IntoIterator < IntoIter : FusedIterator , Item = OwnedFd > ,
897
- pipeid : u32 ,
898
- ) -> Result < ( [ OwnedFd ; N ] , [ PipeId ; M ] ) > {
899
- Ok ( (
900
- fixed_from_iterable ( fds) ?,
901
- fixed_from_iterable ( PipeId :: try_new ( pipeid) ) ?,
902
- ) )
903
- }
904
-
905
- #[ test]
906
- fn test_new_from_raw_values_no_fds_no_pipeid ( ) {
907
- let ( [ ] , [ ] ) = fds_and_pipeid ( [ ] , 0 ) . unwrap ( ) ;
908
- }
909
-
910
935
#[ test]
911
936
fn test_new_from_raw_values_finish_pipe ( ) {
912
937
let datafd = create_dummy_fd ( ) ;
913
938
// Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
914
939
let raw_datafd_val = datafd. as_raw_fd ( ) ;
915
940
let fds = vec ! [ datafd] ;
916
- let pipeid = PipeId :: try_new ( 1 ) . unwrap ( ) ;
917
- let ( [ res_datafd] , [ res_pipeid] ) = fds_and_pipeid ( fds, pipeid. 0 . get ( ) ) . unwrap ( ) ;
918
- assert_eq ! ( res_pipeid, pipeid) ;
919
- assert_eq ! ( res_datafd. as_raw_fd( ) , raw_datafd_val) ;
941
+ let v = FinishPipe :: from_reply ( fds, 1 ) . unwrap ( ) ;
942
+ assert_eq ! ( v. pipeid. 0 . get( ) , 1 ) ;
943
+ assert_eq ! ( v. datafd. as_raw_fd( ) , raw_datafd_val) ;
920
944
}
921
945
922
946
#[ test]
@@ -926,18 +950,18 @@ mod tests {
926
950
let raw_datafd_val = datafd. as_raw_fd ( ) ;
927
951
let raw_errfd_val = errfd. as_raw_fd ( ) ;
928
952
let fds = vec ! [ datafd, errfd] ;
929
- let ( [ res_datafd , res_errfd ] , [ ] ) = fds_and_pipeid ( fds, 0 ) . unwrap ( ) ;
930
- assert_eq ! ( res_datafd . as_raw_fd( ) , raw_datafd_val) ;
931
- assert_eq ! ( res_errfd . as_raw_fd( ) , raw_errfd_val) ;
953
+ let v = DualFds :: from_reply ( fds, 0 ) . unwrap ( ) ;
954
+ assert_eq ! ( v . datafd . as_raw_fd( ) , raw_datafd_val) ;
955
+ assert_eq ! ( v . errfd . as_raw_fd( ) , raw_errfd_val) ;
932
956
}
933
957
934
958
#[ test]
935
959
fn test_new_from_raw_values_error_too_many_fds ( ) {
936
960
let fds = vec ! [ create_dummy_fd( ) , create_dummy_fd( ) , create_dummy_fd( ) ] ;
937
- match fds_and_pipeid ( fds, 0 ) {
938
- Ok ( ( [ datafd , errfd ] , [ ] ) ) => unreachable ! ( "{datafd:?} {errfd :?}" ) ,
961
+ match DualFds :: from_reply ( fds, 0 ) {
962
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
939
963
Err ( Error :: Other ( msg) ) => {
940
- assert_eq ! ( msg. as_ref( ) , "Expected 2 OwnedFd but got 3 " )
964
+ assert_eq ! ( msg. as_ref( ) , "More than two fds for DualFds " )
941
965
}
942
966
Err ( other) => unreachable ! ( "{other}" ) ,
943
967
}
@@ -946,10 +970,10 @@ mod tests {
946
970
#[ test]
947
971
fn test_new_from_raw_values_error_fd_with_zero_pipeid ( ) {
948
972
let fds = vec ! [ create_dummy_fd( ) ] ;
949
- match fds_and_pipeid ( fds, 0 ) {
950
- Ok ( ( [ datafd ] , [ pipeid ] ) ) => unreachable ! ( "{datafd:?} {pipeid :?}" ) ,
973
+ match FinishPipe :: from_reply ( fds, 0 ) {
974
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
951
975
Err ( Error :: Other ( msg) ) => {
952
- assert_eq ! ( msg. as_ref( ) , "Expected 1 PipeId but got 0 " )
976
+ assert_eq ! ( msg. as_ref( ) , "Expected pipeid for FinishPipe " )
953
977
}
954
978
Err ( other) => unreachable ! ( "{other}" ) ,
955
979
}
@@ -958,10 +982,10 @@ mod tests {
958
982
#[ test]
959
983
fn test_new_from_raw_values_error_pipeid_with_both_fds ( ) {
960
984
let fds = vec ! [ create_dummy_fd( ) , create_dummy_fd( ) ] ;
961
- match fds_and_pipeid ( fds, 1 ) {
962
- Ok ( ( [ datafd , errfd ] , [ ] ) ) => unreachable ! ( "{datafd:?} {errfd :?}" ) ,
985
+ match DualFds :: from_reply ( fds, 1 ) {
986
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
963
987
Err ( Error :: Other ( msg) ) => {
964
- assert_eq ! ( msg. as_ref( ) , "Expected 0 PipeId but got 1 " )
988
+ assert_eq ! ( msg. as_ref( ) , "Unexpected pipeid with DualFds " )
965
989
}
966
990
Err ( other) => unreachable ! ( "{other}" ) ,
967
991
}
@@ -970,10 +994,10 @@ mod tests {
970
994
#[ test]
971
995
fn test_new_from_raw_values_error_no_fd_with_pipeid ( ) {
972
996
let fds: Vec < OwnedFd > = vec ! [ ] ;
973
- match fds_and_pipeid ( fds, 1 ) {
974
- Ok ( ( [ datafd ] , [ pipeid ] ) ) => unreachable ! ( "{datafd:?} {pipeid :?}" ) ,
997
+ match FinishPipe :: from_reply ( fds, 1 ) {
998
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
975
999
Err ( Error :: Other ( msg) ) => {
976
- assert_eq ! ( msg. as_ref( ) , "Expected 1 OwnedFd but got 0 " )
1000
+ assert_eq ! ( msg. as_ref( ) , "Expected fd for FinishPipe " )
977
1001
}
978
1002
Err ( other) => unreachable ! ( "{other}" ) ,
979
1003
}
0 commit comments