|
33 | 33 | import org.apache.beam.sdk.schemas.SchemaTranslation; |
34 | 34 | import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; |
35 | 35 | import org.apache.beam.sdk.testing.PAssert; |
36 | | -import org.apache.beam.sdk.testing.TestPipeline; |
37 | 36 | import org.apache.beam.sdk.testing.UsesPythonExpansionService; |
38 | 37 | import org.apache.beam.sdk.testing.ValidatesRunner; |
39 | 38 | import org.apache.beam.sdk.transforms.Create; |
40 | 39 | import org.apache.beam.sdk.transforms.Keys; |
41 | 40 | import org.apache.beam.sdk.util.PythonCallableSource; |
| 41 | +import org.apache.beam.sdk.util.construction.BaseExternalTest; |
42 | 42 | import org.apache.beam.sdk.values.KV; |
43 | 43 | import org.apache.beam.sdk.values.PCollection; |
44 | 44 | import org.apache.beam.sdk.values.Row; |
45 | 45 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; |
46 | 46 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; |
47 | | -import org.junit.Rule; |
48 | 47 | import org.junit.Test; |
49 | 48 | import org.junit.experimental.categories.Category; |
50 | 49 | import org.junit.runner.RunWith; |
51 | 50 | import org.junit.runners.JUnit4; |
52 | 51 |
|
53 | 52 | @RunWith(JUnit4.class) |
54 | 53 | public class PythonExternalTransformTest implements Serializable { |
55 | | - @Rule public transient TestPipeline testPipeline = TestPipeline.create(); |
56 | | - |
57 | | - @Test |
58 | | - @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) |
59 | | - public void trivialPythonTransform() { |
60 | | - PCollection<String> output = |
61 | | - testPipeline |
62 | | - .apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z"))) |
63 | | - .apply( |
64 | | - PythonExternalTransform |
65 | | - .<PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>> |
66 | | - from("apache_beam.GroupByKey")) |
67 | | - .apply(Keys.create()); |
68 | | - PAssert.that(output).containsInAnyOrder("A", "B"); |
69 | | - testPipeline.run(); |
70 | | - } |
71 | | - |
72 | | - @Test |
73 | | - @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) |
74 | | - public void pythonTransformWithDependencies() { |
75 | | - PCollection<String> output = |
76 | | - testPipeline |
77 | | - .apply(Create.of("elephant", "mouse", "sheep")) |
78 | | - .apply( |
79 | | - PythonExternalTransform.<PCollection<String>, PCollection<String>>from( |
80 | | - "apache_beam.Map") |
81 | | - .withArgs(PythonCallableSource.of("import inflection\ninflection.pluralize")) |
82 | | - .withExtraPackages(ImmutableList.of("inflection")) |
83 | | - .withOutputCoder(StringUtf8Coder.of())); |
84 | | - PAssert.that(output).containsInAnyOrder("elephants", "mice", "sheep"); |
85 | | - testPipeline.run(); |
| 54 | + @RunWith(JUnit4.class) |
| 55 | + public static class RunPipelineTest extends BaseExternalTest { |
| 56 | + |
| 57 | + @Test |
| 58 | + @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) |
| 59 | + public void trivialPythonTransform() { |
| 60 | + PCollection<String> output = |
| 61 | + testPipeline |
| 62 | + .apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z"))) |
| 63 | + .apply( |
| 64 | + PythonExternalTransform |
| 65 | + .<PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>> |
| 66 | + from("apache_beam.GroupByKey")) |
| 67 | + .apply(Keys.create()); |
| 68 | + PAssert.that(output).containsInAnyOrder("A", "B"); |
| 69 | + } |
| 70 | + |
| 71 | + @Test |
| 72 | + @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) |
| 73 | + public void pythonTransformWithDependencies() { |
| 74 | + PCollection<String> output = |
| 75 | + testPipeline |
| 76 | + .apply(Create.of("elephant", "mouse", "sheep")) |
| 77 | + .apply( |
| 78 | + PythonExternalTransform.<PCollection<String>, PCollection<String>>from( |
| 79 | + "apache_beam.Map") |
| 80 | + .withArgs(PythonCallableSource.of("import inflection\ninflection.pluralize")) |
| 81 | + .withExtraPackages(ImmutableList.of("inflection")) |
| 82 | + .withOutputCoder(StringUtf8Coder.of())); |
| 83 | + PAssert.that(output).containsInAnyOrder("elephants", "mice", "sheep"); |
| 84 | + } |
86 | 85 | } |
87 | 86 |
|
88 | 87 | @Test |
|
0 commit comments