Skip to content

Commit 16fc9e4

Browse files
committed
refactor: lazy zen input bytes — Arc avoids clone on v2 non-zen path
add_input_vector: Arc::new(bytes) — zero-copy move, refcount bump into IoProxy add_input_buffer: store &'static [u8] directly — no copy at registration add_copied_input_buffer: one copy into Arc, shared with IoProxy (was two copies) Vec<u8> for zenpipe built lazily in zen_execute_inner via ZenInput::to_vec(), so v2 non-zen jobs pay zero clone cost even with zen-pipeline feature enabled. New: IoBackend::ReadArc(Cursor<ArcBytes>), IoProxy::read_arc, IoProxy::as_input_slice
1 parent dc198f0 commit 16fc9e4

File tree

2 files changed

+81
-11
lines changed

2 files changed

+81
-11
lines changed

imageflow_core/src/context.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,25 @@ use crate::graphics::bitmaps::{Bitmap, BitmapKey, BitmapWindowMut, BitmapsContai
3131
use imageflow_types::ImageInfo;
3232
use itertools::Itertools;
3333

34+
/// Input bytes for the zen pipeline — either an Arc-wrapped owned Vec or a static slice.
35+
/// Stored instead of `Vec<u8>` so the v2 non-zen path pays zero clone cost at registration time.
36+
/// The `Vec<u8>` zenpipe needs is built lazily in `zen_execute_inner`.
37+
#[cfg(feature = "zen-pipeline")]
38+
enum ZenInput {
39+
Owned(Arc<Vec<u8>>),
40+
Static(&'static [u8]),
41+
}
42+
43+
#[cfg(feature = "zen-pipeline")]
44+
impl ZenInput {
45+
fn to_vec(&self) -> Vec<u8> {
46+
match self {
47+
ZenInput::Owned(a) => (**a).clone(),
48+
ZenInput::Static(s) => s.to_vec(),
49+
}
50+
}
51+
}
52+
3453
/// Something of a god object (which is necessary for a reasonable FFI interface).
3554
/// 1025 bytes including 5 heap allocations as of Oct 2025. If on the stack, 312 bytes are taken up
3655
pub struct Context {
@@ -60,8 +79,9 @@ pub struct Context {
6079
/// Input bytes stashed for the zen pipeline (feature-gated).
6180
/// Populated by `add_copied_input_buffer`, `add_input_vector`, etc.
6281
/// The zen pipeline reads from here instead of the codec containers.
82+
/// Uses Arc to avoid cloning on the v2 non-zen path — Vec is built lazily at execute time.
6383
#[cfg(feature = "zen-pipeline")]
64-
zen_input_bytes: std::collections::HashMap<i32, Vec<u8>>,
84+
zen_input_bytes: std::collections::HashMap<i32, ZenInput>,
6585

6686
/// Force a specific backend for testing. None = zen when zen-pipeline is compiled in.
6787
#[cfg(feature = "zen-pipeline")]
@@ -578,17 +598,31 @@ impl Context {
578598

579599
pub fn add_copied_input_buffer(&mut self, io_id: i32, bytes: &[u8]) -> Result<()> {
580600
#[cfg(feature = "zen-pipeline")]
581-
self.zen_input_bytes.insert(io_id, bytes.to_vec());
582-
583-
let io = IoProxy::copy_slice(self, io_id, bytes).map_err(|e| e.at(here!()))?;
584-
self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()))
601+
{
602+
let arc = Arc::new(bytes.to_vec());
603+
self.zen_input_bytes.insert(io_id, ZenInput::Owned(arc.clone()));
604+
let io = IoProxy::read_arc(self, io_id, arc).map_err(|e| e.at(here!()))?;
605+
return self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()));
606+
}
607+
#[cfg(not(feature = "zen-pipeline"))]
608+
{
609+
let io = IoProxy::copy_slice(self, io_id, bytes).map_err(|e| e.at(here!()))?;
610+
self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()))
611+
}
585612
}
586613
pub fn add_input_vector(&mut self, io_id: i32, bytes: Vec<u8>) -> Result<()> {
587614
#[cfg(feature = "zen-pipeline")]
588-
self.zen_input_bytes.insert(io_id, bytes.clone());
589-
590-
let io = IoProxy::read_vec(self, io_id, bytes).map_err(|e| e.at(here!()))?;
591-
self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()))
615+
{
616+
let arc = Arc::new(bytes);
617+
self.zen_input_bytes.insert(io_id, ZenInput::Owned(arc.clone()));
618+
let io = IoProxy::read_arc(self, io_id, arc).map_err(|e| e.at(here!()))?;
619+
return self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()));
620+
}
621+
#[cfg(not(feature = "zen-pipeline"))]
622+
{
623+
let io = IoProxy::read_vec(self, io_id, bytes).map_err(|e| e.at(here!()))?;
624+
self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()))
625+
}
592626
}
593627

