Skip to content

Commit 079e07f

Browse files
committed
Await on multiple addresses
1 parent 70a2837 commit 079e07f

File tree

1 file changed

+31
-15
lines changed

1 file changed

+31
-15
lines changed

communication/src/networking.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -148,22 +148,38 @@ pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bo
148148
/// Result contains connections `[my_index + 1, addresses.len() - 1]`.
149149
pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
150150
let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
151-
let listener = TcpListener::bind(&addresses[my_index][..])?;
152-
153-
for _ in (my_index + 1) .. addresses.len() {
154-
let mut stream = listener.accept()?.0;
155-
stream.set_nodelay(true).expect("set_nodelay call failed");
156-
let mut buffer = [0u8;16];
157-
stream.read_exact(&mut buffer)?;
158-
let mut cursor = io::Cursor::new(buffer);
159-
let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
160-
if magic != HANDSHAKE_MAGIC {
161-
return Err(io::Error::new(io::ErrorKind::InvalidData,
162-
"received incorrect timely handshake"));
151+
152+
// We may have multiple addresses to bind to, and will listen on each of them until all received.
153+
let listeners = addresses[my_index].split_whitespace().map(|addr| TcpListener::bind(addr)).collect::<Result<Vec<_>>>()?;
154+
for listener in listeners.iter() { listener.set_nonblocking(true).expect("Couldn't set nonblocking"); }
155+
156+
// Until we have all intended connections, poll each listener, sleeping briefly if none have accepted a new stream.
157+
while results.iter().any(Option::is_none) {
158+
let mut received = false;
159+
for listener in listeners.iter() {
160+
match listener.accept() {
161+
Ok((mut stream, _)) => {
162+
stream.set_nodelay(true).expect("set_nodelay call failed");
163+
let mut buffer = [0u8;16];
164+
stream.read_exact(&mut buffer)?;
165+
let mut cursor = io::Cursor::new(buffer);
166+
let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
167+
if magic != HANDSHAKE_MAGIC {
168+
return Err(io::Error::new(io::ErrorKind::InvalidData,
169+
"received incorrect timely handshake"));
170+
}
171+
let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
172+
results[identifier - my_index - 1] = Some(stream);
173+
if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
174+
received = true;
175+
}
176+
Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { return Err(e); } }
177+
}
178+
}
179+
if !received {
180+
println!("awaiting connections (at {:?}/{:?})", results.iter().filter(|x| x.is_some()).count(), results.len());
181+
sleep(Duration::from_secs(1));
163182
}
164-
let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
165-
results[identifier - my_index - 1] = Some(stream);
166-
if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
167183
}
168184

169185
Ok(results)

0 commit comments

Comments
 (0)