Skip to content

Conversation

@fresh-borzoni
Copy link

@fresh-borzoni fresh-borzoni commented Dec 28, 2025

Purpose

Linked issue: close #2231

Add comprehensive test coverage for Delta Join feature in Flink 2.2 and improve documentation.

Brief change log

  • Added tests for normal PK tables with table.delete.behavior=IGNORE (not just first_row merge engine)
  • Added tests for joining on bucket key (not just full primary key)
  • Added tests verifying LEFT/RIGHT/FULL OUTER joins fail with appropriate validation error
  • Added test verifying cascade joins are not supported
  • Updated documentation for Flink 2.2 Delta Join with new supported features and limitations

Tests

  • testDeltaJoinWithPrimaryKeyTableNoDeletes - normal PK table with delete.behavior=IGNORE
  • testDeltaJoinOnBucketKey - join on bucket key only
  • testDeltaJoinFailsWhenFilterOnNonUpsertKeys - filter on non-upsert-key columns fails
  • testDeltaJoinOnBucketKey - join on bucket key only (not full PK)
  • testDeltaJoinFailsWhenSourceHasDelete - source with DELETE records fails
  • testDeltaJoinFailsWhenJoinKeyNotContainIndex - join key not containing index fail
  • testDeltaJoinFailsWithLeftJoin - LEFT JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithRightJoin - RIGHT JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithFullOuterJoin - FULL OUTER JOIN wouldn't be converted to DeltaJoin
  • testDeltaJoinFailsWithCascadeJoin - cascade join wouldn't be converted to DeltaJoin

API and Format

No

Documentation

Yes - updated docs/engine-flink/delta-joins.md in Flink 2.2 part.

@fresh-borzoni
Copy link
Author

@xuyangzhong @wuchong
While studying the Flink 2.2 changes related to Delta Join, I noticed this issue. I've added some tests and improved the documentation, feel free to use the changes if you find them useful.

@fresh-borzoni fresh-borzoni force-pushed the fluss-delta-joins-tests-docs branch from ac23b6b to 6035422 Compare December 31, 2025 13:31
@xuyangzhong
Copy link
Contributor

Hi, @fresh-borzoni I'm a bit busy these days, I'll try my best to take a look after next Wednesday

Copy link
Contributor

@xuyangzhong xuyangzhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving this! I have added some comments. Furthermore, some methods for table creation could be reused.

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** IT case for {@link FlinkTableSource} in Flink 2.2. */
public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m thinking it might be better to use a separate Flink22DeltaJoinITCase to hold these Delta Join–related tests, since there are now more delta join related tests, and there’s some common logic we could factor out, for example, setting the FORCE strategy, setting the default parallelism to 2, and creating the source and sink tables. WDYT?

+ " 'table.delete.behavior' = 'IGNORE' "
+ ")",
leftTableName));
List<InternalRow> rows1 =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we prepare some data with the same primary key to test it?

"INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s ON c1 = c2 AND d1 = d2",
sinkTableName, leftTableName, rightTableName);

assertThatThrownBy(() -> tEnv.explainSql(sql))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Merge testDeltaJoinFailsWithLeftJoin, testDeltaJoinFailsWithRightJoin and testDeltaJoinFailsWithFullOuterJoin to testDeltaJoinFailsWithOuterJoin and assert the thrown exception here three times.


assertThatThrownBy(() -> tEnv.explainSql(sql))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("doesn't support to do delta join optimization");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a more test here.

// Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the upsert key
        String sql2 =
                String.format(
                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2",
                        sinkTableName, leftTableName, rightTableName);

        assertThatThrownBy(() -> tEnv.explainSql(sql2))
                .isInstanceOf(ValidationException.class)
                .hasMessageContaining("doesn't support to do delta join optimization");

}

@Test
void testDeltaJoinFailsWithCascadeJoin() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding more tests from the doc like:

  1. the join key includes more fields than the primary key.
  2. sink materializer(the sink's primary key is different from the primary keys of the two upstream sources.)
  3. filters or projections contain non-deterministic functions like rand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Flink 2.2 Delta Join tests and documentation

2 participants