Skip to content

Commit 5f92656

Browse files
authored
[Feature](Streaming Job) Extend streaming job to support Postgres synchronization (#59461)
### What problem does this PR solve? This Issues (#58896) implements multi-table synchronization in MySQL, The main purpose of this PR is to extend the data source to Postgres.
1 parent b69e79d commit 5f92656

File tree

45 files changed

+2669
-367
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2669
-367
lines changed

.github/workflows/build-extension.yml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ jobs:
4040
outputs:
4141
broker_changes: ${{ steps.filter.outputs.broker_changes }}
4242
docs_changes: ${{ steps.filter.outputs.docs_changes }}
43+
cdc_client_changes: ${{ steps.filter.outputs.cdc_client_changes }}
4344
steps:
4445
- name: Checkout ${{ github.ref }}
4546
uses: actions/checkout@v3
@@ -53,9 +54,11 @@ jobs:
5354
with:
5455
filters: |
5556
broker_changes:
56-
- 'fs_brokers/**'
57+
- 'fs_brokers/apache_hdfs_broker/**'
5758
docs_changes:
5859
- 'docs/**'
60+
cdc_client_changes:
61+
- 'fs_brokers/cdc_client/**'
5962
build-broker:
6063
name: Build Broker
6164
needs: changes
@@ -92,6 +95,41 @@ jobs:
9295
- name: Build broker
9396
run: |
9497
cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
98+
build-cdc-client:
99+
name: Build Cdc Client
100+
needs: changes
101+
if: ${{ needs.changes.outputs.cdc_client_changes == 'true' }}
102+
runs-on: ubuntu-latest
103+
steps:
104+
- name: Checkout ${{ github.ref }}
105+
uses: actions/checkout@v3
106+
107+
- name: Setup java
108+
uses: actions/setup-java@v2
109+
with:
110+
distribution: adopt
111+
java-version: '17'
112+
113+
- name: Setup thrift
114+
run: |
115+
pushd thirdparty
116+
branch="${{ github.base_ref }}"
117+
if [[ -z "${branch}" ]] || [[ "${branch}" == 'master' || "${branch}" == 'branch-4.0' || "${branch}" == 'branch-3.0' || "${branch}" == 'branch-2.1' ]]; then
118+
curl -L https://github.com/apache/doris-thirdparty/releases/download/automation/doris-thirdparty-prebuilt-linux-x86_64.tar.xz \
119+
-o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
120+
else
121+
curl -L "https://github.com/apache/doris-thirdparty/releases/download/automation-${branch/branch-/}/doris-thirdparty-prebuilt-linux-x86_64.tar.xz" \
122+
-o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
123+
fi
124+
tar -xvf doris-thirdparty-prebuilt-linux-x86_64.tar.xz
125+
popd
126+
export PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"
127+
128+
thrift --version
129+
130+
- name: Build cdc client
131+
run: |
132+
cd fs_brokers/cdc_client/ && /bin/bash build.sh
95133
# build-docs:
96134
# name: Build Documents
97135
# needs: changes

be/src/runtime/cdc_client_mgr.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
#include "runtime/cdc_client_mgr.h"
1919

2020
#include <brpc/closure_guard.h>
21+
#include <fcntl.h>
2122
#include <fmt/core.h>
2223
#include <gen_cpp/internal_service.pb.h>
2324
#include <google/protobuf/stubs/callback.h>
2425
#include <signal.h>
2526
#include <sys/stat.h>
2627
#include <sys/wait.h>
28+
#include <unistd.h>
2729

2830
#include <cstdio>
2931
#ifndef __APPLE__
@@ -129,24 +131,25 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
129131
if (kill(exist_pid, 0) == 0) {
130132
// Process exists, verify it's actually our CDC client by health check
131133
std::string check_response;
132-
auto check_st = check_cdc_client_health(1, 0, check_response);
134+
auto check_st = check_cdc_client_health(3, 1, check_response);
133135
if (check_st.ok()) {
134136
// Process exists and responding, CDC client is running
135137
return Status::OK();
136138
} else {
137139
// Process exists but CDC client not responding
138140
// Either it's a different process (PID reused) or CDC client is unhealthy
139-
// Reset PID and return error
140-
_child_pid.store(0);
141141
st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
142142
st.to_protobuf(result->mutable_status());
143143
return st;
144144
}
145145
} else {
146+
LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
146147
// Process is dead, reset PID and continue to start
147148
_child_pid.store(0);
148149
}
149150
#endif
151+
} else {
152+
LOG(INFO) << "CDC client has never been started";
150153
}
151154

