Skip to content

Commit 551c3d7

Browse files
authored
[hotfix][table] Fix incorrect state ttl setting in KeyedLookupJoinWrapper
This closes apache#26970.
1 parent deee02b commit 551c3d7

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ public void open(OpenContext openContext) throws Exception {
9797
} else {
9898
ValueStateDescriptor<List<RowData>> valueStateDescriptor =
9999
new ValueStateDescriptor<>("values", new ListSerializer<>(serializer));
100-
state = getRuntimeContext().getState(valueStateDescriptor);
101100
if (ttlConfig.isEnabled()) {
102101
valueStateDescriptor.enableTimeToLive(ttlConfig);
103102
}
103+
state = getRuntimeContext().getState(valueStateDescriptor);
104104
}
105105
emptyRow = initEmptyRow(lookupJoinRunner.tableFieldsCount);
106106
collectListener = new FetchedRecordListener();

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,41 @@ void testTemporalLeftJoinWithTtlLookupKeyContainsPk() throws Exception {
384384
testHarness.close();
385385
}
386386

387+
@Test
388+
void testTemporalLeftJoinWithTtlWithoutPk() throws Exception {
389+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
390+
createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, false, 1_000);
391+
392+
testHarness.open();
393+
// set TtlTimeProvider with 1
394+
testHarness.setStateTtlProcessingTime(1);
395+
testHarness.processElement(insertRecord(1, "a"));
396+
testHarness.processElement(insertRecord(2, "b"));
397+
testHarness.processElement(insertRecord(3, "c"));
398+
399+
// set TtlTimeProvider with 1001 to trigger expired state cleanup
400+
testHarness.setStateTtlProcessingTime(1002);
401+
// should output a delete message (pad null) since it's left join
402+
testHarness.processElement(deleteRecord(2, "b"));
403+
404+
testHarness.processElement(insertRecord(2, "b2"));
405+
testHarness.processElement(updateBeforeRecord(3, "c"));
406+
testHarness.processElement(updateAfterRecord(3, "c2"));
407+
408+
List<Object> expectedOutput = new ArrayList<>();
409+
expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
410+
expectedOutput.add(insertRecord(2, "b", null, null));
411+
expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
412+
expectedOutput.add(deleteRecord(2, "b", null, null));
413+
expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
414+
expectedOutput.add(deleteRecord(3, "c", null, null));
415+
expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
416+
expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
417+
418+
assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
419+
testHarness.close();
420+
}
421+
387422
@Test
388423
void testTemporalInnerJoinWithTtlLookupKeyContainsPk() throws Exception {
389424
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =

0 commit comments

Comments
 (0)