Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ipc-channel"
version = "0.20.1"
version = "0.20.2"
description = "A multiprocess drop-in replacement for Rust channels"
authors = ["The Servo Project Developers"]
license = "MIT OR Apache-2.0"
Expand Down
1 change: 0 additions & 1 deletion src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ impl IpcSelectionResult {
/// Use the [to] method to deserialize the raw result into the requested type.
///
/// [to]: #method.to
#[derive(PartialEq)]
pub struct IpcMessage {
pub(crate) data: Vec<u8>,
pub(crate) os_ipc_channels: Vec<OsOpaqueIpcChannel>,
Expand Down
29 changes: 4 additions & 25 deletions src/platform/inprocess/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ use std::error::Error as StdError;
use std::fmt::{self, Debug, Formatter};
use std::io;
use std::ops::{Deref, RangeFrom};
use std::ptr::eq;
use std::slice;
use std::sync::{Arc, LazyLock, Mutex};
use std::time::Duration;
use usize;
use uuid::Uuid;

#[derive(Clone)]
Expand Down Expand Up @@ -68,13 +66,6 @@ pub struct OsIpcReceiver {
receiver: RefCell<Option<crossbeam_channel::Receiver<ChannelMessage>>>,
}

impl PartialEq for OsIpcReceiver {
fn eq(&self, other: &OsIpcReceiver) -> bool {
self.receiver.borrow().as_ref().map(|rx| rx as *const _)
== other.receiver.borrow().as_ref().map(|rx| rx as *const _)
}
}

impl OsIpcReceiver {
fn new(receiver: Receiver<ChannelMessage>) -> OsIpcReceiver {
OsIpcReceiver {
Expand Down Expand Up @@ -124,23 +115,12 @@ impl OsIpcReceiver {

#[derive(Clone, Debug)]
pub struct OsIpcSender {
sender: RefCell<Sender<ChannelMessage>>,
}

impl PartialEq for OsIpcSender {
fn eq(&self, other: &OsIpcSender) -> bool {
eq(
&*self.sender.borrow() as *const _,
&*other.sender.borrow() as *const _,
)
}
sender: Sender<ChannelMessage>,
}

impl OsIpcSender {
fn new(sender: Sender<ChannelMessage>) -> OsIpcSender {
OsIpcSender {
sender: RefCell::new(sender),
}
OsIpcSender { sender }
}

pub fn connect(name: String) -> Result<OsIpcSender, ChannelError> {
Expand All @@ -162,7 +142,6 @@ impl OsIpcSender {
let os_ipc_channels = ports.into_iter().map(OsOpaqueIpcChannel::new).collect();
let ipc_message = IpcMessage::new(data.to_vec(), os_ipc_channels, shared_memory_regions);
self.sender
.borrow()
.send(ChannelMessage(ipc_message))
.map_err(|_| ChannelError::BrokenPipeError)
}
Expand Down Expand Up @@ -281,13 +260,13 @@ impl OsIpcOneShotServer {
}
}

#[derive(PartialEq, Debug)]
#[derive(Debug)]
pub enum OsIpcChannel {
Sender(OsIpcSender),
Receiver(OsIpcReceiver),
}

#[derive(PartialEq, Debug)]
#[derive(Debug)]
pub struct OsOpaqueIpcChannel {
channel: RefCell<Option<OsIpcChannel>>,
}
Expand Down
24 changes: 5 additions & 19 deletions src/platform/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::error::Error as StdError;
use std::ffi::CString;
use std::fmt::{self, Debug, Formatter};
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::ptr;
Expand Down Expand Up @@ -125,7 +124,7 @@ pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), MachError> {
Ok((sender, receiver))
}

#[derive(PartialEq, Debug)]
#[derive(Debug)]
pub struct OsIpcReceiver {
port: Cell<mach_port_t>,
}
Expand Down Expand Up @@ -389,15 +388,9 @@ impl<'a> SendData<'a> {
}
}

#[derive(PartialEq, Debug)]
#[derive(Debug)]
pub struct OsIpcSender {
port: mach_port_t,
// Make sure this is `!Sync`, to match `crossbeam_channel::Sender`; and to discourage sharing
// references.
//
// (Rather, senders should just be cloned, as they are shared internally anyway --
// another layer of sharing only adds unnecessary overhead...)
nosync_marker: PhantomData<Cell<()>>,
}

impl Drop for OsIpcSender {
Expand All @@ -419,19 +412,13 @@ impl Clone for OsIpcSender {
Err(error) => panic!("mach_port_mod_refs(1, {}) failed: {:?}", cloned_port, error),
}
}
OsIpcSender {
port: cloned_port,
nosync_marker: PhantomData,
}
OsIpcSender { port: cloned_port }
}
}

impl OsIpcSender {
fn from_name(port: mach_port_t) -> OsIpcSender {
OsIpcSender {
port,
nosync_marker: PhantomData,
}
OsIpcSender { port }
}

pub fn connect(name: String) -> Result<OsIpcSender, MachError> {
Expand Down Expand Up @@ -577,7 +564,7 @@ impl OsIpcChannel {
}
}

#[derive(PartialEq, Debug)]
#[derive(Debug)]
pub struct OsOpaqueIpcChannel {
port: mach_port_t,
}
Expand All @@ -597,7 +584,6 @@ impl OsOpaqueIpcChannel {
pub fn to_sender(&mut self) -> OsIpcSender {
OsIpcSender {
port: mem::replace(&mut self.port, MACH_PORT_NULL),
nosync_marker: PhantomData,
}
}

Expand Down
69 changes: 45 additions & 24 deletions src/platform/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn simple() {
let data: &[u8] = b"1234567";
tx.send(data, Vec::new(), Vec::new()).unwrap();
let ipc_message = rx.recv().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand All @@ -60,7 +60,7 @@ fn sender_transfer() {
let sub_tx = ipc_message.os_ipc_channels.pop().unwrap().to_sender();
sub_tx.send(data, vec![], vec![]).unwrap();
let ipc_message = sub_rx.recv().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand All @@ -76,7 +76,7 @@ fn receiver_transfer() {
let sub_rx = ipc_message.os_ipc_channels.pop().unwrap().to_receiver();
sub_tx.send(data, vec![], vec![]).unwrap();
let ipc_message = sub_rx.recv().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand All @@ -98,12 +98,12 @@ fn multisender_transfer() {
let sub0_tx = ipc_message1.os_ipc_channels.remove(0).to_sender();
sub0_tx.send(data, vec![], vec![]).unwrap();
let ipc_message2 = sub0_rx.recv().unwrap();
assert_eq!(ipc_message2, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message2.to::<Vec<u8>>().unwrap(), data);

let sub1_tx = ipc_message1.os_ipc_channels.remove(0).to_sender();
sub1_tx.send(data, vec![], vec![]).unwrap();
let ipc_message3 = sub1_rx.recv().unwrap();
assert_eq!(ipc_message3, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message3.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand All @@ -115,7 +115,7 @@ fn medium_data() {
let (tx, rx) = platform::channel().unwrap();
tx.send(data, vec![], vec![]).unwrap();
let ipc_message = rx.recv().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand All @@ -134,7 +134,7 @@ fn medium_data_with_sender_transfer() {
let sub_tx = ipc_message.os_ipc_channels.pop().unwrap().to_sender();
sub_tx.send(data, vec![], vec![]).unwrap();
let ipc_message = sub_rx.recv().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

fn check_big_data(size: u32) {
Expand All @@ -148,7 +148,7 @@ fn check_big_data(size: u32) {
let data: Vec<u8> = (0..size).map(|i| (i % 251) as u8).collect();
let data: &[u8] = &data[..];
assert_eq!(ipc_message.data.len(), data.len());
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
thread.join().unwrap();
}

Expand Down Expand Up @@ -193,7 +193,7 @@ fn big_data_with_sender_transfer() {
sub_tx.send(data, vec![], vec![]).unwrap();
let ipc_message = sub_rx.recv().unwrap();
assert_eq!(ipc_message.data.len(), data.len());
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
thread.join().unwrap();
}

Expand Down Expand Up @@ -225,7 +225,7 @@ fn with_n_fds(n: usize, size: usize) {
sub_tx.send(&data, vec![], vec![]).unwrap();
let ipc_message = sub_rx.recv().unwrap();
assert_eq!(ipc_message.data.len(), data.len());
assert_eq!(ipc_message, IpcMessage::from_data(data.clone()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}
}

Expand Down Expand Up @@ -349,7 +349,7 @@ macro_rules! create_big_data_with_n_fds {
sub_tx.send(data, vec![], vec![]).unwrap();
let ipc_message = sub_rx.recv().unwrap();
assert_eq!(ipc_message.data.len(), data.len());
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}
}
};
Expand Down Expand Up @@ -390,7 +390,7 @@ fn concurrent_senders() {
.collect();
let data: &[u8] = &data[..];
assert_eq!(ipc_message.data.len(), data.len());
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}
assert!(rx.try_recv().is_err()); // There should be no further messages pending.
received_vals.sort();
Expand Down Expand Up @@ -700,7 +700,7 @@ fn server_accept_first() {
});

let (_, ipc_message) = server.accept().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand All @@ -716,7 +716,7 @@ fn server_connect_first() {
thread::sleep(Duration::from_millis(30));
let (_, mut ipc_message) = server.accept().unwrap();
ipc_message.data.truncate(7);
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
Expand All @@ -739,7 +739,7 @@ fn cross_process_spawn() {

let (_, ipc_message) = server.accept().unwrap();
child_pid.wait().expect("failed to wait on child");
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[cfg(not(any(
Expand All @@ -762,7 +762,7 @@ fn cross_process_fork() {

let (_, ipc_message) = server.accept().unwrap();
child_pid.wait();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
Expand Down Expand Up @@ -797,7 +797,7 @@ fn cross_process_sender_transfer_spawn() {
let data: &[u8] = b"bar";
let ipc_message = super_rx.recv().unwrap();
child_pid.wait().expect("failed to wait on child");
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[cfg(not(any(
Expand Down Expand Up @@ -833,7 +833,7 @@ fn cross_process_sender_transfer_fork() {
let data: &[u8] = b"bar";
let ipc_message = super_rx.recv().unwrap();
child_pid.wait();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}

#[test]
Expand Down Expand Up @@ -944,7 +944,7 @@ fn try_recv() {
let data: &[u8] = b"1234567";
tx.send(data, Vec::new(), Vec::new()).unwrap();
let ipc_message = rx.try_recv().unwrap();
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
assert!(rx.try_recv().is_err());
}

Expand Down Expand Up @@ -1016,7 +1016,7 @@ fn try_recv_large() {

let data: Vec<u8> = (0..1024 * 1024).map(|i| (i % 251) as u8).collect();
let data: &[u8] = &data[..];
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
assert!(rx.try_recv().is_err());
}

Expand Down Expand Up @@ -1085,7 +1085,7 @@ fn try_recv_large_delayed() {
let data: Vec<u8> = (0..msg_size).map(|j| (j % 13) as u8 | val << 4).collect();
let data: &[u8] = &data[..];
assert_eq!(ipc_message.data.len(), data.len());
assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), data);
}
assert!(rx.try_recv().is_err()); // There should be no further messages pending.
received_vals.sort();
Expand All @@ -1098,11 +1098,32 @@ fn try_recv_large_delayed() {

mod sync_test {
use crate::platform;
use static_assertions::assert_not_impl_any;
use static_assertions::{assert_impl_all, assert_not_impl_any};

#[test]
fn receiver_not_sync() {
assert_not_impl_any!(platform::OsIpcSender : Sync);
// A single-consumer queue is not Sync.
assert_not_impl_any!(platform::OsIpcReceiver : Sync);
}

#[test]
fn sender_is_sync() {
// A multi-producer queue must be Sync.
assert_impl_all!(platform::OsIpcSender : Sync);
}
}

mod partial_eq_test {
use crate::platform;
use static_assertions::assert_not_impl_any;

#[test]
fn receiver_not_partialeq() {
assert_not_impl_any!(platform::OsIpcReceiver : PartialEq);
}
#[test]
fn sender_not_partialeq() {
assert_not_impl_any!(platform::OsIpcSender : PartialEq);
}
}

Expand Down Expand Up @@ -1190,7 +1211,7 @@ fn cross_process_two_step_transfer_spawn() {
let ipc_message = super_rx.recv().unwrap();
let child_exit_code = child_pid.wait().expect("failed to wait on child");
assert!(child_exit_code.success());
assert_eq!(ipc_message, IpcMessage::from_data(cookie.to_vec()));
assert_eq!(ipc_message.to::<Vec<u8>>().unwrap(), cookie);
}

fn get_max_fragment_size() -> usize {
Expand Down
Loading
Loading