Skip to content

Commit df8653e

Browse files
committed
enable stream load record to audit log system table
1 parent e29025e commit df8653e

File tree

16 files changed

+558
-13
lines changed

16 files changed

+558
-13
lines changed

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,8 @@ DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
627627
// Whether to enable stream load record function, the default is false.
628628
// False: disable stream load record
629629
DEFINE_mBool(enable_stream_load_record, "false");
630+
// Whether to enable stream load record to audit log table, the default is true.
631+
DEFINE_mBool(enable_stream_load_record_to_audit_log_table, "true");
630632
// batch size of stream load record reported to FE
631633
DEFINE_mInt32(stream_load_record_batch_size, "50");
632634
// expire time of stream load record in rocksdb.

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,8 @@ DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
648648
// Whether to enable stream load record function, the default is false.
649649
// False: disable stream load record
650650
DECLARE_mBool(enable_stream_load_record);
651+
// Whether to enable stream load record to audit log table, the default is true.
652+
DECLARE_mBool(enable_stream_load_record_to_audit_log_table);
651653
// batch size of stream load record reported to FE
652654
DECLARE_mInt32(stream_load_record_batch_size);
653655
// expire time of stream load record in rocksdb.

be/src/http/action/http_stream.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,11 @@ void HttpStreamAction::handle(HttpRequest* req) {
113113
// add new line at end
114114
str = str + '\n';
115115
HttpChannel::send_reply(req, str);
116-
if (config::enable_stream_load_record) {
117-
str = ctx->prepare_stream_load_record(str);
118-
_save_stream_load_record(ctx, str);
116+
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
117+
if (req->header(HTTP_SKIP_RECORD).empty()) {
118+
str = ctx->prepare_stream_load_record(str);
119+
_save_stream_load_record(ctx, str);
120+
}
119121
}
120122
// update statistics
121123
http_stream_requests_total->increment(1);
@@ -176,9 +178,12 @@ int HttpStreamAction::on_header(HttpRequest* req) {
176178
// add new line at end
177179
str = str + '\n';
178180
HttpChannel::send_reply(req, str);
179-
if (config::enable_stream_load_record) {
180-
str = ctx->prepare_stream_load_record(str);
181-
_save_stream_load_record(ctx, str);
181+
if (config::enable_stream_load_record ||
182+
config::enable_stream_load_record_to_audit_log_table) {
183+
if (req->header(HTTP_SKIP_RECORD).empty()) {
184+
str = ctx->prepare_stream_load_record(str);
185+
_save_stream_load_record(ctx, str);
186+
}
182187
}
183188
return -1;
184189
}

be/src/http/action/stream_load.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,11 @@ void StreamLoadAction::handle(HttpRequest* req) {
131131
str = str + '\n';
132132
HttpChannel::send_reply(req, str);
133133
#ifndef BE_TEST
134-
if (config::enable_stream_load_record) {
135-
str = ctx->prepare_stream_load_record(str);
136-
_save_stream_load_record(ctx, str);
134+
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
135+
if (req->header(HTTP_SKIP_RECORD).empty()) {
136+
str = ctx->prepare_stream_load_record(str);
137+
_save_stream_load_record(ctx, str);
138+
}
137139
}
138140
#endif
139141

@@ -238,9 +240,12 @@ int StreamLoadAction::on_header(HttpRequest* req) {
238240
str = str + '\n';
239241
HttpChannel::send_reply(req, str);
240242
#ifndef BE_TEST
241-
if (config::enable_stream_load_record) {
242-
str = ctx->prepare_stream_load_record(str);
243-
_save_stream_load_record(ctx, str);
243+
if (config::enable_stream_load_record ||
244+
config::enable_stream_load_record_to_audit_log_table) {
245+
if (req->header(HTTP_SKIP_RECORD).empty()) {
246+
str = ctx->prepare_stream_load_record(str);
247+
_save_stream_load_record(ctx, str);
248+
}
244249
}
245250
#endif
246251
return -1;

be/src/http/http_client.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,12 @@ Status HttpClient::execute_post_request(const std::string& payload, std::string*
409409
return execute(response);
410410
}
411411

412+
Status HttpClient::execute_put_request(const std::string& payload, std::string* response) {
413+
set_payload(payload);
414+
curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "PUT");
415+
return execute(response);
416+
}
417+
412418
Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) {
413419
set_method(DELETE);
414420
set_payload(payload);

be/src/http/http_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ class HttpClient {
158158

159159
Status execute_post_request(const std::string& payload, std::string* response);
160160

161+
Status execute_put_request(const std::string& payload, std::string* response);
162+
161163
Status execute_delete_request(const std::string& payload, std::string* response);
162164

163165
// execute a simple method, and its response is saved in response argument

be/src/http/http_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,6 @@ static const std::string HTTP_GROUP_COMMIT = "group_commit";
7373
static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";
7474
static const std::string HTTP_EMPTY_FIELD_AS_NULL = "empty_field_as_null";
7575
static const std::string HTTP_COMPUTE_GROUP = "compute_group";
76+
static const std::string HTTP_SKIP_RECORD = "skip_record";
7677

7778
} // namespace doris

be/src/runtime/exec_env.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
#include "common/config.h"
3232
#include "common/status.h"
33+
#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
3334
#include "io/cache/fs_file_cache_storage.h"
3435
#include "olap/memtable_memory_limiter.h"
3536
#include "olap/options.h"
@@ -105,6 +106,7 @@ class LoadStreamMgr;
105106
class LoadStreamMapPool;
106107
class StreamLoadExecutor;
107108
class RoutineLoadTaskExecutor;
109+
class StreamLoadRecorderManager;
108110
class SmallFileMgr;
109111
class BackendServiceClient;
110112
class TPaloBrokerServiceClient;
@@ -484,6 +486,7 @@ class ExecEnv {
484486

485487
std::unique_ptr<StreamLoadExecutor> _stream_load_executor;
486488
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
489+
StreamLoadRecorderManager* _stream_load_recorder_manager = nullptr;
487490
SmallFileMgr* _small_file_mgr = nullptr;
488491
HeartbeatFlags* _heartbeat_flags = nullptr;
489492
vectorized::ScannerScheduler* _scanner_scheduler = nullptr;

be/src/runtime/exec_env_init.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
#include "runtime/small_file_mgr.h"
9494
#include "runtime/stream_load/new_load_stream_mgr.h"
9595
#include "runtime/stream_load/stream_load_executor.h"
96+
#include "runtime/stream_load/stream_load_recorder_manager.h"
9697
#include "runtime/thread_context.h"
9798
#include "runtime/user_function_cache.h"
9899
#include "runtime/workload_group/workload_group_manager.h"
@@ -392,6 +393,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
392393
return st;
393394
}
394395

396+
// should start after storage_engine->open()
397+
_stream_load_recorder_manager = new StreamLoadRecorderManager();
398+
_stream_load_recorder_manager->start();
399+
395400
// create internal workload group should be after storage_engin->open()
396401
RETURN_IF_ERROR(_create_internal_workload_group());
397402
_workload_sched_mgr = new WorkloadSchedPolicyMgr();
@@ -765,6 +770,7 @@ void ExecEnv::destroy() {
765770
SAFE_STOP(_group_commit_mgr);
766771
// _routine_load_task_executor should be stopped before _new_load_stream_mgr.
767772
SAFE_STOP(_routine_load_task_executor);
773+
SAFE_STOP(_stream_load_recorder_manager);
768774
// stop workload scheduler
769775
SAFE_STOP(_workload_sched_mgr);
770776
// stop pipline step 2, cgroup execution
@@ -831,6 +837,7 @@ void ExecEnv::destroy() {
831837
SAFE_DELETE(_file_meta_cache);
832838
SAFE_DELETE(_group_commit_mgr);
833839
SAFE_DELETE(_routine_load_task_executor);
840+
SAFE_DELETE(_stream_load_recorder_manager);
834841
// _stream_load_executor
835842
SAFE_DELETE(_function_client_cache);
836843
SAFE_DELETE(_streaming_client_cache);

be/src/runtime/stream_load/stream_load_recorder.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,21 @@ Status StreamLoadRecorder::put(const std::string& key, const std::string& value)
100100
return Status::OK();
101101
}
102102

103+
Status StreamLoadRecorder::get(const std::string& key, std::string* value) {
104+
rocksdb::ColumnFamilyHandle* handle = _handles[0];
105+
rocksdb::ReadOptions read_options;
106+
rocksdb::Status s = _db->Get(read_options, handle, rocksdb::Slice(key), value);
107+
if (s.IsNotFound()) {
108+
return Status::NotFound("Key not found: {}", key);
109+
}
110+
if (!s.ok()) {
111+
LOG(WARNING) << "rocks db get key:" << key << " failed, reason:" << s.ToString();
112+
return Status::InternalError("Stream load record rocksdb get failed, reason: {}",
113+
s.ToString());
114+
}
115+
return Status::OK();
116+
}
117+
103118
Status StreamLoadRecorder::get_batch(const std::string& start, int batch_size,
104119
std::map<std::string, std::string>* stream_load_records) {
105120
rocksdb::ColumnFamilyHandle* handle = _handles[0];

0 commit comments

Comments
 (0)