Skip to content

Commit 4b8a2ac

Browse files
JNSimbaYour Name
authored andcommitted
[Fix](Streamingjob) Fix the issue of synchronization failure under empty tables (#59735)
### What problem does this PR solve? Fix the issue of synchronization failure under empty tables Related PR: #58898
1 parent 00503bc commit 4b8a2ac

File tree

5 files changed

+49
-47
lines changed

5 files changed

+49
-47
lines changed

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.doris.job.cdc.request.FetchRecordRequest;
2626
import org.apache.doris.job.cdc.request.WriteRecordRequest;
2727
import org.apache.doris.job.cdc.split.BinlogSplit;
28-
import org.apache.doris.job.cdc.split.SnapshotSplit;
2928

3029
import org.apache.commons.collections.CollectionUtils;
3130
import org.apache.flink.api.connector.source.SourceSplit;
@@ -125,32 +124,19 @@ private RecordWithMeta buildRecordResponse(
125124
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
126125
offsetRes.put(SPLIT_ID, split.splitId());
127126
recordResponse.setMeta(offsetRes);
128-
return recordResponse;
129127
}
128+
130129
// set meta for binlog event
131130
if (sourceReader.isBinlogSplit(split)) {
132131
Map<String, String> offsetRes =
133132
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
134133
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
135-
}
136-
}
137-
138-
// no data in this split, set meta info
139-
if (CollectionUtils.isEmpty(recordResponse.getRecords())) {
140-
if (sourceReader.isBinlogSplit(split)) {
141-
Map<String, String> offsetRes =
142-
sourceReader.extractBinlogOffset(readResult.getSplit());
143-
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
144134
recordResponse.setMeta(offsetRes);
145-
} else {
146-
SnapshotSplit snapshotSplit =
147-
objectMapper.convertValue(fetchRecord.getMeta(), SnapshotSplit.class);
148-
Map<String, String> meta = new HashMap<>();
149-
meta.put(SPLIT_ID, snapshotSplit.getSplitId());
150-
// chunk no data
151-
recordResponse.setMeta(meta);
152135
}
136+
} else {
137+
throw new RuntimeException("split state is null");
153138
}
139+
154140
sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
155141
return recordResponse;
156142
}
@@ -188,7 +174,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
188174
SourceReader sourceReader = Env.getCurrentEnv().getReader(writeRecordRequest);
189175
DorisBatchStreamLoad batchStreamLoad = null;
190176
Map<String, String> metaResponse = new HashMap<>();
191-
boolean hasData = false;
192177
long scannedRows = 0L;
193178
long scannedBytes = 0L;
194179
SplitReadResult readResult = null;
@@ -215,21 +200,12 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
215200
if (!CollectionUtils.isEmpty(serializedRecords)) {
216201
String database = writeRecordRequest.getTargetDb();
217202
String table = extractTable(element);
218-
hasData = true;
219203
for (String record : serializedRecords) {
220204
scannedRows++;
221205
byte[] dataBytes = record.getBytes();
222206
scannedBytes += dataBytes.length;
223207
batchStreamLoad.writeRecord(database, table, dataBytes);
224208
}
225-
226-
if (sourceReader.isBinlogSplit(readResult.getSplit())) {
227-
// put offset for event
228-
Map<String, String> lastMeta =
229-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
230-
lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
231-
metaResponse = lastMeta;
232-
}
233209
}
234210
// Check if maxInterval has been exceeded
235211
long elapsedTime = System.currentTimeMillis() - startTime;
@@ -245,29 +221,29 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
245221
}
246222

