From e82554e5b0fdbbec7a640a0f133c4936536d6a1d Mon Sep 17 00:00:00 2001 From: Narfinger Date: Mon, 20 Oct 2025 15:03:20 +0200 Subject: [PATCH] Add new data structure IpcSharedMemoryVec which allows to add more elements to it. This data structure is not allowed to be send over the a channel but can easily get a reader which is allowed to be send but not allowed to add. Signed-off-by: Narfinger --- Cargo.toml | 2 +- src/ipc.rs | 195 ++++++++++++++++++++++++++----- src/platform/inprocess/mod.rs | 8 +- src/platform/mod.rs | 1 + src/platform/test.rs | 113 +++++++++--------- src/platform/unix/mod.rs | 208 ++++++++++++++++++++++++++++++++-- src/router.rs | 2 +- src/test.rs | 21 +++- 8 files changed, 454 insertions(+), 96 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f6b41faa..159c5538 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ fnv = "1.0.3" futures-channel = { version = "0.3.31", optional = true } futures-core = { version = "0.3.31", optional = true } libc = "0.2.162" -serde_core = "1.0" +serde = { version = "1.0", features = ["derive"] } uuid = { version = "1", features = ["v4"] } [target.'cfg(any(target_os = "linux", target_os = "openbsd", target_os = "freebsd", target_os = "illumos"))'.dependencies] diff --git a/src/ipc.rs b/src/ipc.rs index 2c98b0d0..5a829d74 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -7,13 +7,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender}; +use crate::platform::{ + self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender, OsIpcSharedMemoryIndex, + OsIpcSharedMemoryVec, +}; use crate::platform::{ OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel, }; use bincode; -use serde_core::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use std::cell::RefCell; use std::cmp::min; use std::error::Error as StdError; @@ -33,6 +36,12 @@ thread_local! { static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell> = const { RefCell::new(Vec::new()) }; static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell> = + const { RefCell::new(Vec::new()) }; + + static OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_DESERIALIZATION: + RefCell>> = const { RefCell::new(Vec::new()) }; + + static OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_SERIALIZATION: RefCell> = const { RefCell::new(Vec::new()) } } @@ -368,15 +377,17 @@ where OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| { OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with( |os_ipc_shared_memory_regions_for_serialization| { - bincode::serialize_into(&mut bytes, &data)?; - let os_ipc_channels = os_ipc_channels_for_serialization.take(); - let os_ipc_shared_memory_regions = - os_ipc_shared_memory_regions_for_serialization.take(); - Ok(self.os_sender.send( - &bytes[..], - os_ipc_channels, - os_ipc_shared_memory_regions, - )?) + OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_SERIALIZATION.with( + |os_ipc_shared_memory_vec_regions_for_serialization| { + bincode::serialize_into(&mut bytes, &data)?; + Ok(self.os_sender.send( + &bytes[..], + os_ipc_channels_for_serialization.take(), + os_ipc_shared_memory_regions_for_serialization.take(), + os_ipc_shared_memory_vec_regions_for_serialization.take(), + )?) + }, + ) }, ) }) @@ -631,6 +642,117 @@ impl IpcSharedMemory { } } +/// An index to access `IpcSharedMemoryVec` +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct IpcSharedMemoryIndex(OsIpcSharedMemoryIndex); + +/// Shared memory vector that will can be made accessible to a receiver via the reader method +/// of an IPC message that contains the discriptor. +/// +/// # Examples +/// ``` +/// # use ipc_channel::ipc::{self, IpcSharedMemoryVec}; +/// # let (tx, rx) = ipc::channel().unwrap(); +/// # let data = [0x76, 0x69, 0x6d, 0x00]; +/// let (shmem, index) = IpcSharedMemoryVec::from_bytes(&data); +/// tx.send(shmem.reader()).unwrap(); +/// # let rx_shmem = rx.recv().unwrap(); +/// # assert_eq!(shmem.get(&index), rx_shmem.get(&index)); +/// ``` +#[derive(Clone)] +pub struct IpcSharedMemoryVec { + os_shared_memory_vec: OsIpcSharedMemoryVec, +} + +unsafe impl Send for IpcSharedMemoryVec {} +unsafe impl Sync for IpcSharedMemoryVec {} + +impl Default for IpcSharedMemoryVec { + fn default() -> Self { + let (memory, _index) = OsIpcSharedMemoryVec::from_bytes(&[0xab]); + IpcSharedMemoryVec { + os_shared_memory_vec: memory, + } + } +} + +impl IpcSharedMemoryVec { + pub fn from_bytes(bytes: &[u8]) -> (IpcSharedMemoryVec, IpcSharedMemoryIndex) { + let (memory, index) = OsIpcSharedMemoryVec::from_bytes(bytes); + ( + IpcSharedMemoryVec { + os_shared_memory_vec: memory, + }, + IpcSharedMemoryIndex(index), + ) + } + + pub fn push(&mut self, bytes: &[u8]) -> IpcSharedMemoryIndex { + IpcSharedMemoryIndex(self.os_shared_memory_vec.push(bytes)) + } + + pub fn get(&self, index: &IpcSharedMemoryIndex) -> Option<&[u8]> { + self.os_shared_memory_vec.get(&index.0) + } + + /// Gives you a reader to access the already stored memory locations. + /// Notice this will not allow you to access memory added _after_ you + /// got this reader. + pub fn reader(&self) -> IpcSharedMemoryReader { + IpcSharedMemoryReader(self.os_shared_memory_vec.clone()) + } +} + +#[derive(Clone)] +pub struct IpcSharedMemoryReader(OsIpcSharedMemoryVec); + +impl IpcSharedMemoryReader { + pub fn get(&self, index: &IpcSharedMemoryIndex) -> Option<&[u8]> { + self.0.get(&index.0) + } +} + +impl<'de> Deserialize<'de> for IpcSharedMemoryReader { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let index: usize = Deserialize::deserialize(deserializer)?; + + let os_shared_memory_vec = OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_DESERIALIZATION.with( + |os_ipc_shared_memory_regions_for_deserialization| { + let mut regions = os_ipc_shared_memory_regions_for_deserialization.borrow_mut(); + let Some(region) = regions.get_mut(index) else { + return Err(format!("Cannot consume shared memory region at index {index}, there are only {} regions available", regions.len())); + }; + + region.take().ok_or_else(|| format!("Shared memory region {index} has already been consumed")) + }, + ).map_err(D::Error::custom)?; + + Ok(IpcSharedMemoryReader(os_shared_memory_vec)) + } +} + +impl Serialize for IpcSharedMemoryReader { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let index = OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_SERIALIZATION.with( + |os_ipc_shared_memory_vec_regions_for_serialization| { + let mut os_ipc_shared_memory_vec_regions_for_serialization = + os_ipc_shared_memory_vec_regions_for_serialization.borrow_mut(); + let index = os_ipc_shared_memory_vec_regions_for_serialization.len(); + os_ipc_shared_memory_vec_regions_for_serialization.push(self.0.clone()); + index + }, + ); + debug_assert!(index < usize::MAX); + index.serialize(serializer) + } +} + /// Result for readable events returned from [IpcReceiverSet::select]. /// /// [IpcReceiverSet::select]: struct.IpcReceiverSet.html#method.select @@ -674,6 +796,7 @@ pub struct IpcMessage { pub(crate) data: Vec, pub(crate) os_ipc_channels: Vec, pub(crate) os_ipc_shared_memory_regions: Vec, + pub(crate) os_ipc_shared_memory_vec: Vec, } impl IpcMessage { @@ -684,6 +807,7 @@ impl IpcMessage { data, os_ipc_channels: vec![], os_ipc_shared_memory_regions: vec![], + os_ipc_shared_memory_vec: vec![], } } } @@ -702,11 +826,13 @@ impl IpcMessage { data: Vec, os_ipc_channels: Vec, os_ipc_shared_memory_regions: Vec, + os_ipc_shared_memory_vec: Vec, ) -> IpcMessage { IpcMessage { data, os_ipc_channels, os_ipc_shared_memory_regions, + os_ipc_shared_memory_vec, } } @@ -718,23 +844,34 @@ impl IpcMessage { OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| { OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with( |os_ipc_shared_memory_regions_for_deserialization| { - // Setup the thread local memory for deserialization to take it. - *os_ipc_channels_for_deserialization.borrow_mut() = self.os_ipc_channels; - *os_ipc_shared_memory_regions_for_deserialization.borrow_mut() = self - .os_ipc_shared_memory_regions - .into_iter() - .map(Some) - .collect(); - - let result = bincode::deserialize(&self.data[..]); - - // Clear the shared memory - let _ = os_ipc_shared_memory_regions_for_deserialization.take(); - let _ = os_ipc_channels_for_deserialization.take(); - - /* Error check comes after doing cleanup, - * since we need the cleanup both in the success and the error cases. */ - result + OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_DESERIALIZATION.with( + |os_ipc_shared_memory_vec_regions_for_deserialization| { + // Setup the thread local memory for deserialization to take it. + *os_ipc_channels_for_deserialization.borrow_mut() = + self.os_ipc_channels; + *os_ipc_shared_memory_regions_for_deserialization.borrow_mut() = self + .os_ipc_shared_memory_regions + .into_iter() + .map(Some) + .collect(); + *os_ipc_shared_memory_vec_regions_for_deserialization.borrow_mut() = + self.os_ipc_shared_memory_vec + .into_iter() + .map(Some) + .collect(); + + let result = bincode::deserialize(&self.data[..]); + + // Clear the shared memory + let _ = os_ipc_shared_memory_regions_for_deserialization.take(); + let _ = os_ipc_channels_for_deserialization.take(); + let _ = os_ipc_shared_memory_vec_regions_for_deserialization.take(); + + /* Error check comes after doing cleanup, + * since we need the cleanup both in the success and the error cases. */ + result + }, + ) }, ) }) @@ -960,7 +1097,7 @@ impl IpcBytesSender { #[inline] pub fn send(&self, data: &[u8]) -> Result<(), io::Error> { self.os_sender - .send(data, vec![], vec![]) + .send(data, vec![], vec![], vec![]) .map_err(io::Error::from) } } diff --git a/src/platform/inprocess/mod.rs b/src/platform/inprocess/mod.rs index b1dd8a1c..4aeb4a6e 100644 --- a/src/platform/inprocess/mod.rs +++ b/src/platform/inprocess/mod.rs @@ -153,9 +153,15 @@ impl OsIpcSender { data: &[u8], ports: Vec, shared_memory_regions: Vec, + shared_memory_vecs: Vec, ) -> Result<(), ChannelError> { let os_ipc_channels = ports.into_iter().map(OsOpaqueIpcChannel::new).collect(); - let ipc_message = IpcMessage::new(data.to_vec(), os_ipc_channels, shared_memory_regions); + let ipc_message = IpcMessage::new( + data.to_vec(), + os_ipc_channels, + shared_memory_regions, + shared_memory_vecs, + ); self.sender .send(ChannelMessage(ipc_message)) .map_err(|_| ChannelError::BrokenPipeError) diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 30bf9011..dcc27fc8 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -66,6 +66,7 @@ mod os { pub use self::os::{channel, OsOpaqueIpcChannel}; pub use self::os::{OsIpcChannel, OsIpcOneShotServer, OsIpcReceiver, OsIpcReceiverSet}; pub use self::os::{OsIpcSelectionResult, OsIpcSender, OsIpcSharedMemory}; +pub use self::os::{OsIpcSharedMemoryIndex, OsIpcSharedMemoryVec}; #[cfg(test)] mod test; diff --git a/src/platform/test.rs b/src/platform/test.rs index 42c37d90..bab28dca 100644 --- a/src/platform/test.rs +++ b/src/platform/test.rs @@ -42,7 +42,7 @@ use crate::test::{get_channel_name_arg, spawn_server}; fn simple() { let (tx, rx) = platform::channel().unwrap(); let data: &[u8] = b"1234567"; - tx.send(data, Vec::new(), Vec::new()).unwrap(); + tx.send(data, Vec::new(), Vec::new(), Vec::new()).unwrap(); let ipc_message = rx.recv().unwrap(); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); } @@ -53,12 +53,12 @@ fn sender_transfer() { let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; super_tx - .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]) + .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]) .unwrap(); let mut ipc_message = super_rx.recv().unwrap(); assert_eq!(ipc_message.os_ipc_channels.len(), 1); let sub_tx = ipc_message.os_ipc_channels.pop().unwrap().to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); } @@ -69,12 +69,12 @@ fn receiver_transfer() { let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; super_tx - .send(data, vec![OsIpcChannel::Receiver(sub_rx)], vec![]) + .send(data, vec![OsIpcChannel::Receiver(sub_rx)], vec![], vec![]) .unwrap(); let mut ipc_message = super_rx.recv().unwrap(); assert_eq!(ipc_message.os_ipc_channels.len(), 1); let sub_rx = ipc_message.os_ipc_channels.pop().unwrap().to_receiver(); - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); } @@ -90,18 +90,19 @@ fn multisender_transfer() { data, vec![OsIpcChannel::Sender(sub0_tx), OsIpcChannel::Sender(sub1_tx)], vec![], + vec![], ) .unwrap(); let mut ipc_message1 = super_rx.recv().unwrap(); assert_eq!(ipc_message1.os_ipc_channels.len(), 2); let sub0_tx = ipc_message1.os_ipc_channels.remove(0).to_sender(); - sub0_tx.send(data, vec![], vec![]).unwrap(); + sub0_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message2 = sub0_rx.recv().unwrap(); assert_eq!(ipc_message2, IpcMessage::from_data(data.to_vec())); let sub1_tx = ipc_message1.os_ipc_channels.remove(0).to_sender(); - sub1_tx.send(data, vec![], vec![]).unwrap(); + sub1_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message3 = sub1_rx.recv().unwrap(); assert_eq!(ipc_message3, IpcMessage::from_data(data.to_vec())); } @@ -113,7 +114,7 @@ fn medium_data() { .collect(); let data: &[u8] = &data[..]; let (tx, rx) = platform::channel().unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message = rx.recv().unwrap(); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); } @@ -127,12 +128,12 @@ fn medium_data_with_sender_transfer() { let (super_tx, super_rx) = platform::channel().unwrap(); let (sub_tx, sub_rx) = platform::channel().unwrap(); super_tx - .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]) + .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]) .unwrap(); let mut ipc_message = super_rx.recv().unwrap(); assert_eq!(ipc_message.os_ipc_channels.len(), 1); let sub_tx = ipc_message.os_ipc_channels.pop().unwrap().to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); } @@ -142,7 +143,7 @@ fn check_big_data(size: u32) { let thread = thread::spawn(move || { let data: Vec = (0..size).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); let ipc_message = rx.recv().unwrap(); let data: Vec = (0..size).map(|i| (i % 251) as u8).collect(); @@ -174,7 +175,7 @@ fn big_data_with_sender_transfer() { let data: Vec = (0..1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; super_tx - .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]) + .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]) .unwrap(); }); let mut ipc_message = super_rx.recv().unwrap(); @@ -190,7 +191,7 @@ fn big_data_with_sender_transfer() { .collect(); let data: &[u8] = &data[..]; let sub_tx = ipc_message.os_ipc_channels[0].to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); @@ -205,7 +206,9 @@ fn with_n_fds(n: usize, size: usize) { let (super_tx, super_rx) = platform::channel().unwrap(); let data: Vec = (0..size).map(|i| (i % 251) as u8).collect(); - super_tx.send(&data[..], sender_fds, vec![]).unwrap(); + super_tx + .send(&data[..], sender_fds, vec![], vec![]) + .unwrap(); let ipc_message = super_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); @@ -222,7 +225,7 @@ fn with_n_fds(n: usize, size: usize) { .zip(receivers.into_iter()) { let sub_tx = sender_fd.to_sender(); - sub_tx.send(&data, vec![], vec![]).unwrap(); + sub_tx.send(&data, vec![], vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); assert_eq!(ipc_message, IpcMessage::from_data(data.clone())); @@ -324,7 +327,7 @@ macro_rules! create_big_data_with_n_fds { let thread = thread::spawn(move || { let data: Vec = (0..1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - super_tx.send(data, sender_fds, vec![]).unwrap(); + super_tx.send(data, sender_fds, vec![], vec![]).unwrap(); }); let ipc_message = super_rx.recv().unwrap(); thread.join().unwrap(); @@ -346,7 +349,7 @@ macro_rules! create_big_data_with_n_fds { .zip(receivers.into_iter()) { let sub_tx = sender_fd.to_sender(); - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let ipc_message = sub_rx.recv().unwrap(); assert_eq!(ipc_message.data.len(), data.len()); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); @@ -375,7 +378,7 @@ fn concurrent_senders() { thread::spawn(move || { let data: Vec = (0..1024 * 1024).map(|j| (j % 13) as u8 | i << 4).collect(); let data: &[u8] = &data[..]; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }) }) .collect(); @@ -410,7 +413,7 @@ fn receiver_set() { let rx1_id = rx_set.add(rx1).unwrap(); let data: &[u8] = b"1234567"; - tx0.send(data, vec![], vec![]).unwrap(); + tx0.send(data, vec![], vec![], vec![]).unwrap(); let (received_id, ipc_message) = rx_set .select() .unwrap() @@ -421,7 +424,7 @@ fn receiver_set() { assert_eq!(received_id, rx0_id); assert_eq!(ipc_message.data, data); - tx1.send(data, vec![], vec![]).unwrap(); + tx1.send(data, vec![], vec![], vec![]).unwrap(); let (received_id, ipc_message) = rx_set .select() .unwrap() @@ -432,8 +435,8 @@ fn receiver_set() { assert_eq!(received_id, rx1_id); assert_eq!(ipc_message.data, data); - tx0.send(data, vec![], vec![]).unwrap(); - tx1.send(data, vec![], vec![]).unwrap(); + tx0.send(data, vec![], vec![], vec![]).unwrap(); + tx1.send(data, vec![], vec![], vec![]).unwrap(); let (mut received0, mut received1) = (false, false); while !received0 || !received1 { for result in rx_set.select().unwrap().into_iter() { @@ -467,7 +470,7 @@ fn receiver_set_eintr() { let rx_id = rx_set.add(rx0).unwrap(); // Let the parent know we're ready let tx1 = OsIpcSender::connect(name).unwrap(); - tx1.send(b" Ready! ", vec![OsIpcChannel::Sender(tx0)], vec![]) + tx1.send(b" Ready! ", vec![OsIpcChannel::Sender(tx0)], vec![], vec![]) .unwrap(); // Send the result of the select back to the parent let result = rx_set.select().unwrap(); @@ -476,7 +479,7 @@ fn receiver_set_eintr() { assert_eq!(rx_id, id); assert_eq!(b"Test".as_ref(), &*ipc_message.data); assert!(result_iter.next().is_none()); - tx1.send(b"Success!", vec![], vec![]).unwrap(); + tx1.send(b"Success!", vec![], vec![], vec![]).unwrap(); }) }; // Wait until the child is ready @@ -489,7 +492,7 @@ fn receiver_set_eintr() { kill(child_pid, SIGCONT); } // The interrupt shouldn't affect the following send - tx1.send(b"Test", vec![], vec![]).unwrap(); + tx1.send(b"Test", vec![], vec![], vec![]).unwrap(); let ipc_message = server.recv().unwrap(); assert!(ipc_message.data == b"Success!"); child_pid.wait(); @@ -502,7 +505,7 @@ fn receiver_set_empty() { let rx_id = rx_set.add(rx).unwrap(); let data: &[u8] = b""; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); let (received_id, ipc_message) = rx_set .select() .unwrap() @@ -562,8 +565,8 @@ fn receiver_set_medium_data() { .map(|offset| (offset % 127) as u8 | 0x80) .collect(); - tx0.send(&data0, vec![], vec![]).unwrap(); - tx1.send(&data1, vec![], vec![]).unwrap(); + tx0.send(&data0, vec![], vec![], vec![]).unwrap(); + tx1.send(&data1, vec![], vec![], vec![]).unwrap(); let (mut received0, mut received1) = (false, false); while !received0 || !received1 { for result in rx_set.select().unwrap().into_iter() { @@ -600,11 +603,11 @@ fn receiver_set_big_data() { let (reference_data0, reference_data1) = (data0.clone(), data1.clone()); let thread0 = thread::spawn(move || { - tx0.send(&data0, vec![], vec![]).unwrap(); + tx0.send(&data0, vec![], vec![], vec![]).unwrap(); tx0 // Don't close just yet -- the receiver-side test code below doesn't expect that... }); let thread1 = thread::spawn(move || { - tx1.send(&data1, vec![], vec![]).unwrap(); + tx1.send(&data1, vec![], vec![], vec![]).unwrap(); tx1 }); @@ -656,7 +659,7 @@ fn receiver_set_concurrent() { // The `macos` back-end won't receive exact size unless it's a multiple of 4... // (See https://github.com/servo/ipc-channel/pull/79 etc. ) let msg_size = msg_size & !3; - tx.send(&data[0..msg_size], vec![], vec![]).unwrap(); + tx.send(&data[0..msg_size], vec![], vec![], vec![]).unwrap(); } }); (thread, (rx_id, (reference_data, chan_index, 0usize))) @@ -696,7 +699,7 @@ fn server_accept_first() { thread::spawn(move || { thread::sleep(Duration::from_millis(30)); let tx = OsIpcSender::connect(name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); let (_, ipc_message) = server.accept().unwrap(); @@ -710,7 +713,7 @@ fn server_connect_first() { thread::spawn(move || { let tx = OsIpcSender::connect(name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); thread::sleep(Duration::from_millis(30)); @@ -727,7 +730,7 @@ fn cross_process_spawn() { let channel_name = get_channel_name_arg("server"); if let Some(channel_name) = channel_name { let tx = OsIpcSender::connect(channel_name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); unsafe { libc::exit(0); @@ -756,7 +759,7 @@ fn cross_process_fork() { let child_pid = unsafe { fork(|| { let tx = OsIpcSender::connect(name).unwrap(); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }) }; @@ -774,11 +777,11 @@ fn cross_process_sender_transfer_spawn() { let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; super_tx - .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]) + .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]) .unwrap(); sub_rx.recv().unwrap(); let data: &[u8] = b"bar"; - super_tx.send(data, vec![], vec![]).unwrap(); + super_tx.send(data, vec![], vec![], vec![]).unwrap(); unsafe { libc::exit(0); @@ -792,7 +795,7 @@ fn cross_process_sender_transfer_spawn() { assert_eq!(ipc_message.os_ipc_channels.len(), 1); let sub_tx = ipc_message.os_ipc_channels[0].to_sender(); let data: &[u8] = b"baz"; - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let data: &[u8] = b"bar"; let ipc_message = super_rx.recv().unwrap(); @@ -816,11 +819,11 @@ fn cross_process_sender_transfer_fork() { let (sub_tx, sub_rx) = platform::channel().unwrap(); let data: &[u8] = b"foo"; super_tx - .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]) + .send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]) .unwrap(); sub_rx.recv().unwrap(); let data: &[u8] = b"bar"; - super_tx.send(data, vec![], vec![]).unwrap(); + super_tx.send(data, vec![], vec![], vec![]).unwrap(); }) }; @@ -828,7 +831,7 @@ fn cross_process_sender_transfer_fork() { assert_eq!(ipc_message.os_ipc_channels.len(), 1); let sub_tx = ipc_message.os_ipc_channels[0].to_sender(); let data: &[u8] = b"baz"; - sub_tx.send(data, vec![], vec![]).unwrap(); + sub_tx.send(data, vec![], vec![], vec![]).unwrap(); let data: &[u8] = b"bar"; let ipc_message = super_rx.recv().unwrap(); @@ -853,7 +856,7 @@ fn no_receiver_notification() { drop(receiver); let data: &[u8] = b"1234567"; loop { - if let Err(err) = sender.send(data, vec![], vec![]) { + if let Err(err) = sender.send(data, vec![], vec![], vec![]) { // We don't have an actual method for distinguishing a "broken pipe" error -- // but at least it's not supposed to signal the same condition as closing the sender. assert!(!err.channel_is_closed()); @@ -873,12 +876,12 @@ fn no_receiver_notification_pending() { let (sender, receiver) = platform::channel().unwrap(); let data: &[u8] = b"1234567"; - let result = sender.send(data, vec![], vec![]); + let result = sender.send(data, vec![], vec![], vec![]); assert!(result.is_ok()); drop(receiver); loop { - if let Err(err) = sender.send(data, vec![], vec![]) { + if let Err(err) = sender.send(data, vec![], vec![], vec![]) { // We don't have an actual method for distinguishing a "broken pipe" error -- // but at least it's not supposed to signal the same condition as closing the sender. assert!(!err.channel_is_closed()); @@ -901,7 +904,7 @@ fn no_receiver_notification_delayed() { let data: &[u8] = b"1234567"; loop { - if let Err(err) = sender.send(data, vec![], vec![]) { + if let Err(err) = sender.send(data, vec![], vec![], vec![]) { // We don't have an actual method for distinguishing a "broken pipe" error -- // but at least it's not supposed to signal the same condition as closing the sender. assert!(!err.channel_is_closed()); @@ -917,7 +920,7 @@ fn shared_memory() { let (tx, rx) = platform::channel().unwrap(); let data: &[u8] = b"1234567"; let shmem_data = OsIpcSharedMemory::from_byte(0xba, 1024 * 1024); - tx.send(data, vec![], vec![shmem_data]).unwrap(); + tx.send(data, vec![], vec![shmem_data], vec![]).unwrap(); let ipc_message = rx.recv().unwrap(); assert_eq!(&ipc_message.data, data); assert!(&ipc_message.os_ipc_channels.is_empty()); @@ -942,7 +945,7 @@ fn try_recv() { let (tx, rx) = platform::channel().unwrap(); assert!(rx.try_recv().is_err()); let data: &[u8] = b"1234567"; - tx.send(data, Vec::new(), Vec::new()).unwrap(); + tx.send(data, Vec::new(), Vec::new(), Vec::new()).unwrap(); let ipc_message = rx.try_recv().unwrap(); assert_eq!(ipc_message, IpcMessage::from_data(data.to_vec())); assert!(rx.try_recv().is_err()); @@ -1003,7 +1006,7 @@ fn try_recv_large() { let thread = thread::spawn(move || { let data: Vec = (0..1024 * 1024).map(|i| (i % 251) as u8).collect(); let data: &[u8] = &data[..]; - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }); let mut result; @@ -1066,7 +1069,7 @@ fn try_recv_large_delayed() { let data: Vec = (0..msg_size).map(|j| (j % 13) as u8 | i << 4).collect(); let data: &[u8] = &data[..]; delay(thread_delay); - tx.send(data, vec![], vec![]).unwrap(); + tx.send(data, vec![], vec![], vec![]).unwrap(); }) }) .collect(); @@ -1137,7 +1140,7 @@ fn cross_process_two_step_transfer_spawn() { // send the other process the tx side, so it can send us the channels super_tx - .send(&[], vec![OsIpcChannel::Sender(sub_tx)], vec![]) + .send(&[], vec![OsIpcChannel::Sender(sub_tx)], vec![], vec![]) .unwrap(); // get two_rx from the other process @@ -1155,7 +1158,9 @@ fn cross_process_two_step_transfer_spawn() { assert_eq!(&ipc_message.data[..], cookie); // finally, send a cookie back - super_tx.send(&ipc_message.data, vec![], vec![]).unwrap(); + super_tx + .send(&ipc_message.data, vec![], vec![], vec![]) + .unwrap(); // terminate unsafe { @@ -1166,13 +1171,13 @@ fn cross_process_two_step_transfer_spawn() { // create channel 1 let (one_tx, one_rx) = platform::channel().unwrap(); // put data in channel 1's pipe - one_tx.send(cookie, vec![], vec![]).unwrap(); + one_tx.send(cookie, vec![], vec![], vec![]).unwrap(); // create channel 2 let (two_tx, two_rx) = platform::channel().unwrap(); // put channel 1's rx end in channel 2's pipe two_tx - .send(&[], vec![OsIpcChannel::Receiver(one_rx)], vec![]) + .send(&[], vec![OsIpcChannel::Receiver(one_rx)], vec![], vec![]) .unwrap(); // create a one-shot server, and spawn another process @@ -1190,7 +1195,7 @@ fn cross_process_two_step_transfer_spawn() { // Send the outer payload channel, so the server can use it to // retrieve the inner payload and the cookie sub_tx - .send(&[], vec![OsIpcChannel::Receiver(two_rx)], vec![]) + .send(&[], vec![OsIpcChannel::Receiver(two_rx)], vec![], vec![]) .unwrap(); // Then we wait for the cookie to make its way back to us diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index 47db0364..7c76c363 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -11,8 +11,8 @@ use crate::ipc::{self, IpcMessage}; use bincode; use fnv::FnvHasher; use libc::{ - self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ, - PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET, + self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_FIXED, MAP_NORESERVE, + MAP_SHARED, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET, }; use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK}; use libc::{iovec, msghdr, off_t, recvmsg, sendmsg}; @@ -20,6 +20,7 @@ use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, s use libc::{EAGAIN, EWOULDBLOCK}; use mio::unix::SourceFd; use mio::{Events, Interest, Poll, Token}; +use serde::{Deserialize, Serialize}; use std::cell::Cell; use std::cmp; use std::collections::HashMap; @@ -230,7 +231,7 @@ impl OsIpcSender { /// /// This one is smaller than regular fragments, because it carries the message (size) header. fn first_fragment_size(sendbuf_size: usize) -> usize { - (Self::fragment_size(sendbuf_size) - mem::size_of::()) & (!8usize + 1) + (Self::fragment_size(sendbuf_size) - 2 * mem::size_of::()) & (!8usize + 1) // Ensure optimal alignment. } @@ -252,6 +253,7 @@ impl OsIpcSender { data: &[u8], channels: Vec, shared_memory_regions: Vec, + shared_memory_vecs: Vec, ) -> Result<(), UnixError> { let mut fds = Vec::new(); for channel in channels.iter() { @@ -260,6 +262,11 @@ impl OsIpcSender { for shared_memory_region in shared_memory_regions.iter() { fds.push(shared_memory_region.store.fd()); } + let number_of_shared_memory_regions = shared_memory_regions.len(); + + for shared_memory_vec in shared_memory_vecs.iter() { + fds.push(shared_memory_vec.store.fd()); + } // `len` is the total length of the message. // Its value will be sent as a message header before the payload data. @@ -272,6 +279,7 @@ impl OsIpcSender { fds: &[c_int], data_buffer: &[u8], len: usize, + number_of_shared_memory_regions: usize, ) -> Result<(), UnixError> { let result = unsafe { let cmsg_length = mem::size_of_val(fds) as c_uint; @@ -305,12 +313,15 @@ impl OsIpcSender { iov_base: &len as *const _ as *mut c_void, iov_len: mem::size_of_val(&len), }, + iovec { + iov_base: &number_of_shared_memory_regions as *const _ as *mut c_void, + iov_len: mem::size_of_val(&number_of_shared_memory_regions), + }, iovec { iov_base: data_buffer.as_ptr() as *mut c_void, iov_len: data_buffer.len(), }, ]; - let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen); let result = sendmsg(sender_fd, &msghdr, 0); libc::free(cmsg_buffer as *mut c_void); @@ -366,7 +377,13 @@ impl OsIpcSender { // If the message is small enough, try sending it in a single fragment. if data.len() <= Self::get_max_fragment_size() { - match send_first_fragment(self.fd.0, &fds[..], data, data.len()) { + match send_first_fragment( + self.fd.0, + &fds[..], + &data, + data.len(), + number_of_shared_memory_regions, + ) { Ok(_) => return Ok(()), Err(error) => { // ENOBUFS means the kernel failed to allocate a buffer large enough @@ -406,7 +423,13 @@ impl OsIpcSender { // This fragment always uses the full allowable buffer size. end_byte_position = Self::first_fragment_size(sendbuf_size); - send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len()) + send_first_fragment( + self.fd.0, + &fds[..], + &data[..end_byte_position], + data.len(), + number_of_shared_memory_regions, + ) } else { // Followup fragment. No header; but offset by amount of data already sent. @@ -789,6 +812,31 @@ impl BackingStore { assert!(address != MAP_FAILED); (address as *mut u8, length) } + + pub unsafe fn map_file_noreserve(&self, length: Option) -> (*mut u8, size_t) { + let length = length.unwrap_or_else(|| { + let mut st = mem::MaybeUninit::uninit(); + if libc::fstat(self.fd, st.as_mut_ptr()) != 0 { + panic!("error stating fd {}: {}", self.fd, UnixError::last()); + } + st.assume_init().st_size as size_t + }); + if length == 0 { + // This will cause `mmap` to fail, so handle it explicitly. + return (ptr::null_mut(), length); + } + let address = libc::mmap( + ptr::null_mut(), + length, + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_NORESERVE, + self.fd, + 0, + ); + assert!(!address.is_null()); + assert!(address != MAP_FAILED); + (address as *mut u8, length) + } } impl Drop for BackingStore { @@ -800,6 +848,130 @@ impl Drop for BackingStore { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OsIpcSharedMemoryIndex { + // The offset in bytes + offset: usize, + // The length in bytes + length: usize, +} + +pub struct OsIpcSharedMemoryVec { + ptr: *mut u8, + length: usize, + store: BackingStore, +} + +impl OsIpcSharedMemoryVec { + pub fn from_bytes(bytes: &[u8]) -> (OsIpcSharedMemoryVec, OsIpcSharedMemoryIndex) { + unsafe { + let store = BackingStore::new(bytes.len()); + let (address, _) = store.map_file_noreserve(Some(512_000_000)); + ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len()); + let memory = OsIpcSharedMemoryVec { + ptr: address, + length: bytes.len(), + store, + }; + ( + memory, + OsIpcSharedMemoryIndex { + offset: 0, + length: bytes.len(), + }, + ) + } + } + + unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemoryVec { + let store = BackingStore::from_fd(fd); + let (ptr, length) = store.map_file(None); + OsIpcSharedMemoryVec { ptr, length, store } + } + + pub fn push(&mut self, bytes: &[u8]) -> OsIpcSharedMemoryIndex { + let fd = self.store.fd(); + let index = unsafe { + assert_eq!( + 0, + libc::ftruncate(fd, (self.length + bytes.len()).try_into().unwrap()) + ); + + let new_length = self.length + bytes.len(); + // map more of the file into the address space + let address = libc::mmap( + self.ptr.byte_offset(self.length.try_into().unwrap()) as *mut c_void, + bytes.len(), + PROT_READ | PROT_WRITE, + MAP_SHARED, + self.store.fd, + 0, + ); + + assert!(address != MAP_FAILED); + ptr::copy_nonoverlapping(bytes.as_ptr(), address as *mut u8, bytes.len()); + OsIpcSharedMemoryIndex { + offset: self.length, + length: bytes.len(), + } + }; + self.length += bytes.len(); + index + } + + pub fn get(&self, index: &OsIpcSharedMemoryIndex) -> Option<&[u8]> { + // While it would be nice to check index vs length here, we cannot. + // The length in this object might be a reader that was sent a long time ago + // The length does not get updated for the object, only for the main IpcSharedMemoryVec + // This is in general fine as we only produce indices for objects that exists _and_ we + // can never delete objects. + if self.length >= index.offset + index.length { + Some(unsafe { + slice::from_raw_parts( + self.ptr.byte_offset(index.offset.try_into().unwrap()), + index.length, + ) + }) + } else { + None + } + } +} + +unsafe impl Send for OsIpcSharedMemoryVec {} +unsafe impl Sync for OsIpcSharedMemoryVec {} + +impl PartialEq for OsIpcSharedMemoryVec { + fn eq(&self, other: &OsIpcSharedMemoryVec) -> bool { + self.ptr == other.ptr + } +} + +impl Drop for OsIpcSharedMemoryVec { + fn drop(&mut self) { + unsafe { + if !self.ptr.is_null() { + let result = libc::munmap(self.ptr as *mut c_void, self.length); + assert!(thread::panicking() || result == 0); + } + } + } +} + +impl Clone for OsIpcSharedMemoryVec { + fn clone(&self) -> OsIpcSharedMemoryVec { + unsafe { + let store = BackingStore::from_fd(libc::dup(self.store.fd())); + let (address, _) = store.map_file(Some(self.length)); + OsIpcSharedMemoryVec { + ptr: address, + length: self.length, + store, + } + } + } +} + pub struct OsIpcSharedMemory { ptr: *mut u8, length: usize, @@ -988,7 +1160,8 @@ enum BlockingMode { #[allow(clippy::uninit_vec, clippy::type_complexity)] fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result { - let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new()); + let (mut channels, mut shared_memory_regions, mut shared_memory_vec) = + (Vec::new(), Vec::new(), Vec::new()); // First fragments begins with a header recording the total data length. // @@ -996,6 +1169,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result // or need to receive additional fragments -- and if so, how much. let mut total_size = 0usize; let mut main_data_buffer; + let mut number_of_regions = 0usize; unsafe { // Allocate a buffer without initialising the memory. main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size()); @@ -1006,6 +1180,10 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result iov_base: &mut total_size as *mut _ as *mut c_void, iov_len: mem::size_of_val(&total_size), }, + iovec { + iov_base: &mut number_of_regions as *mut _ as *mut c_void, + iov_len: mem::size_of_val(&number_of_regions), + }, iovec { iov_base: main_data_buffer.as_mut_ptr() as *mut c_void, iov_len: main_data_buffer.len(), @@ -1014,7 +1192,9 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result let mut cmsg = UnixCmsg::new(&mut iovec)?; let bytes_read = cmsg.recv(fd, blocking_mode)?; - main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size)); + main_data_buffer.set_len( + bytes_read - mem::size_of_val(&total_size) - mem::size_of_val(&number_of_regions), + ); let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int; let cmsg_length = cmsg.msghdr.msg_controllen; @@ -1026,13 +1206,21 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result // exposed by libc. CMSG_SPACE(0) is the portable version of that.) (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::() }; + // The null fd shows the difference between SharedMemory and SharedMemoryVecs. + // Everything after it is SharedMemoryVecs for index in 0..channel_length { let fd = *cmsg_fds.add(index); if is_socket(fd) { channels.push(OsOpaqueIpcChannel::from_fd(fd)); continue; } - shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + + if number_of_regions > 0 { + shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd)); + number_of_regions -= 1; + } else { + shared_memory_vec.push(OsIpcSharedMemoryVec::from_fd(fd)); + } } } @@ -1042,6 +1230,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result main_data_buffer, channels, shared_memory_regions, + shared_memory_vec, )); } @@ -1093,6 +1282,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result main_data_buffer, channels, shared_memory_regions, + shared_memory_vec, )) } diff --git a/src/router.rs b/src/router.rs index 36366330..297097b0 100644 --- a/src/router.rs +++ b/src/router.rs @@ -18,7 +18,7 @@ use std::sync::{LazyLock, Mutex}; use std::thread::{self, JoinHandle}; use crossbeam_channel::{self, Receiver, Sender}; -use serde_core::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use crate::ipc::{ self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver, diff --git a/src/test.rs b/src/test.rs index 3cefecd3..6fac77c7 100644 --- a/src/test.rs +++ b/src/test.rs @@ -42,7 +42,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::ipc::IpcOneShotServer; #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] use crate::ipc::IpcReceiver; -use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory}; +use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory, IpcSharedMemoryVec}; use crate::router::{RouterProxy, ROUTER}; #[cfg(not(any( @@ -542,6 +542,25 @@ fn shared_memory_object_equality() { assert_eq!(received_person_and_shared_memory, person_and_shared_memory); } +#[test] +fn shared_memory_vec() { + let (mut vec, index) = IpcSharedMemoryVec::from_bytes(&[0xba; 24]); + let (tx, rx) = ipc::channel().unwrap(); + tx.send(vec.reader()).unwrap(); + let received_reader = rx.recv().unwrap(); + assert_eq!(vec.get(&index), received_reader.get(&index)); + + let index2 = vec.push(&[0xbc; 24]); + assert!(vec.get(&index2).is_some()); + assert!(received_reader.get(&index2).is_none()); // The reader is too old + + tx.send(vec.reader()).unwrap(); + let received_reader2 = rx.recv().unwrap(); + + assert_eq!(vec.get(&index), received_reader.get(&index)); + assert_eq!(vec.get(&index2), received_reader2.get(&index2)); +} + #[test] fn opaque_sender() { let person = ("Patrick Walton".to_owned(), 29);