Skip to content

Commit a783625

Browse files
author
宋光璠
committed
fix
1 parent 08ada7b commit a783625

File tree

4 files changed

+60
-32
lines changed

4 files changed

+60
-32
lines changed

.idea/vcs.xml

Lines changed: 14 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,8 @@ DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
597597
// Whether to enable stream load record function, the default is false.
598598
// False: disable stream load record
599599
DEFINE_mBool(enable_stream_load_record, "false");
600+
// false: disables http_stream load support
601+
DEFINE_mBool(enable_http_stream_load, "false");
600602
// batch size of stream load record reported to FE
601603
DEFINE_mInt32(stream_load_record_batch_size, "50");
602604
// expire time of stream load record in rocksdb.

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,8 @@ DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
632632
// Whether to enable stream load record function, the default is false.
633633
// False: disable stream load record
634634
DECLARE_mBool(enable_stream_load_record);
635+
// Whether to enable http_stream load support. Default is false.
636+
DECLARE_mBool(enable_http_stream_load);
635637
// batch size of stream load record reported to FE
636638
DECLARE_mInt32(stream_load_record_batch_size);
637639
// expire time of stream load record in rocksdb.

be/src/http/action/stream_load.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,48 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
724724
request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
725725
}
726726

727+
if (config::enable_http_stream_load) {
728+
std::string table_name = ctx->table;
729+
std::string database_name = request.db;
730+
731+
std::string columns = http_req->header(HTTP_COLUMNS);
732+
if (columns.empty()) {
733+
return Status::InvalidArgument("Missing required HTTP header: columns");
734+
}
735+
736+
std::string column_separator = http_req->header(HTTP_COLUMN_SEPARATOR).empty()
737+
? ","
738+
: http_req->header(HTTP_COLUMN_SEPARATOR);
739+
740+
std::string format =
741+
http_req->header("format").empty() ? "csv" : http_req->header("format");
742+
743+
std::vector<std::string> column_vector = split(columns, std::string(","));
744+
std::string column_list = "(" + join(column_vector, ", ") + ")";
745+
746+
std::vector<std::string> sql_parts;
747+
sql_parts.emplace_back("SELECT " + join(column_vector, ", "));
748+
749+
std::string http_stream_params = "http_stream('format' = '" + format +
750+
"', 'column_separator' = '" + column_separator + "')";
751+
752+
sql_parts.emplace_back("FROM " + http_stream_params);
753+
754+
if (!http_req->header(HTTP_WHERE).empty()) {
755+
sql_parts.emplace_back("WHERE " + http_req->header(HTTP_WHERE));
756+
}
757+
758+
std::string full_sql =
759+
"INSERT INTO " + database_name + "." + table_name + " " + column_list + " ";
760+
for (size_t i = 0; i < sql_parts.size(); i++) {
761+
full_sql += sql_parts[i];
762+
if (i != sql_parts.size() - 1) full_sql += " ";
763+
}
764+
765+
ctx->sql_str = full_sql;
766+
LOG(INFO) << "Generated SQL: " << full_sql << ctx->to_json();
767+
}
768+
727769
#ifndef BE_TEST
728770
// plan this load
729771
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;

0 commit comments

Comments
 (0)