Skip to content

Commit 1540baa

Browse files
committed
Handle clients that went away unnanounced
1 parent 3511fda commit 1540baa

File tree

2 files changed

+49
-37
lines changed

2 files changed

+49
-37
lines changed

src/bin/server.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,6 @@ fn main() {
294294
addr
295295
);
296296

297-
// @TODO to nagle or not to nagle?
298-
// sock.set_nodelay(true)
299-
300297
let token = {
301298
let entry = connections.vacant_entry();
302299
let token = Token(entry.key());
@@ -331,31 +328,38 @@ fn main() {
331328
while let Ok((query_name, results)) = recv_results.try_recv() {
332329
info!("[WORKER {}] {} {} results", worker.index(), query_name, results.len());
333330

334-
match server.interests.get(&query_name) {
331+
match server.interests.get_mut(&query_name) {
335332
None => {
336-
/* @TODO unregister this flow */
337-
warn!("NO INTEREST FOR THIS RESULT");
333+
trace!("result on query {} w/o interested clients", query_name);
334+
server.shutdown_query(&query_name);
338335
}
339336
Some(tokens) => {
340337
let serialized = serde_json::to_string::<(String, Vec<ResultDiff<T>>)>(
341338
&(query_name, results),
342339
).expect("failed to serialize outputs");
343340
let msg = ws::Message::text(serialized);
344341

345-
for &token in tokens.iter() {
346-
// @TODO check whether connection still exists
347-
let conn = &mut connections[token.into()];
348-
349-
conn.send_message(msg.clone())
350-
.expect("failed to send message");
351-
352-
poll.reregister(
353-
conn.socket(),
354-
conn.token(),
355-
conn.events(),
356-
PollOpt::edge() | PollOpt::oneshot(),
357-
).unwrap();
358-
}
342+
tokens.retain(|token| {
343+
match connections.get_mut((*token).into()) {
344+
None => {
345+
trace!("client {:?} has gone away", token);
346+
false
347+
}
348+
Some(conn) => {
349+
conn.send_message(msg.clone())
350+
.expect("failed to send message");
351+
352+
poll.reregister(
353+
conn.socket(),
354+
conn.token(),
355+
conn.events(),
356+
PollOpt::edge() | PollOpt::oneshot(),
357+
).unwrap();
358+
359+
true
360+
}
361+
}
362+
});
359363
}
360364
}
361365
}
@@ -368,7 +372,7 @@ fn main() {
368372
).unwrap();
369373
}
370374
ERRORS => {
371-
while let Ok((tokens, mut errors)) = recv_errors.try_recv() {
375+
while let Ok((mut tokens, mut errors)) = recv_errors.try_recv() {
372376
error!("[WORKER {}] {:?}", worker.index(), errors);
373377

374378
let serializable = errors.drain(..).map(|(error, time)| {
@@ -384,19 +388,21 @@ fn main() {
384388
).expect("failed to serialize errors");
385389
let msg = ws::Message::text(serialized);
386390

387-
for &token in tokens.iter() {
388-
// @TODO check whether connection still exists
389-
let conn = &mut connections[token.into()];
390-
391-
conn.send_message(msg.clone())
392-
.expect("failed to send message");
391+
for token in tokens.drain(..) {
392+
match connections.get_mut(token.into()) {
393+
None => warn!("sending errors to client that went away"),
394+
Some(conn) => {
395+
conn.send_message(msg.clone())
396+
.expect("failed to send message");
393397

394-
poll.reregister(
395-
conn.socket(),
396-
conn.token(),
397-
conn.events(),
398-
PollOpt::edge() | PollOpt::oneshot(),
399-
).unwrap();
398+
poll.reregister(
399+
conn.socket(),
400+
conn.token(),
401+
conn.events(),
402+
PollOpt::edge() | PollOpt::oneshot(),
403+
).unwrap();
404+
}
405+
}
400406
}
401407
}
402408

@@ -621,9 +627,8 @@ fn main() {
621627
entry.remove(&client_token);
622628

623629
if entry.is_empty() {
624-
info!("Shutting down {}", name);
630+
server.shutdown_query(&name);
625631
server.interests.remove(&name);
626-
server.shutdown_handles.remove(&name);
627632
}
628633
}
629634
}

src/server/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ where
147147
pub context: Context<T>,
148148
/// Mapping from query names to interested client tokens.
149149
pub interests: HashMap<String, HashSet<Token>>,
150-
/// Mapping from query names to their shutdown handles.
151-
pub shutdown_handles: HashMap<String, ShutdownHandle>,
150+
// Mapping from query names to their shutdown handles.
151+
shutdown_handles: HashMap<String, ShutdownHandle>,
152152
/// Probe keeping track of overall dataflow progress.
153153
pub probe: ProbeHandle<T>,
154154
/// Scheduler managing deferred operator activations.
@@ -245,6 +245,13 @@ where
245245
]
246246
}
247247

248+
/// Drops all shutdown handles associated with the specified
249+
/// query, resulting in its dataflow getting cleaned up.
250+
pub fn shutdown_query(&mut self, name: &str) {
251+
info!("Shutting down {}", name);
252+
self.shutdown_handles.remove(name);
253+
}
254+
248255
/// Handle a Transact request.
249256
pub fn transact(
250257
&mut self,

0 commit comments

Comments
 (0)