@@ -341,30 +341,81 @@ pub struct ConvertedLayerInfo {
341
341
pub media_type : oci_spec:: image:: MediaType ,
342
342
}
343
343
344
- // Consumes an iterable and tries to convert it to a fixed-size array. Returns Ok([T; N]) if the
345
- // number of items in the iterable was correct, else an error describing the mismatch.
346
- fn fixed_from_iterable < T , const N : usize > (
347
- iterable : impl IntoIterator < IntoIter : FusedIterator , Item = T > ,
348
- ) -> Result < [ T ; N ] > {
349
- let mut iter = iterable. into_iter ( ) ;
350
- // We make use of the fact that [_; N].map() returns [_; N]. That makes this a bit more
351
- // awkward than it would otherwise be, but it's not too bad.
352
- let collected = [ ( ) ; N ] . map ( |_| iter. next ( ) ) ;
353
- // Count the Some() in `collected` plus leftovers in the iter.
354
- let actual = collected. iter ( ) . flatten ( ) . count ( ) + iter. count ( ) ;
355
- if actual == N {
356
- // SAFETY: This is a fused iter, so all N items are in our array
357
- Ok ( collected. map ( Option :: unwrap) )
358
- } else {
359
- let type_name = std:: any:: type_name :: < T > ( ) ;
360
- let basename = type_name
361
- . rsplit_once ( "::" )
362
- . map ( |( _path, name) | name)
363
- . unwrap_or ( type_name) ;
364
-
365
- Err ( Error :: Other (
366
- format ! ( "Expected {N} {basename} but got {actual}" ) . into ( ) ,
367
- ) )
344
+ /// A single fd; requires invoking FinishPipe
345
+ #[ derive( Debug ) ]
346
+ struct FinishPipe {
347
+ pipeid : PipeId ,
348
+ datafd : OwnedFd ,
349
+ }
350
+
351
+ /// There is a data FD and an error FD. The error FD will be JSON.
352
+ #[ derive( Debug ) ]
353
+ struct DualFds {
354
+ datafd : OwnedFd ,
355
+ errfd : OwnedFd ,
356
+ }
357
+
358
+ /// Helper trait for parsing the pipeid and/or file descriptors of a reply
359
+ trait FromReplyFds : Send + ' static
360
+ where
361
+ Self : Sized ,
362
+ {
363
+ fn from_reply (
364
+ iterable : impl IntoIterator < IntoIter : FusedIterator , Item = OwnedFd > ,
365
+ pipeid : u32 ,
366
+ ) -> Result < Self > ;
367
+ }
368
+
369
+ /// No file descriptors or pipeid expected
370
+ impl FromReplyFds for ( ) {
371
+ fn from_reply ( fds : impl IntoIterator < Item = OwnedFd > , pipeid : u32 ) -> Result < Self > {
372
+ if fds. into_iter ( ) . next ( ) . is_some ( ) {
373
+ return Err ( Error :: Other ( "expected no fds" . into ( ) ) ) ;
374
+ }
375
+ if pipeid != 0 {
376
+ return Err ( Error :: Other ( "unexpected pipeid" . into ( ) ) ) ;
377
+ }
378
+ Ok ( ( ) )
379
+ }
380
+ }
381
+
382
+ /// A FinishPipe instance
383
+ impl FromReplyFds for FinishPipe {
384
+ fn from_reply ( fds : impl IntoIterator < Item = OwnedFd > , pipeid : u32 ) -> Result < Self > {
385
+ let mut fds = fds. into_iter ( ) ;
386
+ let Some ( first_fd) = fds. next ( ) else {
387
+ return Err ( Error :: Other ( "Expected fd for FinishPipe" . into ( ) ) ) ;
388
+ } ;
389
+ if fds. next ( ) . is_some ( ) {
390
+ return Err ( Error :: Other ( "More than one fd for FinishPipe" . into ( ) ) ) ;
391
+ }
392
+ let Some ( pipeid) = PipeId :: try_new ( pipeid) else {
393
+ return Err ( Error :: Other ( "Expected pipeid for FinishPipe" . into ( ) ) ) ;
394
+ } ;
395
+ Ok ( Self {
396
+ pipeid,
397
+ datafd : first_fd,
398
+ } )
399
+ }
400
+ }
401
+
402
+ /// A DualFds instance
403
+ impl FromReplyFds for DualFds {
404
+ fn from_reply ( fds : impl IntoIterator < Item = OwnedFd > , pipeid : u32 ) -> Result < Self > {
405
+ let mut fds = fds. into_iter ( ) ;
406
+ let Some ( datafd) = fds. next ( ) else {
407
+ return Err ( Error :: Other ( "Expected data fd for DualFds" . into ( ) ) ) ;
408
+ } ;
409
+ let Some ( errfd) = fds. next ( ) else {
410
+ return Err ( Error :: Other ( "Expected err fd for DualFds" . into ( ) ) ) ;
411
+ } ;
412
+ if fds. next ( ) . is_some ( ) {
413
+ return Err ( Error :: Other ( "More than two fds for DualFds" . into ( ) ) ) ;
414
+ }
415
+ if pipeid != 0 {
416
+ return Err ( Error :: Other ( "Unexpected pipeid with DualFds" . into ( ) ) ) ;
417
+ }
418
+ Ok ( Self { datafd, errfd } )
368
419
}
369
420
}
370
421
@@ -404,7 +455,7 @@ impl ImageProxy {
404
455
} ;
405
456
406
457
// Verify semantic version
407
- let ( protover, [ ] , [ ] ) : ( String , _ , _ ) = r. impl_request ( "Initialize" , [ ( ) ; 0 ] ) . await ?;
458
+ let ( protover, _ ) : ( String , ( ) ) = r. impl_request ( "Initialize" , [ ( ) ; 0 ] ) . await ?;
408
459
tracing:: debug!( "Remote protocol version: {protover}" ) ;
409
460
let protover = semver:: Version :: parse ( protover. as_str ( ) ) ?;
410
461
// Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
@@ -420,14 +471,10 @@ impl ImageProxy {
420
471
Ok ( r)
421
472
}
422
473
423
- async fn impl_request_raw <
424
- T : serde:: de:: DeserializeOwned + Send + ' static ,
425
- const N : usize ,
426
- const M : usize ,
427
- > (
474
+ async fn impl_request_raw < T : serde:: de:: DeserializeOwned + Send + ' static , F : FromReplyFds > (
428
475
sockfd : Arc < Mutex < OwnedFd > > ,
429
476
req : Request ,
430
- ) -> Result < ( T , [ OwnedFd ; N ] , [ PipeId ; M ] ) > {
477
+ ) -> Result < ( T , F ) > {
431
478
tracing:: trace!( "sending request {}" , req. method. as_str( ) ) ;
432
479
// TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
433
480
let r = tokio:: task:: spawn_blocking ( move || {
@@ -464,11 +511,8 @@ impl ImageProxy {
464
511
error : reply. error . into ( ) ,
465
512
} ) ;
466
513
}
467
- Ok ( (
468
- serde_json:: from_value ( reply. value ) ?,
469
- fixed_from_iterable ( fdret) ?,
470
- fixed_from_iterable ( PipeId :: try_new ( reply. pipeid ) ) ?,
471
- ) )
514
+ let fds = FromReplyFds :: from_reply ( fdret, reply. pipeid ) ?;
515
+ Ok ( ( serde_json:: from_value ( reply. value ) ?, fds) )
472
516
} )
473
517
. await
474
518
. map_err ( |e| Error :: Other ( e. to_string ( ) . into ( ) ) ) ??;
@@ -477,15 +521,11 @@ impl ImageProxy {
477
521
}
478
522
479
523
#[ instrument( skip( args) ) ]
480
- async fn impl_request <
481
- T : serde:: de:: DeserializeOwned + Send + ' static ,
482
- const N : usize ,
483
- const M : usize ,
484
- > (
524
+ async fn impl_request < T : serde:: de:: DeserializeOwned + Send + ' static , F : FromReplyFds > (
485
525
& self ,
486
526
method : & str ,
487
527
args : impl IntoIterator < Item = impl Into < serde_json:: Value > > ,
488
- ) -> Result < ( T , [ OwnedFd ; N ] , [ PipeId ; M ] ) > {
528
+ ) -> Result < ( T , F ) > {
489
529
let req = Self :: impl_request_raw ( Arc :: clone ( & self . sockfd ) , Request :: new ( method, args) ) ;
490
530
let mut childwait = self . childwait . lock ( ) . await ;
491
531
tokio:: select! {
@@ -501,21 +541,21 @@ impl ImageProxy {
501
541
#[ instrument]
502
542
async fn finish_pipe ( & self , pipeid : PipeId ) -> Result < ( ) > {
503
543
tracing:: debug!( "closing pipe" ) ;
504
- let ( r, [ ] , [ ] ) = self . impl_request ( "FinishPipe" , [ pipeid. 0 . get ( ) ] ) . await ?;
544
+ let ( r, ( ) ) = self . impl_request ( "FinishPipe" , [ pipeid. 0 . get ( ) ] ) . await ?;
505
545
Ok ( r)
506
546
}
507
547
508
548
#[ instrument]
509
549
pub async fn open_image ( & self , imgref : & str ) -> Result < OpenedImage > {
510
550
tracing:: debug!( "opening image" ) ;
511
- let ( imgid, [ ] , [ ] ) = self . impl_request ( "OpenImage" , [ imgref] ) . await ?;
551
+ let ( imgid, ( ) ) = self . impl_request ( "OpenImage" , [ imgref] ) . await ?;
512
552
Ok ( OpenedImage ( imgid) )
513
553
}
514
554
515
555
#[ instrument]
516
556
pub async fn open_image_optional ( & self , imgref : & str ) -> Result < Option < OpenedImage > > {
517
557
tracing:: debug!( "opening image" ) ;
518
- let ( imgid, [ ] , [ ] ) = self . impl_request ( "OpenImageOptional" , [ imgref] ) . await ?;
558
+ let ( imgid, ( ) ) = self . impl_request ( "OpenImageOptional" , [ imgref] ) . await ?;
519
559
if imgid == 0 {
520
560
Ok ( None )
521
561
} else {
@@ -526,16 +566,16 @@ impl ImageProxy {
526
566
#[ instrument]
527
567
pub async fn close_image ( & self , img : & OpenedImage ) -> Result < ( ) > {
528
568
tracing:: debug!( "closing image" ) ;
529
- let ( r, [ ] , [ ] ) = self . impl_request ( "CloseImage" , [ img. 0 ] ) . await ?;
569
+ let ( r, ( ) ) = self . impl_request ( "CloseImage" , [ img. 0 ] ) . await ?;
530
570
Ok ( r)
531
571
}
532
572
533
- async fn read_all_fd ( & self , datafd : OwnedFd , pipeid : PipeId ) -> Result < Vec < u8 > > {
534
- let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( datafd) ) ;
573
+ async fn read_finish_pipe ( & self , pipe : FinishPipe ) -> Result < Vec < u8 > > {
574
+ let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( pipe . datafd ) ) ;
535
575
let mut fd = tokio:: io:: BufReader :: new ( fd) ;
536
576
let mut r = Vec :: new ( ) ;
537
577
let reader = fd. read_to_end ( & mut r) ;
538
- let ( nbytes, finish) = tokio:: join!( reader, self . finish_pipe( pipeid) ) ;
578
+ let ( nbytes, finish) = tokio:: join!( reader, self . finish_pipe( pipe . pipeid) ) ;
539
579
finish?;
540
580
assert_eq ! ( nbytes?, r. len( ) ) ;
541
581
Ok ( r)
@@ -545,8 +585,8 @@ impl ImageProxy {
545
585
/// The original digest of the unconverted manifest is also returned.
546
586
/// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
547
587
pub async fn fetch_manifest_raw_oci ( & self , img : & OpenedImage ) -> Result < ( String , Vec < u8 > ) > {
548
- let ( digest, [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetManifest" , [ img. 0 ] ) . await ?;
549
- Ok ( ( digest, self . read_all_fd ( datafd , pipeid ) . await ?) )
588
+ let ( digest, pipefd ) = self . impl_request ( "GetManifest" , [ img. 0 ] ) . await ?;
589
+ Ok ( ( digest, self . read_finish_pipe ( pipefd ) . await ?) )
550
590
}
551
591
552
592
/// Fetch the manifest.
@@ -563,8 +603,8 @@ impl ImageProxy {
563
603
/// Fetch the config.
564
604
/// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
565
605
pub async fn fetch_config_raw ( & self , img : & OpenedImage ) -> Result < Vec < u8 > > {
566
- let ( ( ) , [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetFullConfig" , [ img. 0 ] ) . await ?;
567
- self . read_all_fd ( datafd , pipeid ) . await
606
+ let ( ( ) , pipe ) = self . impl_request ( "GetFullConfig" , [ img. 0 ] ) . await ?;
607
+ self . read_finish_pipe ( pipe ) . await
568
608
}
569
609
570
610
/// Fetch the config.
@@ -601,11 +641,11 @@ impl ImageProxy {
601
641
tracing:: debug!( "fetching blob" ) ;
602
642
let args: Vec < serde_json:: Value > =
603
643
vec ! [ img. 0 . into( ) , digest. to_string( ) . into( ) , size. into( ) ] ;
604
- let ( bloblen, [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetBlob" , args) . await ?;
644
+ let ( bloblen, pipe ) : ( u64 , FinishPipe ) = self . impl_request ( "GetBlob" , args) . await ?;
605
645
let _: u64 = bloblen;
606
- let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( datafd) ) ;
646
+ let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( pipe . datafd ) ) ;
607
647
let fd = tokio:: io:: BufReader :: new ( fd) ;
608
- let finish = Box :: pin ( self . finish_pipe ( pipeid) ) ;
648
+ let finish = Box :: pin ( self . finish_pipe ( pipe . pipeid ) ) ;
609
649
Ok ( ( fd, finish) )
610
650
}
611
651
@@ -650,9 +690,9 @@ impl ImageProxy {
650
690
) > {
651
691
tracing:: debug!( "fetching blob" ) ;
652
692
let args: Vec < serde_json:: Value > = vec ! [ img. 0 . into( ) , digest. to_string( ) . into( ) ] ;
653
- let ( bloblen, [ datafd , errfd ] , [ ] ) = self . impl_request ( "GetRawBlob" , args) . await ?;
654
- let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( datafd) ) ;
655
- let err = Self :: read_blob_error ( errfd) . boxed ( ) ;
693
+ let ( bloblen, fds ) : ( u64 , DualFds ) = self . impl_request ( "GetRawBlob" , args) . await ?;
694
+ let fd = tokio:: fs:: File :: from_std ( std:: fs:: File :: from ( fds . datafd ) ) ;
695
+ let err = Self :: read_blob_error ( fds . errfd ) . boxed ( ) ;
656
696
Ok ( ( bloblen, fd, err) )
657
697
}
658
698
@@ -678,14 +718,14 @@ impl ImageProxy {
678
718
) -> Result < Option < Vec < ConvertedLayerInfo > > > {
679
719
tracing:: debug!( "Getting layer info" ) ;
680
720
if layer_info_piped_proto_version ( ) . matches ( & self . protover ) {
681
- let ( ( ) , [ datafd ] , [ pipeid ] ) = self . impl_request ( "GetLayerInfoPiped" , [ img. 0 ] ) . await ?;
682
- let buf = self . read_all_fd ( datafd , pipeid ) . await ?;
721
+ let ( ( ) , pipe ) = self . impl_request ( "GetLayerInfoPiped" , [ img. 0 ] ) . await ?;
722
+ let buf = self . read_finish_pipe ( pipe ) . await ?;
683
723
return Ok ( Some ( serde_json:: from_slice ( & buf) ?) ) ;
684
724
}
685
725
if !layer_info_proto_version ( ) . matches ( & self . protover ) {
686
726
return Ok ( None ) ;
687
727
}
688
- let ( layers, [ ] , [ ] ) = self . impl_request ( "GetLayerInfo" , [ img. 0 ] ) . await ?;
728
+ let ( layers, ( ) ) = self . impl_request ( "GetLayerInfo" , [ img. 0 ] ) . await ?;
689
729
Ok ( Some ( layers) )
690
730
}
691
731
@@ -893,31 +933,15 @@ mod tests {
893
933
memfd_create ( c"test-fd" , MemfdFlags :: CLOEXEC ) . unwrap ( )
894
934
}
895
935
896
- fn fds_and_pipeid < const N : usize , const M : usize > (
897
- fds : impl IntoIterator < IntoIter : FusedIterator , Item = OwnedFd > ,
898
- pipeid : u32 ,
899
- ) -> Result < ( [ OwnedFd ; N ] , [ PipeId ; M ] ) > {
900
- Ok ( (
901
- fixed_from_iterable ( fds) ?,
902
- fixed_from_iterable ( PipeId :: try_new ( pipeid) ) ?,
903
- ) )
904
- }
905
-
906
- #[ test]
907
- fn test_new_from_raw_values_no_fds_no_pipeid ( ) {
908
- let ( [ ] , [ ] ) = fds_and_pipeid ( [ ] , 0 ) . unwrap ( ) ;
909
- }
910
-
911
936
#[ test]
912
937
fn test_new_from_raw_values_finish_pipe ( ) {
913
938
let datafd = create_dummy_fd ( ) ;
914
939
// Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
915
940
let raw_datafd_val = datafd. as_raw_fd ( ) ;
916
941
let fds = vec ! [ datafd] ;
917
- let pipeid = PipeId :: try_new ( 1 ) . unwrap ( ) ;
918
- let ( [ res_datafd] , [ res_pipeid] ) = fds_and_pipeid ( fds, pipeid. 0 . get ( ) ) . unwrap ( ) ;
919
- assert_eq ! ( res_pipeid, pipeid) ;
920
- assert_eq ! ( res_datafd. as_raw_fd( ) , raw_datafd_val) ;
942
+ let v = FinishPipe :: from_reply ( fds, 1 ) . unwrap ( ) ;
943
+ assert_eq ! ( v. pipeid. 0 . get( ) , 1 ) ;
944
+ assert_eq ! ( v. datafd. as_raw_fd( ) , raw_datafd_val) ;
921
945
}
922
946
923
947
#[ test]
@@ -927,18 +951,18 @@ mod tests {
927
951
let raw_datafd_val = datafd. as_raw_fd ( ) ;
928
952
let raw_errfd_val = errfd. as_raw_fd ( ) ;
929
953
let fds = vec ! [ datafd, errfd] ;
930
- let ( [ res_datafd , res_errfd ] , [ ] ) = fds_and_pipeid ( fds, 0 ) . unwrap ( ) ;
931
- assert_eq ! ( res_datafd . as_raw_fd( ) , raw_datafd_val) ;
932
- assert_eq ! ( res_errfd . as_raw_fd( ) , raw_errfd_val) ;
954
+ let v = DualFds :: from_reply ( fds, 0 ) . unwrap ( ) ;
955
+ assert_eq ! ( v . datafd . as_raw_fd( ) , raw_datafd_val) ;
956
+ assert_eq ! ( v . errfd . as_raw_fd( ) , raw_errfd_val) ;
933
957
}
934
958
935
959
#[ test]
936
960
fn test_new_from_raw_values_error_too_many_fds ( ) {
937
961
let fds = vec ! [ create_dummy_fd( ) , create_dummy_fd( ) , create_dummy_fd( ) ] ;
938
- match fds_and_pipeid ( fds, 0 ) {
939
- Ok ( ( [ datafd , errfd ] , [ ] ) ) => unreachable ! ( "{datafd:?} {errfd :?}" ) ,
962
+ match DualFds :: from_reply ( fds, 0 ) {
963
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
940
964
Err ( Error :: Other ( msg) ) => {
941
- assert_eq ! ( msg. as_ref( ) , "Expected 2 OwnedFd but got 3 " )
965
+ assert_eq ! ( msg. as_ref( ) , "More than two fds for DualFds " )
942
966
}
943
967
Err ( other) => unreachable ! ( "{other}" ) ,
944
968
}
@@ -947,10 +971,10 @@ mod tests {
947
971
#[ test]
948
972
fn test_new_from_raw_values_error_fd_with_zero_pipeid ( ) {
949
973
let fds = vec ! [ create_dummy_fd( ) ] ;
950
- match fds_and_pipeid ( fds, 0 ) {
951
- Ok ( ( [ datafd ] , [ pipeid ] ) ) => unreachable ! ( "{datafd:?} {pipeid :?}" ) ,
974
+ match FinishPipe :: from_reply ( fds, 0 ) {
975
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
952
976
Err ( Error :: Other ( msg) ) => {
953
- assert_eq ! ( msg. as_ref( ) , "Expected 1 PipeId but got 0 " )
977
+ assert_eq ! ( msg. as_ref( ) , "Expected pipeid for FinishPipe " )
954
978
}
955
979
Err ( other) => unreachable ! ( "{other}" ) ,
956
980
}
@@ -959,10 +983,10 @@ mod tests {
959
983
#[ test]
960
984
fn test_new_from_raw_values_error_pipeid_with_both_fds ( ) {
961
985
let fds = vec ! [ create_dummy_fd( ) , create_dummy_fd( ) ] ;
962
- match fds_and_pipeid ( fds, 1 ) {
963
- Ok ( ( [ datafd , errfd ] , [ ] ) ) => unreachable ! ( "{datafd:?} {errfd :?}" ) ,
986
+ match DualFds :: from_reply ( fds, 1 ) {
987
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
964
988
Err ( Error :: Other ( msg) ) => {
965
- assert_eq ! ( msg. as_ref( ) , "Expected 0 PipeId but got 1 " )
989
+ assert_eq ! ( msg. as_ref( ) , "Unexpected pipeid with DualFds " )
966
990
}
967
991
Err ( other) => unreachable ! ( "{other}" ) ,
968
992
}
@@ -971,10 +995,10 @@ mod tests {
971
995
#[ test]
972
996
fn test_new_from_raw_values_error_no_fd_with_pipeid ( ) {
973
997
let fds: Vec < OwnedFd > = vec ! [ ] ;
974
- match fds_and_pipeid ( fds, 1 ) {
975
- Ok ( ( [ datafd ] , [ pipeid ] ) ) => unreachable ! ( "{datafd:?} {pipeid :?}" ) ,
998
+ match FinishPipe :: from_reply ( fds, 1 ) {
999
+ Ok ( v ) => unreachable ! ( "{v :?}" ) ,
976
1000
Err ( Error :: Other ( msg) ) => {
977
- assert_eq ! ( msg. as_ref( ) , "Expected 1 OwnedFd but got 0 " )
1001
+ assert_eq ! ( msg. as_ref( ) , "Expected fd for FinishPipe " )
978
1002
}
979
1003
Err ( other) => unreachable ! ( "{other}" ) ,
980
1004
}
0 commit comments