Skip to content

[Question]: How to use asyncio's stream types as tokio's AsyncRead/AsyncWrite? #52

@JadKHaddad

Description

@JadKHaddad

I took a look at the unstable feature unstable-streams to convert a stream of bytes into an AsyncRead, which looks roughly like this:

fn internal_function_takes_async_read<T: AsyncRead>(reader: T) {}


#[pyclass]
pub struct AsyncGenBytes {
    /// An async generator that produces byte arrays
    py_async_gen: Py<PyAny>,
}

// Does not really compile because AsyncGenBytes does not implement PyFunctionArgument (not important for this example)
#[pyfunction]
fn takes_async_gen_bytes_exposed_to_python(py_async_gen: AsyncGenBytes) -> PyResult<()> {
    let stream = Python::with_gil(|py| {
        let bound = py_async_gen.py_async_gen.into_bound(py);

        pyo3_async_runtimes::tokio::into_stream_v1(bound)
    })?;

    let byte_stream = stream.then(|res| async {
        Python::with_gil(|py| match res {
            Ok(obj) => match obj.extract::<Vec<u8>>(py) {
                Ok(vec) => Ok(Bytes::from(vec)),
                Err(e) => Err(std::io::Error::other(format!(
                    "Failed to extract bytes: {e}"
                ))),
            },
            Err(e) => Err(std::io::Error::other(format!("Python error: {e}"))),
        })
    });
    
    // StreamReader implements AsyncRead
    let stream_reader = tokio_util::io::StreamReader::new(byte_stream);

    // do the things you want to do
    internal_function_takes_async_read(stream_reader);

    Ok(())
}

This seems to me like hacking around to get things done.

Suggestions on how to do it the right way are very appreciated.

Thanks all for this GREAT project!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions