Skip to content

Commit f0f7a98

Browse files
authored
refactor: move some fields of session to internal. (#18442)
1 parent a2c6d0a commit f0f7a98

File tree

5 files changed

+58
-111
lines changed

5 files changed

+58
-111
lines changed

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -208,20 +208,36 @@ pub struct ServerInfo {
208208
pub start_time: String,
209209
}
210210

211+
fn is_default<T: Default + PartialEq>(t: &T) -> bool {
212+
*t == T::default()
213+
}
214+
211215
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
212216
pub struct HttpSessionStateInternal {
213217
/// value is JSON of Scalar
218+
#[serde(default)]
219+
#[serde(skip_serializing_if = "is_default")]
214220
variables: Vec<(String, String)>,
215221
pub last_query_result_cache_key: String,
216222
#[serde(default)]
223+
#[serde(skip_serializing_if = "is_default")]
217224
pub has_temp_table: bool,
225+
#[serde(default)]
226+
#[serde(skip_serializing_if = "Option::is_none")]
227+
pub last_node_id: Option<String>,
228+
/// last_query_ids[0] is the last query id, last_query_ids[1] is the second last query id, etc.
229+
#[serde(default)]
230+
#[serde(skip_serializing_if = "is_default")]
231+
pub last_query_ids: Vec<String>,
218232
}
219233

220234
impl HttpSessionStateInternal {
221235
fn new(
222236
variables: &HashMap<String, Scalar>,
223237
last_query_result_cache_key: String,
224238
has_temp_table: bool,
239+
last_node_id: Option<String>,
240+
last_query_ids: Vec<String>,
225241
) -> Self {
226242
let variables = variables
227243
.iter()
@@ -236,6 +252,8 @@ impl HttpSessionStateInternal {
236252
variables,
237253
last_query_result_cache_key,
238254
has_temp_table,
255+
last_node_id,
256+
last_query_ids,
239257
}
240258
}
241259

@@ -306,17 +324,6 @@ pub struct HttpSessionConf {
306324
pub need_sticky: bool,
307325
#[serde(default)]
308326
pub need_keep_alive: bool,
309-
310-
// TODO: move to internal later
311-
// used to check if the session is still on the same server
312-
#[serde(skip_serializing_if = "Option::is_none")]
313-
pub last_server_info: Option<ServerInfo>,
314-
315-
// TODO: move to internal later
316-
/// last_query_ids[0] is the last query id, last_query_ids[1] is the second last query id, etc.
317-
#[serde(default)]
318-
pub last_query_ids: Vec<String>,
319-
320327
/// hide state not useful to clients
321328
/// so client only need to know there is a String field `internal`,
322329
/// which need to carry with session/conn
@@ -409,13 +416,14 @@ pub struct HttpQuery {
409416
fn try_set_txn(
410417
query_id: &str,
411418
session: &Arc<Session>,
412-
session_conf: &HttpSessionConf,
419+
txn_state: &Option<TxnState>,
420+
internal_state: &HttpSessionStateInternal,
413421
http_query_manager: &Arc<HttpQueryManager>,
414422
) -> Result<()> {
415-
match &session_conf.txn_state {
423+
match txn_state {
416424
Some(TxnState::Active) => {
417-
http_query_manager.check_sticky_for_txn(&session_conf.last_server_info)?;
418-
let last_query_id = session_conf.last_query_ids.first().ok_or_else(|| {
425+
http_query_manager.check_sticky_for_txn(&internal_state.last_node_id)?;
426+
let last_query_id = internal_state.last_query_ids.first().ok_or_else(|| {
419427
ErrorCode::InvalidSessionState(
420428
"[HTTP-QUERY] Invalid transaction state: transaction is active but last_query_ids is empty".to_string(),
421429
)
@@ -500,7 +508,7 @@ impl HttpQuery {
500508
if !state.variables.is_empty() {
501509
session.set_all_variables(state.get_variables()?)
502510
}
503-
if let Some(id) = session_conf.last_query_ids.first() {
511+
if let Some(id) = state.last_query_ids.first() {
504512
if let Some(last_query) = http_query_manager.queries.get(id) {
505513
let state = *last_query.state.lock();
506514
if !matches!(
@@ -521,7 +529,7 @@ impl HttpQuery {
521529
}
522530
let has_temp_table = !session.temp_tbl_mgr().lock().is_empty();
523531
if state.has_temp_table {
524-
if let Some(ServerInfo { id, .. }) = &session_conf.last_server_info {
532+
if let Some(id) = &state.last_node_id {
525533
if http_query_manager.server_info.id != *id {
526534
if http_ctx.fixed_coordinator_node {
527535
return Err(ErrorCode::SessionLost(
@@ -563,11 +571,11 @@ impl HttpQuery {
563571
warn!("[TEMP-TABLE] Found unexpected Temp table.");
564572
}
565573
}
566-
567574
try_set_txn(
568575
&http_ctx.query_id,
569576
&session,
570-
session_conf,
577+
&session_conf.txn_state,
578+
&session_conf.internal.clone().unwrap_or_default(),
571579
&http_query_manager,
572580
)?;
573581
};
@@ -731,12 +739,13 @@ impl HttpQuery {
731739
&session_state.variables,
732740
session_state.last_query_result_cache_key,
733741
has_temp_table,
742+
Some(HttpQueryManager::instance().server_info.id.clone()),
743+
if session_state.last_query_ids.is_empty() {
744+
vec![self.id.clone()]
745+
} else {
746+
session_state.last_query_ids
747+
},
734748
);
735-
let internal = if internal == HttpSessionStateInternal::default() {
736-
None
737-
} else {
738-
Some(internal)
739-
};
740749

741750
if is_stopped
742751
&& txn_state != TxnState::AutoCommit
@@ -767,13 +776,8 @@ impl HttpQuery {
767776
txn_state: Some(txn_state),
768777
need_sticky,
769778
need_keep_alive,
770-
last_server_info: Some(HttpQueryManager::instance().server_info.clone()),
771-
last_query_ids: if session_state.last_query_ids.is_empty() {
772-
vec![self.id.clone()]
773-
} else {
774-
session_state.last_query_ids
775-
},
776-
internal,
779+
780+
internal: Some(internal),
777781
})
778782
}
779783

src/query/service/src/servers/http/v1/query/http_query_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ impl HttpQueryManager {
221221
}
222222
}
223223

224-
pub(crate) fn check_sticky_for_txn(&self, last_server_info: &Option<ServerInfo>) -> Result<()> {
225-
if let Some(ServerInfo { id, .. }) = last_server_info {
224+
pub(crate) fn check_sticky_for_txn(&self, last_node_id: &Option<String>) -> Result<()> {
225+
if let Some(id) = last_node_id {
226226
if self.server_info.id != *id {
227227
return Err(ErrorCode::SessionLost(format!(
228228
"[HTTP-QUERY] Transaction aborted because server restart or route error: expecting server {}, current one is {} started at {} ",

src/query/service/tests/it/servers/http/http_query_handlers.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,8 +1470,6 @@ async fn test_affect() -> Result<()> {
14701470
txn_state: Some(TxnState::AutoCommit),
14711471
need_sticky: false,
14721472
need_keep_alive: false,
1473-
last_server_info: None,
1474-
last_query_ids: vec![],
14751473
internal: None,
14761474
}),
14771475
),
@@ -1495,8 +1493,6 @@ async fn test_affect() -> Result<()> {
14951493
txn_state: Some(TxnState::AutoCommit),
14961494
need_sticky: false,
14971495
need_keep_alive: false,
1498-
last_server_info: None,
1499-
last_query_ids: vec![],
15001496
internal: None,
15011497
}),
15021498
),
@@ -1515,8 +1511,6 @@ async fn test_affect() -> Result<()> {
15151511
txn_state: Some(TxnState::AutoCommit),
15161512
need_sticky: false,
15171513
need_keep_alive: false,
1518-
last_server_info: None,
1519-
last_query_ids: vec![],
15201514
internal: None,
15211515
}),
15221516
),
@@ -1537,8 +1531,6 @@ async fn test_affect() -> Result<()> {
15371531
txn_state: Some(TxnState::AutoCommit),
15381532
need_sticky: false,
15391533
need_keep_alive: false,
1540-
last_server_info: None,
1541-
last_query_ids: vec![],
15421534
internal: None,
15431535
}),
15441536
),
@@ -1561,27 +1553,24 @@ async fn test_affect() -> Result<()> {
15611553
txn_state: Some(TxnState::AutoCommit),
15621554
need_sticky: false,
15631555
need_keep_alive: false,
1564-
last_server_info: None,
1565-
last_query_ids: vec![],
15661556
internal: None,
15671557
}),
15681558
),
15691559
];
15701560

15711561
for (json, affect, session_conf) in sqls {
1572-
let result = TestHttpQueryRequest::new(json.clone())
1562+
let mut result = TestHttpQueryRequest::new(json.clone())
15731563
.fetch_total()
15741564
.await?
15751565
.last();
1566+
if let Some(s) = result.1.session.as_mut() {
1567+
s.internal = None;
1568+
}
15761569
assert_eq!(result.0, StatusCode::OK, "{} {:?}", json, result.1.error);
15771570
assert!(result.1.error.is_none(), "{} {:?}", json, result.1.error);
15781571
assert_eq!(result.1.state, ExecuteStateKind::Succeeded, "{}", json);
15791572
assert_eq!(result.1.affect, affect, "{}", json);
1580-
let session = result.1.session.map(|s| HttpSessionConf {
1581-
last_server_info: None,
1582-
last_query_ids: vec![],
1583-
..s
1584-
});
1573+
let session = result.1.session.map(|s| HttpSessionConf { ..s });
15851574

15861575
assert_eq!(session, session_conf, "{}", json);
15871576
}
@@ -1667,8 +1656,8 @@ async fn test_txn_error() -> Result<()> {
16671656

16681657
{
16691658
let mut session = session.clone();
1670-
if let Some(info) = &mut session.last_server_info {
1671-
info.id = "abc".to_string();
1659+
if let Some(info) = &mut session.internal.as_mut() {
1660+
info.last_node_id = Some("abc".to_string());
16721661
}
16731662
let json = serde_json::json!({
16741663
"sql": "select 1",
@@ -1682,8 +1671,8 @@ async fn test_txn_error() -> Result<()> {
16821671

16831672
{
16841673
let mut session = session.clone();
1685-
if let Some(s) = &mut session.last_server_info {
1686-
s.id = "abc".to_string()
1674+
if let Some(s) = &mut session.internal.as_mut() {
1675+
s.last_node_id = Some("abc".to_string())
16871676
}
16881677
let json = serde_json::json!({
16891678
"sql": "select 1",
@@ -1715,7 +1704,14 @@ async fn test_txn_timeout() -> Result<()> {
17151704
sleep(Duration::from_secs(3)).await;
17161705

17171706
let session = session.clone();
1718-
let last_query_id = session.last_query_ids.first().unwrap().to_string();
1707+
let last_query_id = session
1708+
.internal
1709+
.as_ref()
1710+
.unwrap()
1711+
.last_query_ids
1712+
.first()
1713+
.unwrap()
1714+
.to_string();
17191715
let json = serde_json::json!({
17201716
"sql": "select 1",
17211717
"session": session,

tests/sqllogictests/src/util.rs

Lines changed: 3 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use bollard::Docker;
2525
use clap::Parser;
2626
use redis::Commands;
2727
use serde::Deserialize;
28-
use serde::Deserializer;
2928
use serde::Serialize;
30-
use serde::Serializer;
3129
use serde_json::Value;
3230
use testcontainers::core::client::docker_client_instance;
3331
use testcontainers::core::logs::consumer::logging_consumer::LoggingConsumer;
@@ -51,72 +49,21 @@ const CONTAINER_RETRY_TIMES: usize = 3;
5149
const CONTAINER_STARTUP_TIMEOUT_SECONDS: u64 = 60;
5250
const CONTAINER_TIMEOUT_SECONDS: u64 = 300;
5351

54-
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
52+
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
5553
pub struct ServerInfo {
5654
pub id: String,
5755
pub start_time: String,
5856
}
5957

6058
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
6159
pub struct HttpSessionConf {
60+
pub catalog: Option<String>,
6261
pub database: Option<String>,
6362
pub role: Option<String>,
64-
pub catalog: Option<String>,
65-
6663
pub secondary_roles: Option<Vec<String>>,
6764
pub settings: Option<BTreeMap<String, String>>,
6865
pub txn_state: Option<String>,
69-
pub last_server_info: Option<ServerInfo>,
70-
#[serde(default)]
71-
pub last_query_ids: Vec<String>,
72-
/// hide state not useful to clients
73-
/// so client only need to know there is a String field `internal`,
74-
/// which need to carry with session/conn
75-
#[serde(default)]
76-
#[serde(skip_serializing_if = "Option::is_none")]
77-
#[serde(
78-
serialize_with = "serialize_as_json_string",
79-
deserialize_with = "deserialize_from_json_string"
80-
)]
81-
pub internal: Option<HttpSessionStateInternal>,
82-
}
83-
84-
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
85-
pub struct HttpSessionStateInternal {
86-
/// value is JSON of Scalar
87-
variables: Vec<(String, String)>,
88-
pub last_query_result_cache_key: String,
89-
}
90-
91-
fn serialize_as_json_string<S>(
92-
value: &Option<HttpSessionStateInternal>,
93-
serializer: S,
94-
) -> std::result::Result<S::Ok, S::Error>
95-
where
96-
S: Serializer,
97-
{
98-
match value {
99-
Some(complex_value) => {
100-
let json_string =
101-
serde_json::to_string(complex_value).map_err(serde::ser::Error::custom)?;
102-
serializer.serialize_some(&json_string)
103-
}
104-
None => serializer.serialize_none(),
105-
}
106-
}
107-
108-
fn deserialize_from_json_string<'de, D>(
109-
deserializer: D,
110-
) -> std::result::Result<Option<HttpSessionStateInternal>, D::Error>
111-
where D: Deserializer<'de> {
112-
let json_string: Option<String> = Option::deserialize(deserializer)?;
113-
match json_string {
114-
Some(s) => {
115-
let complex_value = serde_json::from_str(&s).map_err(serde::de::Error::custom)?;
116-
Ok(Some(complex_value))
117-
}
118-
None => Ok(None),
119-
}
66+
pub internal: String,
12067
}
12168

12269
pub fn parser_rows(rows: &Value) -> Result<Vec<Vec<String>>> {

tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
44
. "$CURDIR"/../../../shell_env.sh
55

66
QID="my_query_for_route_${RANDOM}"
7-
NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.last_server_info.id")
7+
NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.internal" | jq -r ".last_node_id")
88
echo "# error"
99
echo "## page"
1010
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'

0 commit comments

Comments
 (0)