From 10f79835f338f8f3a0a63d3e2fff1b2f072eec6b Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 29 May 2026 19:38:24 +0530 Subject: [PATCH] UI: Fix Graph layout for TaskGroup tasks wired to external nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Open ``@task_group`` rendered with vertically-stacked internals and edges crossing the boundary whenever an internal task had a direct dependency on a node outside the group (an "escape edge" that bypassed the group's entry/exit interface). Dag execution was unaffected. Two underlying issues, both in the ELK graph-layout refactor from #65031: 1. ``hasUniformExternalConnectivity`` was too lenient — it fired whenever externally-connected children separately shared the same external sources OR the same external targets, instead of the canonical fan-in/fan-out pattern where every child has the same full ``(sources, targets)`` profile. On mixed-profile groups (entry + exits), it incorrectly fired and collapsed the author's deliberately- wired escape edges into a single group-level edge, hiding the intent. 2. When the optimisation did fire on an open group, ``rewriteGroupEdges`` was tuned for closed groups and dropped the group's internal edges too, leaving ELK with no internal-layout information for the children (the visible symptom in #67714). Fix: tighten ``hasUniformExternalConnectivity`` to require the full profile to match across externally-connected children, and add a ``preserveInternal`` option to ``rewriteGroupEdges`` so the canonical fan-in/fan-out path keeps internals intact. Closes: #67714 --- .../components/Graph/elkGraphUtils.test.ts | 305 ++++++++++++++++++ .../ui/src/components/Graph/elkGraphUtils.ts | 181 +++++++---- 2 files changed, 425 insertions(+), 61 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.test.ts diff --git a/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.test.ts b/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.test.ts new file mode 100644 index 0000000000000..02942fef9e412 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.test.ts @@ -0,0 +1,305 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { describe, expect, it } from "vitest"; + +import type { EdgeResponse, NodeResponse } from "openapi/requests/types.gen"; + +import { generateElkGraph, hasUniformExternalConnectivity } from "./elkGraphUtils"; +import type { FormattedNode } from "./elkGraphUtils"; + +// Minimal NodeResponse builder — fills the fields the layout pipeline actually reads. +const buildNode = (overrides: Partial & Pick): NodeResponse => ({ + asset_condition_type: null, + children: null, + is_mapped: null, + operator: null, + setup_teardown_type: null, + tooltip: null, + type: "task", + ...overrides, +}); + +const buildEdge = (sourceId: string, targetId: string): EdgeResponse => ({ + is_setup_teardown: null, + label: null, + source_id: sourceId, + target_id: targetId, +}); + +describe("hasUniformExternalConnectivity", () => { + it("returns false for the vanilla TaskGroup shape (only entry/exit cross the boundary)", () => { + // ``a1`` has external source {start}; ``group_done`` has external target + // {final_task}. Their profiles differ — not canonical fan-in/fan-out. + const edges = [buildEdge("start", "a1"), buildEdge("group_done", "final_task")]; + const result = hasUniformExternalConnectivity(new Set(["a1", "branch", "a2", "a3", "group_done"]), edges); + + expect(result).toBe(false); + }); + + it("returns false when externally-connected children have mixed profiles (entry + exits)", () => { + // The #67714 bug-trigger shape: a1 is the "entry" with external source + // {start}, while a2/a3/group_done are "exits" with external target + // {final_task}. Profiles differ → not canonical → return false so the + // author's explicit escape edges remain visible. + const edges = [ + buildEdge("start", "a1"), + buildEdge("group_done", "final_task"), + buildEdge("a2", "final_task"), + buildEdge("a3", "final_task"), + ]; + const result = hasUniformExternalConnectivity(new Set(["a1", "branch", "a2", "a3", "group_done"]), edges); + + expect(result).toBe(false); + }); + + it("returns true for the canonical fan-in/fan-out shape", () => { + // Every child has the same external source AND the same external target — + // the "cleanup group" pattern that the optimisation is designed for. + const edges = [ + buildEdge("upstream", "T1"), + buildEdge("upstream", "T2"), + buildEdge("upstream", "T3"), + buildEdge("T1", "downstream"), + buildEdge("T2", "downstream"), + buildEdge("T3", "downstream"), + ]; + const result = hasUniformExternalConnectivity(new Set(["T1", "T2", "T3"]), edges); + + expect(result).toBe(true); + }); +}); + +describe("generateElkGraph — open TaskGroup with escape edges (#67714)", () => { + // Mirrors the minimal reproducer from issue #67714: + // + // start ─→ group_a { a1 ─→ branch ─→ [a2, a3] ─→ group_done } ─→ final_task + // ↘──── ↘────────┐ + // ───────→ final_task + // + // ``a1`` is the entry (external source {start}), ``a2``/``a3``/``group_done`` + // are exits (external target {final_task}). Their profiles differ, so + // ``hasUniformExternalConnectivity`` correctly returns false and the + // open-group rewrite branch never runs — every internal and escape edge + // is rendered individually. + const internalChildren: Array = [ + buildNode({ id: "a1", label: "task_a1" }), + buildNode({ id: "branch", label: "branch_a" }), + buildNode({ id: "a2", label: "task_a2" }), + buildNode({ id: "a3", label: "task_a3" }), + buildNode({ id: "group_done", label: "group_done" }), + ]; + + const nodes: Array = [ + buildNode({ id: "start", label: "start" }), + buildNode({ + children: internalChildren, + id: "group_a", + label: "group_a", + }), + buildNode({ id: "final_task", label: "final_task" }), + ]; + + const edges: Array = [ + buildEdge("start", "a1"), + buildEdge("a1", "branch"), + buildEdge("branch", "a2"), + buildEdge("branch", "a3"), + buildEdge("a2", "group_done"), + buildEdge("a3", "group_done"), + buildEdge("group_done", "final_task"), + // The two "escape" edges that trip the bug: + buildEdge("a2", "final_task"), + buildEdge("a3", "final_task"), + ]; + + it("keeps internal group edges so ELK can lay out the children", () => { + const root = generateElkGraph({ + direction: "RIGHT", + edges, + font: "12px sans-serif", + nodes, + openGroupIds: ["group_a"], + }); + + const groupNode = (root.children as Array).find((child) => child.id === "group_a"); + + expect(groupNode).toBeDefined(); + expect(groupNode?.isOpen).toBe(true); + + // All five internal edges must survive — that's what ELK needs to lay out + // a1 → branch → [a2, a3] → group_done correctly inside the group. + const internalEdgeIds = new Set(groupNode?.edges?.map((edge) => edge.id) ?? []); + + expect(internalEdgeIds).toEqual( + new Set(["a1-branch", "branch-a2", "branch-a3", "a2-group_done", "a3-group_done"]), + ); + }); + + it("renders each crossing escape edge individually instead of collapsing them", () => { + const root = generateElkGraph({ + direction: "RIGHT", + edges, + font: "12px sans-serif", + nodes, + openGroupIds: ["group_a"], + }); + + // The deliberately-wired escape edges from a2 and a3 must remain visible + // so the author's explicit dependency intent is preserved in the graph. + const rootEdgeIds = new Set(root.edges?.map((edge) => edge.id) ?? []); + + expect(rootEdgeIds).toEqual( + new Set(["start-a1", "group_done-final_task", "a2-final_task", "a3-final_task"]), + ); + }); +}); + +describe("generateElkGraph — open TaskGroup matching the canonical fan-in/fan-out shape", () => { + // The "cleanup group" pattern the optimisation is designed for: every child + // has the SAME external source AND the SAME external target. The collapse + // optimisation should still fire here, and ``preserveInternal: true`` must + // keep any internal edges intact. + it("collapses crossing edges to a single group-level edge while keeping internal edges", () => { + const nodes: Array = [ + buildNode({ id: "upstream", label: "upstream" }), + buildNode({ + children: [ + buildNode({ id: "T1", label: "T1" }), + buildNode({ id: "T2", label: "T2" }), + buildNode({ id: "T3", label: "T3" }), + buildNode({ id: "T_internal", label: "T_internal" }), + ], + id: "cleanup_group", + label: "cleanup_group", + }), + buildNode({ id: "downstream", label: "downstream" }), + ]; + + const edges: Array = [ + buildEdge("upstream", "T1"), + buildEdge("upstream", "T2"), + buildEdge("upstream", "T3"), + buildEdge("T1", "downstream"), + buildEdge("T2", "downstream"), + buildEdge("T3", "downstream"), + // An internal edge within the group; must survive the optimisation so + // ELK can lay it out inside the open group. + buildEdge("T1", "T_internal"), + ]; + + const root = generateElkGraph({ + direction: "RIGHT", + edges, + font: "12px sans-serif", + nodes, + openGroupIds: ["cleanup_group"], + }); + + const groupNode = (root.children as Array).find((child) => child.id === "cleanup_group"); + + // The internal T1 → T_internal edge must still be present in the group's + // edges array (this is what the ``preserveInternal: true`` flag protects). + const internalEdgeIds = new Set(groupNode?.edges?.map((edge) => edge.id) ?? []); + + expect(internalEdgeIds).toEqual(new Set(["T1-T_internal"])); + + // Six crossing edges (3 fan-in + 3 fan-out) collapse to one each. + const rootEdgeIds = new Set(root.edges?.map((edge) => edge.id) ?? []); + + expect(rootEdgeIds).toEqual(new Set(["upstream-cleanup_group", "cleanup_group-downstream"])); + }); +}); + +describe("generateElkGraph — open TaskGroup without escape edges", () => { + // Regression guard for the simple TaskGroup shape (only entry/exit cross the + // boundary). ``hasUniformExternalConnectivity`` returns false here, so the + // open-group rewrite branch never runs — internal edges should already survive. + it("keeps internal group edges intact", () => { + const nodes: Array = [ + buildNode({ id: "start", label: "start" }), + buildNode({ + children: [ + buildNode({ id: "a1", label: "task_a1" }), + buildNode({ id: "a2", label: "task_a2" }), + buildNode({ id: "group_done", label: "group_done" }), + ], + id: "group_a", + label: "group_a", + }), + buildNode({ id: "final_task", label: "final_task" }), + ]; + + const edges: Array = [ + buildEdge("start", "a1"), + buildEdge("a1", "a2"), + buildEdge("a2", "group_done"), + buildEdge("group_done", "final_task"), + ]; + + const root = generateElkGraph({ + direction: "RIGHT", + edges, + font: "12px sans-serif", + nodes, + openGroupIds: ["group_a"], + }); + + const groupNode = (root.children as Array).find((child) => child.id === "group_a"); + + const internalEdgeIds = new Set(groupNode?.edges?.map((edge) => edge.id) ?? []); + + expect(internalEdgeIds).toEqual(new Set(["a1-a2", "a2-group_done"])); + }); +}); + +describe("generateElkGraph — closed TaskGroup", () => { + // Closed-group behaviour must still drop internal edges (they're not laid out + // when the group is collapsed) and rewrite crossings to point at the group. + it("drops internal edges and rewrites crossings to the group", () => { + const nodes: Array = [ + buildNode({ id: "start", label: "start" }), + buildNode({ + children: [buildNode({ id: "a1", label: "task_a1" }), buildNode({ id: "a2", label: "task_a2" })], + id: "group_a", + label: "group_a", + }), + buildNode({ id: "final_task", label: "final_task" }), + ]; + + const edges: Array = [ + buildEdge("start", "a1"), + buildEdge("a1", "a2"), + buildEdge("a2", "final_task"), + ]; + + const root = generateElkGraph({ + direction: "RIGHT", + edges, + font: "12px sans-serif", + nodes, + openGroupIds: [], + }); + + const rootEdgeIds = new Set(root.edges?.map((edge) => edge.id) ?? []); + + // Internal a1 → a2 must not appear at the root level; crossings collapse to + // single group-level edges. + expect(rootEdgeIds).toEqual(new Set(["start-group_a", "group_a-final_task"])); + }); +}); diff --git a/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts b/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts index b307482ba75dc..ac460c29cb798 100644 --- a/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts +++ b/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts @@ -101,21 +101,30 @@ const formatElkEdge = (edge: EdgeResponse, font: string, node?: NodeResponse): F }); /** - * Returns true when every child task that has at least one external connection - * shares exactly the same set of external sources AND the same set of external - * targets as every other externally-connected child. + * Returns true when every child task with at least one external connection + * shares the **exact same full external profile** — the same set of external + * sources AND the same set of external targets — as every other externally- + * connected child. * - * Example — a "cleanup" group where every task fans out from one upstream node - * and funnels into the same downstream node: + * Canonical pattern — a "cleanup" group where every task fans out from one + * upstream node and funnels into the same downstream node: * * upstream → T1 ─┐ * upstream → T2 ─┼→ downstream * upstream → T3 ─┘ * - * Rendering N individual crossing edges adds visual noise without conveying - * any extra information beyond "the group connects upstream → downstream". - * When this returns true, the caller collapses those N edges to a single - * group-level edge while still rendering the children inside the group. + * Here all three children have profile ``({upstream}, {downstream})`` — same + * sources, same targets. Rendering N individual crossing edges adds visual + * noise without conveying any extra information beyond "the group connects + * upstream → downstream". When this returns true, the caller collapses those + * N edges to a single group-level edge while still rendering the children. + * + * Mixed profiles — e.g. one child is the group's "entry" (external sources + * only, no external targets) while others are "exits" (external targets only, + * no external sources) — are NOT canonical. The author has expressed + * deliberately different external connectivity per child, and collapsing + * those edges would hide that intent. The check returns false in that case, + * and the caller renders each crossing edge individually. See #67714. * * Uses the original, unmodified edge list so that prior sibling group * transformations do not affect the connectivity check. @@ -124,89 +133,123 @@ export const hasUniformExternalConnectivity = ( childIdSet: Set, edges: Array, ): boolean => { - const sourcesPerChild = new Map>(); - const targetsPerChild = new Map>(); + // For each externally-connected child, build the full ``(sources, targets)`` + // profile in a single map (rather than tracking sources and targets in + // independent maps — which loses the per-child correlation). + const profileByChild = new Map; targets: Set }>(); + const getOrInitProfile = (childId: string) => { + let profile = profileByChild.get(childId); + + if (profile === undefined) { + profile = { sources: new Set(), targets: new Set() }; + profileByChild.set(childId, profile); + } + + return profile; + }; for (const edge of edges) { const sourceIsChild = childIdSet.has(edge.source_id); const targetIsChild = childIdSet.has(edge.target_id); if (!sourceIsChild && targetIsChild) { - const existing = sourcesPerChild.get(edge.target_id) ?? new Set(); - - existing.add(edge.source_id); - sourcesPerChild.set(edge.target_id, existing); + getOrInitProfile(edge.target_id).sources.add(edge.source_id); } if (sourceIsChild && !targetIsChild) { - const existing = targetsPerChild.get(edge.source_id) ?? new Set(); - - existing.add(edge.target_id); - targetsPerChild.set(edge.source_id, existing); + getOrInitProfile(edge.source_id).targets.add(edge.target_id); } } - // Need at least 2 children with external connections on at least one side - // for the optimisation to be worthwhile. - if (sourcesPerChild.size < 2 && targetsPerChild.size < 2) { + // Need at least 2 externally-connected children for the optimisation to be + // worthwhile — one child has nothing to collapse against. + if (profileByChild.size < 2) { return false; } - // Build the union of all external sources / targets across all children. - const allSources = new Set(); - const allTargets = new Set(); + // All externally-connected children must share the exact same profile. + const [reference, ...rest] = [...profileByChild.values()]; - for (const sources of sourcesPerChild.values()) { - for (const source of sources) { - allSources.add(source); - } - } - for (const targets of targetsPerChild.values()) { - for (const target of targets) { - allTargets.add(target); - } + // The early-return above on ``profileByChild.size < 2`` guarantees that the + // destructure produced a defined ``reference``, but TypeScript can't see + // through the map-size guard. This explicit check both narrows the type and + // documents the invariant. + if (reference === undefined) { + return false; } - // Every child's external sources must equal allSources (same size sufficient - // given allSources is already the union — a child with fewer differs in size). - for (const sources of sourcesPerChild.values()) { - if (sources.size !== allSources.size) { + const setsEqual = (left: Set, right: Set) => { + if (left.size !== right.size) { return false; } - } - for (const targets of targetsPerChild.values()) { - if (targets.size !== allTargets.size) { - return false; + for (const value of left) { + if (!right.has(value)) { + return false; + } } - } - return true; + return true; + }; + + return rest.every( + (profile) => + setsEqual(profile.sources, reference.sources) && setsEqual(profile.targets, reference.targets), + ); }; // --------------------------------------------------------------------------- // Edge rewriting helper // --------------------------------------------------------------------------- +type RewriteGroupEdgesProps = { + childIdSet: Set; + edges: Array; + groupId: string; + /** + * When false (the default, used for *closed* groups), purely-internal edges + * are dropped — the collapsed group does not need its internal layout. + * + * When true (used when applying the uniform-external optimisation to an + * *open* group), internal edges pass through unchanged so the caller can + * still extract them as the group's internal edges; only crossing edges get + * rewritten and deduplicated. + */ + preserveInternal?: boolean; +}; + /** - * Given the current working edge list, drops purely-internal edges, rewrites - * crossing edges so both endpoints reference `groupId` instead of a child node, - * then deduplicates the result so N rewritten edges collapse to one per - * (source, target) pair. + * Rewrites crossing edges so both endpoints reference `groupId` instead of a + * child node, then deduplicates the result so N rewritten edges collapse to + * one per (source, target) pair. */ -const rewriteGroupEdges = ( - edges: Array, - childIdSet: Set, - groupId: string, -): Array => { +const rewriteGroupEdges = ({ + childIdSet, + edges, + groupId, + preserveInternal = false, +}: RewriteGroupEdgesProps): Array => { const seen = new Set(); return edges - .filter((fe) => !(childIdSet.has(fe.source_id) && childIdSet.has(fe.target_id))) - .map((fe) => ({ - ...fe, - source_id: childIdSet.has(fe.source_id) ? groupId : fe.source_id, - target_id: childIdSet.has(fe.target_id) ? groupId : fe.target_id, - })) + .filter((fe) => preserveInternal || !(childIdSet.has(fe.source_id) && childIdSet.has(fe.target_id))) + .map((fe) => { + const sourceIsChild = childIdSet.has(fe.source_id); + const targetIsChild = childIdSet.has(fe.target_id); + + // Internal edges of an open group must pass through unchanged so the + // caller can recognise and extract them. Rewriting both endpoints to + // ``groupId`` would (a) collapse them to a self-loop and (b) hide them + // from the subsequent internal-edge extraction loop. + if (preserveInternal && sourceIsChild && targetIsChild) { + return fe; + } + + return { + ...fe, + source_id: sourceIsChild ? groupId : fe.source_id, + target_id: targetIsChild ? groupId : fe.target_id, + }; + }) .filter((fe) => { const key = `${fe.source_id}-${fe.target_id}`; @@ -266,8 +309,20 @@ export const generateElkGraph = ({ // and downstream target(s), collapse N crossing edges to one group-level // edge (same as a closed group) while keeping the children visible. // Checked against unformattedEdges so prior sibling transforms don't interfere. + // + // ``preserveInternal: true`` is required because the group is *open* — its + // internal edges must survive past the rewrite so the extraction loop + // below can pull them into the group's ``edges`` array. Without it, ELK + // would receive an open group with no internal edges and fail to lay out + // the children in a sensible left-to-right order whenever an internal + // task has a direct dependency on a node outside the group (see #67714). if (hasUniformExternalConnectivity(childIdSet, unformattedEdges)) { - filteredEdges = rewriteGroupEdges(filteredEdges, childIdSet, node.id); + filteredEdges = rewriteGroupEdges({ + childIdSet, + edges: filteredEdges, + groupId: node.id, + preserveInternal: true, + }); } // Extract any remaining internal edges (both endpoints inside this group). @@ -302,7 +357,11 @@ export const generateElkGraph = ({ if (!isOpen && node.children !== undefined) { // Use a Set for O(1) membership checks — childIds.includes() would be // O(n) per edge, turning the filter/map into O(n × E) for large groups. - filteredEdges = rewriteGroupEdges(filteredEdges, new Set(childIds), node.id); + filteredEdges = rewriteGroupEdges({ + childIdSet: new Set(childIds), + edges: filteredEdges, + groupId: node.id, + }); } const label = `${node.label}${node.is_mapped ? "[1000]" : ""}${node.children ? ` + ${node.children.length} tasks` : ""}`;