Skip to content

Commit db9ac0a

Browse files
authored
dont drop user fd until node reads a synthetic EOF (#93)
turns out #51 reduced the race rate but did not eliminate it completely. we still see the data race bug very rarely and it is exacerbated on high-numbers of repeats in the tests and/or high nodejs event loop utilization my old theory was: - the user fd is O_NONBLOCK clear so any writes there should block until its made its way into the 4kb intermediary buffer - as a result, when the .wait returns, we know that the program output has at least made it fully into the the user fd input buffer - because we poll until `controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0`, in theory we should never call the js side exit code until theres truly nothing left in the pipe (i.e. the readstream on the nodejs side has it in its own buffer) HOWEVER, with some logging i saw a few cases where in fact `controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0`, the program had exited, yet the nodejs side still missed some data what i think is _actually_ happening is that 1. one of libuv's io threads reads the data from the controller side and queues a data event on the stream 2. polling exits as now theres no more data in the queues we immediately `drop(user_fd);` which [sets TTY_OTHER_CLOSED synchronously](https://github.com/torvalds/linux/blob/4ff71af020ae59ae2d83b174646fc2ad9fcd4dc4/drivers/tty/pty.c#L66) and we get an error event queued on the stream 3. if the nodejs event loop happens to read error before data, we emit 'end' and mark the data as read even though technically nodejs hasnt processed the data event yet so we drop data :( how we fix it: 1. axe poll_pty_fds_until_read 2. make Pty struct own user_fd and expose a method for the js side to drop this fd when its done 3. when child.wait finishes, write a synthetic 'EOF' that is actually a custom OSC terminal control sequence (`\x1B]7878\x1B\\`, 7878 is RUST on the phonepad :)) to the user fd (a cursory search shows no results, it seems _very_ unlikely for this sequence to appear randomly) 4. on the nodejs wrapper side, create a transform stream that parses out the synthetic EOF and emits it as a custom event when it happens 5. when the nodejs side hits this EOF, we know we are actually at the end of the data stream and can safely drop user_fd node-pty had the [same problem](microsoft/node-pty#72) and did the :grug: brain thing and [added a wait 250ms](microsoft/vscode@9464b54 ) so im calling it slightly more ok
1 parent 9f8c550 commit db9ac0a

File tree

13 files changed

+393
-112
lines changed

13 files changed

+393
-112
lines changed

index.d.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export interface Size {
4545
}
4646
export const MAX_U16_VALUE: number;
4747
export const MIN_U16_VALUE: number;
48+
export declare function getSyntheticEofSequence(): Buffer;
4849
/** Resize the terminal. */
4950
export declare function ptyResize(fd: number, size: Size): void;
5051
/**
@@ -66,5 +67,10 @@ export declare class Pty {
6667
* once (it will error the second time). The caller is responsible for closing the file
6768
* descriptor.
6869
*/
69-
takeFd(): c_int;
70+
takeControllerFd(): c_int;
71+
/**
72+
* Closes the owned file descriptor for the PTY controller. The Nodejs side must call this
73+
* when it is done with the file descriptor to avoid leaking FDs.
74+
*/
75+
dropUserFd(): void;
7076
}

