Skip to content

Commit d74a1f0

Browse files
authored
fix(cubesql): Catch error on TcpStream.peer_addr() (#6300)
1 parent 1f2db32 commit d74a1f0

File tree

10 files changed

+79
-29
lines changed

10 files changed

+79
-29
lines changed

rust/cubesql/cubesql/src/compile/engine/information_schema/postgres/pg_stat_activity.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,13 @@ impl PgStatActivityBuilder {
7878
self.application_name
7979
.append_option(session.application_name)
8080
.unwrap();
81-
self.client_addr.append_option(session.client_addr).unwrap();
81+
self.client_addr.append_value(session.client_addr).unwrap();
8282
self.client_hostname
8383
.append_option(session.client_hostname)
8484
.unwrap();
85-
self.client_port.append_option(session.client_port).unwrap();
85+
self.client_port
86+
.append_value(session.client_port.to_string())
87+
.unwrap();
8688
self.backend_start.append_null().unwrap();
8789
self.xact_start.append_null().unwrap();
8890
self.query_start.append_null().unwrap();

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7322,6 +7322,20 @@ ORDER BY \"COUNT(count)\" DESC"
73227322
Ok(())
73237323
}
73247324

7325+
#[tokio::test]
7326+
async fn test_pgcatalog_pg_stat_activity_postgres() -> Result<(), CubeError> {
7327+
insta::assert_snapshot!(
7328+
"pgcatalog_pg_stat_activity_postgres",
7329+
execute_query(
7330+
"SELECT * FROM pg_catalog.pg_stat_activity".to_string(),
7331+
DatabaseProtocol::PostgreSQL
7332+
)
7333+
.await?
7334+
);
7335+
7336+
Ok(())
7337+
}
7338+
73257339
#[tokio::test]
73267340
async fn test_pgcatalog_pguser_postgres() -> Result<(), CubeError> {
73277341
insta::assert_snapshot!(

rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__information_schema_processlist.snap

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
---
22
source: cubesql/src/compile/mod.rs
3-
assertion_line: 4118
4-
expression: "execute_query(\"SELECT * FROM information_schema.processlist\".to_string()).await?"
5-
3+
assertion_line: 5710
4+
expression: "execute_query(\"SELECT * FROM information_schema.processlist\".to_string(),\n DatabaseProtocol::MySQL).await?"
65
---
76
+----+------+-----------+----+---------+------+------------------------+------+
87
| ID | USER | HOST | DB | COMMAND | TIME | STATE | INFO |
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
source: cubesql/src/compile/mod.rs
3+
assertion_line: 7327
4+
expression: "execute_query(\"SELECT * FROM pg_catalog.pg_stat_activity\".to_string(),\n DatabaseProtocol::PostgreSQL).await?"
5+
---
6+
+-----+---------+-----+------------+----------+---------+------------------+-------------+-----------------+-------------+---------------+------------+-------------+--------------+-----------------+------------+-------+-------------+--------------+----------+-------+----------------+
7+
| oid | datname | pid | leader_pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | xact_start | query_start | state_change | wait_event_type | wait_event | state | backend_xid | backend_xmin | query_id | query | backend_type |
8+
+-----+---------+-----+------------+----------+---------+------------------+-------------+-----------------+-------------+---------------+------------+-------------+--------------+-----------------+------------+-------+-------------+--------------+----------+-------+----------------+
9+
| 1 | cubedb | 1 | NULL | NULL | ovr | NULL | 127.0.0.1 | NULL | 1234 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | client backend |
10+
+-----+---------+-----+------------+----------+---------+------------------+-------------+-----------------+-------------+---------------+------------+-------------+--------------+-----------------+------------+-------+-------------+--------------+----------+-------+----------------+

rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__show_processlist.snap

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
---
22
source: cubesql/src/compile/mod.rs
3-
assertion_line: 4116
4-
expression: "execute_query(\"SHOW processlist\".to_string()).await?"
5-
3+
assertion_line: 5676
4+
expression: "execute_query(\"SHOW processlist\".to_string(), DatabaseProtocol::MySQL).await?"
65
---
76
+----+------+-----------+----+---------+------+------------------------+------+
87
| ID | USER | HOST | DB | COMMAND | TIME | STATE | INFO |

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ pub async fn get_test_session(protocol: DatabaseProtocol) -> Arc<Session> {
189189
};
190190
let session_manager = Arc::new(SessionManager::new(server.clone()));
191191
let session = session_manager
192-
.create_session(protocol, "127.0.0.1".to_string())
192+
.create_session(protocol, "127.0.0.1".to_string(), 1234)
193193
.await;
194194

195195
// Populate like shims

rust/cubesql/cubesql/src/sql/mysql/service.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,12 +501,21 @@ impl ProcessingLoop for MySqlServer {
501501
}
502502
};
503503

504+
let (client_addr, client_port) = match socket.peer_addr() {
505+
Ok(peer_addr) => (peer_addr.ip().to_string(), peer_addr.port()),
506+
Err(e) => {
507+
error!(
508+
"[mysql] Error while calling peer_addr() on TcpStream: {}",
509+
e
510+
);
511+
512+
("127.0.0.1".to_string(), 0000_u16)
513+
}
514+
};
515+
504516
let session = self
505517
.session_manager
506-
.create_session(
507-
DatabaseProtocol::MySQL,
508-
socket.peer_addr().unwrap().to_string(),
509-
)
518+
.create_session(DatabaseProtocol::MySQL, client_addr, client_port)
510519
.await;
511520

512521
let logger = Arc::new(SessionLogger::new(session.state.clone()));

rust/cubesql/cubesql/src/sql/postgres/service.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,18 @@ impl ProcessingLoop for PostgresServer {
5656
}
5757
};
5858

59+
let (client_addr, client_port) = match socket.peer_addr() {
60+
Ok(peer_addr) => (peer_addr.ip().to_string(), peer_addr.port()),
61+
Err(e) => {
62+
error!("[pg] Error while calling peer_addr() on TcpStream: {}", e);
63+
64+
("127.0.0.1".to_string(), 0000_u16)
65+
}
66+
};
67+
5968
let session = self
6069
.session_manager
61-
.create_session(
62-
DatabaseProtocol::PostgreSQL,
63-
socket.peer_addr().unwrap().to_string(),
64-
)
70+
.create_session(DatabaseProtocol::PostgreSQL, client_addr, client_port)
6571
.await;
6672
let logger = Arc::new(SessionLogger::new(session.state.clone()));
6773

@@ -74,7 +80,7 @@ impl ProcessingLoop for PostgresServer {
7480
tokio::spawn(async move {
7581
tx.closed().await;
7682

77-
trace!("[postgres] Removing connection {}", connection_id);
83+
trace!("[pg] Removing connection {}", connection_id);
7884

7985
session_manager.drop_session(connection_id).await;
8086
});

rust/cubesql/cubesql/src/sql/session.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ pub struct SessionState {
8080
pub connection_id: u32,
8181
// secret for this session
8282
pub secret: u32,
83-
// client address, immutable
84-
pub host: String,
83+
// client ip, immutable
84+
pub client_ip: String,
85+
// client port, immutable
86+
pub client_port: u16,
8587
// client protocol, mysql/postgresql, immutable
8688
pub protocol: DatabaseProtocol,
8789

@@ -104,7 +106,8 @@ pub struct SessionState {
104106
impl SessionState {
105107
pub fn new(
106108
connection_id: u32,
107-
host: String,
109+
client_ip: String,
110+
client_port: u16,
108111
protocol: DatabaseProtocol,
109112
auth_context: Option<AuthContextRef>,
110113
) -> Self {
@@ -113,7 +116,8 @@ impl SessionState {
113116
Self {
114117
connection_id,
115118
secret: rng.gen(),
116-
host,
119+
client_ip,
120+
client_port,
117121
protocol,
118122
variables: RwLockSync::new(None),
119123
properties: RwLockSync::new(SessionProperties::new(None, None)),
@@ -395,9 +399,9 @@ impl Session {
395399
usesysid: 0,
396400
usename: self.state.user(),
397401
application_name,
398-
client_addr: None,
402+
client_addr: self.state.client_ip.clone(),
399403
client_hostname: None,
400-
client_port: None,
404+
client_port: self.state.client_port.clone(),
401405
query,
402406
}
403407
}
@@ -406,7 +410,7 @@ impl Session {
406410
pub fn to_process_list(self: &Arc<Self>) -> SessionProcessList {
407411
SessionProcessList {
408412
id: self.state.connection_id,
409-
host: self.state.host.clone(),
413+
host: self.state.client_ip.clone(),
410414
user: self.state.user(),
411415
database: self.state.database(),
412416
}
@@ -430,8 +434,8 @@ pub struct SessionStatActivity {
430434
pub usesysid: u32,
431435
pub usename: Option<String>,
432436
pub application_name: Option<String>,
433-
pub client_addr: Option<String>,
437+
pub client_addr: String,
434438
pub client_hostname: Option<String>,
435-
pub client_port: Option<String>,
439+
pub client_port: u16,
436440
pub query: Option<String>,
437441
}

rust/cubesql/cubesql/src/sql/session_manager.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,21 @@ impl SessionManager {
3535
pub async fn create_session(
3636
self: &Arc<Self>,
3737
protocol: DatabaseProtocol,
38-
host: String,
38+
client_addr: String,
39+
client_port: u16,
3940
) -> Arc<Session> {
4041
let connection_id = self.last_id.fetch_add(1, Ordering::SeqCst);
4142

4243
let sess = Session {
4344
session_manager: self.clone(),
4445
server: self.server.clone(),
45-
state: Arc::new(SessionState::new(connection_id, host, protocol, None)),
46+
state: Arc::new(SessionState::new(
47+
connection_id,
48+
client_addr,
49+
client_port,
50+
protocol,
51+
None,
52+
)),
4653
};
4754

4855
let session_ref = Arc::new(sess);

0 commit comments

Comments
 (0)