Skip to content

Conversation

@drindr
Copy link
Contributor

@drindr drindr commented Jan 8, 2026

Currently, the send_output method cost 1 copy.

This PR provides a zero-copy method for Python API named send_output_raw which provides better performance as the message getting larger.

Copilot AI review requested due to automatic review settings January 8, 2026 16:07
@drindr
Copy link
Contributor Author

drindr commented Jan 8, 2026

It includes 2 examples. One for showing how to use and one for benchmark the performance between send_output and send_output_raw

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds a zero-copy send API for Python to improve performance when sending large data buffers. The new send_output_raw method allows Python code to allocate a buffer through Dora, write directly to it, and send it with minimal overhead compared to the existing send_output method which requires copying data.

Key changes:

  • New SampleHandler class in Rust that manages allocated buffers and implements Python's context manager protocol
  • New send_output_raw method on the Python Node class that returns a SampleHandler for zero-copy buffer management
  • Example code and benchmarks demonstrating usage and performance improvements

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
apis/python/node/src/sample_handler.rs New implementation of SampleHandler for managing zero-copy buffer allocation and sending
apis/python/node/src/lib.rs Added send_output_raw method to Python Node API with documentation
apis/rust/node/src/node/mod.rs Changed validate_output visibility from private to public to support new API
apis/python/operator/src/lib.rs Added Clone implementation for DelayedCleanup to enable sharing across handlers
apis/python/node/Cargo.toml Added dependencies on dora-message and dora-core
Cargo.lock Updated lockfile with new dependencies
examples/python-zero-copy-send/node.py Example demonstrating zero-copy send with numpy arrays
examples/python-zero-copy-send/receiver.py Example receiver validating zero-copy sent data
examples/python-zero-copy-send/benchmark.py Performance benchmark comparing regular vs zero-copy send
examples/python-zero-copy-send/benchmark_receiver.py Benchmark receiver measuring latency and throughput
examples/python-zero-copy-send/dataflow.yml Dataflow configuration for example
examples/python-zero-copy-send/benchmark_dataflow.yml Dataflow configuration for benchmark
examples/python-zero-copy-send/README.md Documentation explaining the zero-copy API and usage

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 47 to 77
pub fn as_arrow(&mut self, py: Python) -> PyResult<Py<PyAny>> {
use pyo3::types::PyAnyMethods;
use std::ops::DerefMut;

if let Some(inner) = &mut self.inner {
let sample_slice: &mut [u8] = inner.sample.deref_mut();
let len = sample_slice.len();
let ptr = sample_slice.as_mut_ptr();

// Import numpy and ctypes
let np = py.import_bound("numpy")?;
let ctypes = py.import_bound("ctypes")?;

// Create ctypes array type: ctypes.c_uint8 * len
let c_uint8 = ctypes.getattr("c_uint8")?;
let array_type = c_uint8.call_method1("__mul__", (len,))?;

// Create array from address
let c_array = array_type.call_method1("from_address", (ptr as usize,))?;

// Convert to numpy array (this will be writable)
let np_ctypeslib = np.getattr("ctypeslib")?;
let np_array = np_ctypeslib.call_method1("as_array", (c_array,))?;

Ok(np_array.into())
} else {
Err(pyo3::exceptions::PyRuntimeError::new_err(
"Sample has already been sent",
))
}
}
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as_arrow() method can be called multiple times before send(), and each call creates a new numpy array view pointing to the same underlying buffer. If Python code keeps a reference to the numpy array after the context manager exits and send() is called, accessing that array could lead to use-after-free or undefined behavior since the underlying buffer ownership is transferred on send. Consider adding a mechanism to invalidate previously created array views when the sample is sent, or document this dangerous behavior clearly.

Copilot uses AI. Check for mistakes.
@@ -0,0 +1,113 @@
use dora_core::metadata::ArrowTypeInfoExt;
use dora_message::metadata::ArrowTypeInfo;
use dora_node_api::{DataSample, DoraNode, MetadataParameters, dora_core::config::DataId};
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent import paths for DataId. The lib.rs file imports DataId from dora_message::id::DataId (line 3), while sample_handler.rs imports it from dora_core::config::DataId (line 3). Although these resolve to the same type (as dora_core::config re-exports from dora_message::id), using consistent import paths across the codebase would improve maintainability. Consider standardizing on one import path.

Suggested change
use dora_node_api::{DataSample, DoraNode, MetadataParameters, dora_core::config::DataId};
use dora_message::id::DataId;
use dora_node_api::{DataSample, DoraNode, MetadataParameters};

