Skip to content

Commit 55cbee0

Browse files
committed
implement msgpack support for server ws, switch from duplexes to simplexes
1 parent 6be4f0d commit 55cbee0

File tree

23 files changed

+365
-233
lines changed

23 files changed

+365
-233
lines changed

Cargo.lock

Lines changed: 15 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

application/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "wings-rs"
33
rust-version = { workspace = true }
4-
version = "0.24.5"
4+
version = "0.24.6"
55
edition = "2024"
66

77
[dependencies]

application/src/io/compression/reader.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ impl<'a, R: Read + Seek> Read for CompressionReaderMt<'a, R> {
120120

121121
pub struct AsyncCompressionReader {
122122
inner_error_receiver: tokio::sync::oneshot::Receiver<std::io::Error>,
123-
inner_reader: tokio::io::DuplexStream,
123+
inner_reader: tokio::io::ReadHalf<tokio::io::SimplexStream>,
124124
}
125125

126126
impl AsyncCompressionReader {
127127
pub fn new(reader: impl Read + Send + 'static, compression_type: CompressionType) -> Self {
128-
let (inner_reader, inner_writer) = tokio::io::duplex(crate::BUFFER_SIZE * 4);
128+
let (inner_reader, inner_writer) = tokio::io::simplex(crate::BUFFER_SIZE * 2);
129129
let (inner_error_sender, inner_error_receiver) = tokio::sync::oneshot::channel();
130130

131131
tokio::task::spawn_blocking(move || {
@@ -142,8 +142,13 @@ impl AsyncCompressionReader {
142142
Ok(_) => {}
143143
Err(err) => {
144144
let _ = inner_error_sender.send(err);
145+
return;
145146
}
146147
}
148+
149+
if let Err(err) = writer.shutdown() {
150+
let _ = inner_error_sender.send(err);
151+
}
147152
});
148153

149154
Self {
@@ -157,7 +162,7 @@ impl AsyncCompressionReader {
157162
compression_type: CompressionType,
158163
threads: usize,
159164
) -> Self {
160-
let (inner_reader, inner_writer) = tokio::io::duplex(crate::BUFFER_SIZE * 4);
165+
let (inner_reader, inner_writer) = tokio::io::simplex(crate::BUFFER_SIZE * 4);
161166
let (inner_error_sender, inner_error_receiver) = tokio::sync::oneshot::channel();
162167

163168
tokio::task::spawn_blocking(move || {

application/src/io/compression/writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ impl<'a, W: Write + Send + 'static> Write for CompressionWriter<'a, W> {
156156

157157
pub struct AsyncCompressionWriter {
158158
inner_error_receiver: tokio::sync::oneshot::Receiver<std::io::Error>,
159-
inner_writer: tokio::io::DuplexStream,
159+
inner_writer: tokio::io::WriteHalf<tokio::io::SimplexStream>,
160160
}
161161

162162
impl AsyncCompressionWriter {
@@ -166,7 +166,7 @@ impl AsyncCompressionWriter {
166166
compression_level: CompressionLevel,
167167
threads: usize,
168168
) -> Self {
169-
let (inner_reader, inner_writer) = tokio::io::duplex(crate::BUFFER_SIZE * 4);
169+
let (inner_reader, inner_writer) = tokio::io::simplex(crate::BUFFER_SIZE * 4);
170170
let (inner_error_sender, inner_error_receiver) = tokio::sync::oneshot::channel();
171171

172172
tokio::task::spawn_blocking(move || {

application/src/routes/api/servers/_server_/files/copy_remote.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,9 @@ mod post {
359359
async move {
360360
let (checksum_sender, checksum_receiver) =
361361
tokio::sync::oneshot::channel();
362-
let (mut checksummed_writer, mut checksummed_reader) =
363-
tokio::io::duplex(crate::BUFFER_SIZE);
364-
let (mut writer, reader) = tokio::io::duplex(crate::BUFFER_SIZE);
362+
let (mut checksummed_reader, mut checksummed_writer) =
363+
tokio::io::simplex(crate::BUFFER_SIZE);
364+
let (reader, mut writer) = tokio::io::simplex(crate::BUFFER_SIZE);
365365

366366
let archive_task = async {
367367
let is_ignored = if filesystem.is_primary_server_fs() {

application/src/routes/api/servers/_server_/logs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ mod get {
3636
pub async fn route(server: GetServer, Query(data): Query<Params>) -> ApiResponseResult {
3737
let mut log_stream = server.read_log(data.lines).await;
3838

39-
let (logs_reader, mut logs_writer) = tokio::io::duplex(crate::BUFFER_SIZE);
39+
let (logs_reader, mut logs_writer) = tokio::io::simplex(crate::BUFFER_SIZE);
4040

4141
tokio::spawn(async move {
4242
while let Some(Ok(line)) = log_stream.next().await {

application/src/server/activity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub enum ActivityEvent {
5858

5959
impl ActivityEvent {
6060
#[inline]
61-
pub fn is_sftp_event(self) -> bool {
61+
pub const fn is_sftp_event(self) -> bool {
6262
matches!(
6363
self,
6464
ActivityEvent::SftpWrite

application/src/server/backup/adapters/btrfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl BackupExt for BtrfsBackup {
245245
let names = filesystem.async_read_dir_all(Path::new("")).await?;
246246
let ignore = Self::get_ignore(config, self.uuid).await?;
247247

248-
let (reader, writer) = tokio::io::duplex(crate::BUFFER_SIZE);
248+
let (reader, writer) = tokio::io::simplex(crate::BUFFER_SIZE);
249249

250250
tokio::spawn({
251251
let config = Arc::clone(config);

application/src/server/backup/adapters/ddup_bak.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ impl DdupBakBackup {
127127
entry: &Entry,
128128
repository: &ddup_bak::repository::Repository,
129129
zip: &mut zip::ZipWriter<
130-
zip::write::StreamWriter<tokio_util::io::SyncIoBridge<tokio::io::DuplexStream>>,
130+
zip::write::StreamWriter<
131+
tokio_util::io::SyncIoBridge<tokio::io::WriteHalf<tokio::io::SimplexStream>>,
132+
>,
131133
>,
132134
compression_level: CompressionLevel,
133135
parent_path: &Path,
@@ -399,7 +401,7 @@ impl BackupExt for DdupBakBackup {
399401

400402
let archive = self.archive.clone();
401403
let compression_level = config.system.backups.compression_level;
402-
let (reader, writer) = tokio::io::duplex(crate::BUFFER_SIZE);
404+
let (reader, writer) = tokio::io::simplex(crate::BUFFER_SIZE);
403405

404406
match archive_format {
405407
StreamableArchiveFormat::Zip => {

application/src/server/backup/adapters/restic.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ impl BackupExt for ResticBackup {
577577
_range: Option<ByteRange>,
578578
) -> Result<crate::response::ApiResponse, anyhow::Error> {
579579
let compression_level = config.system.backups.compression_level;
580-
let (reader, writer) = tokio::io::duplex(crate::BUFFER_SIZE);
580+
let (reader, writer) = tokio::io::simplex(crate::BUFFER_SIZE);
581581

582582
match archive_format {
583583
StreamableArchiveFormat::Zip => {
@@ -1221,7 +1221,7 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
12211221
compression_level: CompressionLevel,
12221222
bytes_archived: Option<Arc<AtomicU64>>,
12231223
is_ignored: IsIgnoredFn,
1224-
) -> Result<tokio::io::DuplexStream, anyhow::Error> {
1224+
) -> Result<tokio::io::ReadHalf<tokio::io::SimplexStream>, anyhow::Error> {
12251225
let path = path.as_ref().to_path_buf();
12261226
let entry = self
12271227
.entries
@@ -1237,7 +1237,7 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
12371237

12381238
let full_path = PathBuf::from(&self.server_path).join(&entry.path);
12391239

1240-
let (reader, writer) = tokio::io::duplex(crate::BUFFER_SIZE);
1240+
let (reader, writer) = tokio::io::simplex(crate::BUFFER_SIZE);
12411241

12421242
let configuration = self.configuration.clone();
12431243
let short_id = self.short_id.clone();
@@ -1265,10 +1265,10 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
12651265
let mut child = spawn_restic()?;
12661266

12671267
let writer = tokio_util::io::SyncIoBridge::new(writer);
1268-
let mut archive = zip::ZipWriter::new_stream(writer);
1268+
let mut zip = zip::ZipWriter::new_stream(writer);
12691269

1270-
let mut subtar = tar::Archive::new(child.stdout.take().unwrap());
1271-
let mut entries = subtar.entries()?;
1270+
let mut restic_tar = tar::Archive::new(child.stdout.take().unwrap());
1271+
let mut entries = restic_tar.entries()?;
12721272

12731273
let mut read_buffer = vec![0; crate::BUFFER_SIZE];
12741274
while let Some(Ok(mut entry)) = entries.next() {
@@ -1311,17 +1311,17 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
13111311

13121312
match header.entry_type() {
13131313
tar::EntryType::Directory => {
1314-
archive.add_directory(relative.to_string_lossy(), options)?;
1314+
zip.add_directory(relative.to_string_lossy(), options)?;
13151315
}
13161316
tar::EntryType::Regular => {
1317-
archive.start_file(relative.to_string_lossy(), options)?;
1317+
zip.start_file(relative.to_string_lossy(), options)?;
13181318

13191319
loop {
13201320
let n = entry.read(&mut read_buffer)?;
13211321
if n == 0 {
13221322
break;
13231323
}
1324-
archive.write_all(&read_buffer[..n])?;
1324+
zip.write_all(&read_buffer[..n])?;
13251325
if let Some(counter) = &bytes_archived {
13261326
counter.fetch_add(n as u64, Ordering::SeqCst);
13271327
}
@@ -1331,8 +1331,9 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
13311331
}
13321332
}
13331333

1334-
let mut inner = archive.finish()?;
1334+
let mut inner = zip.finish()?.into_inner();
13351335
inner.flush()?;
1336+
inner.shutdown()?;
13361337

13371338
Ok(())
13381339
});
@@ -1348,10 +1349,10 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
13481349
file_compression_threads,
13491350
)?;
13501351

1351-
let mut subtar = tar::Archive::new(child.stdout.take().unwrap());
1352-
let mut entries = subtar.entries()?;
1352+
let mut restic_tar = tar::Archive::new(child.stdout.take().unwrap());
1353+
let mut entries = restic_tar.entries()?;
13531354

1354-
let mut out_tar = tar::Builder::new(writer);
1355+
let mut tar = tar::Builder::new(writer);
13551356

13561357
while let Some(Ok(entry)) = entries.next() {
13571358
let mut header = entry.header().clone();
@@ -1373,18 +1374,19 @@ impl VirtualReadableFilesystem for VirtualResticBackup {
13731374
if let Some(counter) = &bytes_archived {
13741375
let counting_reader =
13751376
CountingReader::new_with_bytes_read(entry, counter.clone());
1376-
out_tar.append_data(&mut header, relative, counting_reader)?;
1377+
tar.append_data(&mut header, relative, counting_reader)?;
13771378
} else {
1378-
out_tar.append_data(&mut header, relative, entry)?;
1379+
tar.append_data(&mut header, relative, entry)?;
13791380
}
13801381
} else {
1381-
out_tar.append_data(&mut header, relative, std::io::empty())?;
1382+
tar.append_data(&mut header, relative, std::io::empty())?;
13821383
}
13831384
}
13841385

1385-
out_tar.finish()?;
1386-
let mut inner = out_tar.into_inner()?.finish()?;
1386+
tar.finish()?;
1387+
let mut inner = tar.into_inner()?.finish()?;
13871388
inner.flush()?;
1389+
inner.shutdown()?;
13881390

13891391
Ok(())
13901392
});

0 commit comments

Comments
 (0)