Skip to content

Commit 107cdce

Browse files
committed
feat(fileTransfer): add streaming to fifo
Signed-off-by: Joshua Chapman <joshua.chapman@secomind.com>
1 parent 66f0bf9 commit 107cdce

File tree

11 files changed

+575
-42
lines changed

11 files changed

+575
-42
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ target/
2828

2929
# RustRover IDE
3030
.idea/
31+
# nvim
32+
.nvim.lua
3133

3234
/telemetry.json
3335

Cargo.lock

Lines changed: 49 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ bytes.workspace = true
114114
cfg-if.workspace = true
115115
clap = { workspace = true, features = ["derive", "env"] }
116116
color-eyre.workspace = true
117+
dirs.workspace = true
117118
displaydoc.workspace = true
118119
edgehog-containers = { workspace = true, optional = true }
119120
edgehog-forwarder = { workspace = true, optional = true }
@@ -124,6 +125,7 @@ eyre.workspace = true
124125
futures.workspace = true
125126
hex.workspace = true
126127
pin-project-lite.workspace = true
128+
rustix = { workspace = true, features = ["fs"] }
127129
rustls.workspace = true
128130
serde.workspace = true
129131
serde_json.workspace = true
@@ -191,6 +193,7 @@ color-eyre = "0.6.5"
191193
deadpool = { version = "0.12.3", default-features = false }
192194
diesel = "2.3.5"
193195
diesel_migrations = "2.3.1"
196+
dirs = "6.0.0"
194197
displaydoc = "0.2.5"
195198
edgehog-containers = { package = "edgehog-device-runtime-containers", path = "./edgehog-device-runtime-containers", version = "=0.10.4" }
196199
edgehog-device-forwarder-proto = "0.1.0"
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// This file is part of Edgehog.
2+
//
3+
// Copyright 2026 SECO Mind Srl
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
// SPDX-License-Identifier: Apache-2.0
18+
19+
use uuid::Uuid;
20+
21+
use super::request::FileDigest;
22+
#[cfg(unix)]
23+
use super::request::FilePermissions;
24+
25+
pub(super) mod store;
26+
pub(super) mod stream;
27+
28+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29+
pub(crate) struct FileOptions {
30+
pub(super) id: Uuid,
31+
pub(super) file_size: u64,
32+
pub(super) file_digest: FileDigest,
33+
#[cfg(unix)]
34+
pub(super) perm: FilePermissions,
35+
}
Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,17 @@ use tokio::io::{AsyncBufReadExt, AsyncWrite, BufReader};
2828
use tracing::{info, instrument, trace};
2929
use uuid::Uuid;
3030

31-
use super::request::FileDigest;
32-
#[cfg(unix)]
33-
use super::request::FilePermissions;
31+
use super::FileOptions;
3432