594628
/// Zero-copy: borrows `bytes` without copying.
@@ -602,7 +636,7 @@ impl Context {
602636
/// In practice, the ABI layer (imageflow_abi) uses transmute to erase the real lifetime.
603637
pub fn add_input_buffer(&mut self, io_id: i32, bytes: &'static [u8]) -> Result<()> {
604638
#[cfg(feature = "zen-pipeline")]
605-
self.zen_input_bytes.insert(io_id, bytes.to_vec());
639+
self.zen_input_bytes.insert(io_id, ZenInput::Static(bytes));
606640

607641
let io = IoProxy::read_slice(self, io_id, bytes).map_err(|e| e.at(here!()))?;
608642
self.add_io(io, io_id, IoDirection::In).map_err(|e| e.at(here!()))
@@ -875,8 +909,11 @@ impl Context {
875909
}
876910
let job_options = what.job_options.unwrap_or_default();
877911

912+
let io_bytes: std::collections::HashMap<i32, Vec<u8>> =
913+
self.zen_input_bytes.iter().map(|(&id, z)| (id, z.to_vec())).collect();
914+
878915
let output =
879-
crate::zen::zen_execute(&what.framewise, &self.zen_input_bytes, &self.security, &job_options)
916+
crate::zen::zen_execute(&what.framewise, &io_bytes, &self.security, &job_options)
880917
.map_err(|e| e.at(here!()))?;
881918

882919
// Store encoded outputs in Context's output buffer system.

imageflow_core/src/io.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,22 @@ use crate::{Context, ErrorKind, JsonResponse, Result};
66
use imageflow_types::collections::AddRemoveSet;
77
use imageflow_types::IoDirection;
88
use std::rc::Rc;
9+
use std::sync::Arc;
910
use uuid::Uuid;
1011

12+
/// Newtype so `Cursor<ArcBytes>` implements `Read`/`BufRead`/`Seek`.
13+
/// `Arc<Vec<u8>>` doesn't implement `AsRef<[u8]>` directly; this bridges the gap.
14+
struct ArcBytes(Arc<Vec<u8>>);
15+
impl AsRef<[u8]> for ArcBytes {
16+
fn as_ref(&self) -> &[u8] {
17+
self.0.as_slice()
18+
}
19+
}
20+
1121
enum IoBackend {
1222
ReadSlice(Cursor<&'static [u8]>),
1323
ReadVec(Cursor<Vec<u8>>),
24+
ReadArc(Cursor<ArcBytes>),
1425
WriteVec(Cursor<Vec<u8>>),
1526
ReadFile(BufReader<File>),
1627
WriteFile(BufWriter<File>),
@@ -27,6 +38,7 @@ impl IoBackend {
2738
match self {
2839
IoBackend::ReadSlice(w) => Some(w),
2940
IoBackend::ReadVec(w) => Some(w),
41+
IoBackend::ReadArc(w) => Some(w),
3042
IoBackend::ReadFile(w) => Some(w),
3143
_ => None,
3244
}
@@ -35,6 +47,7 @@ impl IoBackend {
3547
match self {
3648
IoBackend::ReadSlice(w) => Some(w),
3749
IoBackend::ReadVec(w) => Some(w),
50+
IoBackend::ReadArc(w) => Some(w),
3851
IoBackend::ReadFile(w) => Some(w),
3952
_ => None,
4053
}
@@ -43,6 +56,7 @@ impl IoBackend {
4356
match self {
4457
IoBackend::ReadSlice(w) => Some(w),
4558
IoBackend::ReadVec(w) => Some(w),
59+
IoBackend::ReadArc(w) => Some(w),
4660
IoBackend::ReadFile(w) => Some(w),
4761
_ => None,
4862
}
@@ -103,6 +117,7 @@ impl IoProxy {
103117
pub fn try_get_length(&mut self) -> Option<u64> {
104118
match &self.backend {
105119
IoBackend::ReadVec(v) => Some(v.get_ref().len() as u64),
120+
IoBackend::ReadArc(v) => Some(v.get_ref().0.len() as u64),
106121
IoBackend::ReadSlice(v) => Some(v.get_ref().len() as u64),
107122
IoBackend::ReadFile(v) => v.get_ref().metadata().map(|m| m.len()).ok(),
108123
_ => None,
@@ -112,12 +127,24 @@ impl IoProxy {
112127
pub fn try_get_position(&mut self) -> Option<u64> {
113128
match &self.backend {
114129
IoBackend::ReadVec(v) => Some(v.position()),
130+
IoBackend::ReadArc(v) => Some(v.position()),
115131
IoBackend::ReadSlice(v) => Some(v.position()),
116132
IoBackend::ReadFile(v) => v.get_ref().stream_position().ok(),
117133
_ => None,
118134
}
119135
}
120136

137+
/// Return a slice view of the input bytes without copying.
138+
/// Returns `None` for file-backed or write-backed proxies.
139+
pub fn as_input_slice(&self) -> Option<&[u8]> {
140+
match &self.backend {
141+
IoBackend::ReadSlice(c) => Some(c.get_ref()),
142+
IoBackend::ReadVec(c) => Some(c.get_ref()),
143+
IoBackend::ReadArc(c) => Some(c.get_ref().0.as_slice()),
144+
_ => None,
145+
}
146+
}
147+
121148
pub fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
122149
self.backend.get_read().expect("cannot read from writer").read_exact(buf)
123150
}
@@ -213,6 +240,12 @@ impl IoProxy {
213240
Ok(IoProxy { path: None, io_id, backend: IoBackend::ReadVec(Cursor::new(bytes)) })
214241
}
215242

243+
/// Wrap a shared `Arc<Vec<u8>>` as an input buffer — zero copy for the caller.
244+
pub fn read_arc(context: &Context, io_id: i32, bytes: Arc<Vec<u8>>) -> Result<IoProxy> {
245+
IoProxy::check_io_id(context, io_id)?;
246+
Ok(IoProxy { path: None, io_id, backend: IoBackend::ReadArc(Cursor::new(ArcBytes(bytes))) })
247+
}
248+
216249
pub fn copy_slice(context: &Context, io_id: i32, bytes: &[u8]) -> Result<IoProxy> {
217250
IoProxy::check_io_id(context, io_id)?;
218251

0 commit comments

Comments
 (0)