Skip to content

Commit 10a9f22

Browse files
authored
Bump to Mokito 4 (#34054)
* Split tests need mockStatic outside of DataflowTest To be able to run DataflowTest internally which did not support mockStatic * Remove usage of powermock for GcpIO, KafkaIO, DataflowRunner tests * Minimize powermock dependency for Flink runner
1 parent d0fbfcd commit 10a9f22

File tree

29 files changed

+343
-240
lines changed

29 files changed

+343
-240
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,6 @@ class BeamModulePlugin implements Plugin<Project> {
629629
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
630630
def netty_version = "4.1.110.Final"
631631
def postgres_version = "42.2.16"
632-
def powermock_version = "2.0.9"
633632
// [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom
634633
def protobuf_version = "4.29.0"
635634
def qpid_jms_client_version = "0.61.0"
@@ -837,8 +836,8 @@ class BeamModulePlugin implements Plugin<Project> {
837836
log4j2_core : "org.apache.logging.log4j:log4j-core:$log4j2_version",
838837
log4j2_to_slf4j : "org.apache.logging.log4j:log4j-to-slf4j:$log4j2_version",
839838
log4j2_slf4j_impl : "org.apache.logging.log4j:log4j-slf4j-impl:$log4j2_version",
840-
mockito_core : "org.mockito:mockito-core:3.7.7",
841-
mockito_inline : "org.mockito:mockito-inline:4.5.1",
839+
mockito_core : "org.mockito:mockito-core:4.11.0",
840+
mockito_inline : "org.mockito:mockito-inline:4.11.0",
842841
mongo_java_driver : "org.mongodb:mongo-java-driver:3.12.11",
843842
nemo_compiler_frontend_beam : "org.apache.nemo:nemo-compiler-frontend-beam:$nemo_version",
844843
netty_all : "io.netty:netty-all:$netty_version",
@@ -847,8 +846,6 @@ class BeamModulePlugin implements Plugin<Project> {
847846
netty_transport : "io.netty:netty-transport:$netty_version",
848847
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
849848
postgres : "org.postgresql:postgresql:$postgres_version",
850-
powermock : "org.powermock:powermock-module-junit4:$powermock_version",
851-
powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version",
852849
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
853850
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
854851
proto_google_cloud_bigquery_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1", // google_cloud_platform_libraries_bom sets version

runners/flink/flink_runner.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ dependencies {
201201
testImplementation library.java.hamcrest
202202
testImplementation library.java.junit
203203
testImplementation library.java.mockito_core
204-
testImplementation library.java.powermock
204+
// TODO(https://github.com/apache/beam/issues/34056) remove powermock once remove Whitebox usages
205+
testImplementation "org.powermock:powermock-reflect:2.0.9"
205206
testImplementation library.java.google_api_services_bigquery
206207
testImplementation project(":sdks:java:io:google-cloud-platform")
207208
testImplementation library.java.jackson_dataformat_yaml

runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import static org.hamcrest.CoreMatchers.is;
2222
import static org.hamcrest.MatcherAssert.assertThat;
2323
import static org.junit.Assert.assertNotNull;
24-
import static org.mockito.ArgumentMatchers.anyObject;
24+
import static org.mockito.ArgumentMatchers.any;
2525
import static org.mockito.ArgumentMatchers.anyString;
2626
import static org.mockito.ArgumentMatchers.argThat;
2727
import static org.mockito.ArgumentMatchers.eq;
@@ -101,7 +101,7 @@ public void testCounter() {
101101
public void testGauge() {
102102
FlinkMetricContainer.FlinkGauge flinkGauge =
103103
new FlinkMetricContainer.FlinkGauge(GaugeResult.empty());
104-
when(metricGroup.gauge(eq("namespace.name"), anyObject())).thenReturn(flinkGauge);
104+
when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge);
105105

106106
MetricsContainer step = container.getMetricsContainer("step");
107107
MetricName metricName = MetricName.named("namespace", "name");
@@ -251,7 +251,7 @@ public boolean matches(FlinkDistributionGauge argument) {
251251
public void testDistribution() {
252252
FlinkMetricContainer.FlinkDistributionGauge flinkGauge =
253253
new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
254-
when(metricGroup.gauge(eq("namespace.name"), anyObject())).thenReturn(flinkGauge);
254+
when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge);
255255

256256
MetricsContainer step = container.getMetricsContainer("step");
257257
MetricName metricName = MetricName.named("namespace", "name");
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.hasEntry;
22+
import static org.hamcrest.Matchers.hasKey;
23+
import static org.hamcrest.Matchers.hasProperty;
24+
import static org.hamcrest.Matchers.is;
25+
import static org.mockito.ArgumentMatchers.eq;
26+
import static org.mockito.Mockito.CALLS_REAL_METHODS;
27+
import static org.mockito.Mockito.mock;
28+
29+
import com.google.api.services.dataflow.Dataflow;
30+
import com.google.api.services.dataflow.model.Job;
31+
import java.io.File;
32+
import java.io.IOException;
33+
import java.util.ArrayList;
34+
import java.util.Map;
35+
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
36+
import org.apache.beam.runners.dataflow.options.DefaultGcpRegionFactory;
37+
import org.apache.beam.sdk.Pipeline;
38+
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
39+
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
40+
import org.apache.beam.sdk.io.FileSystems;
41+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
42+
import org.apache.beam.sdk.util.MimeTypes;
43+
import org.hamcrest.Matchers;
44+
import org.junit.Before;
45+
import org.junit.Rule;
46+
import org.junit.Test;
47+
import org.junit.rules.ExpectedException;
48+
import org.junit.rules.TemporaryFolder;
49+
import org.junit.runner.RunWith;
50+
import org.junit.runners.JUnit4;
51+
import org.mockito.ArgumentCaptor;
52+
import org.mockito.MockedStatic;
53+
import org.mockito.Mockito;
54+
55+
/**
56+
* Tests for the {@link DataflowRunner} that involves mock static methods.
57+
*
58+
* <p>Separated from {@link DataflowRunnerTest}.
59+
*/
60+
@RunWith(JUnit4.class)
61+
public class DataflowRunnerStaticTest {
62+
private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
63+
private static final String PROJECT_ID = "some-project";
64+
private static final String REGION_ID = "some-region-1";
65+
@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
66+
@Rule public transient ExpectedException thrown = ExpectedException.none();
67+
private transient Dataflow.Projects.Locations.Jobs mockJobs;
68+
private transient GcsUtil mockGcsUtil;
69+
70+
private DataflowPipelineOptions buildPipelineOptions() throws IOException {
71+
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
72+
options.setRunner(DataflowRunner.class);
73+
options.setProject(PROJECT_ID);
74+
options.setTempLocation(VALID_TEMP_BUCKET);
75+
options.setRegion(REGION_ID);
76+
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
77+
options.setFilesToStage(new ArrayList<>());
78+
options.setDataflowClient(DataflowRunnerTest.buildMockDataflow(mockJobs));
79+
options.setGcsUtil(mockGcsUtil);
80+
options.setGcpCredential(new TestCredential());
81+
82+
// Configure the FileSystem registrar to use these options.
83+
FileSystems.setDefaultPipelineOptions(options);
84+
85+
return options;
86+
}
87+
88+
@Before
89+
public void setUp() throws IOException {
90+
mockGcsUtil = DataflowRunnerTest.buildMockGcsUtil();
91+
mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
92+
}
93+
94+
/**
95+
* Test that the region is set in the generated JSON pipeline options even when a default value is
96+
* grabbed from the environment.
97+
*/
98+
@Test
99+
public void testDefaultRegionSet() throws Exception {
100+
try (MockedStatic<DefaultGcpRegionFactory> mocked =
101+
Mockito.mockStatic(DefaultGcpRegionFactory.class)) {
102+
mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(REGION_ID);
103+
Dataflow.Projects.Locations.Jobs mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
104+
105+
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
106+
options.setRunner(DataflowRunner.class);
107+
options.setProject(PROJECT_ID);
108+
options.setTempLocation(VALID_TEMP_BUCKET);
109+
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
110+
options.setFilesToStage(new ArrayList<>());
111+
options.setDataflowClient(DataflowRunnerTest.buildMockDataflow(mockJobs));
112+
options.setGcsUtil(DataflowRunnerTest.buildMockGcsUtil());
113+
options.setGcpCredential(new TestCredential());
114+
115+
Pipeline p = Pipeline.create(options);
116+
p.run();
117+
118+
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
119+
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
120+
Map<String, Object> sdkPipelineOptions =
121+
jobCaptor.getValue().getEnvironment().getSdkPipelineOptions();
122+
123+
assertThat(sdkPipelineOptions, hasKey("options"));
124+
Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
125+
assertThat(optionsMap, hasEntry("region", options.getRegion()));
126+
}
127+
}
128+
129+
/**
130+
* Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate
131+
* exception when an output file throws IOException at close.
132+
*/
133+
@Test
134+
public void testTemplateRunnerLoggedErrorForFileCloseError() throws Exception {
135+
File templateLocation = tmpFolder.newFile();
136+
String closeErrorMessage = "Unable to close";
137+
138+
try (MockedStatic<FileSystems> mocked =
139+
Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) {
140+
mocked
141+
.when(
142+
() ->
143+
FileSystems.create(
144+
FileSystems.matchNewResource(templateLocation.getPath(), false),
145+
MimeTypes.TEXT))
146+
.thenReturn(
147+
DataflowRunnerTest.createWritableByteChannelThrowsIOExceptionAtClose(
148+
closeErrorMessage));
149+
150+
DataflowPipelineOptions options = buildPipelineOptions();
151+
options.setTemplateLocation(templateLocation.getPath());
152+
Pipeline p = Pipeline.create(options);
153+
154+
thrown.expectMessage("Cannot create output file at");
155+
thrown.expect(RuntimeException.class);
156+
thrown.expectCause(Matchers.isA(IOException.class));
157+
thrown.expectCause(hasProperty("message", is(closeErrorMessage)));
158+
159+
p.run();
160+
}
161+
}
162+
163+
/**
164+
* Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate
165+
* exception when an output file throws IOException at close.
166+
*/
167+
@Test
168+
public void testTemplateRunnerLoggedErrorForFileWriteError() throws Exception {
169+
File templateLocation = tmpFolder.newFile();
170+
String closeErrorMessage = "Unable to write";
171+
172+
try (MockedStatic<FileSystems> mocked =
173+
Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) {
174+
mocked
175+
.when(
176+
() ->
177+
FileSystems.create(
178+
FileSystems.matchNewResource(templateLocation.getPath(), false),
179+
MimeTypes.TEXT))
180+
.thenReturn(
181+
DataflowRunnerTest.createWritableByteChannelThrowsIOExceptionAtWrite(
182+
closeErrorMessage));
183+
184+
thrown.expectMessage("Cannot create output file at");
185+
thrown.expect(RuntimeException.class);
186+
thrown.expectCause(Matchers.isA(IOException.class));
187+
thrown.expectCause(hasProperty("message", is(closeErrorMessage)));
188+
189+
DataflowPipelineOptions options = buildPipelineOptions();
190+
options.setTemplateLocation(templateLocation.getPath());
191+
Pipeline p = Pipeline.create(options);
192+
193+
p.run();
194+
}
195+
}
196+
}

0 commit comments

Comments
 (0)