diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index fd646a34148b4..716a1b84098a5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -114,14 +114,37 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // step4: sanity check and return non-empty root if (finalRoot.isEmpty) { - val plan = FlinkRelOptUtil.toString(root, withChangelogTraits = true) - throw new TableException( - "Can't generate a valid execution plan for the given query:\n" + plan) + // Reaching here means no node assignment satisfies the changelog requirements. When the root + // is a sink that cannot consume the upsert changelog produced by its input, point at the + // conflict directly. Any other failure falls back to the full annotated plan. + val errorMessage = createTargetedErrorMessage(rootWithModifyKindSet) + throw new TableException(errorMessage) } else { finalRoot.head } } + private def createTargetedErrorMessage(rootWithModifyKindSet: StreamPhysicalRel) = { + if ( + rootWithModifyKindSet.isInstanceOf[StreamPhysicalSink] + && containsUpdates(rootWithModifyKindSet) + ) { + val conflict = new StringBuilder(describeChangelog(rootWithModifyKindSet)) + rootWithModifyKindSet.getInputs.foreach( + input => conflict.append("\n +- ").append(describeChangelog(input))) + "Can't generate a valid execution plan for the given query.\n\n" + + "There is a changelog mismatch between two operators. One produces an upsert " + + "changelog (UPDATE_AFTER without UPDATE_BEFORE). The other requires a retract " + + "changelog (UPDATE_BEFORE and UPDATE_AFTER), for example a sink without a primary " + + "key. In such cases, ensure that the sink is able to digest upserts where the " + + "PRIMARY KEY serves as the upsert key, or make the input produce UPDATE_BEFORE.\n\n" + + "The conflict is at:\n" + conflict + } else { + val plan = FlinkRelOptUtil.toString(rootWithModifyKindSet, withChangelogTraits = true) + "Can't generate a valid execution plan for the given query:\n" + plan + } + } + /** * A visitor which will try to satisfy the required [[ModifyKindSetTrait]] from root. * @@ -1604,6 +1627,29 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti modifyKindSetTrait.modifyKindSet } + /** Whether the node or any node in its input subtree produces UPDATE changes. */ + private def containsUpdates(rel: RelNode): Boolean = + getModifyKindSet(rel).contains(ModifyKind.UPDATE) || + rel.getInputs.exists(input => containsUpdates(input)) + + /** + * Renders a node's type and changelog mode, for example + * "Sink(ExpectedChangelogMode=[I,UB,UA,D])". + */ + private def describeChangelog(rel: RelNode): String = rel match { + case sink: StreamPhysicalSink => + // A sink's own changelog mode is empty; show the mode it expects from its input instead. + val expected = + sink.tableSink.getChangelogMode(getModifyKindSet(sink.getInput).toDefaultChangelogMode) + s"Sink(ExpectedChangelogMode=[${ChangelogPlanUtils.stringifyChangelogMode(Some(expected))}])" + case streamRel: StreamPhysicalRel => + val typeName = rel.getRelTypeName.stripPrefix("StreamPhysical") + val mode = + ChangelogPlanUtils.stringifyChangelogMode(ChangelogPlanUtils.getChangelogMode(streamRel)) + s"$typeName(changelogMode=[$mode])" + case _ => rel.getRelTypeName + } + private def getDeleteKind(node: RelNode): DeleteKind = { val deleteKindTrait = node.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE) deleteKindTrait.deleteKind diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java index 411b6fc2b75df..08a90824136ee 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java @@ -108,6 +108,10 @@ void setup() { .executeSql( "CREATE TABLE t_full_delete_sink (`name` STRING PRIMARY KEY NOT ENFORCED, `name0` STRING, `count` BIGINT, `mode` STRING) " + "WITH ('connector' = 'values')"); + util.tableEnv() + .executeSql( + "CREATE TABLE t_no_pk_sink (`name` STRING, `name0` STRING, `count` BIGINT, `mode` STRING) " + + "WITH ('connector' = 'values', 'sink-insert-only' = 'false')"); } @Test @@ -513,6 +517,35 @@ private static Stream errorSpecs() { "SELECT * FROM f(r => TABLE t_watermarked PARTITION BY name, on_time => DESCRIPTOR(ts))", "Time operations using the `on_time` argument are currently not supported for " + "PTFs that consume or produce updates."), + ErrorSpec.ofInsertInto( + "upsert output into sink without primary key", + UpdatingUpsertFunction.class, + "INSERT INTO t_no_pk_sink SELECT * FROM f(r => TABLE t_updating PARTITION BY name)", + "There is a changelog mismatch between two operators. " + + "One produces an upsert changelog (UPDATE_AFTER without UPDATE_BEFORE). " + + "The other requires a retract changelog (UPDATE_BEFORE and UPDATE_AFTER), for example a sink without a primary key. " + + "In such cases, ensure that the sink is able to digest upserts where the PRIMARY KEY serves as the upsert key, or make the input produce UPDATE_BEFORE.\n\n" + + "The conflict is at:\n" + + "Sink(ExpectedChangelogMode=[I,UB,UA,D])\n" + + " +- ProcessTableFunction(changelogMode=[I,UA,D])"), + ErrorSpec.ofSelect( + "upsert conflict that does not surface at a sink", + UpdatingUpsertFunction.class, + "SELECT name, SUM(`count`) OVER (PARTITION BY name ORDER BY name) " + + "FROM f(r => TABLE t_updating PARTITION BY name)", + "Can't generate a valid execution plan for the given query:\n"), + ErrorSpec.ofInsertInto( + "upsert conflict buried below a calc", + UpdatingUpsertFunction.class, + "INSERT INTO t_no_pk_sink SELECT name, name0, `count`, mode " + + "FROM f(r => TABLE t_updating PARTITION BY name) WHERE name0 <> ''", + "There is a changelog mismatch between two operators. " + + "One produces an upsert changelog (UPDATE_AFTER without UPDATE_BEFORE). " + + "The other requires a retract changelog (UPDATE_BEFORE and UPDATE_AFTER), for example a sink without a primary key. " + + "In such cases, ensure that the sink is able to digest upserts where the PRIMARY KEY serves as the upsert key, or make the input produce UPDATE_BEFORE.\n\n" + + "The conflict is at:\n" + + "Sink(ExpectedChangelogMode=[I,UB,UA,D])\n" + + " +- Calc(changelogMode=[I,UA,D])"), ErrorSpec.ofSelect( "no pass-through for multiple table args", InvalidPassThroughTables.class,