Skip to content

Commit 60ff65c

Browse files
authored
Spanner-to-SourceDb: Dataflow failure injection test (#2723)
* SpannerToSourceDb - Dataflow failure injection test * Creating schema in source * Adding unit tests for Utils
1 parent 90b3722 commit 60ff65c

File tree

17 files changed

+1281
-103
lines changed

17 files changed

+1281
-103
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.it.gcp.cloudsql.conditions;
19+
20+
import com.google.auto.value.AutoValue;
21+
import javax.annotation.Nullable;
22+
import org.apache.beam.it.conditions.ConditionCheck;
23+
import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager;
24+
25+
/** ConditionCheck to validate if CloudSQL has received a certain amount of rows. */
26+
@AutoValue
27+
public abstract class CloudSQLRowsCheck extends ConditionCheck {
28+
29+
abstract CloudSqlResourceManager resourceManager();
30+
31+
abstract String tableId();
32+
33+
abstract Integer minRows();
34+
35+
@Nullable
36+
abstract Integer maxRows();
37+
38+
@Override
39+
public String getDescription() {
40+
if (maxRows() != null) {
41+
return String.format(
42+
"CloudSQL check if table %s has between %d and %d rows", tableId(), minRows(), maxRows());
43+
}
44+
return String.format("CloudSQL check if table %s has %d rows", tableId(), minRows());
45+
}
46+
47+
@Override
48+
public CheckResult check() {
49+
long totalRows = resourceManager().getRowCount(tableId());
50+
if (totalRows < minRows()) {
51+
return new CheckResult(
52+
false, String.format("Expected %d rows but has only %d", minRows(), totalRows));
53+
}
54+
if (maxRows() != null && totalRows > maxRows()) {
55+
return new CheckResult(
56+
false, String.format("Expected up to %d rows but found %d", maxRows(), totalRows));
57+
}
58+
59+
if (maxRows() != null) {
60+
return new CheckResult(
61+
true,
62+
String.format(
63+
"Expected between %d and %d rows and found %d", minRows(), maxRows(), totalRows));
64+
}
65+
66+
return new CheckResult(
67+
true, String.format("Expected at least %d rows and found %d", minRows(), totalRows));
68+
}
69+
70+
public static Builder builder(CloudSqlResourceManager resourceManager, String tableId) {
71+
return new AutoValue_CloudSQLRowsCheck.Builder()
72+
.setResourceManager(resourceManager)
73+
.setTableId(tableId);
74+
}
75+
76+
/** Builder for {@link CloudSQLRowsCheck}. */
77+
@AutoValue.Builder
78+
public abstract static class Builder {
79+
80+
public abstract Builder setResourceManager(CloudSqlResourceManager resourceManager);
81+
82+
public abstract Builder setTableId(String tableId);
83+
84+
public abstract Builder setMinRows(Integer minRows);
85+
86+
public abstract Builder setMaxRows(Integer maxRows);
87+
88+
abstract CloudSQLRowsCheck autoBuild();
89+
90+
public CloudSQLRowsCheck build() {
91+
return autoBuild();
92+
}
93+
}
94+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/** Package that contains reusable CloudSQL conditions. */
20+
package org.apache.beam.it.gcp.cloudsql.conditions;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.it.gcp.cloudsql.conditions;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
import static org.mockito.Mockito.when;
24+
25+
import org.apache.beam.it.conditions.ConditionCheck;
26+
import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager;
27+
import org.junit.Before;
28+
import org.junit.Rule;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.junit.runners.JUnit4;
32+
import org.mockito.Mock;
33+
import org.mockito.junit.MockitoJUnit;
34+
import org.mockito.junit.MockitoRule;
35+
36+
/** Unit tests for {@link CloudSQLRowsCheck}. */
37+
@RunWith(JUnit4.class)
38+
public class CloudSQLRowsCheckTest {
39+
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
40+
@Mock private CloudSqlResourceManager resourceManager;
41+
private static final String TABLE_ID = "test-table";
42+
43+
@Before
44+
public void setUp() {
45+
when(resourceManager.getRowCount(TABLE_ID)).thenReturn(10L);
46+
}
47+
48+
@Test
49+
public void testCheck_minRowsMet() {
50+
CloudSQLRowsCheck check =
51+
CloudSQLRowsCheck.builder(resourceManager, TABLE_ID).setMinRows(5).build();
52+
ConditionCheck.CheckResult result = check.check();
53+
assertTrue(result.isSuccess());
54+
assertEquals("Expected at least 5 rows and found 10", result.getMessage());
55+
}
56+
57+
@Test
58+
public void testCheck_minRowsNotMet() {
59+
CloudSQLRowsCheck check =
60+
CloudSQLRowsCheck.builder(resourceManager, TABLE_ID).setMinRows(15).build();
61+
ConditionCheck.CheckResult result = check.check();
62+
assertFalse(result.isSuccess());
63+
assertEquals("Expected 15 rows but has only 10", result.getMessage());
64+
}
65+
66+
@Test
67+
public void testCheck_maxRowsMet() {
68+
CloudSQLRowsCheck check =
69+
CloudSQLRowsCheck.builder(resourceManager, TABLE_ID).setMinRows(5).setMaxRows(15).build();
70+
ConditionCheck.CheckResult result = check.check();
71+
assertTrue(result.isSuccess());
72+
assertEquals("Expected between 5 and 15 rows and found 10", result.getMessage());
73+
}
74+
75+
@Test
76+
public void testCheck_maxRowsExceeded() {
77+
CloudSQLRowsCheck check =
78+
CloudSQLRowsCheck.builder(resourceManager, TABLE_ID).setMinRows(5).setMaxRows(8).build();
79+
ConditionCheck.CheckResult result = check.check();
80+
assertFalse(result.isSuccess());
81+
assertEquals("Expected up to 8 rows but found 10", result.getMessage());
82+
}
83+
84+
@Test
85+
public void testGetDescription_minRowsOnly() {
86+
CloudSQLRowsCheck check =
87+
CloudSQLRowsCheck.builder(resourceManager, TABLE_ID).setMinRows(10).build();
88+
assertEquals("CloudSQL check if table test-table has 10 rows", check.getDescription());
89+
}
90+
91+
@Test
92+
public void testGetDescription_minAndMaxRows() {
93+
CloudSQLRowsCheck check =
94+
CloudSQLRowsCheck.builder(resourceManager, TABLE_ID).setMinRows(5).setMaxRows(15).build();
95+
assertEquals(
96+
"CloudSQL check if table test-table has between 5 and 15 rows", check.getDescription());
97+
}
98+
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/DataStreamToSpannerMySQLSrcDataflowFT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.failureinjectiontesting;
1717

18-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.AUTHORS_TABLE;
19-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.BOOKS_TABLE;
18+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.AUTHORS_TABLE;
19+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.BOOKS_TABLE;
2020
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
2121
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
2222

2323
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2424
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
25+
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.DataflowFailureInjector;
26+
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider;
2527
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner;
26-
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.DataflowFailureInjector;
27-
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider;
2828
import java.io.IOException;
2929
import java.time.Duration;
3030
import java.util.Arrays;

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/DataStreamToSpannerMySQLSrcGCSFT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.failureinjectiontesting;
1717

18-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.AUTHORS_TABLE;
19-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.BOOKS_TABLE;
18+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.AUTHORS_TABLE;
19+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.BOOKS_TABLE;
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
2222
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
2323

2424
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2525
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
26+
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider;
2627
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner;
2728
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.GCSFailureInjector;
28-
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider;
2929
import java.io.IOException;
3030
import java.time.Duration;
3131
import java.util.Arrays;

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/DataStreamToSpannerMySQLSrcPubsubFT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.failureinjectiontesting;
1717

18-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.AUTHORS_TABLE;
19-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.BOOKS_TABLE;
18+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.AUTHORS_TABLE;
19+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.BOOKS_TABLE;
2020
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
2121
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
2222
import static org.junit.Assert.assertNotNull;
@@ -28,8 +28,8 @@
2828
import com.google.cloud.logging.Severity;
2929
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
3030
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
31+
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider;
3132
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner;
32-
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider;
3333
import com.google.common.collect.ImmutableMap;
3434
import com.google.pubsub.v1.PullResponse;
3535
import com.google.pubsub.v1.SubscriptionName;

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/DataStreamToSpannerMySQLSrcSpannerFT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.failureinjectiontesting;
1717

18-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.AUTHORS_TABLE;
19-
import static com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider.BOOKS_TABLE;
18+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.AUTHORS_TABLE;
19+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.BOOKS_TABLE;
2020
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
2121
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
2222

2323
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2424
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
25+
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider;
2526
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner;
26-
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.utils.MySQLSrcDataProvider;
2727
import java.io.IOException;
2828
import java.time.Duration;
2929
import java.util.Arrays;

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/utils/MySQLSrcDataProvider.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

v2/spanner-common/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@
7171
<scope>compile</scope>
7272
<version>${project.version}</version>
7373
</dependency>
74+
<dependency>
75+
<groupId>com.google.cloud</groupId>
76+
<artifactId>google-cloud-compute</artifactId>
77+
</dependency>
7478
</dependencies>
7579

7680
<build>

0 commit comments

Comments
 (0)