Skip to content

Commit 473202d

Browse files
authored
feat: add header X-DATABEND-CLIENT-CAPS. (#18475)
* fix: missing 'values' when displaying insert stmt. * feat: add header X-DATABEND-CLIENT-CAPS. * feat: add header X-DATABEND-CLIENT-CAPS.
1 parent e570d28 commit 473202d

File tree

13 files changed

+110
-40
lines changed

13 files changed

+110
-40
lines changed

src/common/base/src/headers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ pub const HEADER_WAREHOUSE: &str = "X-DATABEND-WAREHOUSE";
3333

3434
pub const HEADER_SIGNATURE: &str = "X-DATABEND-SIGNATURE";
3535
pub const HEADER_AUTH_METHOD: &str = "X-DATABEND-AUTH-METHOD";
36+
pub const HEADER_CLIENT_CAPABILITIES: &str = "X-DATABEND-CLIENT-CAPS";

src/query/ast/src/ast/statements/insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl Display for InsertSource {
115115
location,
116116
} => {
117117
if let Some(value) = value {
118-
write!(f, "(")?;
118+
write!(f, "VALUES (")?;
119119
write_comma_separated_list(f, value)?;
120120
write!(f, ")")?;
121121
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashSet;
16+
17+
/// client should carry header X-DATABEND-CLIENT-CAPS in each request.
18+
/// the value should be capability name separated by ';'
19+
#[derive(Debug, Default, Clone, Copy)]
20+
pub struct ClientCapabilities {
21+
// client use one of session_header/session_cookie to carry session id and related info.
22+
// client connection write back X-DATABEND-SESSION as it is for each request.
23+
pub session_header: bool,
24+
// client connection use a global Cookie store
25+
pub session_cookie: bool,
26+
}
27+
28+
impl ClientCapabilities {
29+
pub fn parse(header_value: &str) -> Self {
30+
let cap_set: HashSet<String> = header_value
31+
.split(';')
32+
.map(|cap| cap.trim())
33+
.filter(|cap| !cap.is_empty())
34+
.map(|cap| cap.to_lowercase())
35+
.collect();
36+
ClientCapabilities {
37+
session_header: cap_set.contains("session_header"),
38+
session_cookie: cap_set.contains("session_cookie"),
39+
}
40+
}
41+
}

src/query/service/src/servers/http/middleware/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod client_capabilities;
1516
mod metrics;
1617
mod panic_handler;
1718
mod session;
1819
pub mod session_header;
1920

21+
pub(crate) use client_capabilities::ClientCapabilities;
2022
pub(crate) use metrics::MetricsMiddleware;
2123
pub(crate) use panic_handler::PanicHandler;
2224
pub(crate) use session::forward_request_with_body;

src/query/service/src/servers/http/middleware/session.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::collections::HashMap;
1616
use std::sync::Arc;
1717

1818
use databend_common_base::base::GlobalInstance;
19+
use databend_common_base::headers::HEADER_CLIENT_CAPABILITIES;
1920
use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL;
2021
use databend_common_base::headers::HEADER_NODE_ID;
2122
use databend_common_base::headers::HEADER_QUERY_ID;
@@ -68,6 +69,7 @@ use crate::servers::http::error::HttpErrorCode;
6869
use crate::servers::http::error::JsonErrorOnly;
6970
use crate::servers::http::error::QueryError;
7071
use crate::servers::http::middleware::session_header::ClientSession;
72+
use crate::servers::http::middleware::ClientCapabilities;
7173
use crate::servers::http::v1::HttpQueryContext;
7274
use crate::servers::http::v1::SessionClaim;
7375
use crate::servers::login_history::LoginEventType;
@@ -364,6 +366,13 @@ impl<E> HTTPSessionEndpoint<E> {
364366
.get(USER_AGENT)
365367
.map(|id| id.to_str().unwrap().to_string());
366368

369+
let mut client_caps = req
370+
.headers()
371+
.get(HEADER_CLIENT_CAPABILITIES)
372+
.map(|caps| caps.to_str().unwrap().to_string())
373+
.map(|caps| ClientCapabilities::parse(&caps))
374+
.unwrap_or_default();
375+
367376
let is_worksheet = user_agent
368377
.as_ref()
369378
.map(|ua_str| {
@@ -406,7 +415,7 @@ impl<E> HTTPSessionEndpoint<E> {
406415
.await?;
407416
login_history.user_name = user_name.clone();
408417

409-
let mut client_session = ClientSession::try_decode(req)?;
418+
let mut client_session = ClientSession::try_decode(req, &mut client_caps)?;
410419
if client_session.is_none() && !matches!(self.endpoint_kind, EndpointKind::PollQuery) {
411420
info!(
412421
"[HTTP-SESSION] got request without session, url={}, headers={:?}",
@@ -480,6 +489,7 @@ impl<E> HTTPSessionEndpoint<E> {
480489
is_sticky_node,
481490
client_session,
482491
fixed_coordinator_node: is_worksheet,
492+
client_caps,
483493
})
484494
}
485495
}

src/query/service/src/servers/http/middleware/session_header.rs

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use base64::engine::general_purpose::URL_SAFE;
2020
use base64::Engine as _;
2121
use databend_common_base::headers::HEADER_SESSION;
2222
use databend_common_base::headers::HEADER_SESSION_ID;
23-
use databend_common_exception::ErrorCode;
2423
use databend_common_meta_app::tenant::Tenant;
2524
use http::HeaderMap;
2625
use jwt_simple::prelude::Deserialize;
@@ -35,6 +34,7 @@ use serde::Deserializer;
3534
use serde::Serializer;
3635
use uuid::Uuid;
3736

37+
use crate::servers::http::middleware::ClientCapabilities;
3838
use crate::servers::http::v1::unix_ts;
3939
use crate::servers::http::v1::ClientSessionManager;
4040

@@ -78,10 +78,13 @@ impl ClientSessionHeader {
7878
}
7979

8080
impl ClientSession {
81-
pub fn try_decode(req: &Request) -> Result<Option<ClientSession>, String> {
82-
if let Some(s) = Self::from_custom_header(req.headers())? {
81+
pub fn try_decode(
82+
req: &Request,
83+
caps: &mut ClientCapabilities,
84+
) -> Result<Option<ClientSession>, String> {
85+
if let Some(s) = Self::from_custom_header(req.headers(), caps)? {
8386
Ok(Some(s))
84-
} else if let Some(s) = Self::from_cookie(req.cookie())? {
87+
} else if let Some(s) = Self::from_cookie(req.cookie(), caps)? {
8588
Ok(Some(s))
8689
} else {
8790
Ok(None)
@@ -111,11 +114,15 @@ impl ClientSession {
111114
}
112115
}
113116

114-
fn from_custom_header(headers: &HeaderMap) -> Result<Option<ClientSession>, String> {
117+
fn from_custom_header(
118+
headers: &HeaderMap,
119+
caps: &mut ClientCapabilities,
120+
) -> Result<Option<ClientSession>, String> {
115121
if let Some(v) = headers.get(HEADER_SESSION) {
122+
caps.session_header = true;
116123
let v = v.to_str().unwrap().to_string().trim().to_owned();
117-
// curl -H "X-xx:" not work
118-
let s = if v.is_empty() || v.to_lowercase() == "new" {
124+
let s = if v.is_empty() {
125+
// note that curl -H "X-xx:" not work
119126
Self::new_session(false)
120127
} else {
121128
let json = URL_SAFE.decode(&v).map_err(|e| {
@@ -133,13 +140,18 @@ impl ClientSession {
133140
Self::old_session(false, header)
134141
};
135142
Ok(Some(s))
143+
} else if caps.session_header {
144+
Ok(Some(Self::new_session(false)))
136145
} else {
137146
Ok(None)
138147
}
139148
}
140149

141-
fn from_cookie(cookie: &CookieJar) -> Result<Option<ClientSession>, String> {
142-
let cookie_enabled = cookie.get(COOKIE_COOKIE_ENABLED).is_some();
150+
fn from_cookie(
151+
cookie: &CookieJar,
152+
caps: &mut ClientCapabilities,
153+
) -> Result<Option<ClientSession>, String> {
154+
let cookie_enabled = cookie.get(COOKIE_COOKIE_ENABLED).is_some() || caps.session_cookie;
143155
if cookie_enabled {
144156
let s = if let Some(sid) = cookie.get(COOKIE_SESSION_ID) {
145157
let id = sid.value_str().to_string();
@@ -188,25 +200,17 @@ impl ClientSession {
188200
let client_session_mgr = ClientSessionManager::instance();
189201
match self.header.last_refresh_time.elapsed() {
190202
Ok(elapsed) => {
191-
if is_worksheet
192-
&& elapsed
193-
> client_session_mgr.max_idle_time + client_session_mgr.min_refresh_interval
194-
{
195-
return Err(ErrorCode::SessionTimeout(format!(
196-
"session expired after idle for more than {} seconds",
197-
client_session_mgr.max_idle_time.as_secs()
198-
)));
199-
}
200-
if elapsed > client_session_mgr.min_refresh_interval {
201-
info!(
202-
"[HTTP-SESSION] refreshing session {} after {} seconds",
203-
self.header.id,
204-
elapsed.as_secs(),
205-
);
203+
// worksheet
204+
if is_worksheet || elapsed > client_session_mgr.min_refresh_interval {
206205
if client_session_mgr.refresh_in_memory_states(&self.header.id, user_name) {
207206
client_session_mgr
208207
.refresh_session_handle(tenant, user_name.to_string(), &self.header.id)
209208
.await?;
209+
info!(
210+
"[HTTP-SESSION] refreshing session {} after {} seconds",
211+
self.header.id,
212+
elapsed.as_secs(),
213+
);
210214
}
211215
self.refreshed = true;
212216
if self.use_cookie {

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ impl HttpQuery {
562562
Some(s) => {
563563
return Err(ErrorCode::SessionTimeout(format!(
564564
"temporary tables in session {} expired after idle for more than {} seconds, when starting query {}",
565-
s.header.id, s.header.last_refresh_time.elapsed().unwrap_or_default().as_secs(), query_id,
565+
s.header.id, ClientSessionManager::instance().max_idle_time.as_secs(), query_id,
566566
)));
567567
}
568568
}

src/query/service/src/servers/http/v1/query/http_query_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use poem::RequestBody;
2525

2626
use crate::auth::Credential;
2727
use crate::servers::http::middleware::session_header::ClientSession;
28+
use crate::servers::http::middleware::ClientCapabilities;
2829
use crate::servers::http::v1::HttpQueryManager;
2930
use crate::sessions::Session;
3031
use crate::sessions::SessionManager;
@@ -49,6 +50,7 @@ pub struct HttpQueryContext {
4950
pub client_session: Option<ClientSession>,
5051
// for now only used for worksheet
5152
pub fixed_coordinator_node: bool,
53+
pub client_caps: ClientCapabilities,
5254
}
5355

5456
impl HttpQueryContext {

src/query/service/src/servers/http/v1/session/client_session_manager.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn hash_token(token: &[u8]) -> String {
5858
}
5959

6060
struct SessionState {
61-
pub last_access: Instant,
61+
pub last_refresh_time: Instant,
6262
pub temp_tbl_mgr: TempTblMgrRef,
6363
}
6464

@@ -134,7 +134,7 @@ impl ClientSessionManager {
134134
{
135135
let guard = self.session_state.lock();
136136
for (key, session_state) in &*guard {
137-
if (now - session_state.last_access)
137+
if (now - session_state.last_refresh_time)
138138
> self.max_idle_time + self.min_refresh_interval
139139
{
140140
expired.push((key.clone(), session_state.temp_tbl_mgr.clone()));
@@ -375,13 +375,16 @@ impl ClientSessionManager {
375375
pub fn refresh_in_memory_states(&self, client_session_id: &str, user_name: &str) -> bool {
376376
let key = Self::state_key(client_session_id, user_name);
377377
let mut guard = self.session_state.lock();
378-
match guard.entry(key) {
379-
Entry::Occupied(mut entry) => {
380-
entry.get_mut().last_access = Instant::now();
381-
true
378+
if let Entry::Occupied(mut entry) = guard.entry(key) {
379+
let now = Instant::now();
380+
if now.duration_since(entry.get().last_refresh_time)
381+
> ClientSessionManager::instance().min_refresh_interval
382+
{
383+
entry.get_mut().last_refresh_time = Instant::now();
384+
return true;
382385
}
383-
Entry::Vacant(_) => false,
384386
}
387+
false
385388
}
386389

387390
pub fn on_query_start(&self, client_session_id: &str, user_name: &str, session: &Arc<Session>) {
@@ -400,7 +403,7 @@ impl ClientSessionManager {
400403
pub fn add_temp_tbl_mgr(&self, prefix: String, temp_tbl_mgr: TempTblMgrRef) {
401404
let mut guard = self.session_state.lock();
402405
let state = SessionState {
403-
last_access: Instant::now(),
406+
last_refresh_time: Instant::now(),
404407
temp_tbl_mgr,
405408
};
406409
if guard.insert(prefix.clone(), state).is_none() {

tests/logging/check_logs_table.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ echo "Create VIEW Query ID: $create_view_query_id"
4646
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "insert into t values (1),(2),(3)"}')
4747
insert_query_id=$(echo $response | jq -r '.id')
4848
echo "Insert Query ID: $insert_query_id"
49-
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -H 'X-Databend-Session:new' -d '{"sql": "select * from t"}')
49+
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -H 'X-Databend-Client-Caps: session_cookie' -d '{"sql": "select * from t"}')
5050
select_query_id=$(echo $response | jq -r '.id')
5151
select_session_id=$(echo $response | jq -r '.session_id')
5252
echo "Select Query ID: $select_query_id"

0 commit comments

Comments
 (0)