Copilot uses AI. Check for mistakes.
Comment on lines 46 to 50
# Fill with gradient pattern
for i in range(pixel_count):
pixels[i*3] = (i % 256) # Red
pixels[i*3 + 1] = ((i // 256) % 256) # Green
pixels[i*3 + 2] = 128 # Blue
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a Python for-loop to fill 100,000 pixels (300,000 bytes) with individual element assignments is highly inefficient. Consider using NumPy vectorized operations or array slicing for better performance. For example, you could reshape the array and use NumPy broadcasting: pixels_reshaped = pixels.reshape(-1, 3); pixels_reshaped[:, 0] = np.arange(pixel_count) % 256.

Suggested change
# Fill with gradient pattern
for i in range(pixel_count):
pixels[i*3] = (i % 256) # Red
pixels[i*3 + 1] = ((i // 256) % 256) # Green
pixels[i*3 + 2] = 128 # Blue
# Fill with gradient pattern using vectorized operations
pixels_reshaped = pixels.reshape(pixel_count, 3)
idx = np.arange(pixel_count)
pixels_reshaped[:, 0] = idx % 256 # Red
pixels_reshaped[:, 1] = (idx // 256) % 256 # Green
pixels_reshaped[:, 2] = 128 # Blue

Copilot uses AI. Check for mistakes.
def format_size(bytes_size):
"""Format bytes to human-readable size"""
for unit in ["B", "KB", "MB", "GB"]:
if bytes_size <= 1024.0:
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition should use < instead of <= to properly transition between units. With <=, a value of exactly 1024.0 B will display as "1024.00 B" instead of transitioning to "1.00 KB".

Suggested change
if bytes_size <= 1024.0:
if bytes_size < 1024.0:

Copilot uses AI. Check for mistakes.
1. **`SampleHandler`** (`apis/python/node/src/sample_handler.rs`):
- Manages the allocated `DataSample`
- Implements Python's context manager protocol (`__enter__` and `__exit__`)
- Provides `as_arrow()` method to expose the buffer as a PyArrow UInt8Array
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation incorrectly states that as_arrow() returns a "PyArrow UInt8Array", but the actual implementation in sample_handler.rs (lines 57-69) returns a numpy array via ctypes. The documentation should be updated to accurately reflect that this method returns a numpy array, not a PyArrow array.

Copilot uses AI. Check for mistakes.
# Data is automatically sent here
```

The `as_arrow()` method returns a PyArrow UInt8Array that wraps the underlying buffer. You can:
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation incorrectly states that as_arrow() returns a "PyArrow UInt8Array". The actual implementation returns a numpy array via ctypes, not a PyArrow array. This line should be updated to reflect that it returns a numpy array.

Copilot uses AI. Check for mistakes.
Comment on lines 45 to 46
/// Get a writable numpy array wrapping the buffer for zero-copy writing
/// This returns a writable UInt8 numpy array that Python can fill
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring incorrectly states that as_arrow() returns a "PyArrow UInt8Array", but the implementation actually returns a numpy array via ctypes (lines 57-69). The docstring should be updated to accurately describe the return type as a numpy array.

Suggested change
/// Get a writable numpy array wrapping the buffer for zero-copy writing
/// This returns a writable UInt8 numpy array that Python can fill
/// Get a writable NumPy ndarray wrapping the buffer for zero-copy writing.
/// This returns a writable uint8 NumPy array (numpy.ndarray) that Python code can fill.

Copilot uses AI. Check for mistakes.
"""

import numpy as np
import pyarrow as pa
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'pa' is not used.

Suggested change
import pyarrow as pa

Copilot uses AI. Check for mistakes.
Receiver node that validates the zero-copy sent data.
"""

import numpy as np
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'np' is not used.

Suggested change
import numpy as np

Copilot uses AI. Check for mistakes.
"""

import numpy as np
import pyarrow as pa
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'pa' is not used.

Suggested change
import pyarrow as pa

Copilot uses AI. Check for mistakes.
@phil-opp
Copy link
Collaborator

phil-opp commented Jan 9, 2026

Thanks for the PR! I didn't have time for a full review yet, but I got one fundamental question: Arrow arrays are usually immutable. Your code does mutate the data after converting it to a numpy array. Does this modify the actual underlying data or does this create a new copy (i.e. copy-on-write).

If it modifies the actual data, what's preventing you from modifying shared data received as an input (which might be shared with other receivers)? If modification is possible, this could lead to undefined behavior because of data races...

If it creates a new copy, the code would probably not have the desired effects (since the original sample is still at the default value when sent out).

@drindr
Copy link
Contributor Author

drindr commented Jan 9, 2026

Thanks for the PR! I didn't have time for a full review yet, but I got one fundamental question: Arrow arrays are usually immutable. Your code does mutate the data after converting it to a numpy array. Does this modify the actual underlying data or does this create a new copy (i.e. copy-on-write).

Sorry for the misleading method name as_arrow. It is totally unrelated to the Arrow and I have modified it.
What it actually does is wrapping the pointer of the allocated sample memory with Numpy's array.

@drindr
Copy link
Contributor Author

drindr commented Jan 9, 2026

If it modifies the actual data, what's preventing you from modifying shared data received as an input (which might be shared with other receivers)? If modification is possible, this could lead to undefined behavior because of data races...

If it creates a new copy, the code would probably not have the desired effects (since the original sample is still at the default value when sent out).

The send method will consume the allocated sample, so that double-send will trigger error.
As for the data racing, I have added a flag to forbid writing just before the sample about to be sent in the send method.

@drindr drindr force-pushed the python-zero-copy-send branch from 43f237b to 0050230 Compare January 13, 2026 18:39
@haixuanTao
Copy link
Collaborator

Before merging, would it be possible to add your benchmark data on your machine for multiple data size so people could have a sense at which moment does this makes a meaningful difference?

@drindr
Copy link
Contributor Author

drindr commented Jan 14, 2026

The problem I'm still worried about is that using the built-in memoryview of Python provides the best flexibility, while it can only reject getting memoryview after sent, but it cannot forbid the modification with the defined Numpy array which might lead to some unexpected behaviors.

Comment on lines +23 to +24
dora-message = { workspace = true }
dora-core = { workspace = true }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dora-core dep is not needed, as the dora-node-api crate re-exports the dora-core crate.

Instead of adding a dep on dora-message we should re-export DataId and ArrowTypeInfo from dora-node-api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. I will modify it soon.

}

fn validate_output(&mut self, output_id: &DataId) -> bool {
pub fn validate_output(&mut self, output_id: &DataId) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make this public, we should give it a better name that indicates the boolean return value. Maybe something like valid_output_id?

Comment on lines +389 to +419
/// `send_output_raw` zero-copy send data from the node.
///
/// This method allocates a buffer and returns a SampleHandler that can be used
/// with Python's context manager syntax (`with ... as ...`). The context manager
/// returns a writable memoryview that wraps the allocated memory.
///
/// Example usage:
///
/// ```python
/// # Allocate a 1MB buffer for zero-copy send
/// with node.send_output_raw("output_id", 1024 * 1024, {"key": "value"}) as buffer:
/// # buffer is a writable memoryview - fill it with your data
/// buffer[:] = your_data # Direct assignment
/// # or iterate: for i in range(len(buffer)): buffer[i] = data[i]
///
/// # The data is automatically sent when exiting the `with` block
/// ```
///
/// Alternatively, you can manually send:
///
/// ```python
/// sample = node.send_output_raw("output_id", data_length)
/// buffer = sample.as_memoryview()
/// # ... fill the buffer ...
/// sample.send() # Manually send
/// ```
///
/// :type output_id: str
/// :type data_length: int
/// :type metadata: dict, optional
/// :rtype: SampleHandler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention here that only uint8 is supported as data type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the memoryview, it can be cast into other types easily now. The exact type should depend on the outer wrapper.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the type_info is still hardcoded to a byte array when sending it out, isn't it?

@phil-opp
Copy link
Collaborator

The problem I'm still worried about is that using the built-in memoryview of Python provides the best flexibility, while it can only reject getting memoryview after sent, but it cannot forbid the modification with the defined Numpy array which might lead to some unexpected behaviors.

Maybe you can implement the Python buffer protocol manually and count the number of active references? I.e. increase the reference count on __getbuffer__ and decrease it on __releasebuffer__? Then you could assert that there are no remaining views to the buffer when sending the data out.

@phil-opp
Copy link
Collaborator

Claude suggests something like this, perhaps this is helpful (as reference, not for copy&paste):

Details
use pyo3::prelude::*;
use pyo3::exceptions: :{PyBufferError, PyRuntimeError};
use pyo3::ffi;
use bytes::Bytes;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::sync::Arc;
use std::os::raw::c_int;

/// A buffer that safely tracks Python views and prevents use-after-consume
#[pyclass]
pub struct SafeBuffer {
    inner: Arc<BufferInner>,
}

struct BufferInner {
    // Using Bytes for reference-counted, immutable buffer
    data: std::sync:: Mutex<Bytes>,
    // Track Python buffer protocol exports
    export_count: AtomicUsize,
    // Whether consume() has been called
    consumed: AtomicBool,
}

#[pymethods]
impl SafeBuffer {
    /// Create a new buffer with given size
    #[new]
    fn new(size: usize) -> Self {
        Self {
            inner: Arc::new(BufferInner {
                data: std::sync::Mutex::new(Bytes::from(vec![0u8; size])),
                export_count: AtomicUsize::new(0),
                consumed: AtomicBool::new(false),
            }),
        }
    }
    
    /// Create from existing bytes
    #[staticmethod]
    fn from_bytes(data: Vec<u8>) -> Self {
        Self {
            inner: Arc::new(BufferInner {
                data: std::sync::Mutex::new(Bytes::from(data)),
                export_count:  AtomicUsize::new(0),
                consumed: AtomicBool::new(false),
            }),
        }
    }
    
    /// Get the number of active Python exports
    fn active_exports(&self) -> usize {
        self.inner.export_count.load(Ordering::SeqCst)
    }
    
    /// Check if buffer has been consumed
    fn is_consumed(&self) -> bool {
        self.inner.consumed.load(Ordering::SeqCst)
    }
    
    /// Get the buffer size
    fn __len__(&self) -> usize {
        self.inner.data.lock().unwrap().len()
    }
    
    /// Consume the buffer and return the bytes
    /// Fails if there are active Python views
    fn consume(&self) -> PyResult<Vec<u8>> {
        if self.inner.consumed.swap(true, Ordering::SeqCst) {
            return Err(PyRuntimeError::new_err("Buffer has already been consumed"));
        }
        
        let active = self.inner.export_count. load(Ordering::SeqCst);
        if active > 0 {
            // Reset consumed flag since we're failing
            self.inner.consumed. store(false, Ordering::SeqCst);
            return Err(PyRuntimeError::new_err(
                format!(
                    "Cannot consume buffer:  {} active view(s) still exist.  \
                     Release all memoryviews and numpy arrays before consuming.",
                    active
                )
            ));
        }
        
        let data = self.inner.data. lock().unwrap().clone();
        Ok(data. to_vec())
    }
    
    fn __repr__(&self) -> String {
        let consumed = self.inner.consumed.load(Ordering::SeqCst);
        let exports = self.inner.export_count.load(Ordering::SeqCst);
        let len = self. inner. data.lock().unwrap().len();
        
        format!(
            "SafeBuffer(size={}, consumed={}, active_exports={})",
            len, consumed, exports
        )
    }
    
    /// Implement buffer protocol using PyBuffer_FillInfo helper
    unsafe fn __getbuffer__(
        slf: PyRefMut<'_, Self>,
        view: *mut ffi:: Py_buffer,
        flags: c_int,
    ) -> PyResult<()> {
        // Check if buffer has been consumed
        if slf. inner.consumed.load(Ordering::SeqCst) {
            return Err(PyBufferError::new_err(
                "Cannot create view:  buffer has been consumed"
            ));
        }
        
        let data = slf.inner.data.lock().unwrap();
        
        // Increment export count BEFORE releasing the lock
        slf.inner. export_count.fetch_add(1, Ordering::SeqCst);
        
        // Use PyBuffer_FillInfo to fill the buffer structure
        // This handles all the boilerplate for us! 
        let ret = unsafe {
            ffi::PyBuffer_FillInfo(
                view,
                slf.as_ptr() as *mut ffi::PyObject,
                data.as_ptr() as *mut std::ffi::c_void,
                data.len() as isize,
                1, // readonly = 1 (read-only buffer)
                flags,
            )
        };
        
        if ret == -1 {
            // Failed - decrement export count
            slf.inner.export_count.fetch_sub(1, Ordering::SeqCst);
            return Err(PyErr::fetch(slf.py()));
        }
        
        Ok(())
    }
    
    unsafe fn __releasebuffer__(
        slf: PyRefMut<'_, Self>,
        _view: *mut ffi:: Py_buffer,
    ) {
        let prev = slf.inner.export_count. fetch_sub(1, Ordering::SeqCst);
        if prev == 0 {
            eprintln!("Warning: __releasebuffer__ called with export_count already at 0");
        }
    }
}

@drindr
Copy link
Contributor Author

drindr commented Jan 19, 2026

This PR is blocked currently.

The Py_buffer is stable after python3.11.

But there might be problems about the pyo3 v0.23.

The python version info generated by the 0.23 is

implementation=CPython
version=3.7
shared=true
abi3=true
lib_name=python3.11

While the latest 0.27 generates

implementation=CPython
version=3.11
shared=true
abi3=false
lib_name=python3.11

I try to bump the version of pyo3, but it was blocked by the huggingface/pyo3-special-method-derive#54 which Dora relied on as well. And the crate Arrow supports pyo3 0.27 will be released on Jan according to apache/arrow-rs#8773.

@drindr drindr marked this pull request as draft January 19, 2026 13:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants