Skip to content

Commit d83d628

Browse files
authored
Poison call map after listener failure to prevent stale inserts (#288)
* Poison call map after listener failure to prevent stale inserts * Remove redundant test
1 parent 541a4e3 commit d83d628

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

rust/src/hdfs/connection.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub(crate) struct RpcConnection {
119119
user_info: UserInfo,
120120
next_call_id: AtomicI32,
121121
alignment_context: Option<Arc<Mutex<AlignmentContext>>>,
122-
call_map: Arc<Mutex<HashMap<i32, CallResult>>>,
122+
call_map: Arc<Mutex<Option<HashMap<i32, CallResult>>>>,
123123
sender: mpsc::Sender<Vec<u8>>,
124124
listener: Option<JoinHandle<()>>,
125125
}
@@ -134,7 +134,7 @@ impl RpcConnection {
134134
) -> Result<Self> {
135135
let client_id = Uuid::new_v4().to_bytes_le().to_vec();
136136
let next_call_id = AtomicI32::new(0);
137-
let call_map = Arc::new(Mutex::new(HashMap::new()));
137+
let call_map = Arc::new(Mutex::new(Some(HashMap::new())));
138138

139139
let mut stream = connect(url, handle).await?;
140140
stream.write_all("hrpc".as_bytes()).await?;
@@ -300,7 +300,20 @@ impl RpcConnection {
300300

301301
let (sender, receiver) = oneshot::channel::<Result<Bytes>>();
302302

303-
self.call_map.lock().unwrap().insert(call_id, sender);
303+
{
304+
let mut map = self.call_map.lock().unwrap();
305+
match map.as_mut() {
306+
Some(m) => {
307+
m.insert(call_id, sender);
308+
}
309+
None => {
310+
return Err(HdfsError::IOError(std::io::Error::new(
311+
std::io::ErrorKind::ConnectionAborted,
312+
"RPC listener disconnected",
313+
)));
314+
}
315+
}
316+
}
304317

305318
self.write_messages(&[&conn_header_buf, &header_buf, message])
306319
.await?;
@@ -310,15 +323,15 @@ impl RpcConnection {
310323
}
311324

312325
struct RpcListener {
313-
call_map: Arc<Mutex<HashMap<i32, CallResult>>>,
326+
call_map: Arc<Mutex<Option<HashMap<i32, CallResult>>>>,
314327
reader: SaslReader,
315328
alive: bool,
316329
alignment_context: Option<Arc<Mutex<AlignmentContext>>>,
317330
}
318331

319332
impl RpcListener {
320333
fn new(
321-
call_map: Arc<Mutex<HashMap<i32, CallResult>>>,
334+
call_map: Arc<Mutex<Option<HashMap<i32, CallResult>>>>,
322335
reader: SaslReader,
323336
alignment_context: Option<Arc<Mutex<AlignmentContext>>>,
324337
) -> Self {
@@ -344,12 +357,13 @@ impl RpcListener {
344357
}
345358
self.alive = false;
346359

347-
let mut call_map = self.call_map.lock().unwrap();
348-
for (_, call) in call_map.drain() {
349-
let _ = call.send(Err(HdfsError::IOError(std::io::Error::new(
350-
std::io::ErrorKind::ConnectionAborted,
351-
"RPC listener disconnected",
352-
))));
360+
if let Some(map) = self.call_map.lock().unwrap().take() {
361+
for (_, call) in map {
362+
let _ = call.send(Err(HdfsError::IOError(std::io::Error::new(
363+
std::io::ErrorKind::ConnectionAborted,
364+
"RPC listener disconnected",
365+
))));
366+
}
353367
}
354368
}
355369

@@ -370,7 +384,12 @@ impl RpcListener {
370384

371385
let call_id = rpc_response.call_id as i32;
372386

373-
let call = self.call_map.lock().unwrap().remove(&call_id);
387+
let call = self
388+
.call_map
389+
.lock()
390+
.unwrap()
391+
.as_mut()
392+
.and_then(|m| m.remove(&call_id));
374393

375394
if let Some(call) = call {
376395
match rpc_response.status() {

0 commit comments

Comments
 (0)