@@ -190,7 +190,7 @@ public void testLineageIntegrationWithLastSegmentSeparator() {
190190
191191 // Verify lineage was recorded with separator
192192 List <String > sources = TestLineage .getRecordedSources ();
193- assertThat (sources , hasItem ("gcs:bucket.path/to/file.txt" ));
193+ assertThat (sources , hasItem ("gcs:bucket.` path/to/file.txt` " ));
194194 }
195195
196196 @ Test
@@ -249,6 +249,7 @@ private static class RecordSourceLineageDoFn<T> extends DoFn<T, T> {
249249
250250 @ ProcessElement
251251 public void processElement (ProcessContext c ) {
252+ // !!! Lineage Caller !!!
252253 Lineage .getSources ().add (system , segments );
253254 c .output (c .element ());
254255 }
@@ -268,6 +269,7 @@ private static class RecordSourceLineageWithSubtypeDoFn extends DoFn<Integer, In
268269
269270 @ ProcessElement
270271 public void processElement (ProcessContext c ) {
272+ // !!! Lineage Caller !!!
271273 Lineage .getSources ().add (system , subtype , segments , null );
272274 c .output (c .element ());
273275 }
@@ -287,6 +289,7 @@ private static class RecordSourceLineageWithSeparatorDoFn extends DoFn<String, S
287289
288290 @ ProcessElement
289291 public void processElement (ProcessContext c ) {
292+ // !!! Lineage Caller !!!
290293 Lineage .getSources ().add (system , segments , separator );
291294 c .output (c .element ());
292295 }
@@ -296,7 +299,9 @@ public void processElement(ProcessContext c) {
296299 private static class RecordBothSourceAndSinkLineageDoFn extends DoFn <String , String > {
297300 @ ProcessElement
298301 public void processElement (ProcessContext c ) {
302+ // !!! Lineage Caller !!!
299303 Lineage .getSources ().add ("input-system" , ImmutableList .of ("input-db" , "input-table" ));
304+ // !!! Lineage Caller !!!
300305 Lineage .getSinks ().add ("output-system" , ImmutableList .of ("output-db" , "output-table" ));
301306 c .output (c .element ());
302307 }
0 commit comments