Skip to content

Commit e57aadf

Browse files
committed
[hyperactor] Serialized: permit multiple encoding representations
Pull Request resolved: #832 In preparation for plumbing through multipart support, we extend Serialized to be representation polymorphic. This will allow us to add serde_multipart::Message as an encoding format, plumbing true multipart encoding "all the way down". ghstack-source-id: 302712692 Differential Revision: [D80110110](https://our.internmc.facebook.com/intern/diff/D80110110/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D80110110/)!
1 parent ebd5272 commit e57aadf

File tree

1 file changed

+76
-56
lines changed

1 file changed

+76
-56
lines changed

hyperactor/src/data.rs

Lines changed: 76 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ use std::fmt;
132132
use std::io::Cursor;
133133
use std::sync::LazyLock;
134134

135+
use enum_as_inner::EnumAsInner;
135136
pub use intern_typename;
136137
use serde::Deserialize;
137138
use serde::Serialize;
@@ -291,10 +292,46 @@ macro_rules! register_type {
291292
}
292293

293294
/// The encoding used for a serialized value.
294-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
295-
enum SerializedEncoding {
296-
Bincode,
297-
Json,
295+
#[derive(Clone, Serialize, Deserialize, PartialEq, EnumAsInner)]
296+
enum Encoded {
297+
Bincode(serde_bytes::ByteBuf),
298+
Json(serde_bytes::ByteBuf),
299+
// todo: multipart
300+
}
301+
302+
impl Encoded {
303+
/// The length of the underlying serialized message
304+
pub fn len(&self) -> usize {
305+
match &self {
306+
Encoded::Bincode(data) => data.len(),
307+
Encoded::Json(data) => data.len(),
308+
}
309+
}
310+
311+
/// Is the message empty. This should always return false.
312+
pub fn is_empty(&self) -> bool {
313+
match &self {
314+
Encoded::Bincode(data) => data.is_empty(),
315+
Encoded::Json(data) => data.is_empty(),
316+
}
317+
}
318+
319+
/// Computes the 32bit crc of the encoded data
320+
pub fn crc(&self) -> u32 {
321+
match &self {
322+
Encoded::Bincode(data) => crc32fast::hash(data),
323+
Encoded::Json(data) => crc32fast::hash(data),
324+
}
325+
}
326+
}
327+
328+
impl std::fmt::Debug for Encoded {
329+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330+
match self {
331+
Encoded::Bincode(data) => write!(f, "Encoded::Bincode({})", HexFmt(data.as_slice())),
332+
Encoded::Json(data) => write!(f, "Encoded::Json({})", HexFmt(data.as_slice())),
333+
}
334+
}
298335
}
299336

