Skip to content

Commit 7c28cdd

Browse files
branch-4.0: [Feature](Streaming Job) Extend streaming job to support MySQL synchronization #58898 (#59228)
Cherry-picked from #58898 Co-authored-by: wudi <[email protected]>
1 parent 1c630eb commit 7c28cdd

File tree

94 files changed

+9384
-258
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+9384
-258
lines changed

be/src/common/config.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ DEFINE_Int32(brpc_port, "8060");
6565

6666
DEFINE_Int32(arrow_flight_sql_port, "8050");
6767

68+
DEFINE_Int32(cdc_client_port, "9096");
69+
6870
// If the external client cannot directly access priority_networks, set public_host to be accessible
6971
// to external client.
7072
// There are usually two usage scenarios:
@@ -629,6 +631,9 @@ DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
629631
// The buffer size to store stream table function schema info
630632
DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
631633

634+
// request cdc client timeout
635+
DEFINE_mInt32(request_cdc_client_timeout_ms, "60000");
636+
632637
// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
633638
// You may need to lower the speed when the sink receiver bes are too busy.
634639
DEFINE_mInt32(olap_table_sink_send_interval_microseconds, "1000");

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ DECLARE_Int32(brpc_port);
100100
// Default -1, do not start arrow flight sql server.
101101
DECLARE_Int32(arrow_flight_sql_port);
102102

103+
// port for cdc client scan oltp cdc data
104+
DECLARE_Int32(cdc_client_port);
105+
103106
// If the external client cannot directly access priority_networks, set public_host to be accessible
104107
// to external client.
105108
// There are usually two usage scenarios:
@@ -667,6 +670,9 @@ DECLARE_mBool(enable_stream_load_commit_txn_on_be);
667670
// The buffer size to store stream table function schema info
668671
DECLARE_Int64(stream_tvf_buffer_size);
669672

673+
// request cdc client timeout
674+
DECLARE_mInt32(request_cdc_client_timeout_ms);
675+
670676
// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
671677
// You may need to lower the speed when the sink receiver bes are too busy.
672678
DECLARE_mInt32(olap_table_sink_send_interval_microseconds);

