Skip to content

Commit 19f491a

Browse files
vmagrometa-codesync[bot]
authored andcommitted
[antlir2][sendstream_parser] write a tokio codec
Summary: The code we have to feed the streaming parser is very complicated and has some subtle bugs around large commands / things that are split across boundaries. In practice we usually do not read too deep into sendstreams so this doesn't often cause a problem, but sometimes it does (S584606) By implementing `tokio_util::codec`, we can let `tokio` take care of those tricky boundary cases for us, and just focus on parsing bytes. Test Plan: ``` ❯ buck2 test fbcode//antlir/antlir2/sendstream_parser: Buck UI: https://www.internalfb.com/buck2/0614533a-88ac-4ccd-baab-a093e505c07d Tests finished: Pass 4. Fail 0. Fatal 0. Skip 0. Build failure 0 ``` https://www.internalfb.com/intern/testinfra/testrun/3940649996430413 Reviewed By: wujj123456 Differential Revision: D86209569 fbshipit-source-id: 4e0d55590efb29418e1611da318003824023614f
1 parent ab2e332 commit 19f491a

File tree

5 files changed

+87
-65
lines changed

5 files changed

+87
-65
lines changed

antlir/antlir2/sendstream_parser/BUCK

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ rust_library(
2222
deps = [
2323
"bytes",
2424
"derive_more",
25+
"futures",
2526
"hex",
2627
"nix",
2728
"nom",
2829
"serde",
2930
"thiserror",
3031
"tokio",
32+
"tokio-util",
3133
"uuid",
3234
],
3335
)

antlir/antlir2/sendstream_parser/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ path = "src/lib.rs"
1717
[dependencies]
1818
bytes = { version = "1.10", features = ["serde"] }
1919
derive_more = { version = "1.0.0", features = ["full"] }
20+
futures = { version = "0.3.31", features = ["async-await", "compat"] }
2021
hex = { version = "0.4.3", features = ["alloc"] }
2122
nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }
2223
nom = "8"
2324
serde = { version = "1.0.219", features = ["derive", "rc"], optional = true }
2425
thiserror = "2.0.12"
2526
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
27+
tokio-util = { version = "0.7.15", features = ["full"] }
2628
uuid = { version = "1.17", features = ["rng-getrandom", "serde", "v4", "v5", "v6", "v7", "v8"] }
2729

2830
[dev-dependencies]

antlir/antlir2/sendstream_parser/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ pub mod wire;
4040
#[derive(Debug, thiserror::Error)]
4141
pub enum Error {
4242
#[error(
43-
"Sendstream had unexpected trailing data. This probably means the parser is broken: {0:?}"
43+
"Sendstream had unexpected trailing data ({0} bytes). This probably means the parser is broken"
4444
)]
45-
TrailingData(Vec<u8>),
45+
TrailingData(usize),
4646
#[error("Sendstream is incomplete")]
4747
Incomplete,
4848
#[error("IO error: {0:#}")]
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
use bytes::Buf;
9+
use bytes::BytesMut;
10+
use nom::IResult;
11+
use nom::Parser;
12+
use tokio_util::codec::Decoder;
13+
14+
use crate::Command;
15+
use crate::Error;
16+
use crate::wire::NomBytes;
17+
pub(super) struct SendstreamDecoder;
18+
19+
pub(super) enum Item {
20+
/// Magic header that starts a sendstream - the only data here is the
21+
/// sendstream version
22+
SendstreamStart(#[allow(dead_code)] u32),
23+
Command(Command),
24+
}
25+
26+
static MAGIC_HEADER: &[u8] = b"btrfs-stream\0";
27+
28+
/// Parse a chunk of bytes to see if we can extract the header expected atop each sendstream.
29+
fn sendstream_header(input: NomBytes) -> IResult<NomBytes, u32> {
30+
let (remainder, (_magic, version)) = (
31+
nom::bytes::streaming::tag::<&[u8], NomBytes, nom::error::Error<NomBytes>>(MAGIC_HEADER),
32+
nom::number::streaming::le_u32,
33+
)
34+
.parse(input)?;
35+
Ok((remainder, version))
36+
}
37+
38+
impl Decoder for SendstreamDecoder {
39+
type Item = Item;
40+
type Error = Error;
41+
42+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
43+
// TODO: make a NomBytes for BytesMut too? This copy feels bad
44+
let parsable: NomBytes = src.clone().into();
45+
let starting_len = parsable.len();
46+
match nom::branch::alt((
47+
sendstream_header.map(Item::SendstreamStart),
48+
Command::parse.map(Item::Command),
49+
))
50+
.parse(parsable)
51+
{
52+
Ok((remaining, item)) => {
53+
src.advance(starting_len - remaining.len());
54+
Ok(Some(item))
55+
}
56+
Err(nom::Err::Incomplete(needed)) => {
57+
if let nom::Needed::Size(s) = needed {
58+
src.reserve(s.into());
59+
}
60+
Ok(None)
61+
}
62+
Err(e) => Err(Error::Unparsable(e.to_string())),
63+
}
64+
}
65+
}

