Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
Expand Down Expand Up @@ -125,32 +124,19 @@ private RecordWithMeta buildRecordResponse(
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, split.splitId());
recordResponse.setMeta(offsetRes);
return recordResponse;
}

// set meta for binlog event
if (sourceReader.isBinlogSplit(split)) {
Map<String, String> offsetRes =
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
}
}

// no data in this split, set meta info
if (CollectionUtils.isEmpty(recordResponse.getRecords())) {
if (sourceReader.isBinlogSplit(split)) {
Map<String, String> offsetRes =
sourceReader.extractBinlogOffset(readResult.getSplit());
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
recordResponse.setMeta(offsetRes);
} else {
SnapshotSplit snapshotSplit =
objectMapper.convertValue(fetchRecord.getMeta(), SnapshotSplit.class);
Map<String, String> meta = new HashMap<>();
meta.put(SPLIT_ID, snapshotSplit.getSplitId());
// chunk no data
recordResponse.setMeta(meta);
}
} else {
throw new RuntimeException("split state is null");
}

sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
return recordResponse;
}
Expand Down Expand Up @@ -188,7 +174,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
SourceReader sourceReader = Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
boolean hasData = false;
long scannedRows = 0L;
long scannedBytes = 0L;
SplitReadResult readResult = null;
Expand All @@ -215,21 +200,12 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
if (!CollectionUtils.isEmpty(serializedRecords)) {
String database = writeRecordRequest.getTargetDb();
String table = extractTable(element);
hasData = true;
for (String record : serializedRecords) {
scannedRows++;
byte[] dataBytes = record.getBytes();
scannedBytes += dataBytes.length;
batchStreamLoad.writeRecord(database, table, dataBytes);
}

if (sourceReader.isBinlogSplit(readResult.getSplit())) {
// put offset for event
Map<String, String> lastMeta =
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
metaResponse = lastMeta;
}
}
// Check if maxInterval has been exceeded
long elapsedTime = System.currentTimeMillis() - startTime;
Expand All @@ -245,29 +221,29 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
}

try {
if (!hasData) {
// todo: need return the lastest heartbeat offset, means the maximum offset that the
// current job can recover.
if (readResult.getSplitState() != null) {
// Set meta information for hw
if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
Map<String, String> offsetRes =
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
metaResponse = offsetRes;
}

// set meta for binlog event
if (sourceReader.isBinlogSplit(readResult.getSplit())) {
Map<String, String> offsetRes =
sourceReader.extractBinlogOffset(readResult.getSplit());
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
batchStreamLoad.commitOffset(offsetRes, scannedRows, scannedBytes);
return;
} else {
throw new RuntimeException("should not happen");
metaResponse = offsetRes;
}
} else {
throw new RuntimeException("split state is null");
}

// wait all stream load finish
batchStreamLoad.forceFlush();
// update offset meta
if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
Map<String, String> offsetRes =
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
metaResponse = offsetRes;
}

// request fe api
batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class DorisBatchStreamLoad implements Serializable {
private BlockingQueue<BatchRecordBuffer> flushQueue;
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private final CountDownLatch loadThreadStarted = new CountDownLatch(1);
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private long maxBlockedBytes;
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
Expand Down Expand Up @@ -110,6 +112,16 @@ public DorisBatchStreamLoad(long jobId, String targetDb) {
this.loadExecutorService.execute(loadAsyncExecutor);
this.targetDb = targetDb;
this.jobId = jobId;
// Wait for the load thread to start
try {
if (!loadThreadStarted.await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("LoadAsyncExecutor thread startup timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Thread interrupted while waiting for load thread to start", e);
}
}

/**
Expand Down Expand Up @@ -310,6 +322,7 @@ public LoadAsyncExecutor(int flushQueueSize, long jobId) {
public void run() {
LOG.info("LoadAsyncExecutor start for jobId {}", jobId);
loadThreadAlive = true;
loadThreadStarted.countDown();
List<BatchRecordBuffer> recordList = new ArrayList<>(flushQueueSize);
while (started.get()) {
recordList.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (currentReader == null || baseReq.isReload()) {
LOG.info(
"No current reader or reload {}, create new split reader",
baseReq.isReload());
"No current reader or reload {}, create new split reader for job {}",
baseReq.isReload(),
baseReq.getJobId());
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ B1 2
A2 1
B2 2

-- !select_snapshot_table3 --

-- !select_binlog_table1 --
B1 10
Doris 18
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_normal1"
def table2 = "user_info_normal2"
def table3 = "user_info_normal3_empty"
def mysqlDb = "test_cdc_db"

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${table1} force"""
sql """drop table if exists ${currentDb}.${table2} force"""
sql """drop table if exists ${currentDb}.${table3} force"""

// Pre-create table2
sql """
Expand Down Expand Up @@ -71,6 +73,11 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
// mock snapshot data
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2', 1);"""
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2', 2);"""
sql """CREATE TABLE ${mysqlDb}.${table3} (
`name` varchar(200) NOT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
}

sql """CREATE JOB ${jobName}
Expand All @@ -82,7 +89,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"include_tables" = "${table1},${table2}",
"include_tables" = "${table3},${table1},${table2}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
Expand All @@ -96,6 +103,8 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
assert showTables.size() == 1
def showTables2 = sql """ show tables from ${currentDb} like '${table2}'; """
assert showTables2.size() == 1
def showTables3 = sql """ show tables from ${currentDb} like '${table3}'; """
assert showTables3.size() == 1

// check table schema correct
def showTbl1 = sql """show create table ${currentDb}.${table1}"""
Expand All @@ -113,7 +122,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
// check job status and succeed task count larger than 2
jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0)
jobSuccendCount.size() == 1 && '3' <= jobSuccendCount.get(0).get(0)
}
)
} catch (Exception ex){
Expand All @@ -127,6 +136,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
// check snapshot data
qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name asc """
qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name asc """
qt_select_snapshot_table3 """ SELECT * FROM ${table3} order by name asc """

// mock mysql incremental into
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
Expand Down
Loading