Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
outputs:
broker_changes: ${{ steps.filter.outputs.broker_changes }}
docs_changes: ${{ steps.filter.outputs.docs_changes }}
cdc_client_changes: ${{ steps.filter.outputs.cdc_client_changes }}
steps:
- name: Checkout ${{ github.ref }}
uses: actions/checkout@v3
Expand All @@ -53,9 +54,11 @@ jobs:
with:
filters: |
broker_changes:
- 'fs_brokers/**'
- 'fs_brokers/apache_hdfs_broker/**'
docs_changes:
- 'docs/**'
cdc_client_changes:
- 'fs_brokers/cdc_client/**'
build-broker:
name: Build Broker
needs: changes
Expand Down Expand Up @@ -92,6 +95,41 @@ jobs:
- name: Build broker
run: |
cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
build-cdc-client:
name: Build Cdc Client
needs: changes
if: ${{ needs.changes.outputs.cdc_client_changes == 'true' }}
runs-on: ubuntu-latest
steps:
- name: Checkout ${{ github.ref }}
uses: actions/checkout@v3

- name: Setup java
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: '17'

- name: Setup thrift
run: |
pushd thirdparty
branch="${{ github.base_ref }}"
if [[ -z "${branch}" ]] || [[ "${branch}" == 'master' || "${branch}" == 'branch-4.0' || "${branch}" == 'branch-3.0' || "${branch}" == 'branch-2.1' ]]; then
curl -L https://github.com/apache/doris-thirdparty/releases/download/automation/doris-thirdparty-prebuilt-linux-x86_64.tar.xz \
-o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
else
curl -L "https://github.com/apache/doris-thirdparty/releases/download/automation-${branch/branch-/}/doris-thirdparty-prebuilt-linux-x86_64.tar.xz" \
-o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
fi
tar -xvf doris-thirdparty-prebuilt-linux-x86_64.tar.xz
popd
export PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"

thrift --version

- name: Build cdc client
run: |
cd fs_brokers/cdc_client/ && /bin/bash build.sh
# build-docs:
# name: Build Documents
# needs: changes
Expand Down
20 changes: 17 additions & 3 deletions be/src/runtime/cdc_client_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#include "runtime/cdc_client_mgr.h"

#include <brpc/closure_guard.h>
#include <fcntl.h>
#include <fmt/core.h>
#include <gen_cpp/internal_service.pb.h>
#include <google/protobuf/stubs/callback.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>

#include <cstdio>
#ifndef __APPLE__
Expand Down Expand Up @@ -129,24 +131,25 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
if (kill(exist_pid, 0) == 0) {
// Process exists, verify it's actually our CDC client by health check
std::string check_response;
auto check_st = check_cdc_client_health(1, 0, check_response);
auto check_st = check_cdc_client_health(3, 1, check_response);
if (check_st.ok()) {
// Process exists and responding, CDC client is running
return Status::OK();
} else {
// Process exists but CDC client not responding
// Either it's a different process (PID reused) or CDC client is unhealthy
// Reset PID and return error
_child_pid.store(0);
st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
st.to_protobuf(result->mutable_status());
return st;
}
} else {
LOG(INFO) << "CDC client is dead, pid=" << exist_pid;
// Process is dead, reset PID and continue to start
_child_pid.store(0);
}
#endif
} else {
LOG(INFO) << "CDC client has never been started";
}

const char* doris_home = getenv("DORIS_HOME");
Expand Down Expand Up @@ -199,6 +202,17 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif
// Redirect stdout and stderr to log out file
std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
if (out_fd < 0) {
perror("open cdc-client.out file failed");
exit(1);
}
dup2(out_fd, STDOUT_FILENO);
dup2(out_fd, STDERR_FILENO);
close(out_fd);

// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
Expand Down
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Usage: $0 <options>
--broker build Broker. Default ON.
--hive-udf build Hive UDF library for Ingestion Load. Default ON.
--be-java-extensions build Backend java extensions. Default ON.
--be-cdc-client build Cdc Client for backend. Default ON.
--be-extension-ignore build be-java-extensions package, choose which modules to ignore. Multiple modules separated by commas.
--clean clean and build target
--output specify the output directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

