Skip to content

Commit 768c1b2

Browse files
authored
feat: Re-introduce Streaming Load API (#17921)
* feat: support streaming load API.
1 parent e2db2b4 commit 768c1b2

34 files changed

+887
-61
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/ast/src/ast/statements/insert.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use derive_visitor::DriveMut;
2121
use crate::ast::write_comma_separated_list;
2222
use crate::ast::write_dot_separated_list;
2323
use crate::ast::Expr;
24+
use crate::ast::FileFormatOptions;
2425
use crate::ast::Hint;
2526
use crate::ast::Identifier;
2627
use crate::ast::Query;
@@ -71,9 +72,20 @@ impl Display for InsertStmt {
7172

7273
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
7374
pub enum InsertSource {
74-
Values { rows: Vec<Vec<Expr>> },
75-
RawValues { rest_str: String, start: usize },
76-
Select { query: Box<Query> },
75+
Values {
76+
rows: Vec<Vec<Expr>>,
77+
},
78+
RawValues {
79+
rest_str: String,
80+
start: usize,
81+
},
82+
Select {
83+
query: Box<Query>,
84+
},
85+
StreamingLoad {
86+
format_options: FileFormatOptions,
87+
on_error_mode: Option<String>,
88+
},
7789
}
7890

7991
impl Display for InsertSource {
@@ -93,6 +105,17 @@ impl Display for InsertSource {
93105
}
94106
InsertSource::RawValues { rest_str, .. } => write!(f, "VALUES {rest_str}"),
95107
InsertSource::Select { query } => write!(f, "{query}"),
108+
InsertSource::StreamingLoad {
109+
format_options,
110+
on_error_mode,
111+
} => {
112+
write!(f, " FILE_FORMAT = ({})", format_options)?;
113+
write!(
114+
f,
115+
" ON_ERROR = '{}'",
116+
on_error_mode.as_ref().unwrap_or(&"Abort".to_string())
117+
)
118+
}
96119
}
97120
}
98121
}