3533
/// Stores files in the storage
3634
#[derive(Debug)]
37-
pub(super) struct FileStorage<F> {
38-
fs: F,
35+
pub(crate) struct FileStorage<F> {
3936
dir: PathBuf,
37+
fs: F,
4038
}
4139

4240
impl FileStorage<Fs> {
43-
pub(super) fn new(dir: PathBuf) -> Self {
41+
pub(crate) fn new(dir: PathBuf) -> Self {
4442
Self { fs: Fs {}, dir }
4543
}
4644
}
@@ -88,8 +86,6 @@ impl<F> FileStorage<F> {
8886
where
8987
F: Limits,
9088
{
91-
trace!("creating write handle");
92-
9389
let file_path = self.partial_file_path(&opt.id);
9490

9591
self.fs
@@ -147,8 +143,6 @@ impl<F> FileStorage<F> {
147143
where
148144
F: Limits,
149145
{
150-
trace!("finalizing write handle");
151-
152146
handle
153147
.file
154148
.sync_all()
@@ -184,18 +178,9 @@ impl<F> FileStorage<F> {
184178
}
185179
}
186180

187-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188-
pub(super) struct FileOptions {
189-
pub(super) id: Uuid,
190-
pub(super) file_size: u64,
191-
pub(super) file_digest: FileDigest,
192-
#[cfg(unix)]
193-
pub(super) perm: FilePermissions,
194-
}
195-
196181
pin_project! {
197182
#[derive(Debug)]
198-
pub(super) struct WriteHandle {
183+
pub(crate) struct WriteHandle {
199184
id: Uuid,
200185
current_size: u64,
201186
// TODO limit the size of the file
@@ -285,6 +270,10 @@ mod tests {
285270
use tempdir::TempDir;
286271
use tokio::io::AsyncWriteExt;
287272

273+
use crate::file_transfer::request::FileDigest;
274+
#[cfg(unix)]
275+
use crate::file_transfer::request::FilePermissions;
276+
288277
use super::*;
289278

290279
use pretty_assertions::assert_eq;
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// This file is part of Edgehog.
2+
//
3+
// Copyright 2026 SECO Mind Srl
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
// SPDX-License-Identifier: Apache-2.0
18+
19+
//! Stream the bytes to a named pipe or unix socket.
20+
21+
use tokio::io::{AsyncRead, AsyncWrite};
22+
use tracing::instrument;
23+
use uuid::Uuid;
24+
25+
use super::FileOptions;
26+
27+
#[cfg(unix)]
28+
mod unix;
29+
#[cfg(windows)]
30+
mod windows;
31+
32+
cfg_if::cfg_if! {
33+
if #[cfg(unix)] {
34+
pub(crate) type SysPipe = self::unix::MakeFifo;
35+
} else if #[cfg(windows)] {
36+
pub(crate) type SysPipe = self::windows::MakeNamedPipe;
37+
}
38+
}
39+
40+
#[derive(Debug)]
41+
pub(crate) struct Streaming<S> {
42+
sys: S,
43+
}
44+
45+
impl<S> Streaming<S> {
46+
#[instrument(skip_all, fields(id = %opt.id))]
47+
pub(crate) async fn open_writer(
48+
&self,
49+
opt: &FileOptions,
50+
) -> eyre::Result<(S::Writer, aws_lc_rs::digest::Context)>
51+
where
52+
S: Pipe,
53+
{
54+
let writer = self.sys.open_writer(opt).await?;
55+
let digest = aws_lc_rs::digest::Context::from(opt.file_digest);
56+
57+
Ok((writer, digest))
58+
}
59+
60+
#[instrument(skip(self))]
61+
pub(crate) async fn create_reader(&self, id: &Uuid) -> eyre::Result<S::Reader>
62+
where
63+
S: Pipe,
64+
{
65+
let reader = self.sys.create_reader(id).await?;
66+
67+
Ok(reader)
68+
}
69+
}
70+
71+
impl Streaming<SysPipe> {
72+
pub(crate) fn with_sys() -> Self {
73+
cfg_if::cfg_if! {
74+
if #[cfg(unix)] {
75+
Self {
76+
sys: self::unix::MakeFifo::new(),
77+
}
78+
} else if #[cfg(windows)] {
79+
Self {
80+
sys: self::windows::MakeNamedPipe::new(),
81+
}
82+
}
83+
}
84+
}
85+
}
86+
87+
pub(crate) trait Pipe {
88+
type Reader: AsyncRead;
89+
type Writer: AsyncWrite;
90+
91+
fn open_writer(
92+
&self,
93+
opt: &FileOptions,
94+
) -> impl Future<Output = eyre::Result<Self::Writer>> + Send;
95+
96+
fn create_reader(&self, id: &Uuid) -> impl Future<Output = eyre::Result<Self::Reader>> + Send;
97+
}
98+
99+
#[cfg(test)]
100+
mod tests {
101+
use super::*;
102+
103+
#[test]
104+
fn runtime_dir() {
105+
Streaming::with_sys();
106+
}
107+
}

0 commit comments

Comments
 (0)