create schema doris_test;
create schema catalog_pg_test;
create schema cdc_test;
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ services:
POSTGRES_PASSWORD: 123456
ports:
- ${DOCKER_PG_14_EXTERNAL_PORT}:5432
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_wal_senders=30"
- "-c"
- "max_replication_slots=30"
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres && psql -U postgres -c 'SELECT 1 FROM doris_test.deadline;'" ]
interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class DataSourceConfigKeys {
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String DATABASE = "database";
public static final String SCHEMA = "schema";
public static final String INCLUDE_TABLES = "include_tables";
public static final String EXCLUDE_TABLES = "exclude_tables";
// initial,earliest,latest,{binlog,postion},\d{13}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
package org.apache.doris.job.common;

public enum DataSourceType {
MYSQL
MYSQL,
POSTGRES
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.DRIVER_URL,
DataSourceConfigKeys.DRIVER_CLASS,
DataSourceConfigKeys.DATABASE,
DataSourceConfigKeys.SCHEMA,
DataSourceConfigKeys.INCLUDE_TABLES,
DataSourceConfigKeys.EXCLUDE_TABLES
DataSourceConfigKeys.EXCLUDE_TABLES,
DataSourceConfigKeys.SPLIT_SIZE
);

public static void validateSource(Map<String, String> input) throws IllegalArgumentException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -210,6 +209,11 @@ private void initSourceJob() {
init();
checkRequiredSourceProperties();
List<String> createTbls = createTableIfNotExists();
if (sourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES) == null) {
// cdc need the final includeTables
String includeTables = String.join(",", createTbls);
sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES, includeTables);
}
this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties);
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
Expand All @@ -232,9 +236,6 @@ private void checkRequiredSourceProperties() {
"password is required property");
Preconditions.checkArgument(sourceProperties.get(DataSourceConfigKeys.DATABASE) != null,
"database is required property");
Preconditions.checkArgument(sourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES) != null
|| sourceProperties.get(DataSourceConfigKeys.EXCLUDE_TABLES) != null,
"Either include_tables or exclude_tables must be specified");
if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
sourceProperties.put(DataSourceConfigKeys.OFFSET, DataSourceConfigKeys.OFFSET_LATEST);
}
Expand Down Expand Up @@ -515,6 +516,7 @@ protected void fetchMeta() {
offsetProvider.fetchRemoteMeta(new HashMap<>());
}
} catch (Exception ex) {
//todo: The job status = MANUAL_PAUSE_ERR, No need to set failureReason again
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to fetch meta, " + ex.getMessage());
Expand Down Expand Up @@ -572,6 +574,7 @@ public void onStreamTaskFail(AbstractStreamingTask task) throws JobException {

public void onStreamTaskSuccess(AbstractStreamingTask task) {
try {
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
AbstractStreamingTask nextTask = createStreamingTask();
Expand Down Expand Up @@ -729,7 +732,7 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getJobName()));
trow.addToColumnValue(new TCell().setStringVal(getCreateUser().getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(getShowSQL()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
Expand All @@ -738,30 +741,30 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(properties != null && !properties.isEmpty()
? GsonUtils.GSON.toJson(properties) : FeConstants.null_string));
? GsonUtils.GSON.toJson(properties) : ""));

if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowCurrentOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
}

if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowMaxOffset())) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowMaxOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
}
if (tvfType != null) {
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
jobStatistic == null ? "" : jobStatistic.toJson()));
} else {
trow.addToColumnValue(new TCell().setStringVal(
nonTxnJobStatistic == null ? FeConstants.null_string : nonTxnJobStatistic.toJson()));
nonTxnJobStatistic == null ? "" : nonTxnJobStatistic.toJson()));
}
trow.addToColumnValue(new TCell().setStringVal(failureReason == null
? FeConstants.null_string : failureReason.getMsg()));
? "" : failureReason.getMsg()));
trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
? FeConstants.null_string : jobRuntimeMsg));
? "" : jobRuntimeMsg));
return trow;
}

