Skip to content

Commit 8b34014

Browse files
vmagrometa-codesync[bot]
authored andcommitted
[antlir2][sendstream_parser] handle v2 sendstreams (and add more future-proofing)
Summary: In sendstream v2, `write` no longer has a separate length We also need to parse `encoded_write` (but very minimally - we don't actually need to get the data out in a usable fashion) Additionally, expand to parse unknown commands (that may appear in the future) into a generic variant that only has the header (which is stable) and ignores the data contained within the command. Test Plan: ``` ❯ buck2 test fbcode//antlir/antlir2/sendstream_parser: -- Buck UI: https://www.internalfb.com/buck2/bf045432-7b90-44ad-b144-71cfaff7406d Tests finished: Pass 4. Fail 0. Fatal 0. Skip 0. Build failure 0 ``` https://www.internalfb.com/intern/testinfra/testrun/17169973703982447 Reviewed By: vjt Differential Revision: D86239502 fbshipit-source-id: 8ca8b06926e459b871ca1d184081f4f37803edcf
1 parent 19f491a commit 8b34014

File tree

10 files changed

+541
-188
lines changed

10 files changed

+541
-188
lines changed

antlir/antlir2/sendstream_parser/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ rust_library(
2626
"hex",
2727
"nix",
2828
"nom",
29+
"num_enum",
2930
"serde",
3031
"thiserror",
3132
"tokio",

antlir/antlir2/sendstream_parser/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ futures = { version = "0.3.31", features = ["async-await", "compat"] }
2121
hex = { version = "0.4.3", features = ["alloc"] }
2222
nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }
2323
nom = "8"
24+
num_enum = "0.7.4"
2425
serde = { version = "1.0.219", features = ["derive", "rc"], optional = true }
2526
thiserror = "2.0.12"
2627
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }

antlir/antlir2/sendstream_parser/src/lib.rs

