Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* Added JUnit 5 support to Java SDK testing framework with new TestPipelineExtension class (Java) ([#18733](https://github.com/apache/beam/issues/18733)).
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## I/Os
Expand All @@ -106,6 +107,8 @@
## New Features / Improvements

* Added support for Processing time Timer in the Spark Classic runner ([#33633](https://github.com/apache/beam/issues/33633)).
* Added JUnit 5 extension support via TestPipelineExtension class to enable modern testing practices while maintaining backward compatibility with existing JUnit 4 TestRule-based tests (Java) ([#18733](https://github.com/apache/beam/issues/18733)).
* Added minimal JUnit 5 dependencies (junit-jupiter-api, junit-jupiter-engine 5.10.0) for TestPipelineExtension support without affecting existing JUnit 4 tests (Java) ([#18733](https://github.com/apache/beam/issues/18733)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))
Expand Down
8 changes: 8 additions & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ dependencies {
shadowTest library.java.everit_json_schema
provided library.java.junit
provided library.java.hamcrest
provided 'org.junit.jupiter:junit-jupiter-api:5.10.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0'
provided 'io.airlift:aircompressor:0.18'
provided 'com.facebook.presto.hadoop:hadoop-apache2:3.2.0-1'
provided library.java.zstd_jni
Expand All @@ -130,3 +133,8 @@ project.tasks.compileTestJava {
// TODO: fix other places with warnings in tests and delete this option
options.compilerArgs += ['-Xlint:-rawtypes']
}

// Configure test task to use both JUnit 4 and JUnit 5
test {
useJUnit()
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@
* </ul>
*
* <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
* remote execution modes. For example:
* remote execution modes.
*
* <h3>JUnit 4 Usage</h3>
*
* For JUnit 4 tests, use this class as a TestRule:
*
* <pre><code>
* {@literal @Rule}
Expand All @@ -97,6 +101,23 @@
* }
* </code></pre>
*
* <h3>JUnit 5 Usage</h3>
*
* For JUnit 5 tests, use {@link TestPipelineExtension}:
*
* <pre><code>
* {@literal @ExtendWith}(TestPipelineExtension.class)
* class MyPipelineTest {
* {@literal @Test}
* {@literal @Category}(NeedsRunner.class)
* void myPipelineTest(TestPipeline pipeline) {
* final PCollection&lt;String&gt; pCollection = pipeline.apply(...)
* PAssert.that(pCollection).containsInAnyOrder(...);
* pipeline.run();
* }
* }
* </code></pre>
*
* <p>For pipeline runners, it is required that they must throw an {@link AssertionError} containing
* the message from the {@link PAssert} that failed.
*
Expand All @@ -108,7 +129,7 @@ public class TestPipeline extends Pipeline implements TestRule {

private final PipelineOptions options;

private static class PipelineRunEnforcement {
static class PipelineRunEnforcement {

@SuppressWarnings("WeakerAccess")
protected boolean enableAutoRunIfMissing;
Expand All @@ -117,7 +138,7 @@ private static class PipelineRunEnforcement {

protected boolean runAttempted;

private PipelineRunEnforcement(final Pipeline pipeline) {
PipelineRunEnforcement(final Pipeline pipeline) {
this.pipeline = pipeline;
}

Expand All @@ -138,7 +159,7 @@ protected void afterUserCodeFinished() {
}
}

private static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {
static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {

// Null until the pipeline has been run
private @MonotonicNonNull List<TransformHierarchy.Node> runVisitedNodes;
Expand All @@ -164,7 +185,7 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node node) {
}
}

private PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
super(pipeline);
runVisitedNodes = null;
}
Expand Down Expand Up @@ -574,7 +595,7 @@ public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pip
}
}

private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
static class IsEmptyVisitor extends PipelineVisitor.Defaults {
private boolean empty = true;

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Optional;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline.PipelineAbandonedNodeEnforcement;
import org.apache.beam.sdk.testing.TestPipeline.PipelineRunEnforcement;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolver;

/**
* JUnit 5 extension for {@link TestPipeline} that provides the same functionality as the JUnit 4
* {@link org.junit.rules.TestRule} implementation.
*
* <p>Use this extension to test pipelines in JUnit 5:
*
* <pre><code>
* {@literal @}ExtendWith(TestPipelineExtension.class)
* class MyPipelineTest {
* {@literal @}Test
* {@literal @}Category(NeedsRunner.class)
* void myPipelineTest(TestPipeline pipeline) {
* final PCollection&lt;String&gt; pCollection = pipeline.apply(...)
* PAssert.that(pCollection).containsInAnyOrder(...);
* pipeline.run();
* }
* }
* </code></pre>
*
* <p>You can also create the extension yourself for more control:
*
* <pre><code>
* class MyPipelineTest {
* {@literal @}RegisterExtension
* final TestPipelineExtension pipeline = TestPipelineExtension.create();
*
* {@literal @}Test
* void testUsingPipeline() {
* pipeline.apply(...);
* pipeline.run();
* }
* }
* </code></pre>
*/
public class TestPipelineExtension
implements BeforeEachCallback, AfterEachCallback, ParameterResolver {

private static final ExtensionContext.Namespace NAMESPACE =
ExtensionContext.Namespace.create(TestPipelineExtension.class);
private static final String PIPELINE_KEY = "testPipeline";
private static final String ENFORCEMENT_KEY = "enforcement";

/** Creates a new TestPipelineExtension with default options. */
public static TestPipelineExtension create() {
return new TestPipelineExtension();
}

/** Creates a new TestPipelineExtension with custom options. */
public static TestPipelineExtension fromOptions(PipelineOptions options) {
return new TestPipelineExtension(options);
}

private TestPipeline testPipeline;

/** Creates a TestPipelineExtension with default options. */
public TestPipelineExtension() {
this.testPipeline = TestPipeline.create();
}

/** Creates a TestPipelineExtension with custom options. */
public TestPipelineExtension(PipelineOptions options) {
this.testPipeline = TestPipeline.fromOptions(options);
}

@Override
public boolean supportsParameter(
ParameterContext parameterContext, ExtensionContext extensionContext) {
return parameterContext.getParameter().getType() == TestPipeline.class;
}

@Override
public Object resolveParameter(
ParameterContext parameterContext, ExtensionContext extensionContext) {
if (this.testPipeline == null) {
return getOrCreateTestPipeline(extensionContext);
} else {
return this.testPipeline;
}
}

@Override
public void beforeEach(ExtensionContext context) throws Exception {
TestPipeline pipeline;

if (this.testPipeline != null) {
pipeline = this.testPipeline;
} else {
pipeline = getOrCreateTestPipeline(context);
}

Optional<PipelineRunEnforcement> enforcement = getOrCreateEnforcement(context);

// Set application name based on test method
String appName = getAppName(context);
pipeline.getOptions().as(ApplicationNameOptions.class).setAppName(appName);

// Set up enforcement based on annotations
setDeducedEnforcementLevel(context, pipeline, enforcement);
}

@Override
public void afterEach(ExtensionContext context) throws Exception {
Optional<PipelineRunEnforcement> enforcement = getEnforcement(context);
if (enforcement.isPresent()) {
enforcement.get().afterUserCodeFinished();
}
}

private TestPipeline getOrCreateTestPipeline(ExtensionContext context) {
return context
.getStore(NAMESPACE)
.getOrComputeIfAbsent(PIPELINE_KEY, key -> TestPipeline.create(), TestPipeline.class);
}

private Optional<PipelineRunEnforcement> getOrCreateEnforcement(ExtensionContext context) {
return context
.getStore(NAMESPACE)
.getOrComputeIfAbsent(
ENFORCEMENT_KEY, key -> Optional.<PipelineRunEnforcement>empty(), Optional.class);
}

private Optional<PipelineRunEnforcement> getEnforcement(ExtensionContext context) {
return context.getStore(NAMESPACE).get(ENFORCEMENT_KEY, Optional.class);
}

private void setEnforcement(
ExtensionContext context, Optional<PipelineRunEnforcement> enforcement) {
context.getStore(NAMESPACE).put(ENFORCEMENT_KEY, enforcement);
}

private String getAppName(ExtensionContext context) {
String className = context.getTestClass().map(Class::getSimpleName).orElse("UnknownClass");
String methodName = context.getTestMethod().map(Method::getName).orElse("unknownMethod");
return className + "-" + methodName;
}

private void setDeducedEnforcementLevel(
ExtensionContext context,
TestPipeline pipeline,
Optional<PipelineRunEnforcement> enforcement) {
// If enforcement level has not been set, do auto-inference
if (!enforcement.isPresent()) {
boolean annotatedWithNeedsRunner = hasNeedsRunnerAnnotation(context);

PipelineOptions options = pipeline.getOptions();
boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner());

checkState(
!(annotatedWithNeedsRunner && crashingRunner),
"The test was annotated with a [@%s] / [@%s] while the runner "
+ "was set to [%s]. Please re-check your configuration.",
NeedsRunner.class.getSimpleName(),
ValidatesRunner.class.getSimpleName(),
CrashingRunner.class.getSimpleName());

if (annotatedWithNeedsRunner || !crashingRunner) {
Optional<PipelineRunEnforcement> newEnforcement =
Optional.of(new PipelineAbandonedNodeEnforcement(pipeline));
setEnforcement(context, newEnforcement);
}
}
}

private boolean hasNeedsRunnerAnnotation(ExtensionContext context) {
// Check method annotations
Method testMethod = context.getTestMethod().orElse(null);
if (testMethod != null) {
if (hasNeedsRunnerCategory(testMethod.getAnnotations())) {
return true;
}
}

// Check class annotations
Class<?> testClass = context.getTestClass().orElse(null);
if (testClass != null) {
if (hasNeedsRunnerCategory(testClass.getAnnotations())) {
return true;
}
}

return false;
}

private boolean hasNeedsRunnerCategory(Annotation[] annotations) {
return Arrays.stream(annotations)
.filter(annotation -> annotation instanceof Category)
.map(annotation -> (Category) annotation)
.flatMap(category -> Arrays.stream(category.value()))
.anyMatch(categoryClass -> NeedsRunner.class.isAssignableFrom(categoryClass));
}
}
Loading
Loading