Skip to content

Commit 920809f

Browse files
[flink] Flink Source need to check tableId on recovery in case that table is recreated. (#2154)
1 parent 08c9648 commit 920809f

File tree

3 files changed

+163
-102
lines changed

3 files changed

+163
-102
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS
104104

105105
@Nullable private final LakeSource<LakeSplit> lakeSource;
106106

107-
// table id, will be null when haven't received any split
108-
private Long tableId;
107+
private final Long tableId;
109108

110109
private final Map<TableBucket, Long> stoppingOffsets;
111110
private LakeSplitReaderGenerator lakeSplitReaderGenerator;
@@ -127,6 +126,7 @@ public FlinkSourceSplitReader(
127126
new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
128127
this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry);
129128
this.table = connection.getTable(tablePath);
129+
this.tableId = table.getTableInfo().getTableId();
130130
this.sourceOutputType = sourceOutputType;
131131
this.boundedSplits = new ArrayDeque<>();
132132
this.subscribedBuckets = new HashMap<>();
@@ -187,15 +187,15 @@ public void handleSplitsChanges(SplitsChange<SourceSplitBase> splitsChanges) {
187187
}
188188
for (SourceSplitBase sourceSplitBase : splitsChanges.splits()) {
189189
LOG.info("add split {}", sourceSplitBase.splitId());
190-
// init table id
191-
if (tableId == null) {
192-
tableId = sourceSplitBase.getTableBucket().getTableId();
193-
} else {
194-
checkArgument(
195-
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
196-
"table id not equal across splits {}",
197-
splitsChanges.splits());
198-
}
190+
checkArgument(
191+
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
192+
"Table ID mismatch: expected %s, but split contains %s for table '%s'. "
193+
+ "This usually happens when a table with the same name was dropped and recreated "
194+
+ "between job runs, causing metadata inconsistency. "
195+
+ "To resolve this, please restart the job **without** using the previous savepoint or checkpoint.",
196+
tableId,
197+
sourceSplitBase.getTableBucket().getTableId(),
198+
table.getTableInfo().getTablePath());
199199

200200
if (sourceSplitBase.isHybridSnapshotLogSplit()) {
201201
HybridSnapshotLogSplit hybridSnapshotLogSplit =

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java

Lines changed: 129 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.metadata.TablePath;
2626
import org.apache.fluss.server.testutils.FlussClusterExtension;
2727
import org.apache.fluss.server.zk.ZooKeeperClient;
28+
import org.apache.fluss.utils.types.Tuple2;
2829

2930
import org.apache.flink.configuration.CheckpointingOptions;
3031
import org.apache.flink.configuration.Configuration;
@@ -60,6 +61,7 @@
6061
import static org.apache.fluss.flink.utils.FlinkTestBase.dropPartitions;
6162
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
6263
import static org.apache.fluss.testutils.DataTestUtils.row;
64+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6365

6466
/** IT case for flink table source fail over. */
6567
abstract class FlinkTableSourceFailOverITCase {
@@ -86,16 +88,27 @@ abstract class FlinkTableSourceFailOverITCase {
8688
org.apache.fluss.config.Configuration clientConf;
8789
ZooKeeperClient zkClient;
8890
Connection conn;
91+
MiniClusterWithClientResource cluster;
8992

9093
@BeforeEach
91-
protected void beforeEach() {
94+
protected void beforeEach() throws Exception {
9295
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
9396
zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
9497
conn = ConnectionFactory.createConnection(clientConf);
98+
99+
cluster =
100+
new MiniClusterWithClientResource(
101+
new MiniClusterResourceConfiguration.Builder()
102+
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
103+
.setNumberTaskManagers(2)
104+
.setNumberSlotsPerTaskManager(2)
105+
.build());
106+
cluster.before();
95107
}
96108

97109
@AfterEach
98110
protected void afterEach() throws Exception {
111+
cluster.after();
99112
conn.close();
100113
}
101114

@@ -121,100 +134,125 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa
121134

122135
@Test
123136
void testRestore() throws Exception {
124-
final int numTaskManagers = 2;
125-
final int numSlotsPerTaskManager = 2;
137+
TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
138+
Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
139+
runWithSavepoint(tablePath);
140+
StreamTableEnvironment tEnv = initTableEnvironment(savepointPathAndResults.f0);
141+
CloseableIterator<Row> results = savepointPathAndResults.f1;
142+
TableResult insertResult =
143+
tEnv.executeSql(
144+
String.format(
145+
"insert into result_table select * from %s",
146+
tablePath.getTableName()));
147+
// append a new row again to check if the source can restore the state correctly
148+
Table table = conn.getTable(tablePath);
149+
AppendWriter writer = table.newAppend().createWriter();
150+
writer.append(row(5, "5000")).get();
151+
List<String> expected = new ArrayList<>();
152+
expected.add("+I[5, 5000]");
153+
assertResultsIgnoreOrder(results, expected, true);
154+
// cancel the insert job
155+
insertResult.getJobClient().get().cancel().get();
156+
}
126157

127-
// Start Flink
128-
MiniClusterWithClientResource cluster =
129-
new MiniClusterWithClientResource(
130-
new MiniClusterResourceConfiguration.Builder()
131-
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
132-
.setNumberTaskManagers(numTaskManagers)
133-
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
134-
.build());
158+
@Test
159+
void testRestoreWithRecreateTable() throws Exception {
160+
TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
161+
Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
162+
runWithSavepoint(tablePath);
163+
StreamTableEnvironment tEnv = initTableEnvironment(savepointPathAndResults.f0);
135164

136-
cluster.before();
165+
// drop and recreate the table.
166+
tEnv.executeSql(String.format("drop table %s", tablePath.getTableName()));
167+
tEnv.executeSql(
168+
String.format(
169+
"create table %s (" + "a int, b varchar" + ") partitioned by (b) ",
170+
tablePath.getTableName()));
171+
172+
TableResult insertResult =
173+
tEnv.executeSql(
174+
String.format(
175+
"insert into result_table select * from %s",
176+
tablePath.getTableName()));
177+
assertThatThrownBy(() -> insertResult.getJobClient().get().getJobExecutionResult().get())
178+
.rootCause()
179+
.hasMessageContaining(
180+
"Table ID mismatch: expected 2, but split contains 0 for table 'fluss.test_recreate_table'. "
181+
+ "This usually happens when a table with the same name was dropped and recreated between job runs, "
182+
+ "causing metadata inconsistency. To resolve this, please restart the job **without** "
183+
+ "using the previous savepoint or checkpoint.");
184+
}
185+
186+
private Tuple2<String, CloseableIterator<Row>> runWithSavepoint(TablePath tablePath)
187+
throws Exception {
188+
StreamTableEnvironment tEnv = initTableEnvironment(null);
189+
tEnv.executeSql(
190+
String.format(
191+
"create table %s ("
192+
+ "a int, b varchar"
193+
+ ") partitioned by (b) "
194+
+ "with ("
195+
+ "'table.auto-partition.enabled' = 'true',"
196+
+ "'table.auto-partition.time-unit' = 'year',"
197+
+ "'scan.partition.discovery.interval' = '100ms',"
198+
+ "'table.auto-partition.num-precreate' = '1')",
199+
tablePath.getTableName()));
200+
tEnv.executeSql("create table result_table (a int, b varchar)");
201+
202+
// create a partition manually
203+
createPartitions(zkClient, tablePath, Collections.singletonList("4000"));
204+
waitUntilPartitions(zkClient, tablePath, 2);
137205

138-
try {
139-
StreamTableEnvironment tEnv = initTableEnvironment(null);
140-
tEnv.executeSql(
141-
"create table test_partitioned ("
142-
+ "a int, b varchar"
143-
+ ") partitioned by (b) "
144-
+ "with ("
145-
+ "'table.auto-partition.enabled' = 'true',"
146-
+ "'table.auto-partition.time-unit' = 'year',"
147-
+ "'scan.partition.discovery.interval' = '100ms',"
148-
+ "'table.auto-partition.num-precreate' = '1')");
149-
tEnv.executeSql("create table result_table (a int, b varchar)");
150-
151-
TablePath tablePath = TablePath.of("fluss", "test_partitioned");
152-
153-
// create a partition manually
154-
createPartitions(zkClient, tablePath, Collections.singletonList("4000"));
155-
waitUntilPartitions(zkClient, tablePath, 2);
156-
157-
// append 3 records for each partition
158-
Table table = conn.getTable(tablePath);
159-
AppendWriter writer = table.newAppend().createWriter();
160-
String thisYear = String.valueOf(Year.now().getValue());
161-
List<String> expected = new ArrayList<>();
162-
for (int i = 0; i < 3; i++) {
163-
writer.append(row(i, thisYear));
164-
writer.append(row(i, "4000"));
165-
expected.add("+I[" + i + ", " + thisYear + "]");
166-
expected.add("+I[" + i + ", 4000]");
167-
}
168-
writer.flush();
169-
170-
// execute the query to fetch logs from the table
171-
TableResult insertResult =
172-
tEnv.executeSql("insert into result_table select * from test_partitioned");
173-
// we have to create a intermediate table to collect result,
174-
// because CollectSink can't be restored from savepoint
175-
CloseableIterator<Row> results =
176-
tEnv.executeSql("select * from result_table").collect();
177-
assertResultsIgnoreOrder(results, expected, false);
178-
expected.clear();
179-
180-
// drop the partition manually
181-
dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
182-
waitUntilPartitions(zkClient, tablePath, 1);
183-
184-
// create a new partition again and append records into it
185-
createPartitions(zkClient, tablePath, Collections.singletonList("5000"));
186-
waitUntilPartitions(zkClient, tablePath, 2);
187-
writer.append(row(4, "5000")).get();
188-
expected.add("+I[4, 5000]");
189-
// if the source subscribes the new partition successfully,
190-
// it should have removed the old partition successfully
191-
assertResultsIgnoreOrder(results, expected, false);
192-
expected.clear();
193-
194-
// now, stop the job with save point
195-
String savepointPath =
196-
insertResult
197-
.getJobClient()
198-
.get()
199-
.stopWithSavepoint(
200-
false,
201-
savepointDir.getAbsolutePath(),
202-
SavepointFormatType.CANONICAL)
203-
.get();
204-
205-
tEnv = initTableEnvironment(savepointPath);
206-
insertResult =
207-
tEnv.executeSql("insert into result_table select * from test_partitioned");
208-
// append a new row again to check if the source can restore the state correctly
209-
writer.append(row(5, "5000")).get();
210-
expected.add("+I[5, 5000]");
211-
assertResultsIgnoreOrder(results, expected, true);
212-
// cancel the insert job
213-
insertResult.getJobClient().get().cancel().get();
214-
} finally {
215-
// stop the cluster and thereby cancel the job
216-
cluster.after();
206+
// append 3 records for each partition
207+
Table table = conn.getTable(tablePath);
208+
AppendWriter writer = table.newAppend().createWriter();
209+
String thisYear = String.valueOf(Year.now().getValue());
210+
List<String> expected = new ArrayList<>();
211+
for (int i = 0; i < 3; i++) {
212+
writer.append(row(i, thisYear));
213+
writer.append(row(i, "4000"));
214+
expected.add("+I[" + i + ", " + thisYear + "]");
215+
expected.add("+I[" + i + ", 4000]");
217216
}
217+
writer.flush();
218+
219+
// execute the query to fetch logs from the table
220+
TableResult insertResult =
221+
tEnv.executeSql(
222+
String.format(
223+
"insert into result_table select * from %s",
224+
tablePath.getTableName()));
225+
// we have to create an intermediate table to collect result,
226+
// because CollectSink can't be restored from savepoint
227+
CloseableIterator<Row> results = tEnv.executeSql("select * from result_table").collect();
228+
assertResultsIgnoreOrder(results, expected, false);
229+
expected.clear();
230+
231+
// drop the partition manually
232+
dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
233+
waitUntilPartitions(zkClient, tablePath, 1);
234+
235+
// create a new partition again and append records into it
236+
createPartitions(zkClient, tablePath, Collections.singletonList("5000"));
237+
waitUntilPartitions(zkClient, tablePath, 2);
238+
writer.append(row(4, "5000")).get();
239+
expected.add("+I[4, 5000]");
240+
// if the source subscribes the new partition successfully,
241+
// it should have removed the old partition successfully
242+
assertResultsIgnoreOrder(results, expected, false);
243+
expected.clear();
244+
245+
// now, stop the job with save point
246+
String savepointPath =
247+
insertResult
248+
.getJobClient()
249+
.get()
250+
.stopWithSavepoint(
251+
false,
252+
savepointDir.getAbsolutePath(),
253+
SavepointFormatType.CANONICAL)
254+
.get();
255+
return Tuple2.of(savepointPath, results);
218256
}
219257

220258
private static Configuration getFileBasedCheckpointsConfig(File savepointDir) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,29 @@ void testHandleHybridSnapshotLogSplitChangesAndFetch() throws Exception {
158158
}
159159
}
160160

161+
@Test
162+
void testTableIdChange() throws Exception {
163+
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-only-snapshot-table");
164+
long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
165+
try (FlinkSourceSplitReader splitReader =
166+
createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.getRowType())) {
167+
assertThatThrownBy(
168+
() ->
169+
splitReader.handleSplitsChanges(
170+
new SplitsAddition<>(
171+
Collections.singletonList(
172+
new LogSplit(
173+
new TableBucket(tableId + 1, 0),
174+
null,
175+
0)))))
176+
.hasMessageContaining(
177+
"Table ID mismatch: expected 0, but split contains 1 for table 'test-flink-db.test-only-snapshot-table'. "
178+
+ "This usually happens when a table with the same name was dropped and recreated between job runs, "
179+
+ "causing metadata inconsistency. To resolve this, please restart the job **without** using "
180+
+ "the previous savepoint or checkpoint.");
181+
}
182+
}
183+
161184
private Map<String, List<RecordAndPos>> constructRecords(
162185
Map<TableBucket, List<InternalRow>> rows) {
163186
Map<String, List<RecordAndPos>> expectedRecords = new HashMap<>();

0 commit comments

Comments
 (0)