Skip to content

Commit 63f36d5

Browse files
author
shitou
committed
[hotfix-50401][restore] Fix the problem of kryo serialization error under multi-threading.
1 parent 4ca398e commit 63f36d5

File tree

1 file changed

+4
-4
lines changed
  • flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker

1 file changed

+4
-4
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/Worker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,11 @@
3939
*/
4040
public class Worker implements Callable<Integer> {
4141

42+
private static final Object LOCK = new Object();
4243
private final QueuesChamberlain queuesChamberlain;
43-
4444
private final Collector<RowData> collector;
45-
4645
/** 任务分片 */
4746
private final Chunk chunk;
48-
4947
/** 队列遍历深度,避免某队列长时间占用线程 */
5048
private final int size;
5149

@@ -87,7 +85,9 @@ private void send() {
8785
private void dealDmL(Deque<RowData> queue) {
8886
// 队列头节点是dml, 将该dml数据发送到sink
8987
RowData rowData = queue.poll();
90-
collector.collect(rowData);
88+
synchronized (LOCK) {
89+
collector.collect(rowData);
90+
}
9191
}
9292

9393
@Override

0 commit comments

Comments
 (0)