Skip to content

Commit 1313f64

Browse files
authored
feat(query): support result_scan and last_query_id in http handler (#18043)
* fix(query): support result_scan and last_query_id in http handler Store 64 query id and result meta key in http conf * refactor: add result cache key into HttpSessionStateInternal and only store 1 query id * fix
1 parent cd45b2d commit 1313f64

File tree

6 files changed

+168
-25
lines changed

6 files changed

+168
-25
lines changed

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,23 @@ pub struct ExecutorSessionState {
158158
pub txn_manager: TxnManagerRef,
159159
pub temp_tbl_mgr: TempTblMgrRef,
160160
pub variables: HashMap<String, Scalar>,
161+
pub last_query_ids: Vec<String>,
162+
pub last_query_result_cache_key: String,
161163
}
162164

163165
impl ExecutorSessionState {
164166
pub fn new(session: Arc<Session>) -> Self {
167+
let mut last_query_ids = Vec::with_capacity(64);
168+
let mut last_query_result_cache_key = String::new();
169+
170+
let last_query_id = session.get_last_query_id(-1);
171+
if !last_query_id.is_empty() {
172+
if let Some(meta_key) = session.get_query_result_cache_key(&last_query_id) {
173+
last_query_ids.push(last_query_id);
174+
last_query_result_cache_key = meta_key;
175+
}
176+
}
177+
165178
Self {
166179
current_catalog: session.get_current_catalog(),
167180
current_database: session.get_current_database(),
@@ -171,6 +184,8 @@ impl ExecutorSessionState {
171184
txn_manager: session.txn_mgr(),
172185
temp_tbl_mgr: session.temp_tbl_mgr(),
173186
variables: session.get_all_variables(),
187+
last_query_ids,
188+
last_query_result_cache_key,
174189
}
175190
}
176191
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,11 @@ pub struct ServerInfo {
210210
pub struct HttpSessionStateInternal {
211211
/// value is JSON of Scalar
212212
variables: Vec<(String, String)>,
213+
pub last_query_result_cache_key: String,
213214
}
214215

215216
impl HttpSessionStateInternal {
216-
fn new(variables: &HashMap<String, Scalar>) -> Self {
217+
fn new(variables: &HashMap<String, Scalar>, last_query_result_cache_key: String) -> Self {
217218
let variables = variables
218219
.iter()
219220
.map(|(k, v)| {
@@ -223,7 +224,10 @@ impl HttpSessionStateInternal {
223224
)
224225
})
225226
.collect();
226-
Self { variables }
227+
Self {
228+
variables,
229+
last_query_result_cache_key,
230+
}
227231
}
228232

229233
pub fn get_variables(&self) -> Result<HashMap<String, Scalar>> {
@@ -442,6 +446,7 @@ impl HttpQuery {
442446
// - the current database
443447
// - the current role
444448
// - the session-level settings, like max_threads, http_handler_result_timeout_secs, etc.
449+
// - the session-level query cache.
445450
if let Some(session_conf) = &req.session {
446451
if let Some(catalog) = &session_conf.catalog {
447452
session.set_current_catalog(catalog.clone());
@@ -477,8 +482,18 @@ impl HttpQuery {
477482
if !state.variables.is_empty() {
478483
session.set_all_variables(state.get_variables()?)
479484
}
485+
if let Some(id) = session_conf.last_query_ids.first() {
486+
if !id.is_empty() && !state.last_query_result_cache_key.is_empty() {
487+
session.update_query_ids_results(
488+
id.to_owned(),
489+
state.last_query_result_cache_key.to_owned(),
490+
);
491+
}
492+
}
480493
}
494+
481495
try_set_txn(&ctx.query_id, &session, session_conf, &http_query_manager)?;
496+
482497
if session_conf.need_sticky
483498
&& matches!(session_conf.txn_state, None | Some(TxnState::AutoCommit))
484499
{
@@ -622,6 +637,7 @@ impl HttpQuery {
622637
// - role: updated by SET ROLE;
623638
// - secondary_roles: updated by SET SECONDARY ROLES ALL|NONE;
624639
// - settings: updated by SET XXX = YYY;
640+
// - query cache: last_query_id and result_scan
625641

626642
let (session_state, is_stopped) = {
627643
let executor = self.state.lock();
@@ -644,8 +660,13 @@ impl HttpQuery {
644660
let role = session_state.current_role.clone();
645661
let secondary_roles = session_state.secondary_roles.clone();
646662
let txn_state = session_state.txn_manager.lock().state();
647-
let internal = if !session_state.variables.is_empty() {
648-
Some(HttpSessionStateInternal::new(&session_state.variables))
663+
let internal = if !session_state.variables.is_empty()
664+
|| !session_state.last_query_result_cache_key.is_empty()
665+
{
666+
Some(HttpSessionStateInternal::new(
667+
&session_state.variables,
668+
session_state.last_query_result_cache_key,
669+
))
649670
} else {
650671
None
651672
};
@@ -722,7 +743,11 @@ impl HttpQuery {
722743
need_sticky,
723744
need_keep_alive,
724745
last_server_info: Some(HttpQueryManager::instance().server_info.clone()),
725-
last_query_ids: vec![self.id.clone()],
746+
last_query_ids: if session_state.last_query_ids.is_empty() {
747+
vec![self.id.clone()]
748+
} else {
749+
session_state.last_query_ids
750+
},
726751
internal,
727752
})
728753
}

src/query/service/src/sessions/session.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,10 @@ impl Session {
375375
.update_query_ids_results(query_id, Some(result_cache_key))
376376
}
377377

378+
pub fn get_last_query_id(&self, index: i32) -> String {
379+
self.session_ctx.get_last_query_id(index)
380+
}
381+
378382
pub fn txn_mgr(&self) -> TxnManagerRef {
379383
self.session_ctx.txn_mgr()
380384
}

src/tests/sqlsmith/src/http_client.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use reqwest::header::HeaderValue;
2727
use reqwest::Client;
2828
use reqwest::ClientBuilder;
2929
use serde::Deserialize;
30+
use serde::Deserializer;
3031
use serde::Serialize;
32+
use serde::Serializer;
3133
use url::Url;
3234

3335
struct GlobalCookieStore {
@@ -87,7 +89,54 @@ pub(crate) struct HttpSessionConf {
8789
pub(crate) last_server_info: Option<ServerInfo>,
8890
#[serde(default)]
8991
pub(crate) last_query_ids: Vec<String>,
90-
pub(crate) internal: Option<String>,
92+
/// hide state not useful to clients
93+
/// so client only need to know there is a String field `internal`,
94+
/// which need to carry with session/conn
95+
#[serde(default)]
96+
#[serde(skip_serializing_if = "Option::is_none")]
97+
#[serde(
98+
serialize_with = "serialize_as_json_string",
99+
deserialize_with = "deserialize_from_json_string"
100+
)]
101+
pub(crate) internal: Option<HttpSessionStateInternal>,
102+
}
103+
104+
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
105+
pub struct HttpSessionStateInternal {
106+
/// value is JSON of Scalar
107+
variables: Vec<(String, String)>,
108+
pub last_query_result_cache_key: String,
109+
}
110+
111+
fn serialize_as_json_string<S>(
112+
value: &Option<HttpSessionStateInternal>,
113+
serializer: S,
114+
) -> std::result::Result<S::Ok, S::Error>
115+
where
116+
S: Serializer,
117+
{
118+
match value {
119+
Some(complex_value) => {
120+
let json_string =
121+
serde_json::to_string(complex_value).map_err(serde::ser::Error::custom)?;
122+
serializer.serialize_some(&json_string)
123+
}
124+
None => serializer.serialize_none(),
125+
}
126+
}
127+
128+
fn deserialize_from_json_string<'de, D>(
129+
deserializer: D,
130+
) -> std::result::Result<Option<HttpSessionStateInternal>, D::Error>
131+
where D: Deserializer<'de> {
132+
let json_string: Option<String> = Option::deserialize(deserializer)?;
133+
match json_string {
134+
Some(s) => {
135+
let complex_value = serde_json::from_str(&s).map_err(serde::de::Error::custom)?;
136+
Ok(Some(complex_value))
137+
}
138+
None => Ok(None),
139+
}
91140
}
92141

93142
#[derive(serde::Deserialize, Debug)]

tests/sqllogictests/src/util.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use bollard::Docker;
2525
use clap::Parser;
2626
use redis::Commands;
2727
use serde::Deserialize;
28+
use serde::Deserializer;
2829
use serde::Serialize;
30+
use serde::Serializer;
2931
use serde_json::Value;
3032
use testcontainers::core::client::docker_client_instance;
3133
use testcontainers::core::logs::consumer::logging_consumer::LoggingConsumer;
@@ -67,7 +69,54 @@ pub struct HttpSessionConf {
6769
pub last_server_info: Option<ServerInfo>,
6870
#[serde(default)]
6971
pub last_query_ids: Vec<String>,
70-
pub internal: Option<String>,
72+
/// hide state not useful to clients
73+
/// so client only need to know there is a String field `internal`,
74+
/// which need to carry with session/conn
75+
#[serde(default)]
76+
#[serde(skip_serializing_if = "Option::is_none")]
77+
#[serde(
78+
serialize_with = "serialize_as_json_string",
79+
deserialize_with = "deserialize_from_json_string"
80+
)]
81+
pub internal: Option<HttpSessionStateInternal>,
82+
}
83+
84+
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
85+
pub struct HttpSessionStateInternal {
86+
/// value is JSON of Scalar
87+
variables: Vec<(String, String)>,
88+
pub last_query_result_cache_key: String,
89+
}
90+
91+
fn serialize_as_json_string<S>(
92+
value: &Option<HttpSessionStateInternal>,
93+
serializer: S,
94+
) -> std::result::Result<S::Ok, S::Error>
95+
where
96+
S: Serializer,
97+
{
98+
match value {
99+
Some(complex_value) => {
100+
let json_string =
101+
serde_json::to_string(complex_value).map_err(serde::ser::Error::custom)?;
102+
serializer.serialize_some(&json_string)
103+
}
104+
None => serializer.serialize_none(),
105+
}
106+
}
107+
108+
fn deserialize_from_json_string<'de, D>(
109+
deserializer: D,
110+
) -> std::result::Result<Option<HttpSessionStateInternal>, D::Error>
111+
where D: Deserializer<'de> {
112+
let json_string: Option<String> = Option::deserialize(deserializer)?;
113+
match json_string {
114+
Some(s) => {
115+
let complex_value = serde_json::from_str(&s).map_err(serde::de::Error::custom)?;
116+
Ok(Some(complex_value))
117+
}
118+
None => Ok(None),
119+
}
71120
}
72121

73122
pub fn parser_rows(rows: &Value) -> Result<Vec<Vec<String>>> {
Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,55 @@
11
# The HTTP interface does not support session state function (last_query_id).
22

3-
onlyif mysql
43
statement ok
54
DROP DATABASE IF EXISTS db20_14;
65

7-
onlyif mysql
86
statement ok
97
CREATE DATABASE db20_14;
108

11-
onlyif mysql
129
statement ok
1310
USE db20_14;
1411

15-
onlyif mysql
12+
1613
statement ok
1714
CREATE TABLE IF NOT EXISTS t1 (a INT);
1815

19-
onlyif mysql
16+
2017
statement ok
2118
INSERT INTO t1 VALUES (1), (2), (3);
2219

23-
onlyif mysql
20+
2421
query I
2522
SELECT * FROM t1 ORDER BY a;
2623
----
2724
1
2825
2
2926
3
3027

31-
onlyif mysql
28+
3229
statement ok
3330
SET enable_query_result_cache = 1;
3431

35-
onlyif mysql
32+
3633
statement ok
3734
SET query_result_cache_min_execute_secs = 0;
3835

36+
3937
onlyif mysql
4038
statement error `RESULT_SCAN` failed: No cache key found in current session for query ID '.*'\.
4139
SELECT * FROM RESULT_SCAN(last_query_id()) ORDER BY a;
4240

43-
onlyif mysql
41+
onlyif http
42+
statement error The `RESULT_SCAN` function requires a 'query_id' parameter. Please specify a valid query ID
43+
SELECT * FROM RESULT_SCAN(last_query_id()) ORDER BY a;
44+
4445
query I
4546
SELECT * FROM t1 ORDER BY a;
4647
----
4748
1
4849
2
4950
3
5051

51-
onlyif mysql
52+
5253
query I
5354
SELECT * FROM RESULT_SCAN(last_query_id()) ORDER BY a;
5455
----
@@ -57,19 +58,19 @@ SELECT * FROM RESULT_SCAN(last_query_id()) ORDER BY a;
5758
3
5859

5960
# multiple executions should return the same result
60-
onlyif mysql
61+
6162
query I
6263
SELECT * FROM RESULT_SCAN(last_query_id());
6364
----
6465
1
6566
2
6667
3
6768

68-
onlyif mysql
69+
6970
statement ok
7071
INSERT INTO t1 VALUES (4);
7172

72-
onlyif mysql
73+
7374
query I
7475
SELECT * FROM t1 ORDER BY a;
7576
----
@@ -78,7 +79,7 @@ SELECT * FROM t1 ORDER BY a;
7879
3
7980
4
8081

81-
onlyif mysql
82+
8283
query I
8384
SELECT * FROM RESULT_SCAN(last_query_id()) ORDER BY a;
8485
----
@@ -87,7 +88,7 @@ SELECT * FROM RESULT_SCAN(last_query_id()) ORDER BY a;
8788
3
8889
4
8990

90-
onlyif mysql
91+
9192
query I
9293
SELECT * FROM RESULT_SCAN(last_query_id(-1)) ORDER BY a;
9394
----
@@ -96,14 +97,14 @@ SELECT * FROM RESULT_SCAN(last_query_id(-1)) ORDER BY a;
9697
3
9798
4
9899

99-
onlyif mysql
100+
100101
statement ok
101102
SET enable_query_result_cache = 0;
102103

103-
onlyif mysql
104+
104105
statement ok
105106
DROP TABLE t1;
106107

107-
onlyif mysql
108+
108109
statement ok
109110
DROP DATABASE db20_14;

0 commit comments

Comments
 (0)