Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 6 additions & 65 deletions src/components/lineage/nodes/dataset_node/DatasetNode.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { NodeProps, Node, useReactFlow } from "@xyflow/react";
import { NodeProps, Node } from "@xyflow/react";
import { useCreatePath, useTranslate } from "react-admin";
import { ReactElement, memo } from "react";
import OpenInNewIcon from "@mui/icons-material/OpenInNew";

import {
DatasetResponseV1,
InputRelationLineageResponseV1,
IORelationSchemaV1,
OutputRelationLineageResponseV1,
} from "@/dataProvider/types";
import { DatasetResponseV1 } from "@/dataProvider/types";
import { LocationIconWithType } from "@/components/location";
import { Button, CardHeader, Typography } from "@mui/material";
import BaseNode from "../base_node/BaseNode";
Expand All @@ -18,7 +13,6 @@ export type DatasetNode = Node<DatasetResponseV1, "datasetNode">;

const DatasetNode = (props: NodeProps<DatasetNode>): ReactElement => {
const translate = useTranslate();
const reactFlow = useReactFlow();

let title = props.data.name;
const subheader = `${props.data.location.type}://${props.data.location.name}`;
Expand All @@ -33,59 +27,6 @@ const DatasetNode = (props: NodeProps<DatasetNode>): ReactElement => {
id: props.data.id,
});

const outputSchemas = reactFlow
.getEdges()
.filter(
(edge) => edge.data?.kind == "OUTPUT" && edge.target === props.id,
)
// @ts-expect-error Explicit cast
.map((edge) => edge.data as OutputRelationLineageResponseV1)
// sort by last_interaction_at descending
.toSorted((a, b) =>
a.last_interaction_at < b.last_interaction_at ? 1 : -1,
)
.map((output) => output.schema)
.filter((schema) => schema !== null)
// keep unique schemas only
.filter(
(schema, index, array) =>
array.findIndex((item) => item.id == schema.id) == index,
);

const inputSchemas = reactFlow
.getEdges()
.filter(
(edge) => edge.data?.kind == "INPUT" && edge.source === props.id,
)
// @ts-expect-error Explicit cast
.map((edge) => edge.data as InputRelationLineageResponseV1)
// sort by last_interaction_at descending
.toSorted((a, b) =>
a.last_interaction_at < b.last_interaction_at ? 1 : -1,
)
.map((input) => input.schema)
.filter((schema) => schema !== null)
// keep unique schemas only
.filter(
(schema, index, array) =>
array.findIndex((item) => item.id == schema.id) == index,
);

// prefer output schema as there is high chance that it describes all the columns properly.
// read interactions may select only a subset of columns.
let schema: IORelationSchemaV1 | undefined = undefined;
let schemaFrom: string = "input";
let schemaCount = 0;
if (outputSchemas.length > 0) {
schema = outputSchemas.at(0);
schemaFrom = "output";
schemaCount = outputSchemas.length;
} else if (inputSchemas.length > 0) {
schema = inputSchemas.at(0);
schemaFrom = "input";
schemaCount = inputSchemas.length;
}

