1- //! Module for anonymous pipe
1+ //! A cross-platform anonymous pipe.
22//!
3- //! ```
4- //! #![feature(anonymous_pipe)]
3+ //! This module provides support for anonymous OS pipes, like [pipe] on Linux or [CreatePipe] on
4+ //! Windows, which can be used as synchronous communication channels between related processes.
5+ //!
6+ //! # Behavior
7+ //!
8+ //! A pipe can be thought of as a bounded, interprocess [`mpsc`](crate::sync::mpsc), provided by
9+ //! the OS, with a platform-dependent capacity. In particular:
10+ //!
11+ //! * A read on a [`PipeReader`] blocks until the pipe is non-empty.
12+ //! * A write on a [`PipeWriter`] blocks when the pipe is full.
13+ //! * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`]
14+ //! returns EOF.
15+ //! * [`PipeReader`] can be shared (by copying the underlying file descriptor), but only one process
16+ //! will consume data in the pipe at any given time. See [`PipeReader::try_clone`] for an example
17+ //! of this.
18+ //!
19+ //! # Capacity
20+ //!
21+ //! Pipe capacity is platform-dependent. To quote the Linux [man page]:
22+ //!
23+ //! > Different implementations have different limits for the pipe capacity. Applications should
24+ //! > not rely on a particular capacity: an application should be designed so that a reading process
25+ //! > consumes data as soon as it is available, so that a writing process does not remain blocked.
526//!
27+ //! # Examples
28+ //!
29+ //! ```no_run
30+ //! #![feature(anonymous_pipe)]
631//! # #[cfg(miri)] fn main() {}
732//! # #[cfg(not(miri))]
33+ //! # use std::process::Command;
34+ //! # use std::io::{Read, Write};
835//! # fn main() -> std::io::Result<()> {
9- //! let (reader, writer) = std::pipe::pipe()?;
36+ //! let (mut ping_rx, ping_tx) = std::pipe::pipe()?;
37+ //! let (pong_rx, mut pong_tx) = std::pipe::pipe()?;
38+ //!
39+ //! let mut peer = Command::new("python")
40+ //! .args([
41+ //! "-c",
42+ //! "from os import close\n\
43+ //! from sys import stdin, stdout\n\
44+ //! stdout.write('ping')\n\
45+ //! stdout.flush()\n\
46+ //! close(stdout.fileno())\n\
47+ //! msg = stdin.read()\n\
48+ //! assert(msg == 'pong')"
49+ //! ])
50+ //! .stdin(pong_rx)
51+ //! .stdout(ping_tx)
52+ //! .spawn()?;
53+ //!
54+ //! let mut msg = String::new();
55+ //! // Block until peer's write end is closed.
56+ //! ping_rx.read_to_string(&mut msg)?;
57+ //! assert_eq!(&msg, "ping");
58+ //!
59+ //! pong_tx.write_all(b"pong")?;
60+ //! // Close to unblock peer's read end.
61+ //! drop(pong_tx);
62+ //!
63+ //! peer.wait()?;
1064//! # Ok(())
1165//! # }
1266//! ```
13-
67+ //! [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html
68+ //! [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe
69+ //! [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html
1470use crate :: io;
1571use crate :: sys:: anonymous_pipe:: { AnonPipe , pipe as pipe_inner} ;
1672
1773/// Create anonymous pipe that is close-on-exec and blocking.
74+ ///
75+ /// # Examples
76+ ///
77+ /// See the [module-level](crate::pipe) documentation for examples.
1878#[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
1979#[ inline]
2080pub fn pipe ( ) -> io:: Result < ( PipeReader , PipeWriter ) > {
@@ -33,6 +93,61 @@ pub struct PipeWriter(pub(crate) AnonPipe);
3393
3494impl PipeReader {
3595 /// Create a new [`PipeReader`] instance that shares the same underlying file description.
96+ ///
97+ /// # Examples
98+ ///
99+ /// ```no_run
100+ /// #![feature(anonymous_pipe)]
101+ /// # #[cfg(miri)] fn main() {}
102+ /// # #[cfg(not(miri))]
103+ /// # use std::process::Command;
104+ /// # use std::io::{Read, Write};
105+ /// # use std::fs::{self, File};
106+ /// # fn main() -> std::io::Result<()> {
107+ /// const NUM_SLOT: u8 = 2;
108+ /// const NUM_PROC: u8 = 5;
109+ /// const OUTPUT: &str = "output.txt";
110+ ///
111+ /// let jobserver = [b'|'; NUM_SLOT as usize];
112+ /// let mut jobs = vec![];
113+ /// let mut file = File::create_new(OUTPUT)?;
114+ ///
115+ /// let (reader, mut writer) = std::pipe::pipe()?;
116+ ///
117+ /// for j in 0..NUM_PROC {
118+ /// jobs.push(
119+ /// Command::new("python")
120+ /// .args([
121+ /// "-c",
122+ /// &format!(
123+ /// "from sys import stdout, stdin\n\
124+ /// stdin.read(1)\n\
125+ /// with open('{}', 'a') as fp: fp.write('x')\n\
126+ /// stdout.write(b'|'.decode())",
127+ /// OUTPUT
128+ /// ),
129+ /// ])
130+ /// .stdout(writer.try_clone()?)
131+ /// .stdin(reader.try_clone()?)
132+ /// .spawn()?,
133+ /// )
134+ /// }
135+ ///
136+ /// writer.write_all(&jobserver)?;
137+ ///
138+ /// for mut job in jobs {
139+ /// job.wait()?;
140+ /// }
141+ ///
142+ /// let mut buf = String::new();
143+ /// file.read_to_string(&mut buf)?;
144+ ///
145+ /// fs::remove_file(OUTPUT)?;
146+ ///
147+ /// assert_eq!(buf, "x".repeat(NUM_PROC.into()));
148+ /// # Ok(())
149+ /// # }
150+ /// ```
36151 #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
37152 pub fn try_clone ( & self ) -> io:: Result < Self > {
38153 self . 0 . try_clone ( ) . map ( Self )
@@ -41,6 +156,37 @@ impl PipeReader {
41156
42157impl PipeWriter {
43158 /// Create a new [`PipeWriter`] instance that shares the same underlying file description.
159+ ///
160+ /// # Examples
161+ ///
162+ /// ```no_run
163+ /// #![feature(anonymous_pipe)]
164+ /// # #[cfg(miri)] fn main() {}
165+ /// # #[cfg(not(miri))]
166+ /// # use std::process::Command;
167+ /// # use std::io::Read;
168+ /// # fn main() -> std::io::Result<()> {
169+ /// let (mut reader, writer) = std::pipe::pipe()?;
170+ ///
171+ /// let mut peer = Command::new("python")
172+ /// .args([
173+ /// "-c",
174+ /// "from sys import stdout, stderr\n\
175+ /// stdout.write('foo')\n\
176+ /// stderr.write('bar')"
177+ /// ])
178+ /// .stdout(writer.try_clone()?)
179+ /// .stderr(writer)
180+ /// .spawn()?;
181+ ///
182+ /// let mut msg = String::new();
183+ /// reader.read_to_string(&mut msg)?;
184+ /// assert_eq!(&msg, "foobar");
185+ ///
186+ /// peer.wait()?;
187+ /// # Ok(())
188+ /// # }
189+ /// ```
44190 #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
45191 pub fn try_clone ( & self ) -> io:: Result < Self > {
46192 self . 0 . try_clone ( ) . map ( Self )
0 commit comments