antlir/antlir2/sendstream_parser/src/wire/mod.rs

Lines changed: 16 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@
55
* LICENSE file in the root directory of this source tree.
66
*/
77

8-
use bytes::BytesMut;
9-
use nom::IResult;
10-
use nom::Parser as _;
8+
use futures::StreamExt;
119
use tokio::io::AsyncRead;
12-
use tokio::io::AsyncReadExt;
13-
14-
static MAGIC_HEADER: &[u8] = b"btrfs-stream\0";
10+
use tokio_util::codec::FramedRead;
1511

1612
pub(crate) mod cmd;
13+
mod framed;
1714
mod nombytes;
1815
mod tlv;
1916
pub use nombytes::NomBytes;
@@ -24,16 +21,6 @@ pub enum ParserControl {
2421
Enough,
2522
}
2623

27-
/// Parse a chunk of bytes to see if we can extract the header expected atop each sendstream.
28-
fn parse_header<'a>(input: &'a [u8]) -> IResult<&'a [u8], u32> {
29-
let (remainder, (_magic, version)) = (
30-
nom::bytes::streaming::tag::<&[u8], &[u8], nom::error::Error<&[u8]>>(MAGIC_HEADER),
31-
nom::number::streaming::le_u32,
32-
)
33-
.parse(input)?;
34-
Ok((remainder, version))
35-
}
36-
3724
/// Parse an async source of bytes, expecting to find it to contain one or more sendstreams.
3825
/// Because the parsed commands reference data owned by the source, we do not collect the commands.
3926
/// Instead, we allow the caller to process them via `f`, which can instruct the processing to
@@ -46,61 +33,27 @@ fn parse_header<'a>(input: &'a [u8]) -> IResult<&'a [u8], u32> {
4633
/// Returns number of commands parsed.
4734
///
4835
/// See https://btrfs.readthedocs.io/en/latest/dev/dev-send-stream.html for reference.
49-
pub async fn parse<R, F>(mut reader: R, mut f: F) -> crate::Result<u128>
36+
pub async fn parse<R, F>(reader: R, mut f: F) -> crate::Result<u128>
5037
where
5138
R: AsyncRead + Unpin + Send,
5239
F: FnMut(&crate::Command) -> ParserControl + Send,
5340
{
54-
let mut unparsed = BytesMut::with_capacity(1000);
41+
let mut reader = FramedRead::new(reader, framed::SendstreamDecoder);
5542
let mut command_count = 0;
56-
let mut header: Option<u32> = None;
57-
'read_bytes: loop {
58-
let bytes_read = reader.read_buf(&mut unparsed).await?;
59-
if bytes_read != 0 || !unparsed.is_empty() {
60-
while header.is_some() {
61-
match crate::Command::parse(unparsed.clone().into()) {
62-
Ok((remainder, command)) => {
63-
command_count += 1;
64-
if let ParserControl::Enough = f(&command) {
65-
// caller got what they needed, no need to continue parsing
66-
return Ok(command_count);
67-
}
68-
if let crate::Command::End = command {
69-
unparsed = remainder.into();
70-
header = None;
71-
continue 'read_bytes;
72-
}
73-
unparsed = remainder.into();
74-
}
75-
Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
76-
Err(crate::Error::Unparsable(format!("{err:?}")))?
77-
}
78-
Err(nom::Err::Incomplete(_)) => {
79-
if bytes_read == 0 {
80-
// we've found extra data that cannot be parsed w/nothing more to read
81-
Err(crate::Error::TrailingData(unparsed.clone().into()))?
82-
}
83-
continue 'read_bytes;
84-
}
43+
while let Some(item_res) = reader.next().await {
44+
match item_res {
45+
Ok(framed::Item::Command(command)) => {
46+
command_count += 1;
47+
if let ParserControl::Enough = f(&command) {
48+
// caller got what they needed, no need to continue parsing
49+
break;
8550
}
8651
}
87-
match parse_header(&unparsed) {
88-
Ok((remainder, _version)) => {
89-
header = Some(_version);
90-
unparsed = remainder.into();
91-
}
92-
Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => {
93-
Err(crate::Error::Unparsable(format!("{err:?}")))?
94-
}
95-
Err(nom::Err::Incomplete(_)) => continue,
96-
};
97-
} else {
98-
break 'read_bytes;
52+
Ok(framed::Item::SendstreamStart(_)) => {}
53+
Err(e) => {
54+
return Err(e);
55+
}
9956
}
10057
}
101-
if header.is_some() {
102-
// We've found the end of the data but not the end of the last sendstream
103-
Err(crate::Error::Incomplete)?;
104-
}
10558
Ok(command_count)
10659
}

0 commit comments

Comments
 (0)