Lines changed: 113 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub type Result<R> = std::result::Result<R, Error>;
5757
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
5858
#[cfg_attr(feature = "serde", serde(untagged))]
5959
pub enum Command {
60+
Unknown(Unknown),
6061
Chmod(Chmod),
6162
Chown(Chown),
6263
Clone(Clone),
@@ -79,6 +80,7 @@ pub enum Command {
7980
UpdateExtent(UpdateExtent),
8081
Utimes(Utimes),
8182
Write(Write),
83+
EncodedWrite(EncodedWrite),
8284
}
8385

8486
impl Command {
@@ -87,6 +89,7 @@ impl Command {
8789
#[cfg(test)]
8890
pub(crate) fn command_type(&self) -> wire::cmd::CommandType {
8991
match self {
92+
Self::Unknown(u) => u.command_type,
9093
Self::Chmod(_) => wire::cmd::CommandType::Chmod,
9194
Self::Chown(_) => wire::cmd::CommandType::Chown,
9295
Self::Clone(_) => wire::cmd::CommandType::Clone,
@@ -109,6 +112,7 @@ impl Command {
109112
Self::UpdateExtent(_) => wire::cmd::CommandType::UpdateExtent,
110113
Self::Utimes(_) => wire::cmd::CommandType::Utimes,
111114
Self::Write(_) => wire::cmd::CommandType::Write,
115+
Self::EncodedWrite(_) => wire::cmd::CommandType::EncodedWrite,
112116
}
113117
}
114118
}
@@ -123,6 +127,15 @@ macro_rules! from_cmd {
123127
};
124128
}
125129

130+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, AsRef)]
131+
#[as_ref(forward)]
132+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
133+
#[cfg_attr(feature = "serde", serde(transparent))]
134+
pub struct Unknown {
135+
command_type: wire::cmd::CommandType,
136+
}
137+
from_cmd!(Unknown);
138+
126139
macro_rules! one_getter {
127140
($f:ident, $ft:ty, copy) => {
128141
pub fn $f(&self) -> $ft {
@@ -536,6 +549,7 @@ macro_rules! time_alias {
536549
time_alias!(Atime);
537550
time_alias!(Ctime);
538551
time_alias!(Mtime);
552+
time_alias!(Otime);
539553

540554
#[derive(Debug, Clone, PartialEq, Eq)]
541555
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
@@ -544,6 +558,7 @@ pub struct Utimes {
544558
pub(crate) atime: Atime,
545559
pub(crate) mtime: Mtime,
546560
pub(crate) ctime: Ctime,
561+
pub(crate) otime: Option<Otime>,
547562
}
548563
from_cmd!(Utimes);
549564
getters! {Utimes, [(path, Path, borrow), (atime, Atime, copy), (mtime, Mtime,copy), (ctime, Ctime, copy)]}
@@ -566,7 +581,7 @@ impl FileOffset {
566581

567582
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, AsRef, From)]
568583
#[as_ref(forward)]
569-
pub struct Data(Bytes);
584+
pub struct Data(pub(crate) Bytes);
570585

571586
impl Data {
572587
#[inline]
@@ -590,16 +605,19 @@ impl std::fmt::Debug for Data {
590605
Ok(s) => Cow::Borrowed(s),
591606
Err(_) => Cow::Owned(hex::encode(&self.0)),
592607
};
593-
if s.len() <= 128 {
608+
if s.chars().count() <= 128 {
594609
write!(f, "{s:?}")
595610
} else {
596-
write!(
597-
f,
598-
"{:?} <truncated ({}b total)> {:?}",
599-
&s[..64],
600-
s.len(),
601-
&s[s.len() - 64..]
602-
)
611+
let left: String = s.chars().take(64).collect();
612+
let right: String = s
613+
.chars()
614+
.rev()
615+
.take(64)
616+
.collect::<Vec<_>>()
617+
.into_iter()
618+
.rev()
619+
.collect();
620+
write!(f, "{left:?} <truncated ({}b total)> {right:?}", s.len(),)
603621
}
604622
}
605623
}
@@ -614,6 +632,86 @@ pub struct Write {
614632
from_cmd!(Write);
615633
getters! {Write, [(path, Path, borrow), (offset, FileOffset, copy), (data, Data, borrow)]}
616634

635+
#[derive(Debug, Clone, PartialEq, Eq)]
636+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
637+
pub struct EncodedWrite {
638+
pub(crate) path: BytesPath,
639+
pub(crate) offset: FileOffset,
640+
pub(crate) unencoded_file_len: UnencodedFileLen,
641+
pub(crate) unencoded_len: UnencodedLen,
642+
pub(crate) unencoded_offset: UnencodedOffset,
643+
pub(crate) compression: Compression,
644+
pub(crate) encryption: Option<Encryption>,
645+
pub(crate) data: Data,
646+
}
647+
from_cmd!(EncodedWrite);
648+
getters! {EncodedWrite, [(path, Path, borrow), (offset, FileOffset, copy), (data, Data, borrow)]}
649+
650+
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, AsRef, Deref)]
651+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
652+
#[cfg_attr(feature = "serde", serde(transparent))]
653+
pub struct UnencodedFileLen(u64);
654+
655+
impl UnencodedFileLen {
656+
pub fn as_u64(self) -> u64 {
657+
self.0
658+
}
659+
}
660+
661+
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, AsRef, Deref)]
662+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
663+
#[cfg_attr(feature = "serde", serde(transparent))]
664+
pub struct UnencodedLen(u64);
665+
666+
impl UnencodedLen {
667+
pub fn as_u64(self) -> u64 {
668+
self.0
669+
}
670+
}
671+
672+
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, AsRef, Deref)]
673+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
674+
#[cfg_attr(feature = "serde", serde(transparent))]
675+
pub struct UnencodedOffset(u64);
676+
677+
impl UnencodedOffset {
678+
pub fn as_u64(self) -> u64 {
679+
self.0
680+
}
681+
}
682+
683+
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, AsRef, Deref)]
684+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
685+
#[cfg_attr(feature = "serde", serde(transparent))]
686+
pub struct Compression(u32);
687+
688+
impl Compression {
689+
pub fn as_u32(self) -> u32 {
690+
self.0
691+
}
692+
}
693+
694+
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, AsRef, Deref)]
695+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
696+
#[cfg_attr(feature = "serde", serde(transparent))]
697+
pub struct Encryption(u32);
698+
699+
impl Encryption {
700+
pub fn as_u32(self) -> u32 {
701+
self.0
702+
}
703+
}
704+
705+
#[derive(Debug, Clone, PartialEq, Eq)]
706+
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
707+
pub struct End;
708+
709+
impl From<End> for Command {
710+
fn from(_: End) -> Self {
711+
Self::End
712+
}
713+
}
714+
617715
#[allow(clippy::expect_used)]
618716
#[cfg(test)]
619717
mod tests {
@@ -661,11 +759,11 @@ mod tests {
661759
if parsed_txt != good_txt {
662760
panic!(
663761
"{}",
664-
SimpleDiff::from_str(&parsed_txt, good_txt, "parsed", "good")
762+
SimpleDiff::from_str(good_txt, &parsed_txt, "good", "parsed")
665763
)
666764
}
667765
}
668-
assert_eq!(num_cmds_parsed, 94);
766+
assert_eq!(num_cmds_parsed, 106);
669767
}
670768

671769
/// Demonstrate how we might eagerly abort parsing after collecting information embedded in an
@@ -696,8 +794,9 @@ mod tests {
696794

697795
#[tokio::test]
698796
async fn sendstream_covers_all_commands() {
699-
let all_cmds: BTreeSet<_> = wire::cmd::CommandType::iter()
700-
.filter(|c| *c != wire::cmd::CommandType::Unspecified)
797+
let all_cmds: BTreeSet<_> = wire::cmd::PARSED_SUBTYPES
798+
.iter()
799+
.copied()
701800
// update_extent is used for no-file-data sendstreams (`btrfs send
702801
// --no-data`), so it's not super useful to cover here
703802
.filter(|c| *c != wire::cmd::CommandType::UpdateExtent)
@@ -711,9 +810,6 @@ mod tests {
711810
})
712811
.await
713812
.expect("while parsing");
714-
if all_cmds != seen_cmds {
715-
let missing: BTreeSet<_> = all_cmds.difference(&seen_cmds).collect();
716-
panic!("sendstream did not include some commands: {:?}", missing,);
717-
}
813+
assert_eq!(seen_cmds, all_cmds);
718814
}
719815
}

0 commit comments

Comments
 (0)