@@ -7,8 +7,11 @@ import {
77 IndirectColumnLineageRelationLineageResponseV1 ,
88 LineageResponseV1 ,
99 RelationEndpointLineageResponseV1 ,
10+ IORelationSchemaV1 ,
11+ IORelationSchemaFieldV1 ,
1012} from "@/dataProvider/types" ;
1113import { Edge , MarkerType } from "@xyflow/react" ;
14+ import { flattenFields } from "../nodes/dataset_node/utils" ;
1215
1316const STOKE_THICK = 3 ;
1417const STOKE_MEDIUM = 1 ;
@@ -186,9 +189,45 @@ const getInputEdge = (
186189} ;
187190
188191const getSymlinkEdge = (
192+ relation : SymlinkRelationLineageResponseV1 ,
193+ targetHasOnlyOutputs : boolean ,
194+ ) : Edge => {
195+ const color = "purple" ;
196+ return {
197+ ...getMinimalEdge ( relation ) ,
198+ label : "SYMLINK" ,
199+ source : targetHasOnlyOutputs
200+ ? getNodeId ( relation . to )
201+ : getNodeId ( relation . from ) ,
202+ target : targetHasOnlyOutputs
203+ ? getNodeId ( relation . from )
204+ : getNodeId ( relation . to ) ,
205+ data : {
206+ ...relation ,
207+ kind : "SYMLINK" ,
208+ } ,
209+ markerStart : {
210+ type : MarkerType . ArrowClosed ,
211+ color : color ,
212+ } ,
213+ markerEnd : {
214+ type : MarkerType . ArrowClosed ,
215+ color : color ,
216+ } ,
217+ style : {
218+ strokeWidth : STOKE_MEDIUM ,
219+ stroke : color ,
220+ } ,
221+ labelStyle : {
222+ backgroundColor : color ,
223+ } ,
224+ } ;
225+ } ;
226+
227+ const getSymlinkEdges = (
189228 relation : SymlinkRelationLineageResponseV1 ,
190229 raw_response : LineageResponseV1 ,
191- ) : Edge | null => {
230+ ) : Edge [ ] => {
192231 if ( relation . type == "WAREHOUSE" ) {
193232 // having 2 edges between same nodes leads to confusing cross links like those:
194233 //
@@ -200,7 +239,7 @@ const getSymlinkEdge = (
200239 // >Hive<
201240 //
202241 // To avoid that, keep only one symlink, at least for now.
203- return null ;
242+ return [ ] ;
204243 }
205244
206245 // if target node has only outputs (e.g. Hive is a source for all jobs/runs/... in this graph),
@@ -214,117 +253,140 @@ const getSymlinkEdge = (
214253 // with more simple graphs, like these:
215254 //
216255 // HDFS <-> Hive -> Job
217- const anyTargetInput = raw_response . relations . inputs . find (
256+ const anyTargetSymlink = raw_response . relations . symlinks . find (
257+ ( r ) => r . to . id == relation . to . id || r . from . id == relation . to . id ,
258+ ) ;
259+ const targetInputs = raw_response . relations . inputs . filter (
218260 ( r ) => r . from . id == relation . to . id ,
219261 ) ;
220- const anyTargetOutput = raw_response . relations . outputs . find (
262+ const targetOutputs = raw_response . relations . outputs . filter (
221263 ( r ) => r . to . id == relation . to . id ,
222264 ) ;
223- const anyTargetSymlink = raw_response . relations . symlinks . find (
224- ( r ) => r . to . id == relation . to . id || r . from . id == relation . to . id ,
225- ) ;
265+
226266 const targetHasOnlyOutputs =
227- ( ! ! anyTargetOutput || ! ! anyTargetSymlink ) && ! anyTargetInput ;
267+ ( ! ! targetInputs . length || ! ! anyTargetSymlink ) && ! targetOutputs . length ;
228268
229- const color = "purple" ;
269+ const results : Edge [ ] = [ getSymlinkEdge ( relation , targetHasOnlyOutputs ) ] ;
270+
271+ /*
272+ Connect each field of SYMLINK source to each field of target:
273+
274+ dataset1 dataset2 - SYMLINK - dataset3 - dataset4
275+ [column1] - [column1] ------------ [column1] - [column1]
276+ [column2] ------------ [column2] - [column2]
277+
278+ Without this column lineage is fractured:
279+
280+ dataset1 dataset2 - SYMLINK - dataset3 - dataset4
281+ [column1] - [column1] [column1] - [column1]
282+ [column2] [column2] - [column2]
283+ */
284+ const sourceInputs = raw_response . relations . inputs . filter (
285+ ( r ) => r . from . id == relation . from . id ,
286+ ) ;
287+ const sourceOutputs = raw_response . relations . outputs . filter (
288+ ( r ) => r . to . id == relation . from . id ,
289+ ) ;
290+
291+ const allSchemas = [
292+ ...targetOutputs ,
293+ ...sourceOutputs ,
294+ ...targetInputs ,
295+ ...sourceInputs ,
296+ ]
297+ . map ( ( io ) => io . schema )
298+ . filter ( ( schema ) => schema !== null ) ;
230299
300+ const fieldsSet = new Set < string > ( ) ;
301+ for ( const schema of allSchemas ) {
302+ for ( const field of flattenFields ( schema . fields ) ) {
303+ fieldsSet . add ( field . name ) ;
304+ }
305+ }
306+ const fields = Array . from ( fieldsSet ) ;
307+
308+ if ( fields . length ) {
309+ results . push (
310+ ...fields . map ( ( field ) =>
311+ getColumnLineageEdge (
312+ relation ,
313+ "DIRECT_COLUMN_LINEAGE" ,
314+ [ "IDENTITY" ] ,
315+ field ,
316+ field ,
317+ ) ,
318+ ) ,
319+ ) ;
320+ }
321+
322+ return results ;
323+ } ;
324+
325+ const getColumnLineageEdge = (
326+ relation : BaseRelationLineageResponseV1 ,
327+ kind : "DIRECT_COLUMN_LINEAGE" | "INDIRECT_COLUMN_LINEAGE" ,
328+ types : string [ ] ,
329+ sourceFieldName : string ,
330+ targetFieldName : string | null ,
331+ ) : Edge => {
332+ const color = "gray" ;
231333 return {
232334 ...getMinimalEdge ( relation ) ,
233- label : "SYMLINK" ,
234- source : targetHasOnlyOutputs
235- ? getNodeId ( relation . to )
236- : getNodeId ( relation . from ) ,
237- target : targetHasOnlyOutputs
238- ? getNodeId ( relation . from )
239- : getNodeId ( relation . to ) ,
335+ id : `${ getNodeId ( relation . from ) } :${ sourceFieldName } --COLUMN-LINEAGE-->${ getNodeId ( relation . to ) } :${ targetFieldName ?? "*" } ` ,
336+ sourceHandle : sourceFieldName ,
337+ targetHandle : targetFieldName ,
338+ type : "columnLineageEdge" ,
240339 data : {
241- ...relation ,
242- kind : "SYMLINK" ,
243- } ,
244- markerStart : {
245- type : MarkerType . ArrowClosed ,
246- color : color ,
340+ source_field : sourceFieldName ,
341+ target_field : targetFieldName ,
342+ types : types ,
343+ kind : kind ,
247344 } ,
248345 markerEnd : {
249346 type : MarkerType . ArrowClosed ,
250347 color : color ,
251348 } ,
252349 style : {
253- strokeWidth : STOKE_MEDIUM ,
350+ strokeWidth : STOKE_THIN ,
254351 stroke : color ,
255352 } ,
256353 labelStyle : {
257- backgroundColor : color ,
354+ // label is shown only then edge is selected, so color is different from stroke
355+ backgroundColor : "#89b2f3" ,
258356 } ,
259357 } ;
260358} ;
261359
262360const getDirectColumnLineageEdges = (
263361 relation : DirectColumnLineageRelationLineageResponseV1 ,
264362) : Edge [ ] => {
265- const color = "gray" ;
266363 return Object . keys ( relation . fields ) . flatMap ( ( target_field_name ) => {
267- return relation . fields [ target_field_name ] . map ( ( source_field ) => {
268- return {
269- ...getMinimalEdge ( relation ) ,
270- id : `${ getNodeId ( relation . from ) } :${ source_field . field } --COLUMN-LINEAGE-->${ getNodeId ( relation . to ) } :${ target_field_name } ` ,
271- sourceHandle : source_field . field ,
272- targetHandle : target_field_name ,
273- type : "columnLineageEdge" ,
274- data : {
275- source_field : source_field . field ,
276- target_field : target_field_name ,
277- types : source_field . types ,
278- kind : "DIRECT_COLUMN_LINEAGE" ,
279- } ,
280- markerEnd : {
281- type : MarkerType . ArrowClosed ,
282- color : color ,
283- } ,
284- style : {
285- strokeWidth : STOKE_THIN ,
286- stroke : color ,
287- } ,
288- labelStyle : {
289- // label is shown only then edge is selected, so color is different from stroke
290- backgroundColor : "#89b2f3" ,
291- } ,
292- } ;
293- } ) ;
364+ return relation . fields [ target_field_name ] . map ( ( source_field ) =>
365+ getColumnLineageEdge (
366+ relation ,
367+ "DIRECT_COLUMN_LINEAGE" ,
368+ source_field . types ,
369+ source_field . field ,
370+ target_field_name ,
371+ ) ,
372+ ) ;
294373 } ) ;
295374} ;
296375
297376const getIndirectColumnLineageEdges = (
298377 relation : IndirectColumnLineageRelationLineageResponseV1 ,
299378) : Edge [ ] => {
300- const color = "gray" ;
301379 return relation . fields . map ( ( source_field ) => {
302- return {
303- ...getMinimalEdge ( relation ) ,
304- id : `${ getNodeId ( relation . from ) } :${ source_field . field } --COLUMN-LINEAGE-->${ getNodeId ( relation . to ) } :*` ,
305- sourceHandle : source_field . field ,
306- type : "columnLineageEdge" ,
307- data : {
308- source_field : source_field . field ,
309- target_field : null ,
310- types : source_field . types ,
311- kind : "INDIRECT_COLUMN_LINEAGE" ,
312- } ,
313- markerEnd : {
314- type : MarkerType . ArrowClosed ,
315- color : color ,
316- } ,
317- style : {
318- strokeWidth : STOKE_THIN ,
319- stroke : color ,
320- } ,
321- labelStyle : {
322- // label is shown only then edge is selected, so color is different from stroke
323- backgroundColor : "#89b2f3" ,
324- } ,
325- } ;
380+ return getColumnLineageEdge (
381+ relation ,
382+ "INDIRECT_COLUMN_LINEAGE" ,
383+ source_field . types ,
384+ source_field . field ,
385+ null ,
386+ ) ;
326387 } ) ;
327388} ;
389+
328390const getGraphEdges = ( raw_response : LineageResponseV1 ) : Edge [ ] => {
329391 return [
330392 ...raw_response . relations . inputs . map ( ( relation ) =>
@@ -333,13 +395,13 @@ const getGraphEdges = (raw_response: LineageResponseV1): Edge[] => {
333395 ...raw_response . relations . outputs . map ( ( relation ) =>
334396 getOutputEdge ( relation , raw_response ) ,
335397 ) ,
336- ...raw_response . relations . symlinks
337- . map ( ( relation ) => getSymlinkEdge ( relation , raw_response ) )
338- . filter ( ( edge ) => edge !== null ) ,
339- ...( raw_response . relations . direct_column_lineage ?? [ ] ) . flatMap (
398+ ...raw_response . relations . symlinks . flatMap ( ( relation ) =>
399+ getSymlinkEdges ( relation , raw_response ) ,
400+ ) ,
401+ ...raw_response . relations . direct_column_lineage . flatMap (
340402 getDirectColumnLineageEdges ,
341403 ) ,
342- ...( raw_response . relations . indirect_column_lineage ?? [ ] ) . flatMap (
404+ ...raw_response . relations . indirect_column_lineage . flatMap (
343405 getIndirectColumnLineageEdges ,
344406 ) ,
345407 ] ;
0 commit comments