Skip to content

Commit f2db0c2

Browse files
committed
fix and add tests generated by AI
1 parent b7fad99 commit f2db0c2

File tree

2 files changed

+192
-3
lines changed

2 files changed

+192
-3
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.options.Options;
2424
import org.apache.paimon.schema.TableSchema;
2525
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.utils.StringUtils;
2627

2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
@@ -53,15 +54,15 @@ public WriterRefresher(FileStoreTable table, T write, Refresher<T> refresher) {
5354
String refreshDetectors =
5455
Options.fromMap(table.options())
5556
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS);
56-
if (refreshDetectors == null) {
57+
if (StringUtils.isNullOrWhitespaceOnly(refreshDetectors)) {
5758
configGroups = null;
5859
} else {
5960
configGroups = Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
6061
}
6162
}
6263

6364
public void tryRefresh() {
64-
if (configGroups == null) {
65+
if (configGroups == null || configGroups.isEmpty()) {
6566
return;
6667
}
6768

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,192 @@ public void testDoRefresh() throws Exception {
8686
assertThat(refreshedOptions).isEqualTo(table2.coreOptions().configGroups(groups));
8787
}
8888

89+
@Test
90+
public void testRefreshWithNullConfigGroups() throws Exception {
91+
// Create table without SINK_WRITER_REFRESH_DETECTORS option
92+
Map<String, String> options = new HashMap<>();
93+
createTable(options);
94+
95+
FileStoreTable table1 = getTable();
96+
97+
// Make schema changes
98+
table1.schemaManager()
99+
.commitChanges(
100+
SchemaChange.setOption(
101+
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1"));
102+
103+
Map<String, String> refreshedOptions = new HashMap<>();
104+
refreshedOptions.put("initial", "value");
105+
106+
WriterRefresher<?> writerRefresher =
107+
new WriterRefresher<>(table1, refreshedOptions, new TestWriteRefresher(null));
108+
109+
// Should not refresh when configGroups is null
110+
writerRefresher.tryRefresh();
111+
112+
// Options should remain unchanged
113+
assertThat(refreshedOptions).containsEntry("initial", "value");
114+
assertThat(refreshedOptions).hasSize(1);
115+
}
116+
117+
@Test
118+
public void testRefreshWithEmptyConfigGroups() throws Exception {
119+
// Create table with empty SINK_WRITER_REFRESH_DETECTORS option
120+
Map<String, String> options = new HashMap<>();
121+
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), "");
122+
createTable(options);
123+
124+
FileStoreTable table1 = getTable();
125+
126+
// Make schema changes
127+
table1.schemaManager()
128+
.commitChanges(
129+
SchemaChange.setOption(
130+
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1"));
131+
132+
Map<String, String> refreshedOptions = new HashMap<>();
133+
refreshedOptions.put("initial", "value");
134+
135+
WriterRefresher<?> writerRefresher =
136+
new WriterRefresher<>(table1, refreshedOptions, new TestWriteRefresher(null));
137+
138+
// Should not refresh when configGroups is empty
139+
writerRefresher.tryRefresh();
140+
141+
// Options should remain unchanged since empty configGroups should trigger early return
142+
assertThat(refreshedOptions).containsEntry("initial", "value");
143+
assertThat(refreshedOptions).hasSize(1);
144+
}
145+
146+
@Test
147+
public void testRefreshWithCommaOnlyConfigGroups() throws Exception {
148+
// Create table with comma-only SINK_WRITER_REFRESH_DETECTORS option
149+
Map<String, String> options = new HashMap<>();
150+
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), ",,,");
151+
createTable(options);
152+
153+
FileStoreTable table1 = getTable();
154+
155+
// Make schema changes
156+
table1.schemaManager()
157+
.commitChanges(
158+
SchemaChange.setOption(
159+
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1"));
160+
161+
Map<String, String> refreshedOptions = new HashMap<>();
162+
refreshedOptions.put("initial", "value");
163+
164+
Set<String> emptyGroups =
165+
Arrays.stream(",,,".split(","))
166+
.filter(s -> !s.trim().isEmpty())
167+
.collect(Collectors.toSet());
168+
169+
WriterRefresher<?> writerRefresher =
170+
new WriterRefresher<>(
171+
table1, refreshedOptions, new TestWriteRefresher(emptyGroups));
172+
173+
// Should not refresh when configGroups is effectively empty
174+
writerRefresher.tryRefresh();
175+
176+
// Options should remain unchanged
177+
assertThat(refreshedOptions).containsEntry("initial", "value");
178+
assertThat(refreshedOptions).hasSize(1);
179+
}
180+
181+
@Test
182+
public void testNoRefreshWhenNoSchemaChange() throws Exception {
183+
String detectGroups = "external-paths";
184+
Map<String, String> options = new HashMap<>();
185+
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), detectGroups);
186+
createTable(options);
187+
188+
FileStoreTable table1 = getTable();
189+
190+
Map<String, String> refreshedOptions = new HashMap<>();
191+
refreshedOptions.put("initial", "value");
192+
193+
Set<String> groups = Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
194+
WriterRefresher<?> writerRefresher =
195+
new WriterRefresher<>(table1, refreshedOptions, new TestWriteRefresher(groups));
196+
197+
// No schema changes made, should not refresh
198+
writerRefresher.tryRefresh();
199+
200+
// Options should remain unchanged
201+
assertThat(refreshedOptions).containsEntry("initial", "value");
202+
assertThat(refreshedOptions).hasSize(1);
203+
}
204+
205+
@Test
206+
public void testNoRefreshWhenConfigGroupsNotChanged() throws Exception {
207+
String detectGroups = "external-paths";
208+
Map<String, String> options = new HashMap<>();
209+
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), detectGroups);
210+
// Set initial external paths option
211+
options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1");
212+
createTable(options);
213+
214+
FileStoreTable table1 = getTable();
215+
216+
// Make schema changes but keep the same external paths value
217+
table1.schemaManager()
218+
.commitChanges(
219+
SchemaChange.setOption(
220+
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1"),
221+
SchemaChange.setOption(
222+
CoreOptions.DATA_FILE_PREFIX.key(),
223+
"data1")); // Change different option
224+
225+
Map<String, String> refreshedOptions = new HashMap<>();
226+
refreshedOptions.put("initial", "value");
227+
228+
Set<String> groups = Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
229+
WriterRefresher<?> writerRefresher =
230+
new WriterRefresher<>(table1, refreshedOptions, new TestWriteRefresher(groups));
231+
232+
// Should not refresh when monitored config groups haven't changed
233+
writerRefresher.tryRefresh();
234+
235+
// Options should remain unchanged
236+
assertThat(refreshedOptions).containsEntry("initial", "value");
237+
assertThat(refreshedOptions).hasSize(1);
238+
}
239+
240+
@Test
241+
public void testUpdatedTable() throws Exception {
242+
String detectGroups = "external-paths";
243+
Map<String, String> options = new HashMap<>();
244+
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), detectGroups);
245+
createTable(options);
246+
247+
FileStoreTable table1 = getTable();
248+
249+
table1.schemaManager()
250+
.commitChanges(
251+
SchemaChange.setOption(
252+
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "external-path1"));
253+
254+
Map<String, String> refreshedOptions = new HashMap<>();
255+
Set<String> groups = Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
256+
WriterRefresher<?> writerRefresher =
257+
new WriterRefresher<>(table1, refreshedOptions, new TestWriteRefresher(groups));
258+
259+
// Before refresh, should return original table
260+
assertThat(writerRefresher.updatedTable()).isSameAs(table1);
261+
262+
writerRefresher.tryRefresh();
263+
264+
// After refresh, should return updated table with new options
265+
FileStoreTable updatedTable = writerRefresher.updatedTable();
266+
assertThat(updatedTable).isNotSameAs(table1);
267+
assertThat(
268+
updatedTable
269+
.coreOptions()
270+
.toConfiguration()
271+
.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS))
272+
.isEqualTo("external-path1");
273+
}
274+
89275
private void createTable(Map<String, String> options) throws Exception {
90276
catalog.createTable(
91277
Identifier.create("default", "T"),
@@ -112,7 +298,9 @@ private static class TestWriteRefresher
112298
@Override
113299
public void refresh(FileStoreTable table, Map<String, String> options) throws Exception {
114300
options.clear();
115-
options.putAll(table.coreOptions().configGroups(groups));
301+
if (groups != null) {
302+
options.putAll(table.coreOptions().configGroups(groups));
303+
}
116304
}
117305
}
118306
}

0 commit comments

Comments
 (0)