be/src/runtime/cdc_client_mgr.cpp

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "runtime/cdc_client_mgr.h"
19+
20+
#include <brpc/closure_guard.h>
21+
#include <fmt/core.h>
22+
#include <gen_cpp/internal_service.pb.h>
23+
#include <google/protobuf/stubs/callback.h>
24+
#include <signal.h>
25+
#include <sys/stat.h>
26+
#include <sys/wait.h>
27+
28+
#include <cstdio>
29+
#ifndef __APPLE__
30+
#include <sys/prctl.h>
31+
#endif
32+
33+
#include <atomic>
34+
#include <chrono>
35+
#include <mutex>
36+
#include <string>
37+
#include <thread>
38+
39+
#include "common/config.h"
40+
#include "common/logging.h"
41+
#include "common/status.h"
42+
#include "http/http_client.h"
43+
44+
namespace doris {
45+
46+
namespace {
47+
// Handle SIGCHLD signal to prevent zombie processes
48+
void handle_sigchld(int sig_no) {
49+
int status = 0;
50+
pid_t pid;
51+
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
52+
}
53+
}
54+
55+
// Check CDC client health
56+
#ifndef BE_TEST
57+
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
58+
const std::string cdc_health_url =
59+
"http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
60+
"/actuator/health";
61+
62+
auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
63+
RETURN_IF_ERROR(client->init(cdc_health_url));
64+
client->set_timeout_ms(5000);
65+
RETURN_IF_ERROR(client->execute(&health_response));
66+
return Status::OK();
67+
};
68+
69+
Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);
70+
71+
if (!status.ok()) {
72+
return Status::InternalError("CDC client health check failed");
73+
}
74+
75+
bool is_up = health_response.find("UP") != std::string::npos;
76+
77+
if (!is_up) {
78+
return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
79+
}
80+
81+
return Status::OK();
82+
}
83+
#endif
84+
85+
} // anonymous namespace
86+
87+
CdcClientMgr::CdcClientMgr() = default;
88+
89+
CdcClientMgr::~CdcClientMgr() {
90+
stop();
91+
}
92+
93+
void CdcClientMgr::stop() {
94+
pid_t pid = _child_pid.load();
95+
if (pid > 0) {
96+
// Check if process is still alive
97+
if (kill(pid, 0) == 0) {
98+
LOG(INFO) << "Stopping CDC client process, pid=" << pid;
99+
// Send SIGTERM for graceful shutdown
100+
kill(pid, SIGTERM);
101+
// Wait a short time for graceful shutdown
102+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
103+
// Force kill if still alive
104+
if (kill(pid, 0) == 0) {
105+
LOG(INFO) << "Force killing CDC client process, pid=" << pid;
106+
kill(pid, SIGKILL);
107+
int status = 0;
108+
waitpid(pid, &status, 0);
109+
}
110+
}
111+
_child_pid.store(0);
112+
}
113+
114+
LOG(INFO) << "CdcClientMgr is stopped";
115+
}
116+
117+
Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
118+
std::lock_guard<std::mutex> lock(_start_mutex);
119+
120+
Status st = Status::OK();
121+
pid_t exist_pid = _child_pid.load();
122+
if (exist_pid > 0) {
123+
#ifdef BE_TEST
124+
// In test mode, directly return OK if PID exists
125+
LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
126+
return Status::OK();
127+
#else
128+
// Check if process is still alive
129+
if (kill(exist_pid, 0) == 0) {
130+
// Process exists, verify it's actually our CDC client by health check
131+
std::string check_response;
132+
auto check_st = check_cdc_client_health(1, 0, check_response);
133+
if (check_st.ok()) {
134+
// Process exists and responding, CDC client is running
135+
return Status::OK();
136+
} else {
137+
// Process exists but CDC client not responding
138+
// Either it's a different process (PID reused) or CDC client is unhealthy
139+
// Reset PID and return error
140+
_child_pid.store(0);
141+
st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
142+
st.to_protobuf(result->mutable_status());
143+
return st;
144+
}
145+
} else {
146+
// Process is dead, reset PID and continue to start
147+
_child_pid.store(0);
148+
}
149+
#endif
150+
}
151+
152+
const char* doris_home = getenv("DORIS_HOME");
153+
const char* log_dir = getenv("LOG_DIR");
154+
const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
155+
const std::string cdc_jar_port =
156+
"--server.port=" + std::to_string(doris::config::cdc_client_port);
157+
const std::string backend_http_port =
158+
"--backend.http.port=" + std::to_string(config::webserver_port);
159+
const std::string java_opts = "-Dlog.path=" + std::string(log_dir);
160+
161+
// check cdc jar exists
162+
struct stat buffer;
163+
if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
164+
st = Status::InternalError("Can not find cdc-client.jar.");
165+
st.to_protobuf(result->mutable_status());
166+
return st;
167+
}
168+
169+
// Ready to start cdc client
170+
LOG(INFO) << "Ready to start cdc client";
171+
const auto* java_home = getenv("JAVA_HOME");
172+
if (!java_home) {
173+
st = Status::InternalError("Can not find JAVA_HOME");
174+
st.to_protobuf(result->mutable_status());
175+
return st;
176+
}
177+
std::string path(java_home);
178+
std::string java_bin = path + "/bin/java";
179+
// Capture signal to prevent child process from becoming a zombie process
180+
struct sigaction act;
181+
act.sa_flags = 0;
182+
act.sa_handler = handle_sigchld;
183+
sigaction(SIGCHLD, &act, NULL);
184+
LOG(INFO) << "Start to fork cdc client process with " << path;
185+
#ifdef BE_TEST
186+
_child_pid.store(99999);
187+
st = Status::OK();
188+
return st;
189+
#else
190+
pid_t pid = fork();
191+
if (pid < 0) {
192+
// Fork failed
193+
st = Status::InternalError("Fork cdc client failed.");
194+
st.to_protobuf(result->mutable_status());
195+
return st;
196+
} else if (pid == 0) {
197+
// Child process
198+
// When the parent process is killed, the child process also needs to exit
199+
#ifndef __APPLE__
200+
prctl(PR_SET_PDEATHSIG, SIGKILL);
201+
#endif
202+
// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
203+
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
204+
cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
205+
// If execlp returns, it means it failed
206+
perror("Cdc client child process error");
207+
exit(1);
208+
} else {
209+
// Parent process: save PID and wait for startup
210+
_child_pid.store(pid);
211+
212+
// Waiting for cdc to start, failed after more than 3 * 10 seconds
213+
std::string health_response;
214+
Status status = check_cdc_client_health(3, 10, health_response);
215+
if (!status.ok()) {
216+
// Reset PID if startup failed
217+
_child_pid.store(0);
218+
st = Status::InternalError("Start cdc client failed.");
219+
st.to_protobuf(result->mutable_status());
220+
} else {
221+
LOG(INFO) << "Start cdc client success, pid=" << pid
222+
<< ", status=" << status.to_string() << ", response=" << health_response;
223+
}
224+
}
225+
#endif //BE_TEST
226+
return st;
227+
}
228+
229+
void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
230+
PRequestCdcClientResult* result,
231+
google::protobuf::Closure* done) {
232+
brpc::ClosureGuard closure_guard(done);
233+
234+
// Start CDC client if not started
235+
Status start_st = start_cdc_client(result);
236+
if (!start_st.ok()) {
237+
LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
238+
start_st.to_protobuf(result->mutable_status());
239+
return;
240+
}
241+
242+
std::string cdc_response;
243+
Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
244+
result->set_response(cdc_response);
245+
st.to_protobuf(result->mutable_status());
246+
}
247+
248+
Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
249+
const std::string& params_body,
250+
std::string* response) {
251+
std::string remote_url_prefix =
252+
fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);
253+
254+
auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
255+
RETURN_IF_ERROR(client->init(remote_url_prefix));
256+
client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
257+
if (!params_body.empty()) {
258+
client->set_payload(params_body);
259+
}
260+
client->set_content_type("application/json");
261+
client->set_method(POST);
262+
RETURN_IF_ERROR(client->execute(response));
263+
return Status::OK();
264+
};
265+
266+
return HttpClient::execute_with_retry(3, 1, cdc_request);
267+
}
268+
269+
} // namespace doris

