Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit 793bb79

Browse files
yzeng1618zengyi
andauthored
[Fix][Iceberg] Fix IllegalMonitorStateException in streaming enumerator (apache#10131)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 82ab344 commit 793bb79

File tree

2 files changed

+150
-5
lines changed

2 files changed

+150
-5
lines changed

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
2021
import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;
2122

2223
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -46,7 +47,8 @@
4647
public class IcebergStreamSplitEnumerator extends AbstractSplitEnumerator {
4748

4849
private final ConcurrentMap<TablePath, IcebergEnumeratorPosition> tableOffsets;
49-
private volatile boolean initialized = false;
50+
51+
@VisibleForTesting volatile boolean initialized = false;
5052

5153
public IcebergStreamSplitEnumerator(
5254
Context<IcebergFileScanTaskSplit> context,
@@ -79,9 +81,9 @@ public void run() throws Exception {
7981
Set<Integer> readers = context.registeredReaders();
8082
while (true) {
8183
for (TablePath tablePath : pendingTables) {
82-
checkThrowInterruptedException();
83-
8484
synchronized (stateLock) {
85+
checkThrowInterruptedException();
86+
8587
log.info("Scan table {}.", tablePath);
8688

8789
Collection<IcebergFileScanTaskSplit> splits = loadSplits(tablePath);
@@ -95,7 +97,9 @@ public void run() throws Exception {
9597
initialized = true;
9698
}
9799

98-
stateLock.wait(sourceConfig.getIncrementScanInterval());
100+
synchronized (stateLock) {
101+
stateLock.wait(sourceConfig.getIncrementScanInterval());
102+
}
99103
}
100104
}
101105

@@ -112,7 +116,9 @@ public IcebergSplitEnumeratorState snapshotState(long checkpointId) throws Excep
112116
@Override
113117
public void handleSplitRequest(int subtaskId) {
114118
if (initialized) {
115-
stateLock.notifyAll();
119+
synchronized (stateLock) {
120+
stateLock.notifyAll();
121+
}
116122
}
117123
}
118124

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;
19+
20+
import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
21+
import org.apache.seatunnel.api.common.metrics.MetricsContext;
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
import org.apache.seatunnel.api.event.Event;
24+
import org.apache.seatunnel.api.event.EventListener;
25+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
26+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
27+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
28+
import org.apache.seatunnel.api.table.catalog.TablePath;
29+
import org.apache.seatunnel.api.table.catalog.TableSchema;
30+
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCommonOptions;
31+
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
32+
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
33+
34+
import org.junit.jupiter.api.Assertions;
35+
import org.junit.jupiter.api.Test;
36+
37+
import java.nio.file.Paths;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
42+
/** Minimal test for {@link IcebergStreamSplitEnumerator} wait / notify fix. */
43+
class IcebergStreamSplitEnumeratorTest {
44+
45+
@Test
46+
void testHandleSplitRequestDoesNotThrowIllegalMonitorStateException() throws Exception {
47+
SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context =
48+
new DummyEnumeratorContext();
49+
50+
IcebergSourceConfig sourceConfig = createSourceConfig();
51+
52+
// Catalog tables must be non-empty because AbstractSplitEnumerator uses the size as the
53+
// capacity of an ArrayBlockingQueue.
54+
TablePath tablePath = TablePath.of("default", "source");
55+
CatalogTable catalogTable =
56+
CatalogTable.of(
57+
TableIdentifier.of("seatunnel", "default", "source"),
58+
TableSchema.builder().build(),
59+
Collections.emptyMap(),
60+
Collections.emptyList(),
61+
"test table");
62+
Map<TablePath, CatalogTable> catalogTables =
63+
Collections.singletonMap(tablePath, catalogTable);
64+
65+
IcebergStreamSplitEnumerator enumerator =
66+
new IcebergStreamSplitEnumerator(
67+
context, sourceConfig, catalogTables, Collections.emptyMap());
68+
69+
// Force initialized = true so handleSplitRequest executes the notify logic.
70+
enumerator.initialized = true;
71+
72+
// Before the fix, this would throw IllegalMonitorStateException because notifyAll was
73+
// called without holding the monitor.
74+
Assertions.assertDoesNotThrow(() -> enumerator.handleSplitRequest(0));
75+
}
76+
77+
private IcebergSourceConfig createSourceConfig() {
78+
Map<String, Object> configs = new HashMap<>();
79+
Map<String, Object> catalogProps = new HashMap<>();
80+
catalogProps.put("type", "hadoop");
81+
catalogProps.put("warehouse", Paths.get("target", "iceberg", "hadoop").toUri().toString());
82+
83+
configs.put(IcebergCommonOptions.KEY_CATALOG_NAME.key(), "seatunnel");
84+
configs.put(IcebergCommonOptions.KEY_NAMESPACE.key(), "default");
85+
configs.put(IcebergCommonOptions.KEY_TABLE.key(), "source");
86+
configs.put(IcebergCommonOptions.CATALOG_PROPS.key(), catalogProps);
87+
88+
return new IcebergSourceConfig(ReadonlyConfig.fromMap(configs));
89+
}
90+
91+
private static class DummyEnumeratorContext
92+
implements SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> {
93+
94+
private final MetricsContext metricsContext = new AbstractMetricsContext() {};
95+
private final EventListener eventListener =
96+
new EventListener() {
97+
@Override
98+
public void onEvent(Event event) {
99+
// no-op
100+
}
101+
};
102+
103+
@Override
104+
public int currentParallelism() {
105+
return 1;
106+
}
107+
108+
@Override
109+
public java.util.Set<Integer> registeredReaders() {
110+
return Collections.singleton(0);
111+
}
112+
113+
@Override
114+
public void assignSplit(int subtaskId, java.util.List<IcebergFileScanTaskSplit> splits) {
115+
// no-op
116+
}
117+
118+
@Override
119+
public void signalNoMoreSplits(int subtask) {
120+
// no-op
121+
}
122+
123+
@Override
124+
public void sendEventToSourceReader(
125+
int subtaskId, org.apache.seatunnel.api.source.SourceEvent event) {
126+
// no-op
127+
}
128+
129+
@Override
130+
public MetricsContext getMetricsContext() {
131+
return metricsContext;
132+
}
133+
134+
@Override
135+
public EventListener getEventListener() {
136+
return eventListener;
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)