return (
<BaseNode
nodeId={props.id}
Expand All @@ -107,15 +48,15 @@ const DatasetNode = (props: NodeProps<DatasetNode>): ReactElement => {
/>
}
expandableContent={
schema ? (
props.data && props.data.schema ? (
<>
<Typography sx={{ textAlign: "center" }}>
{translate(
`resources.datasets.fields.schema.${schemaFrom}`,
{ smart_count: schemaCount },
`resources.datasets.fields.schema.${props.data.schemaFrom}`,
{ smart_count: props.data.schemaCount },
)}
</Typography>
<DatasetSchemaTable fields={schema.fields} />
<DatasetSchemaTable fields={props.data.schema.fields} />
</>
) : null
}
Expand Down
78 changes: 40 additions & 38 deletions src/components/lineage/utils/getGraphEdges.tsx
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import {
LineageRelationResponseV1,
OutputRelationLineageResponseV1,
SymlinkRelationLineageResponseV1,
InputRelationLineageResponseV1,
ParentRelationLineageResponseV1,
BaseRelationLineageResponseV1,
LineageResponseV1,
RelationEndpointLineageResponseV1,
} from "@/dataProvider/types";
import { Edge, MarkerType } from "@xyflow/react";
import { getNodeId } from "./getGraphNodes";

const STOKE_THICK = 3;
const STOKE_THIN = 1;

export const getEdgeId = (relation: BaseRelationLineageResponseV1): string => {
const getNodeId = (node: RelationEndpointLineageResponseV1): string => {
return node.kind + "-" + node.id;
};

const getEdgeId = (relation: BaseRelationLineageResponseV1): string => {
// @ts-expect-error Type field may be present in some relation types
const type = relation.type ?? "";
return `${getNodeId(relation.from)}-${type}->${getNodeId(relation.to)}`;
Expand All @@ -25,8 +28,6 @@ const getMinimalEdge = (relation: BaseRelationLineageResponseV1): Edge => {
source: getNodeId(relation.from),
target: getNodeId(relation.to),
type: "baseEdge",
// @ts-expect-error For some reason, TS thinks that BaseRelation does not satisfy Record
data: relation,
markerEnd: {
type: MarkerType.ArrowClosed,
},
Expand All @@ -40,9 +41,13 @@ const getMinimalEdge = (relation: BaseRelationLineageResponseV1): Edge => {
const getParentEdge = (relation: ParentRelationLineageResponseV1): Edge => {
return {
...getMinimalEdge(relation),
label: "PARENT",
data: {
...relation,
kind: "PARENT",
},
sourceHandle: "bottom",
targetHandle: "top",
label: "PARENT",
};
};

Expand All @@ -64,6 +69,10 @@ const getOutputEdge = (relation: OutputRelationLineageResponseV1): Edge => {
return {
...getMinimalEdge(relation),
type: "ioEdge",
data: {
...relation,
kind: "OUTPUT",
},
animated: true,
markerEnd: {
type: MarkerType.ArrowClosed,
Expand All @@ -84,6 +93,10 @@ const getInputEdge = (relation: InputRelationLineageResponseV1): Edge => {
return {
...getMinimalEdge(relation),
type: "ioEdge",
data: {
...relation,
kind: "INPUT",
},
animated: true,
markerEnd: {
type: MarkerType.ArrowClosed,
Expand Down Expand Up @@ -128,12 +141,17 @@ const getSymlinkEdge = (
// with more simple graphs, like these:
//
// HDFS <-> Hive -> Job
const targetRelations = raw_response.relations.filter(
(r) => r.to.id == relation.to.id || r.from.id == relation.to.id,
const anyTargetInput = raw_response.relations.inputs.find(
(r) => r.from.id == relation.to.id,
);
const targetHasOnlyOutputs = targetRelations.every(
(r) => r.kind == "OUTPUT" || r.kind == "SYMLINK",
const anyTargetOutput = raw_response.relations.outputs.find(
(r) => r.to.id == relation.to.id,
);
const anyTargetSymlink = raw_response.relations.symlinks.find(
(r) => r.to.id == relation.to.id || r.from.id == relation.to.id,
);
const targetHasOnlyOutputs =
(!!anyTargetOutput || !!anyTargetSymlink) && !anyTargetInput;

const color = "purple";

Expand All @@ -146,6 +164,10 @@ const getSymlinkEdge = (
target: targetHasOnlyOutputs
? getNodeId(relation.from)
: getNodeId(relation.to),
data: {
...relation,
kind: "SYMLINK",
},
markerStart: {
type: MarkerType.ArrowClosed,
color: color,
Expand All @@ -164,35 +186,15 @@ const getSymlinkEdge = (
};
};

const getGraphEdge = (
relation: LineageRelationResponseV1,
raw_response: LineageResponseV1,
): Edge | null => {
switch (relation.kind) {
case "PARENT":
return getParentEdge(relation);
case "OUTPUT":
return getOutputEdge(relation);
case "INPUT":
return getInputEdge(relation);
case "SYMLINK":
return getSymlinkEdge(relation, raw_response);
default:
return getMinimalEdge(relation);
}
};

const getGraphEdges = (raw_response: LineageResponseV1): Edge[] => {
const result: Edge[] = [];

raw_response.relations.forEach((relation) => {
const edge = getGraphEdge(relation, raw_response);
if (edge) {
result.push(edge);
}
});

return result;
return [
...raw_response.relations.parents.map(getParentEdge),
...raw_response.relations.inputs.map(getInputEdge),
...raw_response.relations.outputs.map(getOutputEdge),
...raw_response.relations.symlinks
.map((relation) => getSymlinkEdge(relation, raw_response))
.filter((edge) => edge !== null),
];
};

export default getGraphEdges;
Loading
Loading