diff --git a/Cargo.toml b/Cargo.toml index c9bab050..5a3b54e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ipc-channel" -version = "0.20.1" +version = "0.20.2" description = "A multiprocess drop-in replacement for Rust channels" authors = ["The Servo Project Developers"] license = "MIT OR Apache-2.0" diff --git a/src/ipc.rs b/src/ipc.rs index c91a381a..8fc1878f 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -684,7 +684,6 @@ impl IpcSelectionResult { /// Use the [to] method to deserialize the raw result into the requested type. /// /// [to]: #method.to -#[derive(PartialEq)] pub struct IpcMessage { pub(crate) data: Vec, pub(crate) os_ipc_channels: Vec, diff --git a/src/platform/inprocess/mod.rs b/src/platform/inprocess/mod.rs index 1b0ef927..ce5206ba 100644 --- a/src/platform/inprocess/mod.rs +++ b/src/platform/inprocess/mod.rs @@ -17,11 +17,9 @@ use std::error::Error as StdError; use std::fmt::{self, Debug, Formatter}; use std::io; use std::ops::{Deref, RangeFrom}; -use std::ptr::eq; use std::slice; use std::sync::{Arc, LazyLock, Mutex}; use std::time::Duration; -use usize; use uuid::Uuid; #[derive(Clone)] @@ -68,13 +66,6 @@ pub struct OsIpcReceiver { receiver: RefCell>>, } -impl PartialEq for OsIpcReceiver { - fn eq(&self, other: &OsIpcReceiver) -> bool { - self.receiver.borrow().as_ref().map(|rx| rx as *const _) - == other.receiver.borrow().as_ref().map(|rx| rx as *const _) - } -} - impl OsIpcReceiver { fn new(receiver: Receiver) -> OsIpcReceiver { OsIpcReceiver { @@ -124,23 +115,12 @@ impl OsIpcReceiver { #[derive(Clone, Debug)] pub struct OsIpcSender { - sender: RefCell>, -} - -impl PartialEq for OsIpcSender { - fn eq(&self, other: &OsIpcSender) -> bool { - eq( - &*self.sender.borrow() as *const _, - &*other.sender.borrow() as *const _, - ) - } + sender: Sender, } impl OsIpcSender { fn new(sender: Sender) -> OsIpcSender { - OsIpcSender { - sender: RefCell::new(sender), - } + OsIpcSender { sender } } pub fn connect(name: String) -> Result { @@ -162,7 +142,6 @@ impl OsIpcSender { let os_ipc_channels = ports.into_iter().map(OsOpaqueIpcChannel::new).collect(); let ipc_message = IpcMessage::new(data.to_vec(), os_ipc_channels, shared_memory_regions); self.sender - .borrow() .send(ChannelMessage(ipc_message)) .map_err(|_| ChannelError::BrokenPipeError) } @@ -281,13 +260,13 @@ impl OsIpcOneShotServer { } } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub enum OsIpcChannel { Sender(OsIpcSender), Receiver(OsIpcReceiver), } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub struct OsOpaqueIpcChannel { channel: RefCell>, } diff --git a/src/platform/macos/mod.rs b/src/platform/macos/mod.rs index 148377dd..ddab111b 100644 --- a/src/platform/macos/mod.rs +++ b/src/platform/macos/mod.rs @@ -23,7 +23,6 @@ use std::error::Error as StdError; use std::ffi::CString; use std::fmt::{self, Debug, Formatter}; use std::io; -use std::marker::PhantomData; use std::mem; use std::ops::Deref; use std::ptr; @@ -125,7 +124,7 @@ pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), MachError> { Ok((sender, receiver)) } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub struct OsIpcReceiver { port: Cell, } @@ -389,15 +388,9 @@ impl<'a> SendData<'a> { } } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub struct OsIpcSender { port: mach_port_t, - // Make sure this is `!Sync`, to match `crossbeam_channel::Sender`; and to discourage sharing - // references. - // - // (Rather, senders should just be cloned, as they are shared internally anyway -- - // another layer of sharing only adds unnecessary overhead...) - nosync_marker: PhantomData>, } impl Drop for OsIpcSender { @@ -419,19 +412,13 @@ impl Clone for OsIpcSender { Err(error) => panic!("mach_port_mod_refs(1, {}) failed: {:?}", cloned_port, error), } } - OsIpcSender { - port: cloned_port, - nosync_marker: PhantomData, - } + OsIpcSender { port: cloned_port } } } impl OsIpcSender { fn from_name(port: mach_port_t) -> OsIpcSender { - OsIpcSender { - port, - nosync_marker: PhantomData, - } + OsIpcSender { port } } pub fn connect(name: String) -> Result { @@ -577,7 +564,7 @@ impl OsIpcChannel { } } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub struct OsOpaqueIpcChannel { port: mach_port_t, } @@ -597,7 +584,6 @@ impl OsOpaqueIpcChannel { pub fn to_sender(&mut self) -> OsIpcSender { OsIpcSender { port: mem::replace(&mut self.port, MACH_PORT_NULL), - nosync_marker: PhantomData, } } diff --git a/src/platform/test.rs b/src/platform/test.rs index cf4e5a2b..2845d509 100644 --- a/src/platform/test.rs +++ b/src/platform/test.rs @@ -44,7 +44,7 @@ fn simple() { let data: &[u8] = b"1234567"; tx.send(data, Vec::new(), Vec::new()).unwrap(); let ipc_message = rx.recv().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[test] @@ -60,7 +60,7 @@ fn sender_transfer() { let sub_tx = ipc_message.os_ipc_channels.pop().unwrap().to_sender(); sub_tx.send(data, vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[test] @@ -76,7 +76,7 @@ fn receiver_transfer() { let sub_rx = ipc_message.os_ipc_channels.pop().unwrap().to_receiver(); sub_tx.send(data, vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[test] @@ -98,12 +98,12 @@ fn multisender_transfer() { let sub0_tx = ipc_message1.os_ipc_channels.remove(0).to_sender(); sub0_tx.send(data, vec![], vec![]).unwrap(); let ipc_message2 = sub0_rx.recv().unwrap(); - assert_eq!(ipc_message2, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message2.to::>().unwrap(), data); let sub1_tx = ipc_message1.os_ipc_channels.remove(0).to_sender(); sub1_tx.send(data, vec![], vec![]).unwrap(); let ipc_message3 = sub1_rx.recv().unwrap(); - assert_eq!(ipc_message3, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message3.to::>().unwrap(), data); } #[test] @@ -115,7 +115,7 @@ fn medium_data() { let (tx, rx) = platform::channel().unwrap(); tx.send(data, vec![], vec![]).unwrap(); let ipc_message = rx.recv().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[test] @@ -134,7 +134,7 @@ fn medium_data_with_sender_transfer() { let sub_tx = ipc_message.os_ipc_channels.pop().unwrap().to_sender(); sub_tx.send(data, vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } fn check_big_data(size: u32) { @@ -148,7 +148,7 @@ fn check_big_data(size: u32) { let data: Vec = (0..size).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; assert_eq!(ipc_message.data.len(), data.len()); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); thread.join().unwrap(); } @@ -193,7 +193,7 @@ fn big_data_with_sender_transfer() { sub_tx.send(data, vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); thread.join().unwrap(); } @@ -225,7 +225,7 @@ fn with_n_fds(n: usize, size: usize) { sub_tx.send(&data, vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); - assert_eq!(ipc_message, IpcMessage::from_data(data.clone())); + assert_eq!(ipc_message.to::>().unwrap(), data); } } @@ -349,7 +349,7 @@ macro_rules! create_big_data_with_n_fds { sub_tx.send(data, vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } } }; @@ -390,7 +390,7 @@ fn concurrent_senders() { .collect(); let data: &[u8] = &data[..]; assert_eq!(ipc_message.data.len(), data.len()); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } assert!(rx.try_recv().is_err()); // There should be no further messages pending. received_vals.sort(); @@ -700,7 +700,7 @@ fn server_accept_first() { }); let (_, ipc_message) = server.accept().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[test] @@ -716,7 +716,7 @@ fn server_connect_first() { thread::sleep(Duration::from_millis(30)); let (_, mut ipc_message) = server.accept().unwrap(); ipc_message.data.truncate(7); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] @@ -739,7 +739,7 @@ fn cross_process_spawn() { let (_, ipc_message) = server.accept().unwrap(); child_pid.wait().expect("failed to wait on child"); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[cfg(not(any( @@ -762,7 +762,7 @@ fn cross_process_fork() { let (_, ipc_message) = server.accept().unwrap(); child_pid.wait(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] @@ -797,7 +797,7 @@ fn cross_process_sender_transfer_spawn() { let data: &[u8] = b"bar"; let ipc_message = super_rx.recv().unwrap(); child_pid.wait().expect("failed to wait on child"); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[cfg(not(any( @@ -833,7 +833,7 @@ fn cross_process_sender_transfer_fork() { let data: &[u8] = b"bar"; let ipc_message = super_rx.recv().unwrap(); child_pid.wait(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } #[test] @@ -944,7 +944,7 @@ fn try_recv() { let data: &[u8] = b"1234567"; tx.send(data, Vec::new(), Vec::new()).unwrap(); let ipc_message = rx.try_recv().unwrap(); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); assert!(rx.try_recv().is_err()); } @@ -1016,7 +1016,7 @@ fn try_recv_large() { let data: Vec = (0..1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); assert!(rx.try_recv().is_err()); } @@ -1085,7 +1085,7 @@ fn try_recv_large_delayed() { let data: Vec = (0..msg_size).map(|j| (j % 13) as u8 | val << 4).collect(); let data: &[u8] = &data[..]; assert_eq!(ipc_message.data.len(), data.len()); - assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), data); } assert!(rx.try_recv().is_err()); // There should be no further messages pending. received_vals.sort(); @@ -1098,11 +1098,32 @@ fn try_recv_large_delayed() { mod sync_test { use crate::platform; - use static_assertions::assert_not_impl_any; + use static_assertions::{assert_impl_all, assert_not_impl_any}; #[test] fn receiver_not_sync() { - assert_not_impl_any!(platform::OsIpcSender : Sync); + // A single-consumer queue is not Sync. + assert_not_impl_any!(platform::OsIpcReceiver : Sync); + } + + #[test] + fn sender_is_sync() { + // A multi-producer queue must be Sync. + assert_impl_all!(platform::OsIpcSender : Sync); + } +} + +mod partial_eq_test { + use crate::platform; + use static_assertions::assert_not_impl_any; + + #[test] + fn receiver_not_partialeq() { + assert_not_impl_any!(platform::OsIpcReceiver : PartialEq); + } + #[test] + fn sender_not_partialeq() { + assert_not_impl_any!(platform::OsIpcSender : PartialEq); } } @@ -1190,7 +1211,7 @@ fn cross_process_two_step_transfer_spawn() { let ipc_message = super_rx.recv().unwrap(); let child_exit_code = child_pid.wait().expect("failed to wait on child"); assert!(child_exit_code.success()); - assert_eq!(ipc_message, IpcMessage::from_data(cookie.to_vec())); + assert_eq!(ipc_message.to::>().unwrap(), cookie); } fn get_max_fragment_size() -> usize { diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index c7821366..6cb9170c 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -29,7 +29,6 @@ use std::ffi::{c_uint, CString}; use std::fmt::{self, Debug, Formatter}; use std::hash::BuildHasherDefault; use std::io; -use std::marker::PhantomData; use std::mem; use std::ops::{Deref, RangeFrom}; use std::os::fd::RawFd; @@ -117,7 +116,7 @@ struct PollEntry { pub fd: RawFd, } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub struct OsIpcReceiver { fd: Cell, } @@ -182,21 +181,15 @@ impl Drop for SharedFileDescriptor { } } -#[derive(PartialEq, Debug, Clone)] +#[derive(Debug, Clone)] pub struct OsIpcSender { fd: Arc, - // Make sure this is `!Sync`, to match `mpsc::Sender`; and to discourage sharing references. - // - // (Rather, senders should just be cloned, as they are shared internally anyway -- - // another layer of sharing only adds unnecessary overhead...) - nosync_marker: PhantomData>, } impl OsIpcSender { fn from_fd(fd: c_int) -> OsIpcSender { OsIpcSender { fd: Arc::new(SharedFileDescriptor(fd)), - nosync_marker: PhantomData, } } @@ -461,7 +454,7 @@ impl OsIpcSender { } } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub enum OsIpcChannel { Sender(OsIpcSender), Receiver(OsIpcReceiver), @@ -595,7 +588,7 @@ impl OsIpcSelectionResult { } } -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub struct OsOpaqueIpcChannel { fd: c_int, } diff --git a/src/platform/windows/mod.rs b/src/platform/windows/mod.rs index 1faba8b8..ef67b2e0 100644 --- a/src/platform/windows/mod.rs +++ b/src/platform/windows/mod.rs @@ -12,13 +12,13 @@ use bincode; use serde; use std::{ - cell::{Cell, RefCell}, + cell::RefCell, cmp::PartialEq, convert::TryInto, env, ffi::CString, fmt, io, - marker::{PhantomData, Send, Sync}, + marker::{Send, Sync}, mem, ops::{Deref, DerefMut, RangeFrom}, ptr, slice, @@ -1288,14 +1288,9 @@ impl OsIpcReceiver { } } -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub struct OsIpcSender { handle: WinHandle, - // Make sure this is `!Sync`, to match `mpsc::Sender`; and to discourage sharing references. - // - // (Rather, senders should just be cloned, as they are shared internally anyway -- - // another layer of sharing only adds unnecessary overhead...) - nosync_marker: PhantomData>, } impl Clone for OsIpcSender { @@ -1315,10 +1310,7 @@ impl OsIpcSender { } fn from_handle(handle: WinHandle) -> OsIpcSender { - OsIpcSender { - handle, - nosync_marker: PhantomData, - } + OsIpcSender { handle } } /// Connect to a pipe server. @@ -1926,7 +1918,7 @@ pub enum OsIpcChannel { Receiver(OsIpcReceiver), } -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub struct OsOpaqueIpcChannel { handle: WinHandle, } diff --git a/src/test.rs b/src/test.rs index aed0315b..11d2eb06 100644 --- a/src/test.rs +++ b/src/test.rs @@ -750,3 +750,20 @@ fn test_receiver_stream() { _ => panic!("Stream should have 5"), }; } + +mod sync_test { + use crate::ipc::{IpcReceiver, IpcSender}; + use static_assertions::{assert_impl_all, assert_not_impl_any}; + + #[test] + fn ipc_receiver_not_sync() { + // A single-consumer queue is not Sync. + assert_not_impl_any!(IpcReceiver : Sync); + } + + #[test] + fn ipc_sender_is_sync() { + // A multi-producer queue must be Sync. + assert_impl_all!(IpcSender : Sync); + } +} diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 92d69e1c..ca1bb7c8 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -15,7 +15,7 @@ use std::{env, process}; // These integration tests may be run on their own by issuing: // cargo test --test '*' -/// Test spawing a process which then acts as a client to a +/// Test spawning a process which then acts as a client to a /// one-shot server in the parent process. #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] #[test]