247223
try {
248-
if (!hasData) {
249-
// todo: need return the lastest heartbeat offset, means the maximum offset that the
250-
// current job can recover.
224+
if (readResult.getSplitState() != null) {
225+
// Set meta information for hw
226+
if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
227+
Map<String, String> offsetRes =
228+
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
229+
offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
230+
metaResponse = offsetRes;
231+
}
232+
233+
// set meta for binlog event
251234
if (sourceReader.isBinlogSplit(readResult.getSplit())) {
252235
Map<String, String> offsetRes =
253-
sourceReader.extractBinlogOffset(readResult.getSplit());
236+
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
254237
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
255-
batchStreamLoad.commitOffset(offsetRes, scannedRows, scannedBytes);
256-
return;
257-
} else {
258-
throw new RuntimeException("should not happen");
238+
metaResponse = offsetRes;
259239
}
240+
} else {
241+
throw new RuntimeException("split state is null");
260242
}
261243

262244
// wait all stream load finish
263245
batchStreamLoad.forceFlush();
264-
// update offset meta
265-
if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
266-
Map<String, String> offsetRes =
267-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
268-
offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
269-
metaResponse = offsetRes;
270-
}
246+
271247
// request fe api
272248
batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes);
273249

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.UUID;
3939
import java.util.concurrent.BlockingQueue;
4040
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.ExecutorService;
4243
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
@@ -80,6 +81,7 @@ public class DorisBatchStreamLoad implements Serializable {
8081
private BlockingQueue<BatchRecordBuffer> flushQueue;
8182
private final AtomicBoolean started;
8283
private volatile boolean loadThreadAlive = false;
84+
private final CountDownLatch loadThreadStarted = new CountDownLatch(1);
8385
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
8486
private long maxBlockedBytes;
8587
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
@@ -110,6 +112,16 @@ public DorisBatchStreamLoad(long jobId, String targetDb) {
110112
this.loadExecutorService.execute(loadAsyncExecutor);
111113
this.targetDb = targetDb;
112114
this.jobId = jobId;
115+
// Wait for the load thread to start
116+
try {
117+
if (!loadThreadStarted.await(10, TimeUnit.SECONDS)) {
118+
throw new RuntimeException("LoadAsyncExecutor thread startup timed out");
119+
}
120+
} catch (InterruptedException e) {
121+
Thread.currentThread().interrupt();
122+
throw new RuntimeException(
123+
"Thread interrupted while waiting for load thread to start", e);
124+
}
113125
}
114126

115127
/**
@@ -310,6 +322,7 @@ public LoadAsyncExecutor(int flushQueueSize, long jobId) {
310322
public void run() {
311323
LOG.info("LoadAsyncExecutor start for jobId {}", jobId);
312324
loadThreadAlive = true;
325+
loadThreadStarted.countDown();
313326
List<BatchRecordBuffer> recordList = new ArrayList<>(flushQueueSize);
314327
while (started.get()) {
315328
recordList.clear();

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
166166
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
167167
if (currentReader == null || baseReq.isReload()) {
168168
LOG.info(
169-
"No current reader or reload {}, create new split reader",
170-
baseReq.isReload());
169+
"No current reader or reload {}, create new split reader for job {}",
170+
baseReq.isReload(),
171+
baseReq.getJobId());
171172
// build split
172173
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
173174
split = splitFlag.f0;

regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ B1 2
77
A2 1
88
B2 2
99

10+
-- !select_snapshot_table3 --
11+
1012
-- !select_binlog_table1 --
1113
B1 10
1214
Doris 18

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
2525
def currentDb = (sql "select database()")[0][0]
2626
def table1 = "user_info_normal1"
2727
def table2 = "user_info_normal2"
28+
def table3 = "user_info_normal3_empty"
2829
def mysqlDb = "test_cdc_db"
2930

3031
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
3132
sql """drop table if exists ${currentDb}.${table1} force"""
3233
sql """drop table if exists ${currentDb}.${table2} force"""
34+
sql """drop table if exists ${currentDb}.${table3} force"""
3335

3436
// Pre-create table2
3537
sql """
@@ -71,6 +73,11 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
7173
// mock snapshot data
7274
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2', 1);"""
7375
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2', 2);"""
76+
sql """CREATE TABLE ${mysqlDb}.${table3} (
77+
`name` varchar(200) NOT NULL,
78+
`age` int DEFAULT NULL,
79+
PRIMARY KEY (`name`)
80+
) ENGINE=InnoDB"""
7481
}
7582

7683
sql """CREATE JOB ${jobName}
@@ -82,7 +89,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
8289
"user" = "root",
8390
"password" = "123456",
8491
"database" = "${mysqlDb}",
85-
"include_tables" = "${table1},${table2}",
92+
"include_tables" = "${table3},${table1},${table2}",
8693
"offset" = "initial"
8794
)
8895
TO DATABASE ${currentDb} (
@@ -96,6 +103,8 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
96103
assert showTables.size() == 1
97104
def showTables2 = sql """ show tables from ${currentDb} like '${table2}'; """
98105
assert showTables2.size() == 1
106+
def showTables3 = sql """ show tables from ${currentDb} like '${table3}'; """
107+
assert showTables3.size() == 1
99108

100109
// check table schema correct
101110
def showTbl1 = sql """show create table ${currentDb}.${table1}"""
@@ -113,7 +122,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
113122
def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
114123
log.info("jobSuccendCount: " + jobSuccendCount)
115124
// check job status and succeed task count larger than 2
116-
jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0)
125+
jobSuccendCount.size() == 1 && '3' <= jobSuccendCount.get(0).get(0)
117126
}
118127
)
119128
} catch (Exception ex){
@@ -127,6 +136,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
127136
// check snapshot data
128137
qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name asc """
129138
qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name asc """
139+
qt_select_snapshot_table3 """ SELECT * FROM ${table3} order by name asc """
130140

131141
// mock mysql incremental into
132142
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {

0 commit comments

Comments
 (0)