2929#include < sys/time.h>
3030#include < thrift/protocol/TDebugProtocol.h>
3131
32+ #include < algorithm>
3233#include < cstdint>
3334#include < cstdlib>
3435#include < ctime>
36+ #include < functional>
3537#include < future>
3638#include < sstream>
3739#include < stdexcept>
@@ -106,17 +108,85 @@ void StreamLoadAction::handle(HttpRequest* req) {
106108 return ;
107109 }
108110
111+ {
112+ std::unique_lock<std::mutex> lock1 (ctx->_send_reply_lock );
113+ ctx->_can_send_reply = true ;
114+ ctx->_can_send_reply_cv .notify_all ();
115+ }
116+
109117 // status already set to fail
110118 if (ctx->status .ok ()) {
111- ctx->status = _handle (ctx);
119+ ctx->status = _handle (ctx, req );
112120 if (!ctx->status .ok () && !ctx->status .is <PUBLISH_TIMEOUT>()) {
113- LOG (WARNING) << " handle streaming load failed, id=" << ctx->id
114- << " , errmsg=" << ctx->status ;
121+ _send_reply (ctx, req);
115122 }
116123 }
124+ }
125+
126+ Status StreamLoadAction::_handle (std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
127+ if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes ) {
128+ LOG (WARNING) << " recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
129+ << " , receive_bytes=" << ctx->receive_bytes << " , id=" << ctx->id ;
130+ return Status::Error<ErrorCode::NETWORK_ERROR>(" receive body don't equal with body bytes" );
131+ }
132+
133+ // if we use non-streaming, MessageBodyFileSink.finish will close the file
134+ RETURN_IF_ERROR (ctx->body_sink ->finish ());
135+ if (!ctx->use_streaming ) {
136+ // we need to close file first, then execute_plan_fragment here
137+ ctx->body_sink .reset ();
138+ TPipelineFragmentParamsList mocked;
139+ RETURN_IF_ERROR (_exec_env->stream_load_executor ()->execute_plan_fragment (
140+ ctx, mocked,
141+ [req, this ](std::shared_ptr<StreamLoadContext> ctx) { _on_finish (ctx, req); }));
142+ }
143+
144+ return Status::OK ();
145+ }
146+
147+ void StreamLoadAction::_on_finish (std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
148+ ctx->status = ctx->load_status_future .get ();
149+ if (ctx->status .ok ()) {
150+ if (ctx->group_commit ) {
151+ LOG (INFO) << " skip commit because this is group commit, pipe_id="
152+ << ctx->id .to_string ();
153+ } else if (ctx->two_phase_commit ) {
154+ int64_t pre_commit_start_time = MonotonicNanos ();
155+ ctx->status = _exec_env->stream_load_executor ()->pre_commit_txn (ctx.get ());
156+ ctx->pre_commit_txn_cost_nanos = MonotonicNanos () - pre_commit_start_time;
157+ } else {
158+ // If put file success we need commit this load
159+ int64_t commit_and_publish_start_time = MonotonicNanos ();
160+ ctx->status = _exec_env->stream_load_executor ()->commit_txn (ctx.get ());
161+ ctx->commit_and_publish_txn_cost_nanos =
162+ MonotonicNanos () - commit_and_publish_start_time;
163+ g_stream_load_commit_and_publish_latency_ms
164+ << ctx->commit_and_publish_txn_cost_nanos / 1000000 ;
165+ }
166+ }
167+ _send_reply (ctx, req);
168+ }
169+
170+ void StreamLoadAction::_send_reply (std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
171+ std::unique_lock<std::mutex> lock1 (ctx->_send_reply_lock );
172+ // 1. _can_send_reply: ensure `send_reply` is invoked only after on_header/handle complete,
173+ // avoid client errors (e.g., broken pipe).
174+ // 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled
175+ // due to long import execution time.
176+ while (!ctx->_finish_send_reply && !ctx->_can_send_reply ) {
177+ ctx->_can_send_reply_cv .wait (lock1);
178+ }
179+ if (ctx->_finish_send_reply ) {
180+ return ;
181+ }
182+ DCHECK (ctx->_can_send_reply );
183+ ctx->_finish_send_reply = true ;
184+ ctx->_can_send_reply_cv .notify_all ();
117185 ctx->load_cost_millis = UnixMillis () - ctx->start_millis ;
118186
119187 if (!ctx->status .ok () && !ctx->status .is <PUBLISH_TIMEOUT>()) {
188+ LOG (WARNING) << " handle streaming load failed, id=" << ctx->id
189+ << " , errmsg=" << ctx->status ;
120190 if (ctx->need_rollback ) {
121191 _exec_env->stream_load_executor ()->rollback_txn (ctx.get ());
122192 ctx->need_rollback = false ;
@@ -129,7 +199,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
129199 auto str = ctx->to_json ();
130200 // add new line at end
131201 str = str + ' \n ' ;
132- HttpChannel::send_reply (req, str);
202+
133203#ifndef BE_TEST
134204 if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
135205 if (req->header (HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty ()) {
@@ -139,6 +209,8 @@ void StreamLoadAction::handle(HttpRequest* req) {
139209 }
140210#endif
141211
212+ HttpChannel::send_reply (req, str);
213+
142214 LOG (INFO) << " finished to execute stream load. label=" << ctx->label
143215 << " , txn_id=" << ctx->txn_id << " , query_id=" << ctx->id
144216 << " , load_cost_ms=" << ctx->load_cost_millis << " , receive_data_cost_ms="
@@ -160,46 +232,9 @@ void StreamLoadAction::handle(HttpRequest* req) {
160232 }
161233}
162234
163- Status StreamLoadAction::_handle (std::shared_ptr<StreamLoadContext> ctx) {
164- if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes ) {
165- LOG (WARNING) << " recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
166- << " , receive_bytes=" << ctx->receive_bytes << " , id=" << ctx->id ;
167- return Status::Error<ErrorCode::NETWORK_ERROR>(" receive body don't equal with body bytes" );
168- }
169-
170- // if we use non-streaming, MessageBodyFileSink.finish will close the file
171- RETURN_IF_ERROR (ctx->body_sink ->finish ());
172- if (!ctx->use_streaming ) {
173- // we need to close file first, then execute_plan_fragment here
174- ctx->body_sink .reset ();
175- TPipelineFragmentParamsList mocked;
176- RETURN_IF_ERROR (_exec_env->stream_load_executor ()->execute_plan_fragment (ctx, mocked));
177- }
178-
179- // wait stream load finish
180- RETURN_IF_ERROR (ctx->future .get ());
181-
182- if (ctx->group_commit ) {
183- LOG (INFO) << " skip commit because this is group commit, pipe_id=" << ctx->id .to_string ();
184- return Status::OK ();
185- }
186-
187- if (ctx->two_phase_commit ) {
188- int64_t pre_commit_start_time = MonotonicNanos ();
189- RETURN_IF_ERROR (_exec_env->stream_load_executor ()->pre_commit_txn (ctx.get ()));
190- ctx->pre_commit_txn_cost_nanos = MonotonicNanos () - pre_commit_start_time;
191- } else {
192- // If put file success we need commit this load
193- int64_t commit_and_publish_start_time = MonotonicNanos ();
194- RETURN_IF_ERROR (_exec_env->stream_load_executor ()->commit_txn (ctx.get ()));
195- ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos () - commit_and_publish_start_time;
196- g_stream_load_commit_and_publish_latency_ms
197- << ctx->commit_and_publish_txn_cost_nanos / 1000000 ;
198- }
199- return Status::OK ();
200- }
201-
202235int StreamLoadAction::on_header (HttpRequest* req) {
236+ req->mark_send_reply ();
237+
203238 streaming_load_current_processing->increment (1 );
204239
205240 std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
@@ -228,26 +263,12 @@ int StreamLoadAction::on_header(HttpRequest* req) {
228263 }
229264 if (!st.ok ()) {
230265 ctx->status = std::move (st);
231- if (ctx->need_rollback ) {
232- _exec_env->stream_load_executor ()->rollback_txn (ctx.get ());
233- ctx->need_rollback = false ;
266+ {
267+ std::unique_lock<std::mutex> lock1 (ctx->_send_reply_lock );
268+ ctx->_can_send_reply = true ;
269+ ctx->_can_send_reply_cv .notify_all ();
234270 }
235- if (ctx->body_sink != nullptr ) {
236- ctx->body_sink ->cancel (ctx->status .to_string ());
237- }
238- auto str = ctx->to_json ();
239- // add new line at end
240- str = str + ' \n ' ;
241- HttpChannel::send_reply (req, str);
242- #ifndef BE_TEST
243- if (config::enable_stream_load_record ||
244- config::enable_stream_load_record_to_audit_log_table) {
245- if (req->header (HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty ()) {
246- str = ctx->prepare_stream_load_record (str);
247- _save_stream_load_record (ctx, str);
248- }
249- }
250- #endif
271+ _send_reply (ctx, req);
251272 return -1 ;
252273 }
253274 return 0 ;
@@ -821,8 +842,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
821842 if (!ctx->use_streaming ) {
822843 return Status::OK ();
823844 }
845+
824846 TPipelineFragmentParamsList mocked;
825- return _exec_env->stream_load_executor ()->execute_plan_fragment (ctx, mocked);
847+ return _exec_env->stream_load_executor ()->execute_plan_fragment (
848+ ctx, mocked, [http_req, this ](std::shared_ptr<StreamLoadContext> ctx) {
849+ _on_finish (ctx, http_req);
850+ });
826851}
827852
828853Status StreamLoadAction::_data_saved_path (HttpRequest* req, std::string* file_path,
0 commit comments