Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
132 changes: 2 additions & 130 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions filevec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,21 @@ impl<T: Serialize + DeserializeOwned> Extend<T> for FileVec<T> {

pub struct FileVecGuard<'a, T: Serialize + DeserializeOwned>(&'a mut FileVec<T>);

impl<'a, T: Serialize + DeserializeOwned> Deref for FileVecGuard<'a, T> {
impl<T: Serialize + DeserializeOwned> Deref for FileVecGuard<'_, T> {
type Target = Vec<T>;

fn deref(&self) -> &Self::Target {
&self.0.vec
}
}

impl<'a, T: Serialize + DeserializeOwned> DerefMut for FileVecGuard<'a, T> {
impl<T: Serialize + DeserializeOwned> DerefMut for FileVecGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0.vec
}
}

impl<'a, T: Serialize + DeserializeOwned> Drop for FileVecGuard<'a, T> {
impl<T: Serialize + DeserializeOwned> Drop for FileVecGuard<'_, T> {
fn drop(&mut self) {
let _ = self.0.write_to_file();
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ strum = { version = "0.26.3", features = ["derive"] }
subprocess = "0.2.9"
thiserror = "1.0.63"
toml = "0.8.19"
zopfli = "0.8.1"

[features]
mock = []
Expand Down
3 changes: 2 additions & 1 deletion scheduler/src/command/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ impl From<CommunicationError> for CommandError {
CommunicationError::PacketInvalidError => CommandError::External(e.into()),
CommunicationError::TimedOut
| CommunicationError::NotAcknowledged
| CommunicationError::CepParsing(_) => CommandError::ProtocolViolation(e.into()),
| CommunicationError::CepParsing(_)
| CommunicationError::TooManyBytes => CommandError::ProtocolViolation(e.into()),
CommunicationError::Io(_) => CommandError::NonRecoverable(e.into()),
}
}
Expand Down
72 changes: 48 additions & 24 deletions scheduler/src/command/execute_program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ use crate::{
command::{
check_length, terminate_student_program, Event, ProgramStatus, ResultId, RetryEvent,
},
communication::{CEPPacket, CommunicationHandle},
communication::{self, CEPPacket, CommunicationHandle},
};
use anyhow::anyhow;
use simple_archive::Compression;
use std::{
io::{ErrorKind, Write},
io::ErrorKind,
path::{Path, PathBuf},
time::Duration,
};
use subprocess::Popen;

const MAXIMUM_FILE_SIZE: usize = 1_000_000;
use zopfli::Options;

/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the
/// status and result queue found in `context`. The result, including logs, is packed into
Expand Down Expand Up @@ -141,21 +139,34 @@ fn run_until_timeout(
Err(())
}

/// The function uses `tar` to create an uncompressed archive that includes the result file specified, as well as
/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive
/// is created without them.
const RESULT_SIZE_LIMIT: usize = 1_000_000;

fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> {
let out_path = PathBuf::from(&format!("./data/{res}"));
let mut archive = simple_archive::Writer::new(std::fs::File::create(out_path)?);
let mut archive = simple_archive::Writer::new(Vec::new());

let res_path =
PathBuf::from(format!("./archives/{}/results/{}", res.program_id, res.timestamp));
let student_log_path = PathBuf::from(format!("./data/{res}.log"));
let log_path = PathBuf::from("./log");

add_to_archive_if_exists(&mut archive, &res.to_string(), &res_path, Compression::None)?;
add_to_archive_if_exists(&mut archive, "student_log", &student_log_path, Compression::Zopfli)?;
add_to_archive_if_exists(&mut archive, "log", &log_path, Compression::Zopfli)?;
if let Some(d) = open_if_exists(&res_path)? {
if d.len() <= RESULT_SIZE_LIMIT {
archive.append_data(&res.to_string(), &d)?;
} else {
log::warn!("Result file for {res} is too large ({})", d.len());
}
}

if let Some(d) = open_if_exists(&log_path)? {
compress_into_archive_if_it_fits(&mut archive, "log", &d)?;
}

if let Some(d) = open_if_exists(&student_log_path)? {
compress_into_archive_if_it_fits(&mut archive, "student_log", &d)?;
}

std::fs::write(out_path, archive.into_inner())?;

let _ = std::fs::remove_file(res_path);
let _ = std::fs::remove_file(student_log_path);
Expand All @@ -164,19 +175,32 @@ fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> {
Ok(())
}

fn add_to_archive_if_exists<T: Write>(
archive: &mut simple_archive::Writer<T>,
name: &str,
path: impl AsRef<Path>,
compression: simple_archive::Compression,
) -> std::io::Result<()> {
fn open_if_exists(path: impl AsRef<Path>) -> std::io::Result<Option<Vec<u8>>> {
match std::fs::read(path) {
Ok(mut data) => {
data.truncate(MAXIMUM_FILE_SIZE);
archive.append_data(name, &data, compression)?;
Ok(())
}
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(()),
Ok(d) => Ok(Some(d)),
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}

fn compress_into_archive_if_it_fits(
archive: &mut simple_archive::Writer<Vec<u8>>,
path: &str,
data: &[u8],
) -> std::io::Result<()> {
let mut compressed = Vec::new();
zopfli::compress(Options::default(), zopfli::Format::Gzip, data, &mut compressed)?;

if compressed.len()
<= communication::MAXIMUM_DATA_LENGTH
- archive.inner().len()
- path.len()
- simple_archive::HEADER_SIZE
{
archive.append_data(path, &compressed)?;
} else {
log::warn!("Could not fit {path} into result");
}

Ok(())
}
4 changes: 2 additions & 2 deletions scheduler/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn handle_command(com: &mut impl CommunicationHandle, exec: &mut SyncExecuti
Err(CommandError::External(e)) => {
log::error!("External error: {e}");
}
};
}
}

pub fn process_command(
Expand Down Expand Up @@ -70,7 +70,7 @@ pub fn process_command(
b => {
return Err(CommandError::ProtocolViolation(anyhow!("Unknown command {b:#x}")));
}
};
}

Ok(())
}
Loading