From 4b8a2ac3aeeb9cc6d52e27ce2d98f891b1df03e6 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 18:52:46 +0800 Subject: [PATCH] [Fix](Streamingjob) Fix the issue of synchronization failure under empty tables (#59735) ### What problem does this PR solve? Fix the issue of synchronization failure under empty tables Related PR: https://github.com/apache/doris/pull/58898 --- .../service/PipelineCoordinator.java | 62 ++++++------------- .../cdcclient/sink/DorisBatchStreamLoad.java | 13 ++++ .../reader/JdbcIncrementalSourceReader.java | 5 +- .../cdc/test_streaming_mysql_job.out | 2 + .../cdc/test_streaming_mysql_job.groovy | 14 ++++- 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 591c4790e6ca42..187003ad0e6754 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -25,7 +25,6 @@ import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.cdc.request.WriteRecordRequest; import org.apache.doris.job.cdc.split.BinlogSplit; -import org.apache.doris.job.cdc.split.SnapshotSplit; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.connector.source.SourceSplit; @@ -125,32 +124,19 @@ private RecordWithMeta buildRecordResponse( sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); offsetRes.put(SPLIT_ID, split.splitId()); recordResponse.setMeta(offsetRes); - return recordResponse; } + // set meta for binlog event if (sourceReader.isBinlogSplit(split)) { Map offsetRes = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - } - } - - // no data in this split, set meta info - if (CollectionUtils.isEmpty(recordResponse.getRecords())) { - if (sourceReader.isBinlogSplit(split)) { - Map offsetRes = - sourceReader.extractBinlogOffset(readResult.getSplit()); - offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); recordResponse.setMeta(offsetRes); - } else { - SnapshotSplit snapshotSplit = - objectMapper.convertValue(fetchRecord.getMeta(), SnapshotSplit.class); - Map meta = new HashMap<>(); - meta.put(SPLIT_ID, snapshotSplit.getSplitId()); - // chunk no data - recordResponse.setMeta(meta); } + } else { + throw new RuntimeException("split state is null"); } + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); return recordResponse; } @@ -188,7 +174,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception SourceReader sourceReader = Env.getCurrentEnv().getReader(writeRecordRequest); DorisBatchStreamLoad batchStreamLoad = null; Map metaResponse = new HashMap<>(); - boolean hasData = false; long scannedRows = 0L; long scannedBytes = 0L; SplitReadResult readResult = null; @@ -215,21 +200,12 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception if (!CollectionUtils.isEmpty(serializedRecords)) { String database = writeRecordRequest.getTargetDb(); String table = extractTable(element); - hasData = true; for (String record : serializedRecords) { scannedRows++; byte[] dataBytes = record.getBytes(); scannedBytes += dataBytes.length; batchStreamLoad.writeRecord(database, table, dataBytes); } - - if (sourceReader.isBinlogSplit(readResult.getSplit())) { - // put offset for event - Map lastMeta = - sourceReader.extractBinlogStateOffset(readResult.getSplitState()); - lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - metaResponse = lastMeta; - } } // Check if maxInterval has been exceeded long elapsedTime = System.currentTimeMillis() - startTime; @@ -245,29 +221,29 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } try { - if (!hasData) { - // todo: need return the lastest heartbeat offset, means the maximum offset that the - // current job can recover. + if (readResult.getSplitState() != null) { + // Set meta information for hw + if (sourceReader.isSnapshotSplit(readResult.getSplit())) { + Map offsetRes = + sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + offsetRes.put(SPLIT_ID, readResult.getSplit().splitId()); + metaResponse = offsetRes; + } + + // set meta for binlog event if (sourceReader.isBinlogSplit(readResult.getSplit())) { Map offsetRes = - sourceReader.extractBinlogOffset(readResult.getSplit()); + sourceReader.extractBinlogStateOffset(readResult.getSplitState()); offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - batchStreamLoad.commitOffset(offsetRes, scannedRows, scannedBytes); - return; - } else { - throw new RuntimeException("should not happen"); + metaResponse = offsetRes; } + } else { + throw new RuntimeException("split state is null"); } // wait all stream load finish batchStreamLoad.forceFlush(); - // update offset meta - if (sourceReader.isSnapshotSplit(readResult.getSplit())) { - Map offsetRes = - sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); - offsetRes.put(SPLIT_ID, readResult.getSplit().splitId()); - metaResponse = offsetRes; - } + // request fe api batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java index bf6a4102801059..1604b1c030539e 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; @@ -80,6 +81,7 @@ public class DorisBatchStreamLoad implements Serializable { private BlockingQueue flushQueue; private final AtomicBoolean started; private volatile boolean loadThreadAlive = false; + private final CountDownLatch loadThreadStarted = new CountDownLatch(1); private AtomicReference exception = new AtomicReference<>(null); private long maxBlockedBytes; private final AtomicLong currentCacheBytes = new AtomicLong(0L); @@ -110,6 +112,16 @@ public DorisBatchStreamLoad(long jobId, String targetDb) { this.loadExecutorService.execute(loadAsyncExecutor); this.targetDb = targetDb; this.jobId = jobId; + // Wait for the load thread to start + try { + if (!loadThreadStarted.await(10, TimeUnit.SECONDS)) { + throw new RuntimeException("LoadAsyncExecutor thread startup timed out"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Thread interrupted while waiting for load thread to start", e); + } } /** @@ -310,6 +322,7 @@ public LoadAsyncExecutor(int flushQueueSize, long jobId) { public void run() { LOG.info("LoadAsyncExecutor start for jobId {}", jobId); loadThreadAlive = true; + loadThreadStarted.countDown(); List recordList = new ArrayList<>(flushQueueSize); while (started.get()) { recordList.clear(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index f9e11f6b029aa3..541e3354828820 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -166,8 +166,9 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc Fetcher currentReader = this.getCurrentReader(); if (currentReader == null || baseReq.isReload()) { LOG.info( - "No current reader or reload {}, create new split reader", - baseReq.isReload()); + "No current reader or reload {}, create new split reader for job {}", + baseReq.isReload(), + baseReq.getJobId()); // build split Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); split = splitFlag.f0; diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out index aebbb6815e3952..f5e148123ae93d 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out @@ -7,6 +7,8 @@ B1 2 A2 1 B2 2 +-- !select_snapshot_table3 -- + -- !select_binlog_table1 -- B1 10 Doris 18 diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy index d77e2b769bb82a..a6bc1d174315e3 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy @@ -25,11 +25,13 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do def currentDb = (sql "select database()")[0][0] def table1 = "user_info_normal1" def table2 = "user_info_normal2" + def table3 = "user_info_normal3_empty" def mysqlDb = "test_cdc_db" sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" sql """drop table if exists ${currentDb}.${table1} force""" sql """drop table if exists ${currentDb}.${table2} force""" + sql """drop table if exists ${currentDb}.${table3} force""" // Pre-create table2 sql """ @@ -71,6 +73,11 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do // mock snapshot data sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2', 1);""" sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2', 2);""" + sql """CREATE TABLE ${mysqlDb}.${table3} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" } sql """CREATE JOB ${jobName} @@ -82,7 +89,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do "user" = "root", "password" = "123456", "database" = "${mysqlDb}", - "include_tables" = "${table1},${table2}", + "include_tables" = "${table3},${table1},${table2}", "offset" = "initial" ) TO DATABASE ${currentDb} ( @@ -96,6 +103,8 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do assert showTables.size() == 1 def showTables2 = sql """ show tables from ${currentDb} like '${table2}'; """ assert showTables2.size() == 1 + def showTables3 = sql """ show tables from ${currentDb} like '${table3}'; """ + assert showTables3.size() == 1 // check table schema correct def showTbl1 = sql """show create table ${currentDb}.${table1}""" @@ -113,7 +122,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobSuccendCount: " + jobSuccendCount) // check job status and succeed task count larger than 2 - jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0) + jobSuccendCount.size() == 1 && '3' <= jobSuccendCount.get(0).get(0) } ) } catch (Exception ex){ @@ -127,6 +136,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do // check snapshot data qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name asc """ qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name asc """ + qt_select_snapshot_table3 """ SELECT * FROM ${table3} order by name asc """ // mock mysql incremental into connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {