@@ -310,6 +310,8 @@ pub use self::error::RawOsError;
310310pub use self::error::SimpleMessage;
311311#[unstable(feature = "io_const_error", issue = "133448")]
312312pub use self::error::const_error;
313+ #[unstable(feature = "anonymous_pipe", issue = "127154")]
314+ pub use self::pipe::{PipeReader, PipeWriter, pipe};
313315#[stable(feature = "is_terminal", since = "1.70.0")]
314316pub use self::stdio::IsTerminal;
315317pub(crate) use self::stdio::attempt_print_to_stderr;
@@ -328,16 +330,17 @@ pub use self::{
328330 stdio::{Stderr, StderrLock, Stdin, StdinLock, Stdout, StdoutLock, stderr, stdin, stdout},
329331 util::{Empty, Repeat, Sink, empty, repeat, sink},
330332};
333+
331334use crate::mem::take;
332335use crate::ops::{Deref, DerefMut};
333- use crate::sys::anonymous_pipe::{AnonPipe, pipe as pipe_inner};
334336use crate::{cmp, fmt, slice, str, sys};
335337
336338mod buffered;
337339pub(crate) mod copy;
338340mod cursor;
339341mod error;
340342mod impls;
343+ mod pipe;
341344pub mod prelude;
342345mod stdio;
343346mod util;
@@ -3251,251 +3254,3 @@ impl<B: BufRead> Iterator for Lines<B> {
32513254 }
32523255 }
32533256}
3254-
3255- /// Create anonymous pipe that is close-on-exec and blocking.
3256- ///
3257- /// # Behavior
3258- ///
3259- /// A pipe is a synchronous, unidirectional data channel between two or more processes, like an
3260- /// interprocess [`mpsc`](crate::sync::mpsc) provided by the OS. In particular:
3261- ///
3262- /// * A read on a [`PipeReader`] blocks until the pipe is non-empty.
3263- /// * A write on a [`PipeWriter`] blocks when the pipe is full.
3264- /// * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`]
3265- /// returns EOF.
3266- /// * [`PipeReader`] can be shared, but only one process will consume the data in the pipe.
3267- ///
3268- /// # Capacity
3269- ///
3270- /// Pipe capacity is platform dependent. To quote the Linux [man page]:
3271- ///
3272- /// > Different implementations have different limits for the pipe capacity. Applications should
3273- /// > not rely on a particular capacity: an application should be designed so that a reading process
3274- /// > consumes data as soon as it is available, so that a writing process does not remain blocked.
3275- ///
3276- /// # Examples
3277- ///
3278- /// ```no_run
3279- /// #![feature(anonymous_pipe)]
3280- /// # #[cfg(miri)] fn main() {}
3281- /// # #[cfg(not(miri))]
3282- /// # fn main() -> std::io::Result<()> {
3283- /// # use std::process::Command;
3284- /// # use std::io::{Read, Write};
3285- /// let (ping_rx, mut ping_tx) = std::io::pipe()?;
3286- /// let (mut pong_rx, pong_tx) = std::io::pipe()?;
3287- ///
3288- /// // Spawn a process that echoes its input.
3289- /// let mut echo_server = Command::new("cat").stdin(ping_rx).stdout(pong_tx).spawn()?;
3290- ///
3291- /// ping_tx.write_all(b"hello")?;
3292- /// // Close to unblock echo_server's reader.
3293- /// drop(ping_tx);
3294- ///
3295- /// let mut buf = String::new();
3296- /// // Block until echo_server's writer is closed.
3297- /// pong_rx.read_to_string(&mut buf)?;
3298- /// assert_eq!(&buf, "hello");
3299- ///
3300- /// echo_server.wait()?;
3301- /// # Ok(())
3302- /// # }
3303- /// ```
3304- /// [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html
3305- /// [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe
3306- /// [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html
3307- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3308- #[inline]
3309- pub fn pipe() -> Result<(PipeReader, PipeWriter)> {
3310- pipe_inner().map(|(reader, writer)| (PipeReader(reader), PipeWriter(writer)))
3311- }
3312-
3313- /// Read end of the anonymous pipe.
3314- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3315- #[derive(Debug)]
3316- pub struct PipeReader(pub(crate) AnonPipe);
3317-
3318- /// Write end of the anonymous pipe.
3319- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3320- #[derive(Debug)]
3321- pub struct PipeWriter(pub(crate) AnonPipe);
3322-
3323- impl PipeReader {
3324- /// Create a new [`PipeReader`] instance that shares the same underlying file description.
3325- ///
3326- /// # Examples
3327- ///
3328- /// ```no_run
3329- /// #![feature(anonymous_pipe)]
3330- /// # #[cfg(miri)] fn main() {}
3331- /// # #[cfg(not(miri))]
3332- /// # fn main() -> std::io::Result<()> {
3333- /// # use std::fs;
3334- /// # use std::io::Write;
3335- /// # use std::process::Command;
3336- /// const NUM_SLOT: u8 = 2;
3337- /// const NUM_PROC: u8 = 5;
3338- /// const OUTPUT: &str = "work.txt";
3339- ///
3340- /// let mut jobs = vec![];
3341- /// let (reader, mut writer) = std::io::pipe()?;
3342- ///
3343- /// // Write NUM_SLOT characters the pipe.
3344- /// writer.write_all(&[b'|'; NUM_SLOT as usize])?;
3345- ///
3346- /// // Spawn several processes that read a character from the pipe, do some work, then
3347- /// // write back to the pipe. When the pipe is empty, the processes block, so only
3348- /// // NUM_SLOT processes can be working at any given time.
3349- /// for _ in 0..NUM_PROC {
3350- /// jobs.push(
3351- /// Command::new("bash")
3352- /// .args(["-c",
3353- /// &format!(
3354- /// "read -n 1\n\
3355- /// echo -n 'x' >> '{OUTPUT}'\n\
3356- /// echo -n '|'",
3357- /// ),
3358- /// ])
3359- /// .stdin(reader.try_clone()?)
3360- /// .stdout(writer.try_clone()?)
3361- /// .spawn()?,
3362- /// );
3363- /// }
3364- ///
3365- /// // Wait for all jobs to finish.
3366- /// for mut job in jobs {
3367- /// job.wait()?;
3368- /// }
3369- ///
3370- /// // Check our work and clean up.
3371- /// let xs = fs::read_to_string(OUTPUT)?;
3372- /// fs::remove_file(OUTPUT)?;
3373- /// assert_eq!(xs, "x".repeat(NUM_PROC.into()));
3374- /// # Ok(())
3375- /// # }
3376- /// ```
3377- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3378- pub fn try_clone(&self) -> Result<Self> {
3379- self.0.try_clone().map(Self)
3380- }
3381- }
3382-
3383- impl PipeWriter {
3384- /// Create a new [`PipeWriter`] instance that shares the same underlying file description.
3385- ///
3386- /// # Examples
3387- ///
3388- /// ```no_run
3389- /// #![feature(anonymous_pipe)]
3390- /// # #[cfg(miri)] fn main() {}
3391- /// # #[cfg(not(miri))]
3392- /// # fn main() -> std::io::Result<()> {
3393- /// # use std::process::Command;
3394- /// # use std::io::Read;
3395- /// let (mut reader, writer) = std::io::pipe()?;
3396- ///
3397- /// // Spawn a process that writes to stdout and stderr.
3398- /// let mut peer = Command::new("bash")
3399- /// .args([
3400- /// "-c",
3401- /// "echo -n foo\n\
3402- /// echo -n bar >&2"
3403- /// ])
3404- /// .stdout(writer.try_clone()?)
3405- /// .stderr(writer)
3406- /// .spawn()?;
3407- ///
3408- /// // Read and check the result.
3409- /// let mut msg = String::new();
3410- /// reader.read_to_string(&mut msg)?;
3411- /// assert_eq!(&msg, "foobar");
3412- ///
3413- /// peer.wait()?;
3414- /// # Ok(())
3415- /// # }
3416- /// ```
3417- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3418- pub fn try_clone(&self) -> Result<Self> {
3419- self.0.try_clone().map(Self)
3420- }
3421- }
3422-
3423- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3424- impl Read for &PipeReader {
3425- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
3426- self.0.read(buf)
3427- }
3428- fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
3429- self.0.read_vectored(bufs)
3430- }
3431- #[inline]
3432- fn is_read_vectored(&self) -> bool {
3433- self.0.is_read_vectored()
3434- }
3435- fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
3436- self.0.read_to_end(buf)
3437- }
3438- fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
3439- self.0.read_buf(buf)
3440- }
3441- }
3442-
3443- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3444- impl Read for PipeReader {
3445- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
3446- self.0.read(buf)
3447- }
3448- fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
3449- self.0.read_vectored(bufs)
3450- }
3451- #[inline]
3452- fn is_read_vectored(&self) -> bool {
3453- self.0.is_read_vectored()
3454- }
3455- fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
3456- self.0.read_to_end(buf)
3457- }
3458- fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
3459- self.0.read_buf(buf)
3460- }
3461- }
3462-
3463- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3464- impl Write for &PipeWriter {
3465- fn write(&mut self, buf: &[u8]) -> Result<usize> {
3466- self.0.write(buf)
3467- }
3468- #[inline]
3469- fn flush(&mut self) -> Result<()> {
3470- Ok(())
3471- }
3472-
3473- fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
3474- self.0.write_vectored(bufs)
3475- }
3476-
3477- #[inline]
3478- fn is_write_vectored(&self) -> bool {
3479- self.0.is_write_vectored()
3480- }
3481- }
3482-
3483- #[unstable(feature = "anonymous_pipe", issue = "127154")]
3484- impl Write for PipeWriter {
3485- fn write(&mut self, buf: &[u8]) -> Result<usize> {
3486- self.0.write(buf)
3487- }
3488- #[inline]
3489- fn flush(&mut self) -> Result<()> {
3490- Ok(())
3491- }
3492-
3493- fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
3494- self.0.write_vectored(bufs)
3495- }
3496-
3497- #[inline]
3498- fn is_write_vectored(&self) -> bool {
3499- self.0.is_write_vectored()
3500- }
3501- }
0 commit comments