Skip to content

Commit 91b8bce

Browse files
authored
refactor: streaming load use header X-Databend-SQL. (#18410)
1 parent 9c2c5e1 commit 91b8bce

11 files changed

+29
-21
lines changed

src/common/base/src/headers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub const HEADER_SESSION_ID: &str = "X-DATABEND-SESSION-ID";
1717
pub const HEADER_TENANT: &str = "X-DATABEND-TENANT";
1818
pub const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
1919
pub const HEADER_USER: &str = "X-DATABEND-USER";
20+
pub const HEADER_SQL: &str = "X-DATABEND-SQL";
2021

2122
pub const HEADER_FUNCTION: &str = "X-DATABEND-FUNCTION";
2223
pub const HEADER_FUNCTION_HANDLER: &str = "X-DATABEND-FUNCTION-HANDLER";

src/query/service/src/servers/http/v1/streaming_load.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818
use databend_common_base::base::tokio;
1919
use databend_common_base::base::tokio::io::AsyncReadExt;
2020
use databend_common_base::base::ProgressValues;
21+
use databend_common_base::headers::HEADER_SQL;
2122
use databend_common_base::runtime::MemStat;
2223
use databend_common_base::runtime::ThreadTracker;
2324
use databend_common_base::runtime::TrySpawn;
@@ -131,13 +132,19 @@ async fn streaming_load_handler_inner(
131132
.await
132133
.map_err(InternalServerError)?;
133134

134-
let sql = req.headers().get("sql").ok_or(poem::Error::from_string(
135-
"[HTTP-STREAMING-LOAD] Missing required 'sql' header in request",
136-
StatusCode::BAD_REQUEST,
137-
))?;
135+
let sql = req
136+
.headers()
137+
.get(HEADER_SQL)
138+
.ok_or(poem::Error::from_string(
139+
"[HTTP-STREAMING-LOAD] Missing required 'sql' header in request",
140+
StatusCode::BAD_REQUEST,
141+
))?;
138142
let sql = sql.to_str().map_err(|e| {
139143
poem::Error::from_string(
140-
format!("[HTTP-STREAMING-LOAD] Invalid UTF-8 in 'sql' header: {}", e),
144+
format!(
145+
"[HTTP-STREAMING-LOAD] Invalid UTF-8 in value of header {HEADER_SQL}: {}",
146+
e
147+
),
141148
StatusCode::BAD_REQUEST,
142149
)
143150
})?;

tests/suites/1_stateful/01_streaming_load/01_0001_streaming_load_ontime.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ cat $TESTS_DATA_DIR/ddl/ontime.sql | sed 's/ontime/ontime_streaming_load/g' | $B
1010

1111
function run() {
1212
echo "--$2"
13-
curl -sS -H "x-databend-query-id:$2" -H "sql:insert into ontime_streaming_load from @_databend_load file_format = ($1)" -F "upload=@/${TESTS_DATA_DIR}/$2" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load"
13+
curl -sS -H "x-databend-query-id:$2" -H "X-Databend-SQL:insert into ontime_streaming_load from @_databend_load file_format = ($1)" -F "upload=@/${TESTS_DATA_DIR}/$2" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load"
1414
echo
1515
echo "select count(1), avg(Year), sum(DayOfWeek) from ontime_streaming_load;" | $BENDSQL_CLIENT_CONNECT
1616
echo "truncate table ontime_streaming_load" | $BENDSQL_CLIENT_CONNECT

tests/suites/1_stateful/01_streaming_load/01_0002_streaming_load_variant.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ cat ${TESTS_DATA_DIR}/ddl/variant_test.sql | $BENDSQL_CLIENT_CONNECT
1010

1111
# run format path table
1212
function run() {
13-
curl -sS -H "x-databend-query-id:$2" -H "sql:insert into $3 values from @_databend_load file_format = ($1)" -F "upload=@/${TESTS_DATA_DIR}/$2" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load"
13+
curl -sS -H "x-databend-query-id:$2" -H "X-Databend-SQL:insert into $3 values from @_databend_load file_format = ($1)" -F "upload=@/${TESTS_DATA_DIR}/$2" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load"
1414
echo
1515
}
1616

tests/suites/1_stateful/01_streaming_load/01_0003_streaming_load_dedup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ function run() {
1010
echo ">>>> load $2 with format ($1), with label $3"
1111
curl -sS \
1212
-H "x-databend-query-id:$2" \
13-
-H "sql:insert /*+ set_var(deduplicate_label='$3') */ into t1 from @_databend_load file_format = ($1)" \
13+
-H "X-Databend-SQL:insert /*+ set_var(deduplicate_label='$3') */ into t1 from @_databend_load file_format = ($1)" \
1414
-F "upload=@/${TESTS_DATA_DIR}/$2" \
1515
-u root: -XPUT \
1616
"http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load"

tests/suites/1_stateful/01_streaming_load/01_0004_streaming_load_error.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ function run() {
1010
echo ">>>> load $2 with format ($1)"
1111
curl -sS \
1212
-H "x-databend-query-id:$2" \
13-
-H "sql:insert into t1 values from @_databend_load file_format = ($1)" \
13+
-H "X-Databend-SQL:insert into t1 values from @_databend_load file_format = ($1)" \
1414
-F "upload=@/${TESTS_DATA_DIR}/$2" \
1515
-u root: -XPUT \
1616
"http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | jq .error.code

tests/suites/1_stateful/01_streaming_load/01_0005_streaming_load_compact.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ stmt "drop table if exists t1 all"
1414
stmt "CREATE TABLE t1 ( c0 string );"
1515

1616
curl -sS \
17-
-H "sql:insert /*+ set_var(input_read_buffer_size=100) */into t1 values from @_databend_load file_format = (type = CSV)" \
17+
-H "X-Databend-SQL:insert /*+ set_var(input_read_buffer_size=100) */into t1 values from @_databend_load file_format = (type = CSV)" \
1818
-H "X-databend-query-id: test" \
1919
-F "upload=@${DATA}" \
2020
-u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load"

tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
>>>> copy into @streaming_load_parquet/q1.parquet from (select '2021-01-01' as c3, '1' as c2) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
55
q1.parquet 624 1
66
>>>> streaming load: q1.parquet error :
7-
+ curl -sS -H x-databend-query-id:load-q1 -H 'sql:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q1.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
7+
+ curl -sS -H x-databend-query-id:load-q1 -H 'X-Databend-SQL:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q1.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
88
{"id":"load-q1","stats":{"rows":1,"bytes":25}}
99
<<<<
1010
>>>> select * from streaming_load_parquet;
@@ -15,7 +15,7 @@ ok 1 2021-01-01
1515
>>>> copy into @streaming_load_parquet/q2.parquet from (select '2021-01-01' as c3) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
1616
q2.parquet 426 1
1717
>>>> streaming load: q2.parquet error :
18-
+ curl -sS -H x-databend-query-id:load-q2 -H 'sql:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q2.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
18+
+ curl -sS -H x-databend-query-id:load-q2 -H 'X-Databend-SQL:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q2.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
1919
{"error":{"code":400,"message":"[HTTP-STREAMING-LOAD] Query execution failed: file q2.parquet missing column `c2`"}}
2020
<<<<
2121
>>>> select * from streaming_load_parquet;
@@ -25,7 +25,7 @@ q2.parquet 426 1
2525
>>>> copy into @streaming_load_parquet/q3.parquet from (select '2021-01-01' as c3) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
2626
q3.parquet 426 1
2727
>>>> streaming load: q3.parquet field_default :
28-
+ curl -sS -H x-databend-query-id:load-q3 -H 'sql:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=field_default, null_if=())' -F upload=@/tmp/streaming_load_parquet/q3.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
28+
+ curl -sS -H x-databend-query-id:load-q3 -H 'X-Databend-SQL:insert into streaming_load_parquet(c2,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=field_default, null_if=())' -F upload=@/tmp/streaming_load_parquet/q3.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
2929
{"id":"load-q3","stats":{"rows":1,"bytes":21}}
3030
<<<<
3131
>>>> select * from streaming_load_parquet;
@@ -36,7 +36,7 @@ ok NULL 2021-01-01
3636
>>>> copy into @streaming_load_parquet/q4.parquet from (select '2021-01-01' as c3, 'my_null' as c1) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
3737
q4.parquet 643 1
3838
>>>> streaming load: q4.parquet error :
39-
+ curl -sS -H x-databend-query-id:load-q4 -H 'sql:insert into streaming_load_parquet(c1,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q4.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
39+
+ curl -sS -H x-databend-query-id:load-q4 -H 'X-Databend-SQL:insert into streaming_load_parquet(c1,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q4.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
4040
{"id":"load-q4","stats":{"rows":1,"bytes":26}}
4141
<<<<
4242
>>>> select * from streaming_load_parquet;
@@ -47,7 +47,7 @@ my_null NULL 2021-01-01
4747
>>>> copy into @streaming_load_parquet/q5.parquet from (select '2021-01-01' as c3, 'my_null' as c1) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
4848
q5.parquet 643 1
4949
>>>> streaming load: q5.parquet error 'my_null':
50-
+ curl -sS -H x-databend-query-id:load-q5 -H 'sql:insert into streaming_load_parquet(c1,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=('\''my_null'\''))' -F upload=@/tmp/streaming_load_parquet/q5.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
50+
+ curl -sS -H x-databend-query-id:load-q5 -H 'X-Databend-SQL:insert into streaming_load_parquet(c1,c3) from @_databend_load file_format = (type='\''parquet'\'', missing_field_as=error, null_if=('\''my_null'\''))' -F upload=@/tmp/streaming_load_parquet/q5.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
5151
{"id":"load-q5","stats":{"rows":1,"bytes":7}}
5252
<<<<
5353
>>>> select * from streaming_load_parquet;

tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ function run() {
1414
echo "--$2"
1515
stmt "copy into @streaming_load_parquet/$1.parquet from (select $2) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;"
1616
echo ">>>> streaming load: $1.parquet $4 $5:"
17-
(set -x; curl -sS -H "x-databend-query-id:load-$1" -H "sql:insert into streaming_load_parquet($3) from @_databend_load file_format = (type='parquet', missing_field_as=$4, null_if=($5))" -F "upload=@$DATA/$1.parquet" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load")
17+
(set -x; curl -sS -H "x-databend-query-id:load-$1" -H "X-Databend-SQL:insert into streaming_load_parquet($3) from @_databend_load file_format = (type='parquet', missing_field_as=$4, null_if=($5))" -F "upload=@$DATA/$1.parquet" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load")
1818
echo
1919
echo "<<<<"
2020
query "select * from streaming_load_parquet;"

tests/suites/1_stateful/01_streaming_load/01_0007_streaming_load_placeholder.result

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
--csv
44
>>>> copy into @streaming_load_07/data.csv from (select '2020-01-02' as c4, 110 as c2) file_format=(type='csv') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
55
data.csv 17 1
6-
+ curl -sS -H x-databend-query-id:load-csv -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=csv)' -F upload=@/tmp/streaming_load_07/data.csv -u root: -XPUT http://localhost:8000/v1/streaming_load
6+
+ curl -sS -H x-databend-query-id:load-csv -H 'X-Databend-SQL:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=csv)' -F upload=@/tmp/streaming_load_07/data.csv -u root: -XPUT http://localhost:8000/v1/streaming_load
77
{"id":"load-csv","stats":{"rows":1,"bytes":39}}
88
<<<<
99
>>>> select * from streaming_load_07;
@@ -13,7 +13,7 @@ ok 110 a 2020-01-02
1313
--tsv
1414
>>>> copy into @streaming_load_07/data.tsv from (select '2020-01-02' as c4, 110 as c2) file_format=(type='tsv') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
1515
data.tsv 15 1
16-
+ curl -sS -H x-databend-query-id:load-tsv -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=tsv)' -F upload=@/tmp/streaming_load_07/data.tsv -u root: -XPUT http://localhost:8000/v1/streaming_load
16+
+ curl -sS -H x-databend-query-id:load-tsv -H 'X-Databend-SQL:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=tsv)' -F upload=@/tmp/streaming_load_07/data.tsv -u root: -XPUT http://localhost:8000/v1/streaming_load
1717
{"id":"load-tsv","stats":{"rows":1,"bytes":39}}
1818
<<<<
1919
>>>> select * from streaming_load_07;
@@ -23,7 +23,7 @@ ok 110 a 2020-01-02
2323
--ndjson
2424
>>>> copy into @streaming_load_07/data.ndjson from (select '2020-01-02' as c4, 110 as c2) file_format=(type='ndjson') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
2525
data.ndjson 29 1
26-
+ curl -sS -H x-databend-query-id:load-ndjson -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=ndjson)' -F upload=@/tmp/streaming_load_07/data.ndjson -u root: -XPUT http://localhost:8000/v1/streaming_load
26+
+ curl -sS -H x-databend-query-id:load-ndjson -H 'X-Databend-SQL:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=ndjson)' -F upload=@/tmp/streaming_load_07/data.ndjson -u root: -XPUT http://localhost:8000/v1/streaming_load
2727
{"id":"load-ndjson","stats":{"rows":1,"bytes":39}}
2828
<<<<
2929
>>>> select * from streaming_load_07;
@@ -33,7 +33,7 @@ ok 110 a 2020-01-02
3333
--parquet
3434
>>>> copy into @streaming_load_07/data.parquet from (select '2020-01-02' as c4, 110 as c2) file_format=(type='parquet') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
3535
data.parquet 665 1
36-
+ curl -sS -H x-databend-query-id:load-parquet -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=parquet)' -F upload=@/tmp/streaming_load_07/data.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
36+
+ curl -sS -H x-databend-query-id:load-parquet -H 'X-Databend-SQL:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) from @_databend_load file_format = (type=parquet)' -F upload=@/tmp/streaming_load_07/data.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
3737
{"id":"load-parquet","stats":{"rows":1,"bytes":39}}
3838
<<<<
3939
>>>> select * from streaming_load_07;

0 commit comments

Comments
 (0)