300337
/// Represents a serialized value, wrapping the underlying serialization
@@ -303,24 +340,15 @@ enum SerializedEncoding {
303340
///
304341
/// Currently, Serialized passes through to bincode, but in the future we may include
305342
/// content-encoding information to allow for other codecs as well.
306-
#[derive(Clone, Serialize, Deserialize, PartialEq)]
343+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
307344
pub struct Serialized {
308-
encoding: SerializedEncoding,
309-
310-
/// The encoded data for the serialized value.
311-
#[serde(with = "serde_bytes")]
312-
data: Vec<u8>,
345+
/// The encoded data
346+
encoded: Encoded,
313347
/// The typehash of the serialized value, if available. This is used to provide
314348
/// typed introspection of the value.
315349
typehash: Option<u64>,
316350
}
317351

318-
impl std::fmt::Debug for Serialized {
319-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320-
write!(f, "Serialized({})", HexFmt(self.data.as_slice()),)
321-
}
322-
}
323-
324352
impl std::fmt::Display for Serialized {
325353
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326354
match self.dump() {
@@ -331,7 +359,7 @@ impl std::fmt::Display for Serialized {
331359
let basename = typename.split("::").last().unwrap_or(typename);
332360
write!(f, "{}{}", basename, JsonFmt(&value))
333361
}
334-
Err(_) => write!(f, "{}", HexFmt(self.data.as_slice())),
362+
Err(_) => write!(f, "{:?}", self.encoded),
335363
}
336364
}
337365
}
@@ -340,38 +368,32 @@ impl Serialized {
340368
/// Construct a new serialized value by serializing the provided T-typed value.
341369
pub fn serialize<T: Serialize + Named>(value: &T) -> Result<Self, bincode::Error> {
342370
Ok(Self {
343-
encoding: SerializedEncoding::Bincode,
344-
data: bincode::serialize(value)?,
371+
encoded: Encoded::Bincode(bincode::serialize(value)?.into()),
345372
typehash: Some(T::typehash()),
346373
})
347374
}
348375

349376
/// Construct a new anonymous (unnamed) serialized value by serializing the provided T-typed value.
350377
pub fn serialize_anon<T: Serialize>(value: &T) -> Result<Self, bincode::Error> {
351378
Ok(Self {
352-
encoding: SerializedEncoding::Bincode,
353-
data: bincode::serialize(value)?,
379+
encoded: Encoded::Bincode(bincode::serialize(value)?.into()),
354380
typehash: None,
355381
})
356382
}
357383

358384
/// Deserialize a value to the provided type T.
359385
pub fn deserialized<T: DeserializeOwned>(&self) -> Result<T, anyhow::Error> {
360-
match self.encoding {
361-
SerializedEncoding::Bincode => {
362-
bincode::deserialize(&self.data).map_err(anyhow::Error::from)
363-
}
364-
SerializedEncoding::Json => {
365-
serde_json::from_slice(&self.data).map_err(anyhow::Error::from)
366-
}
386+
match &self.encoded {
387+
Encoded::Bincode(data) => bincode::deserialize(data).map_err(anyhow::Error::from),
388+
Encoded::Json(data) => serde_json::from_slice(data).map_err(anyhow::Error::from),
367389
}
368390
}
369391

370392
/// Transcode the serialized value to JSON. This operation will succeed if the type hash
371393
/// is embedded in the value, and the corresponding type is available in this binary.
372394
pub fn transcode_to_json(self) -> Result<Self, Self> {
373-
match self.encoding {
374-
SerializedEncoding::Bincode => {
395+
match self.encoded {
396+
Encoded::Bincode(_) => {
375397
let json_value = match self.dump() {
376398
Ok(json_value) => json_value,
377399
Err(_) => return Err(self),
@@ -381,20 +403,19 @@ impl Serialized {
381403
Err(_) => return Err(self),
382404
};
383405
Ok(Self {
384-
encoding: SerializedEncoding::Json,
385-
data: json_data,
406+
encoded: Encoded::Json(json_data.into()),
386407
typehash: self.typehash,
387408
})
388409
}
389-
SerializedEncoding::Json => Ok(self),
410+
Encoded::Json(_) => Ok(self),
390411
}
391412
}
392413

393414
/// Dump the Serialized message into a JSON value. This will succeed if: 1) the typehash is embedded
394415
/// in the serialized value; 2) the named type is linked into the binary.
395416
pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
396-
match self.encoding {
397-
SerializedEncoding::Bincode => {
417+
match &self.encoded {
418+
Encoded::Bincode(_) => {
398419
let Some(typehash) = self.typehash() else {
399420
anyhow::bail!("serialized value does not contain a typehash");
400421
};
@@ -403,9 +424,7 @@ impl Serialized {
403424
};
404425
typeinfo.dump(self.clone())
405426
}
406-
SerializedEncoding::Json => {
407-
serde_json::from_slice(&self.data).map_err(anyhow::Error::from)
408-
}
427+
Encoded::Json(data) => serde_json::from_slice(data).map_err(anyhow::Error::from),
409428
}
410429
}
411430

@@ -425,11 +444,10 @@ impl Serialized {
425444
// TODO: we should support this by formalizing the notion of a 'prefix'
426445
// serialization, and generalize it to other codecs as well.
427446
pub fn prefix<T: DeserializeOwned>(&self) -> Result<T, anyhow::Error> {
428-
anyhow::ensure!(
429-
self.encoding == SerializedEncoding::Bincode,
430-
"only bincode supports prefix emplacement"
431-
);
432-
bincode::deserialize(&self.data).map_err(anyhow::Error::from)
447+
match &self.encoded {
448+
Encoded::Bincode(data) => bincode::deserialize(data).map_err(anyhow::Error::from),
449+
_ => anyhow::bail!("only bincode supports prefix emplacement"),
450+
}
433451
}
434452

435453
/// Emplace a new prefix to this value. This is currently only supported
@@ -438,38 +456,39 @@ impl Serialized {
438456
&mut self,
439457
prefix: T,
440458
) -> Result<(), anyhow::Error> {
441-
anyhow::ensure!(
442-
self.encoding == SerializedEncoding::Bincode,
443-
"only bincode supports prefix emplacement"
444-
);
459+
let data = match &self.encoded {
460+
Encoded::Bincode(data) => data,
461+
_ => anyhow::bail!("only bincode supports prefix emplacement"),
462+
};
445463

446464
// This is a bit ugly, but: we first deserialize out the old prefix,
447465
// then serialize the new prefix, then splice the two together.
448466
// This is safe because we know that the prefix is the first thing
449467
// in the serialized value, and that the serialization format is stable.
450-
let mut cursor = Cursor::new(self.data.clone());
468+
let mut cursor = Cursor::new(data.clone());
451469
let _prefix: T = bincode::deserialize_from(&mut cursor).unwrap();
452470
let position = cursor.position() as usize;
453471
let suffix = &cursor.into_inner()[position..];
454-
self.data = bincode::serialize(&prefix)?;
455-
self.data.extend_from_slice(suffix);
472+
let mut data = bincode::serialize(&prefix)?;
473+
data.extend_from_slice(suffix);
474+
self.encoded = Encoded::Bincode(data.into());
456475

457476
Ok(())
458477
}
459478

460479
/// The length of the underlying serialized message
461480
pub fn len(&self) -> usize {
462-
self.data.len()
481+
self.encoded.len()
463482
}
464483

465484
/// Is the message empty. This should always return false.
466485
pub fn is_empty(&self) -> bool {
467-
self.len() == 0
486+
self.encoded.is_empty()
468487
}
469488

470489
/// Returns the 32bit crc of the serialized data
471490
pub fn crc(&self) -> u32 {
472-
crc32fast::hash(&self.data)
491+
self.encoded.crc()
473492
}
474493
}
475494

@@ -615,10 +634,11 @@ mod tests {
615634
let serialized = Serialized::serialize(&data).unwrap();
616635
let serialized_json = serialized.clone().transcode_to_json().unwrap();
617636

618-
assert_eq!(serialized.encoding, SerializedEncoding::Bincode);
619-
assert_eq!(serialized_json.encoding, SerializedEncoding::Json);
637+
assert!(serialized.encoded.is_bincode());
638+
assert!(serialized_json.encoded.is_json());
620639

621-
let json_string = String::from_utf8(serialized_json.data.clone()).unwrap();
640+
let json_string =
641+
String::from_utf8(serialized_json.encoded.as_json().unwrap().to_vec().clone()).unwrap();
622642
// The serialized data for JSON is just the (compact) JSON string.
623643
assert_eq!(json_string, "{\"a\":\"hello\",\"b\":1234,\"c\":5678}");
624644

0 commit comments

Comments
 (0)