index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ const {
326326
Operation,
327327
MAX_U16_VALUE,
328328
MIN_U16_VALUE,
329+
getSyntheticEofSequence,
329330
ptyResize,
330331
setCloseOnExec,
331332
getCloseOnExec,
@@ -335,6 +336,7 @@ module.exports.Pty = Pty;
335336
module.exports.Operation = Operation;
336337
module.exports.MAX_U16_VALUE = MAX_U16_VALUE;
337338
module.exports.MIN_U16_VALUE = MIN_U16_VALUE;
339+
module.exports.getSyntheticEofSequence = getSyntheticEofSequence;
338340
module.exports.ptyResize = ptyResize;
339341
module.exports.setCloseOnExec = setCloseOnExec;
340342
module.exports.getCloseOnExec = getCloseOnExec;

npm/darwin-arm64/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@replit/ruspty-darwin-arm64",
3-
"version": "3.5.3",
3+
"version": "3.6.0",
44
"os": [
55
"darwin"
66
],

npm/darwin-x64/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@replit/ruspty-darwin-x64",
3-
"version": "3.5.3",
3+
"version": "3.6.0",
44
"os": [
55
"darwin"
66
],

npm/linux-x64-gnu/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@replit/ruspty-linux-x64-gnu",
3-
"version": "3.5.3",
3+
"version": "3.6.0",
44
"os": [
55
"linux"
66
],

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@replit/ruspty",
3-
"version": "3.5.3",
3+
"version": "3.6.0",
44
"main": "dist/wrapper.js",
55
"types": "dist/wrapper.d.ts",
66
"author": "Szymon Kaliski <hi@szymonkaliski.com>",
@@ -40,7 +40,7 @@
4040
"build:wrapper": "tsup",
4141
"prepublishOnly": "napi prepublish -t npm",
4242
"test": "vitest run",
43-
"test:ci": "vitest --reporter=verbose --reporter=github-actions run",
43+
"test:ci": "vitest --reporter=verbose --reporter=github-actions --allowOnly run",
4444
"test:hang": "vitest run --reporter=hanging-process",
4545
"universal": "napi universal",
4646
"version": "napi version",

src/lib.rs

Lines changed: 57 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,14 @@ use std::os::fd::{FromRawFd, IntoRawFd, RawFd};
77
use std::os::unix::process::CommandExt;
88
use std::process::{Command, Stdio};
99
use std::thread;
10-
use std::time::Duration;
1110

12-
use backoff::backoff::Backoff;
13-
use backoff::ExponentialBackoffBuilder;
14-
use napi::bindgen_prelude::JsFunction;
11+
use napi::bindgen_prelude::{Buffer, JsFunction};
1512
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
1613
use napi::Status::GenericFailure;
1714
use napi::{self, Env};
1815
use nix::errno::Errno;
1916
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
20-
use nix::libc::{self, c_int, ioctl, FIONREAD, TIOCOUTQ, TIOCSCTTY, TIOCSWINSZ};
17+
use nix::libc::{self, c_int, TIOCSCTTY, TIOCSWINSZ};
2118
use nix::pty::{openpty, Winsize};
2219
use nix::sys::termios::{self, SetArg};
2320

@@ -31,6 +28,7 @@ mod sandbox;
3128
#[allow(dead_code)]
3229
struct Pty {
3330
controller_fd: Option<OwnedFd>,
31+
user_fd: Option<OwnedFd>,
3432
/// The pid of the forked process.
3533
pub pid: u32,
3634
}
@@ -89,61 +87,15 @@ pub const MAX_U16_VALUE: u16 = u16::MAX;
8987
#[napi]
9088
pub const MIN_U16_VALUE: u16 = u16::MIN;
9189

92-
fn cast_to_napi_error(err: Errno) -> napi::Error {
93-
napi::Error::new(GenericFailure, err)
94-
}
95-
96-
// if the child process exits before the controller fd is fully read or the user fd is fully
97-
// flushed, we might accidentally end in a case where onExit is called but js hasn't had
98-
// the chance to fully read the controller fd
99-
// let's wait until the controller fd is fully read before we call onExit
100-
fn poll_pty_fds_until_read(controller_fd: RawFd, user_fd: RawFd) {
101-
let mut backoff = ExponentialBackoffBuilder::default()
102-
.with_initial_interval(Duration::from_millis(1))
103-
.with_max_interval(Duration::from_millis(100))
104-
.with_max_elapsed_time(Some(Duration::from_secs(1)))
105-
.build();
106-
107-
loop {
108-
// check both input and output queues for both FDs
109-
let mut controller_inq: i32 = 0;
110-
let mut controller_outq: i32 = 0;
111-
let mut user_inq: i32 = 0;
112-
let mut user_outq: i32 = 0;
113-
114-
// safe because we're passing valid file descriptors and properly sized integers
115-
unsafe {
116-
// check bytes waiting to be read (FIONREAD, equivalent to TIOCINQ on Linux)
117-
if ioctl(controller_fd, FIONREAD, &mut controller_inq) == -1
118-
|| ioctl(user_fd, FIONREAD, &mut user_inq) == -1
119-
{
120-
// break if we can't read
121-
break;
122-
}
123-
124-
// check bytes waiting to be written (TIOCOUTQ)
125-
if ioctl(controller_fd, TIOCOUTQ, &mut controller_outq) == -1
126-
|| ioctl(user_fd, TIOCOUTQ, &mut user_outq) == -1
127-
{
128-
// break if we can't read
129-
break;
130-
}
131-
}
90+
const SYNTHETIC_EOF: &[u8] = b"\x1B]7878\x1B\\";
13291

133-
// if all queues are empty, we're done
134-
if controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0 {
135-
break;
136-
}
92+
#[napi]
93+
pub fn get_synthetic_eof_sequence() -> Buffer {
94+
SYNTHETIC_EOF.into()
95+
}
13796

138-
// apply backoff strategy
139-
if let Some(d) = backoff.next_backoff() {
140-
thread::sleep(d);
141-
continue;
142-
} else {
143-
// we have exhausted our attempts
144-
break;
145-
}
146-
}
97+
fn cast_to_napi_error(err: Errno) -> napi::Error {
98+
napi::Error::new(GenericFailure, err)
14799
}
148100

149101
#[napi]
@@ -347,9 +299,10 @@ impl Pty {
347299
thread::spawn(move || {
348300
let wait_result = child.wait();
349301

350-
// try to wait for the controller fd to be fully read
351-
poll_pty_fds_until_read(raw_controller_fd, raw_user_fd);
352-
drop(user_fd);
302+
// by this point, child has closed its copy of the user_fd
303+
// lets inject our synthetic EOF OSC into the user_fd
304+
// its ok to ignore the result here as we have a timeout on the nodejs side to handle if this write fails
305+
let _ = write_syn_eof_to_fd(raw_user_fd);
353306

354307
match wait_result {
355308
Ok(status) => {
@@ -379,6 +332,7 @@ impl Pty {
379332

380333
Ok(Pty {
381334
controller_fd: Some(controller_fd),
335+
user_fd: Some(user_fd),
382336
pid,
383337
})
384338
}
@@ -388,7 +342,7 @@ impl Pty {
388342
/// descriptor.
389343
#[napi]
390344
#[allow(dead_code)]
391-
pub fn take_fd(&mut self) -> Result<c_int, napi::Error> {
345+
pub fn take_controller_fd(&mut self) -> Result<c_int, napi::Error> {
392346
if let Some(fd) = self.controller_fd.take() {
393347
Ok(fd.into_raw_fd())
394348
} else {
@@ -398,6 +352,15 @@ impl Pty {
398352
))
399353
}
400354
}
355+
356+
/// Closes the owned file descriptor for the PTY controller. The Nodejs side must call this
357+
/// when it is done with the file descriptor to avoid leaking FDs.
358+
#[napi]
359+
#[allow(dead_code)]
360+
pub fn drop_user_fd(&mut self) -> Result<(), napi::Error> {
361+
self.user_fd.take();
362+
Ok(())
363+
}
401364
}
402365

403366
/// Resize the terminal.
@@ -492,3 +455,35 @@ fn set_nonblocking(fd: i32) -> Result<(), napi::Error> {
492455
}
493456
Ok(())
494457
}
458+
459+
fn write_syn_eof_to_fd(fd: libc::c_int) -> std::io::Result<()> {
460+
let mut remaining = SYNTHETIC_EOF;
461+
while !remaining.is_empty() {
462+
match unsafe {
463+
libc::write(
464+
fd,
465+
remaining.as_ptr() as *const libc::c_void,
466+
remaining.len(),
467+
)
468+
} {
469+
-1 => {
470+
let err = std::io::Error::last_os_error();
471+
if err.kind() == std::io::ErrorKind::Interrupted {
472+
continue;
473+
}
474+
475+
return Err(err);
476+
}
477+
0 => {
478+
return Err(std::io::Error::new(
479+
std::io::ErrorKind::WriteZero,
480+
"write returned 0",
481+
));
482+
}
483+
n => {
484+
remaining = &remaining[n as usize..];
485+
}
486+
}
487+
}
488+
Ok(())
489+
}

syntheticEof.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { Transform } from 'node:stream';
2+
import { getSyntheticEofSequence } from './index.js';
3+
4+
// keep in sync with lib.rs::SYNTHETIC_EOF
5+
export const SYNTHETIC_EOF = getSyntheticEofSequence();
6+
export const EOF_EVENT = 'synthetic-eof';
7+
8+
// get the longest suffix of buffer that is a prefix of SYNTHETIC_EOF
9+
function getBufferEndPrefixLength(buffer: Buffer) {
10+
const maxLen = Math.min(buffer.length, SYNTHETIC_EOF.length);
11+
for (let len = maxLen; len > 0; len--) {
12+
let match = true;
13+
for (let i = 0; i < len; i++) {
14+
if (buffer[buffer.length - len + i] !== SYNTHETIC_EOF[i]) {
15+
match = false;
16+
break;
17+
}
18+
}
19+
20+
if (match) {
21+
return len;
22+
}
23+
}
24+
25+
return 0;
26+
}
27+
28+
export class SyntheticEOFDetector extends Transform {
29+
buffer: Buffer;
30+
31+
constructor(options = {}) {
32+
super(options);
33+
this.buffer = Buffer.alloc(0);
34+
}
35+
36+
_transform(chunk: Buffer, _encoding: string, callback: () => void) {
37+
const searchData = Buffer.concat([this.buffer, chunk]);
38+
const eofIndex = searchData.indexOf(SYNTHETIC_EOF);
39+
40+
if (eofIndex !== -1) {
41+
// found EOF - emit everything before it
42+
if (eofIndex > 0) {
43+
this.push(searchData.subarray(0, eofIndex));
44+
}
45+
46+
this.emit(EOF_EVENT);
47+
48+
// emit everything after EOF (if any) and clear buffer
49+
const afterEOF = searchData.subarray(eofIndex + SYNTHETIC_EOF.length);
50+
if (afterEOF.length > 0) {
51+
this.push(afterEOF);
52+
}
53+
54+
this.buffer = Buffer.alloc(0);
55+
} else {
56+
// no EOF - buffer potential partial match at end
57+
58+
// get the longest suffix of buffer that is a prefix of SYNTHETIC_EOF
59+
// and emit everything before it
60+
// this is done for the case which the eof happened to be split across multiple chunks
61+
const commonPrefixLen = getBufferEndPrefixLength(searchData);
62+
63+
if (commonPrefixLen > 0) {
64+
const emitSize = searchData.length - commonPrefixLen;
65+
if (emitSize > 0) {
66+
this.push(searchData.subarray(0, emitSize));
67+
}
68+
this.buffer = searchData.subarray(emitSize);
69+
} else {
70+
this.push(searchData);
71+
this.buffer = Buffer.alloc(0);
72+
}
73+
}
74+
75+
callback();
76+
}
77+
78+
_flush(callback: () => void) {
79+
if (this.buffer.length > 0) {
80+
this.push(this.buffer);
81+
}
82+
83+
callback();
84+
}
85+
}

0 commit comments

Comments
 (0)