From 375ac9077ba1cf3023bacf52904c09c20750958c Mon Sep 17 00:00:00 2001 From: James Sun Date: Thu, 14 Aug 2025 18:10:37 -0700 Subject: [PATCH] rename pack_meta.rs (#883) Summary: Pull Request resolved: https://github.com/meta-pytorch/monarch/pull/883 **meta.rs is not synced to oss Reviewed By: dcci, vidhyav Differential Revision: D80303347 --- monarch_conda/src/diff.rs | 10 +- monarch_conda/src/lib.rs | 2 +- monarch_conda/src/pack_meta_history.rs | 214 +++++++++++++++++++++++++ monarch_conda/src/sync.rs | 10 +- python/tests/test_python_actors.py | 3 + 5 files changed, 228 insertions(+), 11 deletions(-) create mode 100644 monarch_conda/src/pack_meta_history.rs diff --git a/monarch_conda/src/diff.rs b/monarch_conda/src/diff.rs index f6c24e97a..81c16467d 100644 --- a/monarch_conda/src/diff.rs +++ b/monarch_conda/src/diff.rs @@ -21,8 +21,8 @@ use sha2::Sha256; use tokio::fs; use crate::hash_utils; -use crate::pack_meta::History; -use crate::pack_meta::Offsets; +use crate::pack_meta_history::History; +use crate::pack_meta_history::Offsets; /// Fingerprint of the conda-meta directory, used by `CondaFingerprint` below. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -159,9 +159,9 @@ mod tests { use tokio::fs; use super::*; - use crate::pack_meta::HistoryRecord; - use crate::pack_meta::Offset; - use crate::pack_meta::OffsetRecord; + use crate::pack_meta_history::HistoryRecord; + use crate::pack_meta_history::Offset; + use crate::pack_meta_history::OffsetRecord; /// Helper function to create a conda environment with configurable packages and files async fn setup_conda_env_with_config( diff --git a/monarch_conda/src/lib.rs b/monarch_conda/src/lib.rs index 6aa170ec2..eb0a89b2c 100644 --- a/monarch_conda/src/lib.rs +++ b/monarch_conda/src/lib.rs @@ -10,5 +10,5 @@ pub mod diff; pub mod hash_utils; -pub mod pack_meta; +pub mod pack_meta_history; pub mod sync; diff --git a/monarch_conda/src/pack_meta_history.rs b/monarch_conda/src/pack_meta_history.rs new file mode 100644 index 000000000..3da0d13da --- /dev/null +++ b/monarch_conda/src/pack_meta_history.rs @@ -0,0 +1,214 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use std::path::Path; +use std::path::PathBuf; + +use anyhow::Context; +use anyhow::Result; +use anyhow::ensure; +use rattler_conda_types::package::FileMode; +use serde::Deserialize; +use serde::Serialize; +use tokio::fs; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct Offset { + pub start: usize, + pub len: usize, + pub contents: Option>, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct OffsetRecord { + pub path: PathBuf, + pub mode: FileMode, + pub offsets: Vec, +} + +impl OffsetRecord { + fn to_str(&self) -> Result { + Ok(serde_json::to_string(&( + &self.path, + self.mode, + &self + .offsets + .iter() + .map(|o| { + ( + o.start, + o.len, + o.contents.as_ref().map(|c| { + c.iter() + .map(|(a, b)| (a, b, None::<()>)) + .collect::>() + }), + ) + }) + .collect::>(), + ))?) + } + + fn from_str(str: &str) -> Result { + let (path, mode, offsets): (_, _, Vec<(usize, usize, Option>)>) = + serde_json::from_str(str).with_context(|| format!("parsing: {}", str))?; + Ok(OffsetRecord { + path, + mode, + offsets: offsets + .into_iter() + .map(|(start, len, contents)| Offset { + start, + len, + contents: contents.map(|c| c.into_iter().map(|(a, b, _)| (a, b)).collect()), + }) + .collect(), + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct Offsets { + pub entries: Vec, +} + +impl Offsets { + pub fn from_contents(s: &str) -> Result { + let mut entries = Vec::new(); + for line in s.lines() { + entries.push(OffsetRecord::from_str(line)?); + } + Ok(Offsets { entries }) + } + + pub async fn from_env(env: &Path) -> Result { + let path = env.join("pack-meta").join("offsets.jsonl"); + let s = fs::read_to_string(&path).await?; + Self::from_contents(&s) + } + + pub fn to_str(&self) -> Result { + let mut str = String::new(); + for entry in &self.entries { + str += &entry.to_str()?; + str += "\n"; + } + Ok(str) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct HistoryRecord { + pub timestamp: u64, // timestamps are truncated into seconds + pub prefix: PathBuf, + pub finished: bool, +} + +impl HistoryRecord { + fn to_str(&self) -> Result { + Ok(serde_json::to_string(&( + self.timestamp, + &self.prefix, + self.finished, + ))?) + } + + fn from_str(line: &str) -> Result { + let (timestamp, prefix, finished) = serde_json::from_str(line)?; + Ok(HistoryRecord { + timestamp, + prefix, + finished, + }) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct History { + pub entries: Vec, +} + +impl History { + pub fn from_contents(s: &str) -> Result { + let mut entries = Vec::new(); + for line in s.lines() { + entries.push(HistoryRecord::from_str(line)?); + } + Ok(History { entries }) + } + + pub async fn from_env(env: &Path) -> Result { + let path = env.join("pack-meta").join("history.jsonl"); + let s = fs::read_to_string(&path).await?; + Self::from_contents(&s) + } + + pub fn to_str(&self) -> Result { + let mut str = String::new(); + for entry in &self.entries { + str += &entry.to_str()?; + str += "\n"; + } + Ok(str) + } + + pub fn first(&self) -> Result<(&Path, u64)> { + let first = self.entries.first().context("missing history")?; + ensure!(first.finished); + Ok((&first.prefix, first.timestamp)) + } + + pub fn last_prefix_update(&self) -> Result> { + let last = self.entries.last().context("missing history")?; + ensure!(last.finished); + Ok(if let [.., record, _] = &self.entries[..] { + ensure!(!record.finished); + ensure!(record.prefix == last.prefix); + Some((&record.prefix, record.timestamp, last.timestamp)) + } else { + None + }) + } + + pub fn last_prefix(&self) -> Result<&Path> { + if let Some((prefix, _, _)) = self.last_prefix_update()? { + return Ok(prefix); + } + let (prefix, _) = self.first()?; + Ok(prefix) + } + + pub fn prefix_and_last_update_window(&self) -> Result<(&Path, Option<(u64, u64)>)> { + let src_first = self.first()?; + Ok(if let Some((prefix, s, e)) = self.last_prefix_update()? { + (prefix, Some((s, e))) + } else { + (src_first.0, None) + }) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::*; + + #[test] + fn test_offset_record_parsing() { + let json = r#"["lib/pkgconfig/pthread-stubs.pc", "text", [[7, 67, null]]]"#; + let record: OffsetRecord = serde_json::from_str(json).unwrap(); + + assert_eq!(record.path, PathBuf::from("lib/pkgconfig/pthread-stubs.pc")); + assert_eq!(record.mode, FileMode::Text); + assert_eq!(record.offsets.len(), 1); + assert_eq!(record.offsets[0].start, 7); + assert_eq!(record.offsets[0].len, 67); + assert_eq!(record.offsets[0].contents, None); + } +} diff --git a/monarch_conda/src/sync.rs b/monarch_conda/src/sync.rs index 015e9858c..bb5afd94a 100644 --- a/monarch_conda/src/sync.rs +++ b/monarch_conda/src/sync.rs @@ -693,11 +693,11 @@ mod tests { use super::make_executable; use super::set_mtime; use super::sync; - use crate::pack_meta::History; - use crate::pack_meta::HistoryRecord; - use crate::pack_meta::Offset; - use crate::pack_meta::OffsetRecord; - use crate::pack_meta::Offsets; + use crate::pack_meta_history::History; + use crate::pack_meta_history::HistoryRecord; + use crate::pack_meta_history::Offset; + use crate::pack_meta_history::OffsetRecord; + use crate::pack_meta_history::Offsets; use crate::sync::Receive; /// Helper function to create a basic conda environment structure diff --git a/python/tests/test_python_actors.py b/python/tests/test_python_actors.py index 23a070d74..58f4f16dc 100644 --- a/python/tests/test_python_actors.py +++ b/python/tests/test_python_actors.py @@ -17,6 +17,7 @@ import threading import time import unittest +import unittest.mock from types import ModuleType from typing import cast @@ -958,6 +959,8 @@ async def ls(self) -> list[str]: return os.listdir(self.workspace) +# oss_skip: there are address assignment issues in git CI, needs to be revisited +@pytest.mark.oss_skip async def test_sync_workspace() -> None: # create two workspaces: one for local and one for remote with tempfile.TemporaryDirectory() as workspace_src, tempfile.TemporaryDirectory() as workspace_dst, unittest.mock.patch.dict(