3131import org .apache .beam .sdk .PipelineResult ;
3232import org .apache .beam .sdk .metrics .Lineage ;
3333import org .apache .beam .sdk .options .PipelineOptionsFactory ;
34+ import org .apache .beam .sdk .testing .NeedsRunner ;
3435import org .apache .beam .sdk .testing .TestPipeline ;
3536import org .apache .beam .sdk .transforms .Create ;
3637import org .apache .beam .sdk .transforms .DoFn ;
3738import org .apache .beam .sdk .transforms .ParDo ;
3839import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableList ;
3940import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Lists ;
4041import org .junit .Before ;
41- import org .junit .Rule ;
4242import org .junit .Test ;
43+ import org .junit .experimental .categories .Category ;
4344import org .junit .runner .RunWith ;
4445import org .junit .runners .JUnit4 ;
4546
4647/** Tests for {@link LineageRegistrar} ServiceLoader discovery and DirectRunner integration. */
4748@ RunWith (JUnit4 .class )
4849public class LineageRegistrarTest {
4950
50- @ Rule public final transient TestPipeline pipeline = TestPipeline .create ();
51-
5251 @ Before
5352 public void setUp () {
5453 // Clear any recorded lineage from previous tests
5554 TestLineage .clearRecorded ();
5655 }
5756
57+ /** Helper to create a TestPipeline with test lineage enabled. */
58+ private TestPipeline createTestPipelineWithLineage () {
59+ TestLineageOptions options = PipelineOptionsFactory .create ().as (TestLineageOptions .class );
60+ options .setEnableTestLineage (true );
61+ TestPipeline pipeline = TestPipeline .fromOptions (options );
62+ // Disable enforcement since we're not using @Rule
63+ pipeline .enableAbandonedNodeEnforcement (false );
64+ return pipeline ;
65+ }
66+
5867 @ Test
5968 public void testServiceLoaderDiscovery () {
6069 // Load all LineageRegistrar implementations via ServiceLoader
@@ -88,11 +97,10 @@ public void testServiceLoaderDiscovery() {
8897 }
8998
9099 @ Test
100+ @ Category (NeedsRunner .class )
91101 public void testLineageIntegrationWithSimpleFQN () {
92- // Enable test lineage plugin
93- TestLineageOptions options = pipeline .getOptions ().as (TestLineageOptions .class );
94- options .setEnableTestLineage (true );
95- Lineage .setDefaultPipelineOptions (pipeline .getOptions ());
102+ // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
103+ TestPipeline pipeline = createTestPipelineWithLineage ();
96104
97105 // Run pipeline that records lineage
98106 pipeline
@@ -108,11 +116,10 @@ public void testLineageIntegrationWithSimpleFQN() {
108116 }
109117
110118 @ Test
119+ @ Category (NeedsRunner .class )
111120 public void testLineageIntegrationWithSubtype () {
112- // Enable test lineage plugin
113- TestLineageOptions options = pipeline .getOptions ().as (TestLineageOptions .class );
114- options .setEnableTestLineage (true );
115- Lineage .setDefaultPipelineOptions (pipeline .getOptions ());
121+ // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
122+ TestPipeline pipeline = createTestPipelineWithLineage ();
116123
117124 // Run pipeline that records lineage with subtype
118125 pipeline
@@ -133,11 +140,10 @@ public void testLineageIntegrationWithSubtype() {
133140 }
134141
135142 @ Test
143+ @ Category (NeedsRunner .class )
136144 public void testLineageIntegrationWithLastSegmentSeparator () {
137- // Enable test lineage plugin
138- TestLineageOptions options = pipeline .getOptions ().as (TestLineageOptions .class );
139- options .setEnableTestLineage (true );
140- Lineage .setDefaultPipelineOptions (pipeline .getOptions ());
145+ // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
146+ TestPipeline pipeline = createTestPipelineWithLineage ();
141147
142148 // Run pipeline that records lineage with custom separator
143149 pipeline
@@ -156,11 +162,10 @@ public void testLineageIntegrationWithLastSegmentSeparator() {
156162 }
157163
158164 @ Test
165+ @ Category (NeedsRunner .class )
159166 public void testLineageIntegrationWithBothSourcesAndSinks () {
160- // Enable test lineage plugin
161- TestLineageOptions options = pipeline .getOptions ().as (TestLineageOptions .class );
162- options .setEnableTestLineage (true );
163- Lineage .setDefaultPipelineOptions (pipeline .getOptions ());
167+ // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
168+ TestPipeline pipeline = createTestPipelineWithLineage ();
164169
165170 // Run pipeline that records both source and sink lineage
166171 pipeline
@@ -179,11 +184,10 @@ public void testLineageIntegrationWithBothSourcesAndSinks() {
179184 }
180185
181186 @ Test
187+ @ Category (NeedsRunner .class )
182188 public void testLineageIntegrationWithMultipleElements () {
183- // Enable test lineage plugin
184- TestLineageOptions options = pipeline .getOptions ().as (TestLineageOptions .class );
185- options .setEnableTestLineage (true );
186- Lineage .setDefaultPipelineOptions (pipeline .getOptions ());
189+ // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
190+ TestPipeline pipeline = createTestPipelineWithLineage ();
187191
188192 // Run pipeline with multiple elements to test thread safety
189193 pipeline
0 commit comments