Skip to content

Commit 6eb05dd

Browse files
committed
fix
1 parent 6d82f8d commit 6eb05dd

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.UUID;
3939
import java.util.concurrent.BlockingQueue;
4040
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.ExecutorService;
4243
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
@@ -80,6 +81,7 @@ public class DorisBatchStreamLoad implements Serializable {
8081
private BlockingQueue<BatchRecordBuffer> flushQueue;
8182
private final AtomicBoolean started;
8283
private volatile boolean loadThreadAlive = false;
84+
private final CountDownLatch loadThreadStarted = new CountDownLatch(1);
8385
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
8486
private long maxBlockedBytes;
8587
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
@@ -110,6 +112,16 @@ public DorisBatchStreamLoad(long jobId, String targetDb) {
110112
this.loadExecutorService.execute(loadAsyncExecutor);
111113
this.targetDb = targetDb;
112114
this.jobId = jobId;
115+
// Wait for the load thread to start
116+
try {
117+
if (!loadThreadStarted.await(10, TimeUnit.SECONDS)) {
118+
throw new RuntimeException("LoadAsyncExecutor thread startup timed out");
119+
}
120+
} catch (InterruptedException e) {
121+
Thread.currentThread().interrupt();
122+
throw new RuntimeException(
123+
"Thread interrupted while waiting for load thread to start", e);
124+
}
113125
}
114126

115127
/**
@@ -310,6 +322,7 @@ public LoadAsyncExecutor(int flushQueueSize, long jobId) {
310322
public void run() {
311323
LOG.info("LoadAsyncExecutor start for jobId {}", jobId);
312324
loadThreadAlive = true;
325+
loadThreadStarted.countDown();
313326
List<BatchRecordBuffer> recordList = new ArrayList<>(flushQueueSize);
314327
while (started.get()) {
315328
recordList.clear();

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
166166
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
167167
if (currentReader == null || baseReq.isReload()) {
168168
LOG.info(
169-
"No current reader or reload {}, create new split reader",
170-
baseReq.isReload());
169+
"No current reader or reload {}, create new split reader for job {}",
170+
baseReq.isReload(),
171+
baseReq.getJobId());
171172
// build split
172173
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
173174
split = splitFlag.f0;

0 commit comments

Comments
 (0)