152155
const char* doris_home = getenv("DORIS_HOME");
@@ -199,6 +202,17 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
199202
#ifndef __APPLE__
200203
prctl(PR_SET_PDEATHSIG, SIGKILL);
201204
#endif
205+
// Redirect stdout and stderr to log out file
206+
std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
207+
int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
208+
if (out_fd < 0) {
209+
perror("open cdc-client.out file failed");
210+
exit(1);
211+
}
212+
dup2(out_fd, STDOUT_FILENO);
213+
dup2(out_fd, STDERR_FILENO);
214+
close(out_fd);
215+
202216
// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
203217
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
204218
cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);

build.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ Usage: $0 <options>
5555
--broker build Broker. Default ON.
5656
--hive-udf build Hive UDF library for Ingestion Load. Default ON.
5757
--be-java-extensions build Backend java extensions. Default ON.
58+
--be-cdc-client build Cdc Client for backend. Default ON.
5859
--be-extension-ignore build be-java-extensions package, choose which modules to ignore. Multiple modules separated by commas.
5960
--clean clean and build target
6061
--output specify the output directory

docker/thirdparties/docker-compose/postgresql/init/01-create-schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@
1717

1818
create schema doris_test;
1919
create schema catalog_pg_test;
20+
create schema cdc_test;

docker/thirdparties/docker-compose/postgresql/postgresql-14.yaml.tpl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ services:
2525
POSTGRES_PASSWORD: 123456
2626
ports:
2727
- ${DOCKER_PG_14_EXTERNAL_PORT}:5432
28+
command:
29+
- "postgres"
30+
- "-c"
31+
- "wal_level=logical"
32+
- "-c"
33+
- "max_wal_senders=30"
34+
- "-c"
35+
- "max_replication_slots=30"
2836
healthcheck:
2937
test: [ "CMD-SHELL", "pg_isready -U postgres && psql -U postgres -c 'SELECT 1 FROM doris_test.deadline;'" ]
3038
interval: 5s

fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class DataSourceConfigKeys {
2424
public static final String USER = "user";
2525
public static final String PASSWORD = "password";
2626
public static final String DATABASE = "database";
27+
public static final String SCHEMA = "schema";
2728
public static final String INCLUDE_TABLES = "include_tables";
2829
public static final String EXCLUDE_TABLES = "exclude_tables";
2930
// initial,earliest,latest,{binlog,postion},\d{13}

fe/fe-core/src/main/java/org/apache/doris/job/common/DataSourceType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818
package org.apache.doris.job.common;
1919

2020
public enum DataSourceType {
21-
MYSQL
21+
MYSQL,
22+
POSTGRES
2223
}

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ public class DataSourceConfigValidator {
3434
DataSourceConfigKeys.DRIVER_URL,
3535
DataSourceConfigKeys.DRIVER_CLASS,
3636
DataSourceConfigKeys.DATABASE,
37+
DataSourceConfigKeys.SCHEMA,
3738
DataSourceConfigKeys.INCLUDE_TABLES,
38-
DataSourceConfigKeys.EXCLUDE_TABLES
39+
DataSourceConfigKeys.EXCLUDE_TABLES,
40+
DataSourceConfigKeys.SPLIT_SIZE
3941
);
4042

4143
public static void validateSource(Map<String, String> input) throws IllegalArgumentException {

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.doris.common.Config;
2727
import org.apache.doris.common.ErrorCode;
2828
import org.apache.doris.common.ErrorReport;
29-
import org.apache.doris.common.FeConstants;
3029
import org.apache.doris.common.InternalErrorCode;
3130
import org.apache.doris.common.Pair;
3231
import org.apache.doris.common.UserException;
@@ -210,6 +209,11 @@ private void initSourceJob() {
210209
init();
211210
checkRequiredSourceProperties();
212211
List<String> createTbls = createTableIfNotExists();
212+
if (sourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES) == null) {
213+
// cdc need the final includeTables
214+
String includeTables = String.join(",", createTbls);
215+
sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES, includeTables);
216+
}
213217
this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties);
214218
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
215219
rdsOffsetProvider.splitChunks(createTbls);
@@ -232,9 +236,6 @@ private void checkRequiredSourceProperties() {
232236
"password is required property");
233237
Preconditions.checkArgument(sourceProperties.get(DataSourceConfigKeys.DATABASE) != null,
234238
"database is required property");
235-
Preconditions.checkArgument(sourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES) != null
236-
|| sourceProperties.get(DataSourceConfigKeys.EXCLUDE_TABLES) != null,
237-
"Either include_tables or exclude_tables must be specified");
238239
if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
239240
sourceProperties.put(DataSourceConfigKeys.OFFSET, DataSourceConfigKeys.OFFSET_LATEST);
240241
}
@@ -515,6 +516,7 @@ protected void fetchMeta() {
515516
offsetProvider.fetchRemoteMeta(new HashMap<>());
516517
}
517518
} catch (Exception ex) {
519+
//todo: The job status = MANUAL_PAUSE_ERR, No need to set failureReason again
518520
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
519521
failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
520522
"Failed to fetch meta, " + ex.getMessage());
@@ -572,6 +574,7 @@ public void onStreamTaskFail(AbstractStreamingTask task) throws JobException {
572574

573575
public void onStreamTaskSuccess(AbstractStreamingTask task) {
574576
try {
577+
resetFailureInfo(null);
575578
succeedTaskCount.incrementAndGet();
576579
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
577580
AbstractStreamingTask nextTask = createStreamingTask();
@@ -729,7 +732,7 @@ public TRow getTvfInfo() {
729732
trow.addToColumnValue(new TCell().setStringVal(getJobName()));
730733
trow.addToColumnValue(new TCell().setStringVal(getCreateUser().getQualifiedUser()));
731734
trow.addToColumnValue(new TCell().setStringVal(getJobConfig().getExecuteType().name()));
732-
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
735+
trow.addToColumnValue(new TCell().setStringVal(""));
733736
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
734737
trow.addToColumnValue(new TCell().setStringVal(getShowSQL()));
735738
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
@@ -738,30 +741,30 @@ public TRow getTvfInfo() {
738741
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
739742
trow.addToColumnValue(new TCell().setStringVal(getComment()));
740743
trow.addToColumnValue(new TCell().setStringVal(properties != null && !properties.isEmpty()
741-
? GsonUtils.GSON.toJson(properties) : FeConstants.null_string));
744+
? GsonUtils.GSON.toJson(properties) : ""));
742745

743746
if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
744747
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowCurrentOffset()));
745748
} else {
746-
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
749+
trow.addToColumnValue(new TCell().setStringVal(""));
747750
}
748751

749752
if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowMaxOffset())) {
750753
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowMaxOffset()));
751754
} else {
752-
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
755+
trow.addToColumnValue(new TCell().setStringVal(""));
753756
}
754757
if (tvfType != null) {
755758
trow.addToColumnValue(new TCell().setStringVal(
756-
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
759+
jobStatistic == null ? "" : jobStatistic.toJson()));
757760
} else {
758761
trow.addToColumnValue(new TCell().setStringVal(
759-
nonTxnJobStatistic == null ? FeConstants.null_string : nonTxnJobStatistic.toJson()));
762+
nonTxnJobStatistic == null ? "" : nonTxnJobStatistic.toJson()));
760763
}
761764
trow.addToColumnValue(new TCell().setStringVal(failureReason == null
762-
? FeConstants.null_string : failureReason.getMsg()));
765+
? "" : failureReason.getMsg()));
763766
trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
764-
? FeConstants.null_string : jobRuntimeMsg));
767+
? "" : jobRuntimeMsg));
765768
return trow;
766769
}
767770

