Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,14 @@ DEFINE_Bool(enable_all_http_auth, "false");
// Number of webserver workers
DEFINE_Int32(webserver_num_workers, "128");

// Async replies: stream load only now
// reply wait timeout only happens if:
// 1. Stream load fragment execution times out
// HTTP request freed → stream load canceled
// 2. Client disconnects
DEFINE_mInt32(async_reply_timeout_s, "60");
DEFINE_Validator(async_reply_timeout_s, [](const int config) -> bool { return config >= 3; });

DEFINE_Bool(enable_single_replica_load, "true");
// Number of download workers for single replica load
DEFINE_Int32(single_replica_load_download_num_workers, "64");
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,13 @@ DECLARE_Bool(enable_all_http_auth);
// Number of webserver workers
DECLARE_Int32(webserver_num_workers);

// Async replies: stream load only now
// reply wait timeout only happens if:
// 1. Stream load fragment execution times out
// HTTP request freed → stream load canceled
// 2. Client disconnects
DECLARE_mInt32(async_reply_timeout_s);

DECLARE_Bool(enable_single_replica_load);
// Number of download workers for single replica load
DECLARE_Int32(single_replica_load_download_num_workers);
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLo
RETURN_IF_ERROR(ctx->body_sink->finish());

// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
RETURN_IF_ERROR(ctx->load_status_future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
Expand Down
151 changes: 88 additions & 63 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
#include <sys/time.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <ctime>
#include <functional>
#include <future>
#include <sstream>
#include <stdexcept>
Expand Down Expand Up @@ -106,17 +108,85 @@ void StreamLoadAction::handle(HttpRequest* req) {
return;
}

{
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
ctx->_can_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
}

// status already set to fail
if (ctx->status.ok()) {
ctx->status = _handle(ctx);
ctx->status = _handle(ctx, req);
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
_send_reply(ctx, req);
}
}
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
RETURN_IF_ERROR(ctx->body_sink->finish());
if (!ctx->use_streaming) {
// we need to close file first, then execute_plan_fragment here
ctx->body_sink.reset();
TPipelineFragmentParamsList mocked;
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
ctx, mocked,
[req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); }));
}

return Status::OK();
}

void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
ctx->status = ctx->load_status_future.get();
if (ctx->status.ok()) {
if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id="
<< ctx->id.to_string();
} else if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get());
ctx->commit_and_publish_txn_cost_nanos =
MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
}
_send_reply(ctx, req);
}

void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
// 1. _can_send_reply: ensure `send_reply` is invoked only after on_header/handle complete,
// avoid client errors (e.g., broken pipe).
// 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled
// due to long import execution time.
while (!ctx->_finish_send_reply && !ctx->_can_send_reply) {
ctx->_can_send_reply_cv.wait(lock1);
}
if (ctx->_finish_send_reply) {
return;
}
DCHECK(ctx->_can_send_reply);
ctx->_finish_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
Expand All @@ -129,7 +199,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
auto str = ctx->to_json();
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);

#ifndef BE_TEST
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
Expand All @@ -139,6 +209,8 @@ void StreamLoadAction::handle(HttpRequest* req) {
}
#endif

HttpChannel::send_reply(req, str);

LOG(INFO) << "finished to execute stream load. label=" << ctx->label
<< ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
<< ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
Expand All @@ -160,46 +232,9 @@ void StreamLoadAction::handle(HttpRequest* req) {
}
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
RETURN_IF_ERROR(ctx->body_sink->finish());
if (!ctx->use_streaming) {
// we need to close file first, then execute_plan_fragment here
ctx->body_sink.reset();
TPipelineFragmentParamsList mocked;
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked));
}

// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
return Status::OK();
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
return Status::OK();
}

int StreamLoadAction::on_header(HttpRequest* req) {
req->mark_send_reply();

streaming_load_current_processing->increment(1);

std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
Expand Down Expand Up @@ -228,26 +263,12 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
{
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
ctx->_can_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
}
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
auto str = ctx->to_json();
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
if (config::enable_stream_load_record ||
config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
#endif
_send_reply(ctx, req);
return -1;
}
return 0;
Expand Down Expand Up @@ -821,8 +842,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!ctx->use_streaming) {
return Status::OK();
}

TPipelineFragmentParamsList mocked;
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked);
return _exec_env->stream_load_executor()->execute_plan_fragment(
ctx, mocked, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
_on_finish(ctx, http_req);
});
}

Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path,
Expand Down
5 changes: 4 additions & 1 deletion be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <memory>
#include <mutex>
#include <string>

#include "http/http_handler.h"
Expand Down Expand Up @@ -46,11 +47,13 @@ class StreamLoadAction : public HttpHandler {

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);

private:
ExecEnv* _exec_env;
Expand Down
1 change: 1 addition & 0 deletions be/src/http/ev_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static void on_chunked(struct evhttp_request* ev_req, void* param) {

static void on_free(struct evhttp_request* ev_req, void* arg) {
HttpRequest* request = (HttpRequest*)arg;
request->wait_finish_send_reply();
delete request;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/http/http_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void HttpChannel::send_error(HttpRequest* request, HttpStatus status) {
void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) {
evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(),
nullptr);
request->finish_send_reply();
}

void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) {
Expand All @@ -66,6 +67,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std:
evbuffer_add(evb, content.c_str(), content.size());
}
evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), evb);
request->finish_send_reply();
evbuffer_free(evb);
}

Expand All @@ -80,6 +82,7 @@ void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t siz
bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
}
evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb);
request->finish_send_reply();
evbuffer_free(evb);
}

Expand Down
47 changes: 47 additions & 0 deletions be/src/http/http_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <event2/http_struct.h>
#include <event2/keyvalq_struct.h>

#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>

#include "http/http_handler.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/stack_util.h"

namespace doris {

Expand Down Expand Up @@ -141,4 +144,48 @@ const char* HttpRequest::remote_host() const {
return _ev_req->remote_host;
}

void HttpRequest::finish_send_reply() {
if (_send_reply_type == REPLY_SYNC) {
return;
}

std::string infos;
if (_handler_ctx != nullptr) {
infos = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get())->brief();
}
_http_reply_promise.set_value(true);
}

void HttpRequest::wait_finish_send_reply() {
if (_send_reply_type == REPLY_SYNC) {
return;
}

std::string infos;
StreamLoadContext* ctx = nullptr;
if (_handler_ctx != nullptr) {
ctx = reinterpret_cast<StreamLoadContext*>(_handler_ctx.get());
infos = ctx->brief();
_handler->free_handler_ctx(_handler_ctx);
}

VLOG_NOTICE << "start to wait send reply, infos=" << infos;
auto status = _http_reply_future.wait_for(std::chrono::seconds(config::async_reply_timeout_s));
// if request is timeout and can't cancel fragment in time, it will cause some new request block
// so we will free cancelled request in time.
if (status != std::future_status::ready) {
LOG(WARNING) << "wait for send reply timeout, " << this->debug_string();
std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
// do not send_reply after free current request
ctx->_can_send_reply = false;
ctx->_finish_send_reply = true;
ctx->_can_send_reply_cv.notify_all();
} else {
VLOG_NOTICE << "wait send reply finished";
}

// delete _handler_ctx at the end, in case that finish_send_reply can't get detailed info
_handler_ctx = nullptr;
}

} // namespace doris
Loading
Loading