src/query/ast/src/ast/statements/merge_into.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use derive_visitor::DriveMut;
2121
use crate::ast::write_comma_separated_list;
2222
use crate::ast::write_dot_separated_list;
2323
use crate::ast::Expr;
24-
use crate::ast::FileFormatOptions;
2524
use crate::ast::Hint;
2625
use crate::ast::Identifier;
2726
use crate::ast::Query;
@@ -168,12 +167,6 @@ impl Display for MergeIntoStmt {
168167

169168
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
170169
pub enum MutationSource {
171-
StreamingV2 {
172-
settings: FileFormatOptions,
173-
on_error_mode: Option<String>,
174-
start: usize,
175-
},
176-
177170
Select {
178171
query: Box<Query>,
179172
source_alias: TableAlias,
@@ -190,12 +183,6 @@ pub enum MutationSource {
190183
impl MutationSource {
191184
pub fn transform_table_reference(&self) -> TableReference {
192185
match self {
193-
Self::StreamingV2 {
194-
settings: _,
195-
on_error_mode: _,
196-
start: _,
197-
} => unimplemented!(),
198-
199186
Self::Select {
200187
query,
201188
source_alias,
@@ -232,19 +219,6 @@ impl MutationSource {
232219
impl Display for MutationSource {
233220
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
234221
match self {
235-
MutationSource::StreamingV2 {
236-
settings,
237-
on_error_mode,
238-
start: _,
239-
} => {
240-
write!(f, " FILE_FORMAT = ({})", settings)?;
241-
write!(
242-
f,
243-
" ON_ERROR = '{}'",
244-
on_error_mode.as_ref().unwrap_or(&"Abort".to_string())
245-
)
246-
}
247-
248222
MutationSource::Select {
249223
query,
250224
source_alias,

src/query/ast/src/parser/statement.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2988,6 +2988,15 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
29882988
//
29892989
// This is a hack to parse large insert statements.
29902990
pub fn raw_insert_source(i: Input) -> IResult<InsertSource> {
2991+
let streaming = map(
2992+
rule! {
2993+
#file_format_clause ~ (ON_ERROR ~ ^"=" ~ ^#ident)?
2994+
},
2995+
|(options, on_error_opt)| InsertSource::StreamingLoad {
2996+
format_options: options,
2997+
on_error_mode: on_error_opt.map(|v| v.2.to_string()),
2998+
},
2999+
);
29913000
let values = map(
29923001
rule! {
29933002
VALUES ~ #rest_str
@@ -3006,21 +3015,11 @@ pub fn raw_insert_source(i: Input) -> IResult<InsertSource> {
30063015
rule!(
30073016
#values
30083017
| #query
3018+
| #streaming
30093019
)(i)
30103020
}
30113021

30123022
pub fn mutation_source(i: Input) -> IResult<MutationSource> {
3013-
let streaming_v2 = map(
3014-
rule! {
3015-
#file_format_clause ~ (ON_ERROR ~ ^"=" ~ ^#ident)? ~ #rest_str
3016-
},
3017-
|(options, on_error_opt, (_, start))| MutationSource::StreamingV2 {
3018-
settings: options,
3019-
on_error_mode: on_error_opt.map(|v| v.2.to_string()),
3020-
start,
3021-
},
3022-
);
3023-
30243023
let query = map(rule! {#query ~ #table_alias}, |(query, source_alias)| {
30253024
MutationSource::Select {
30263025
query: Box::new(query),
@@ -3040,8 +3039,7 @@ pub fn mutation_source(i: Input) -> IResult<MutationSource> {
30403039
);
30413040

30423041
rule!(
3043-
#streaming_v2
3044-
| #query
3042+
#query
30453043
| #source_table
30463044
)(i)
30473045
}

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ impl PrivilegeAccess {
694694
self.check(ctx, plan).await?;
695695
}
696696
InsertInputSource::Values(_) => {}
697+
InsertInputSource::StreamingLoad { .. } => {}
697698
}
698699
Ok(())
699700
}

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use databend_common_sql::plans::InsertInputSource;
3333
use databend_common_sql::plans::InsertValue;
3434
use databend_common_sql::plans::Plan;
3535
use databend_common_sql::NameResolutionContext;
36+
use databend_common_storages_stage::build_streaming_load_pipeline;
3637
use log::info;
3738

3839
use crate::interpreters::common::check_deduplicate_label;
@@ -247,6 +248,25 @@ impl Interpreter for InsertInterpreter {
247248

248249
return Ok(build_res);
249250
}
251+
InsertInputSource::StreamingLoad {
252+
file_format,
253+
on_error_mode,
254+
schema,
255+
default_exprs,
256+
block_thresholds,
257+
receiver,
258+
} => {
259+
build_streaming_load_pipeline(
260+
self.ctx.clone(),
261+
&mut build_res.main_pipeline,
262+
file_format,
263+
receiver.clone(),
264+
schema.clone(),
265+
default_exprs.clone(),
266+
*block_thresholds,
267+
on_error_mode.clone(),
268+
)?;
269+
}
250270
};
251271

252272
PipelineBuilder::build_append2table_with_commit_pipeline(

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,9 @@ impl ReplaceInterpreter {
432432
}
433433
_ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"),
434434
},
435+
InsertInputSource::StreamingLoad { .. } => {
436+
unreachable!("replace with streaming not supported yet")
437+
}
435438
}
436439
}
437440

src/query/service/src/servers/http/middleware/session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub enum EndpointKind {
9696
SystemInfo,
9797
Catalog,
9898
Metadata,
99+
StreamingLoad,
99100
}
100101

101102
impl EndpointKind {
@@ -122,6 +123,7 @@ impl EndpointKind {
122123
| EndpointKind::Logout
123124
| EndpointKind::SystemInfo
124125
| EndpointKind::HeartBeat
126+
| EndpointKind::StreamingLoad
125127
| EndpointKind::UploadToStage
126128
| EndpointKind::Metadata
127129
| EndpointKind::Catalog => {

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ use crate::servers::http::v1::query::blocks_serializer::BlocksSerializer;
7878
use crate::servers::http::v1::query::Progresses;
7979
use crate::servers::http::v1::refresh_handler;
8080
use crate::servers::http::v1::roles::list_roles_handler;
81+
use crate::servers::http::v1::streaming_load_handler;
8182
use crate::servers::http::v1::upload_to_stage;
8283
use crate::servers::http::v1::users::create_user_handler;
8384
use crate::servers::http::v1::users::list_users_handler;
@@ -703,6 +704,11 @@ pub fn query_route() -> Route {
703704
get(list_users_handler).post(create_user_handler),
704705
EndpointKind::Metadata,
705706
),
707+
(
708+
"/streaming_load",
709+
put(streaming_load_handler),
710+
EndpointKind::StreamingLoad,
711+
),
706712
("/roles", get(list_roles_handler), EndpointKind::Metadata),
707713
];
708714

@@ -742,14 +748,14 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
742748
/// The HTTP query endpoints are expected to be responses within 60 seconds.
743749
/// If it exceeds far from 60 seconds, there might be something wrong, we should
744750
/// log it.
745-
struct SlowRequestLogTracker {
751+
pub(crate) struct SlowRequestLogTracker {
746752
started_at: std::time::Instant,
747753
method: String,
748754
uri: String,
749755
}
750756

751757
impl SlowRequestLogTracker {
752-
fn new(ctx: &HttpQueryContext) -> Self {
758+
pub(crate) fn new(ctx: &HttpQueryContext) -> Self {
753759
Self {
754760
started_at: std::time::Instant::now(),
755761
method: ctx.http_method.clone(),
@@ -776,7 +782,11 @@ impl Drop for SlowRequestLogTracker {
776782

777783
// get_http_tracing_span always return a valid span for tracing
778784
// it will try to decode w3 traceparent and if empty or failed, it will create a new root span and throw a warning
779-
fn get_http_tracing_span(name: &'static str, ctx: &HttpQueryContext, query_id: &str) -> Span {
785+
pub(crate) fn get_http_tracing_span(
786+
name: &'static str,
787+
ctx: &HttpQueryContext,
788+
query_id: &str,
789+
) -> Span {
780790
if let Some(parent) = ctx.trace_parent.as_ref() {
781791
let trace = parent.as_str();
782792
match SpanContext::decode_w3c_traceparent(trace) {

src/query/service/src/servers/http/v1/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod query;
1919
pub mod roles;
2020
mod session;
2121
mod stage;
22+
mod streaming_load;
2223
pub mod users;
2324
mod verify;
2425

@@ -48,6 +49,8 @@ pub use session::ClientSessionManager;
4849
pub(crate) use session::SessionClaim;
4950
pub use stage::upload_to_stage;
5051
pub use stage::UploadToStageResponse;
52+
pub use streaming_load::streaming_load_handler;
53+
pub use streaming_load::LoadResponse;
5154
pub use users::create_user_handler;
5255
pub use users::list_users_handler;
5356
pub use verify::verify_handler;

0 commit comments

Comments
 (0)