be/src/runtime/cdc_client_mgr.h

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <gen_cpp/internal_service.pb.h>
21+
22+
#include <atomic>
23+
#include <mutex>
24+
#include <string>
25+
26+
#include "common/status.h"
27+
28+
namespace google::protobuf {
29+
class Closure;
30+
class RpcController;
31+
} // namespace google::protobuf
32+
33+
namespace doris {
34+
35+
class CdcClientMgr {
36+
public:
37+
CdcClientMgr();
38+
~CdcClientMgr();
39+
40+
void stop();
41+
42+
// Request CDC client to handle a request
43+
void request_cdc_client_impl(const PRequestCdcClientRequest* request,
44+
PRequestCdcClientResult* result, google::protobuf::Closure* done);
45+
46+
Status send_request_to_cdc_client(const std::string& api, const std::string& params_body,
47+
std::string* response);
48+
49+
Status start_cdc_client(PRequestCdcClientResult* result);
50+
51+
#ifdef BE_TEST
52+
// For testing only: get current child PID
53+
pid_t get_child_pid() const { return _child_pid.load(); }
54+
// For testing only: set child PID directly
55+
void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); }
56+
#endif
57+
58+
private:
59+
std::mutex _start_mutex;
60+
std::atomic<pid_t> _child_pid {0};
61+
};
62+
63+
} // namespace doris

be/src/runtime/exec_env.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class HeartbeatFlags;
116116
class FrontendServiceClient;
117117
class FileMetaCache;
118118
class GroupCommitMgr;
119+
class CdcClientMgr;
119120
class TabletSchemaCache;
120121
class TabletColumnObjectPool;
121122
class UserFunctionCache;
@@ -275,6 +276,7 @@ class ExecEnv {
275276
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
276277
doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; }
277278
GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }
279+
CdcClientMgr* cdc_client_mgr() { return _cdc_client_mgr; }
278280

279281
const std::vector<StorePath>& store_paths() const { return _store_paths; }
280282

@@ -505,6 +507,7 @@ class ExecEnv {
505507
// ip:brpc_port -> frontend_indo
506508
std::map<TNetworkAddress, FrontendInfo> _frontends;
507509
GroupCommitMgr* _group_commit_mgr = nullptr;
510+
CdcClientMgr* _cdc_client_mgr = nullptr;
508511

509512
// Maybe we should use unique_ptr, but it need complete type, which means we need
510513
// to include many headers, and for some cpp file that do not need class like TabletSchemaCache,

0 commit comments

Comments
 (0)