Skip to content

Commit fbd8f78

Browse files
authored
feat: streaming load support session. (#18510)
* HttpSessionStateInternal allow missing fields. * refactor: extract fn for initializing session in HTTP handler. * refactor: extract fn for encode/decode header value with json and base64 * feat: streaming load support session. * feat: streaming load support session.
1 parent e73bbe8 commit fbd8f78

File tree

10 files changed

+340
-175
lines changed

10 files changed

+340
-175
lines changed

src/common/base/src/headers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ pub const HEADER_WAREHOUSE: &str = "X-DATABEND-WAREHOUSE";
3434
pub const HEADER_SIGNATURE: &str = "X-DATABEND-SIGNATURE";
3535
pub const HEADER_AUTH_METHOD: &str = "X-DATABEND-AUTH-METHOD";
3636
pub const HEADER_CLIENT_CAPABILITIES: &str = "X-DATABEND-CLIENT-CAPS";
37+
pub const HEADER_QUERY_CONTEXT: &str = "X-DATABEND-QUERY-CONTEXT";
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use base64::engine::general_purpose::URL_SAFE;
16+
use base64::Engine;
17+
use serde::de;
18+
use serde::Serialize;
19+
20+
pub fn encode_json_header<T>(v: &T) -> String
21+
where T: ?Sized + Serialize {
22+
let s = serde_json::to_string(&v).unwrap();
23+
URL_SAFE.encode(&s)
24+
}
25+
26+
// use base64 encode whenever possible for safety
27+
// but also accept raw JSON for test/debug/one-shot operations
28+
pub fn decode_json_header<T>(key: &str, value: &str) -> Result<T, String>
29+
where T: de::DeserializeOwned {
30+
if value.starts_with("{") {
31+
serde_json::from_slice(value.as_bytes())
32+
.map_err(|e| format!("Invalid value {value} for {key} JSON decode error: {e}",))?
33+
} else {
34+
let json = URL_SAFE.decode(value).map_err(|e| {
35+
format!(
36+
"Invalid value {} for {key}, base64 decode error: {}",
37+
value, e
38+
)
39+
})?;
40+
serde_json::from_slice(&json).map_err(|e| {
41+
format!(
42+
"Invalid value {value} for {key}, JSON value {}, decode error: {e}",
43+
String::from_utf8_lossy(&json)
44+
)
45+
})
46+
}
47+
}

src/query/service/src/servers/http/middleware/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
mod client_capabilities;
16+
pub mod json_header;
1617
mod metrics;
1718
mod panic_handler;
1819
mod session;

src/query/service/src/servers/http/middleware/session_header.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use std::time::Duration;
1616
use std::time::SystemTime;
1717
use std::time::UNIX_EPOCH;
1818

19-
use base64::engine::general_purpose::URL_SAFE;
20-
use base64::Engine as _;
2119
use databend_common_base::headers::HEADER_SESSION;
2220
use databend_common_base::headers::HEADER_SESSION_ID;
2321
use databend_common_meta_app::tenant::Tenant;
@@ -34,6 +32,8 @@ use serde::Deserializer;
3432
use serde::Serializer;
3533
use uuid::Uuid;
3634

35+
use crate::servers::http::middleware::json_header::decode_json_header;
36+
use crate::servers::http::middleware::json_header::encode_json_header;
3737
use crate::servers::http::middleware::ClientCapabilities;
3838
use crate::servers::http::v1::unix_ts;
3939
use crate::servers::http::v1::ClientSessionManager;
@@ -72,8 +72,7 @@ pub struct ClientSessionHeader {
7272

7373
impl ClientSessionHeader {
7474
fn encode(&self) -> String {
75-
let s = serde_json::to_string(&self).unwrap();
76-
URL_SAFE.encode(&s)
75+
encode_json_header(self)
7776
}
7877
}
7978

@@ -125,18 +124,7 @@ impl ClientSession {
125124
// note that curl -H "X-xx:" not work
126125
Self::new_session(false)
127126
} else {
128-
let json = URL_SAFE.decode(&v).map_err(|e| {
129-
format!(
130-
"Invalid value {} for X-DATABEND-SESSION, base64 decode error: {}",
131-
v, e
132-
)
133-
})?;
134-
let header = serde_json::from_slice(&json).map_err(|e| {
135-
format!(
136-
"Invalid value {} for X-DATABEND-SESSION, JSON decode error: {}",
137-
v, e
138-
)
139-
})?;
127+
let header = decode_json_header(HEADER_SESSION, v.as_str())?;
140128
Self::old_session(false, header)
141129
};
142130
Ok(Some(s))

0 commit comments

Comments
 (0)