Expand Down Expand Up @@ -1064,7 +1067,7 @@ public void gsonPostProcess() throws IOException {
* The current streamingTask times out; create a new streamingTask.
* Only applies to StreamingMultiTask.
*/
public void processTimeoutTasks() {
public void processTimeoutTasks() throws JobException {
if (!(runningStreamTask instanceof StreamingMultiTblTask)) {
return;
}
Expand All @@ -1073,16 +1076,8 @@ public void processTimeoutTasks() {
StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) this.runningStreamTask;
if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
&& runningMultiTask.isTimeout()) {
runningMultiTask.cancel(false);
runningMultiTask.setErrMsg("task cancelled cause timeout");

// renew streaming multi task
this.runningStreamTask = createStreamingMultiTblTask();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
log.info("create new streaming multi tasks due to timeout, for job {}, task {} ",
getJobId(), runningStreamTask.getTaskId());
recordTasks(runningStreamTask);
runningMultiTask.onFail("task failed cause timeout");
// renew streaming task by auto resume
}
} finally {
writeUnlock();
Expand All @@ -1096,20 +1091,22 @@ public void commitOffset(CommitOffsetRequest offsetRequest) throws JobException
}
writeLock();
try {
if (offsetRequest.getScannedRows() == 0 && offsetRequest.getScannedBytes() == 0) {
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
op.setHasMoreData(false);
}
updateNoTxnJobStatisticAndOffset(offsetRequest);
if (this.runningStreamTask != null
&& this.runningStreamTask instanceof StreamingMultiTblTask) {
if (this.runningStreamTask.getTaskId() != offsetRequest.getTaskId()) {
throw new JobException("Task id mismatch when commit offset. expected: "
+ this.runningStreamTask.getTaskId() + ", actual: " + offsetRequest.getTaskId());
}
updateNoTxnJobStatisticAndOffset(offsetRequest);
if (offsetRequest.getScannedRows() == 0 && offsetRequest.getScannedBytes() == 0) {
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
op.setHasMoreData(false);
}

persistOffsetProviderIfNeed();
((StreamingMultiTblTask) this.runningStreamTask).successCallback(offsetRequest);
}

} finally {
writeUnlock();
}
Expand All @@ -1134,6 +1131,7 @@ public void replayOffsetProviderIfNeed() throws JobException {
* 2. Clean chunk info in meta table (jdbc)
*/
public void cleanup() throws JobException {
log.info("cleanup streaming job {}", getJobId());
// s3 tvf clean offset
if (tvfType != null && Config.isCloudMode()) {
Cloud.DeleteStreamingJobResponse resp = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private void handlePendingState() throws JobException {
streamingInsertJob.setAutoResumeCount(0);
}

private void handleRunningState() {
private void handleRunningState() throws JobException {
streamingInsertJob.processTimeoutTasks();
streamingInsertJob.fetchMeta();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ private void sendWriteRequest() throws JobException {
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.error("Failed to get split from backend, {}", result.getStatus().getErrorMsgs(0));
log.error("Failed to send write records request, {}", result.getStatus().getErrorMsgs(0));
throw new JobException(
"Failed to get split from backend," + result.getStatus().getErrorMsgs(0) + ", response: "
"Failed to send write records request," + result.getStatus().getErrorMsgs(0) + ", response: "
+ result.getResponse());
}
String response = result.getResponse();
Expand All @@ -142,7 +142,7 @@ private void sendWriteRequest() throws JobException {
return;
}
} catch (JsonProcessingException e) {
log.error("Failed to parse write records response: {}", response, e);
log.warn("Failed to parse write records response: {}", response);
throw new JobException("Failed to parse write records response: " + response);
}
throw new JobException("Failed to send write records request , error message: " + response);
Expand Down Expand Up @@ -257,7 +257,11 @@ public void closeOrReleaseResources() {
}

public boolean isTimeout() {
return (System.currentTimeMillis() - createTimeMs) > timeoutMs;
if (startTimeMs == null) {
// It's still pending, waiting for scheduling.
return false;
}
return (System.currentTimeMillis() - startTimeMs) > timeoutMs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ public String showRange() {
return new Gson().toJson(showMap);
}
}

@Override
public String toString() {
return "JdbcOffset{"
+ "split="
+ split
+ '}';
}
}
Loading
Loading