@@ -1064,7 +1067,7 @@ public void gsonPostProcess() throws IOException {
10641067
* The current streamingTask times out; create a new streamingTask.
10651068
* Only applies to StreamingMultiTask.
10661069
*/
1067-
public void processTimeoutTasks() {
1070+
public void processTimeoutTasks() throws JobException {
10681071
if (!(runningStreamTask instanceof StreamingMultiTblTask)) {
10691072
return;
10701073
}
@@ -1073,16 +1076,8 @@ public void processTimeoutTasks() {
10731076
StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) this.runningStreamTask;
10741077
if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
10751078
&& runningMultiTask.isTimeout()) {
1076-
runningMultiTask.cancel(false);
1077-
runningMultiTask.setErrMsg("task cancelled cause timeout");
1078-
1079-
// renew streaming multi task
1080-
this.runningStreamTask = createStreamingMultiTblTask();
1081-
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
1082-
this.runningStreamTask.setStatus(TaskStatus.PENDING);
1083-
log.info("create new streaming multi tasks due to timeout, for job {}, task {} ",
1084-
getJobId(), runningStreamTask.getTaskId());
1085-
recordTasks(runningStreamTask);
1079+
runningMultiTask.onFail("task failed cause timeout");
1080+
// renew streaming task by auto resume
10861081
}
10871082
} finally {
10881083
writeUnlock();
@@ -1096,20 +1091,22 @@ public void commitOffset(CommitOffsetRequest offsetRequest) throws JobException
10961091
}
10971092
writeLock();
10981093
try {
1099-
if (offsetRequest.getScannedRows() == 0 && offsetRequest.getScannedBytes() == 0) {
1100-
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
1101-
op.setHasMoreData(false);
1102-
}
1103-
updateNoTxnJobStatisticAndOffset(offsetRequest);
11041094
if (this.runningStreamTask != null
11051095
&& this.runningStreamTask instanceof StreamingMultiTblTask) {
11061096
if (this.runningStreamTask.getTaskId() != offsetRequest.getTaskId()) {
11071097
throw new JobException("Task id mismatch when commit offset. expected: "
11081098
+ this.runningStreamTask.getTaskId() + ", actual: " + offsetRequest.getTaskId());
11091099
}
1100+
updateNoTxnJobStatisticAndOffset(offsetRequest);
1101+
if (offsetRequest.getScannedRows() == 0 && offsetRequest.getScannedBytes() == 0) {
1102+
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
1103+
op.setHasMoreData(false);
1104+
}
1105+
11101106
persistOffsetProviderIfNeed();
11111107
((StreamingMultiTblTask) this.runningStreamTask).successCallback(offsetRequest);
11121108
}
1109+
11131110
} finally {
11141111
writeUnlock();
11151112
}
@@ -1134,6 +1131,7 @@ public void replayOffsetProviderIfNeed() throws JobException {
11341131
* 2. Clean chunk info in meta table (jdbc)
11351132
*/
11361133
public void cleanup() throws JobException {
1134+
log.info("cleanup streaming job {}", getJobId());
11371135
// s3 tvf clean offset
11381136
if (tvfType != null && Config.isCloudMode()) {
11391137
Cloud.DeleteStreamingJobResponse resp = null;

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private void handlePendingState() throws JobException {
7171
streamingInsertJob.setAutoResumeCount(0);
7272
}
7373

74-
private void handleRunningState() {
74+
private void handleRunningState() throws JobException {
7575
streamingInsertJob.processTimeoutTasks();
7676
streamingInsertJob.fetchMeta();
7777
}

0 commit comments

Comments
 (0)