Skip to content

Commit a738a83

Browse files
committed
Add ITs
1 parent d468427 commit a738a83

File tree

5 files changed

+437
-0
lines changed

5 files changed

+437
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates;
17+
18+
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
21+
22+
import com.google.cloud.spanner.Mutation;
23+
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
24+
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
25+
import com.google.common.io.Resources;
26+
import com.google.pubsub.v1.SubscriptionName;
27+
import java.io.IOException;
28+
import java.util.HashSet;
29+
import java.util.List;
30+
import java.util.Map;
31+
import org.apache.beam.it.common.PipelineLauncher;
32+
import org.apache.beam.it.common.PipelineOperator;
33+
import org.apache.beam.it.common.utils.ResourceManagerUtils;
34+
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
35+
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
36+
import org.apache.beam.it.gcp.storage.GcsResourceManager;
37+
import org.apache.beam.it.jdbc.MySQLResourceManager;
38+
import org.junit.AfterClass;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
import org.junit.runner.RunWith;
43+
import org.junit.runners.JUnit4;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
47+
/** Integration test for SpannerToSourceDb Flex template using file-based schema overrides. */
48+
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
49+
@TemplateIntegrationTest(SpannerToSourceDb.class)
50+
@RunWith(JUnit4.class)
51+
public class SpannerToSourceDbFileOverridesSchemaMapperIT extends SpannerToSourceDbITBase {
52+
private static final Logger LOG =
53+
LoggerFactory.getLogger(SpannerToSourceDbFileOverridesSchemaMapperIT.class);
54+
private static final HashSet<SpannerToSourceDbFileOverridesSchemaMapperIT> testInstances =
55+
new HashSet<>();
56+
private static PipelineLauncher.LaunchInfo jobInfo;
57+
public static SpannerResourceManager spannerResourceManager;
58+
private static SpannerResourceManager spannerMetadataResourceManager;
59+
public static MySQLResourceManager mySQLResourceManager;
60+
public static GcsResourceManager gcsResourceManager;
61+
private static PubsubResourceManager pubsubResourceManager;
62+
private SubscriptionName subscriptionName;
63+
64+
private static final String SPANNER_DDL_RESOURCE =
65+
"SpannerToSourceDbOverridesIT/spanner-schema.sql";
66+
private static final String MYSQL_SCHEMA_FILE_RESOURCE =
67+
"SpannerToSourceDbOverridesIT/mysql-schema.sql";
68+
private static final String SCHEMA_OVERRIDE_FILE_RESOURCE =
69+
"SpannerToSourceDbOverridesIT/file-overrides.json";
70+
private static final String SCHEMA_OVERRIDE_GCS_PREFIX = "SpannerToSourceDbOverridesIT";
71+
72+
/**
73+
* Setup resource managers and Launch dataflow job once during the execution of this test class.
74+
*
75+
* @throws IOException
76+
*/
77+
@Before
78+
public void setUp() throws IOException {
79+
skipBaseCleanup = true;
80+
synchronized (SpannerToSourceDbFileOverridesSchemaMapperIT.class) {
81+
testInstances.add(this);
82+
if (jobInfo == null) {
83+
spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE);
84+
spannerMetadataResourceManager = createSpannerMetadataDatabase();
85+
mySQLResourceManager = MySQLResourceManager.builder(testName).build();
86+
createMySQLSchema(mySQLResourceManager, MYSQL_SCHEMA_FILE_RESOURCE);
87+
gcsResourceManager = setUpSpannerITGcsResourceManager();
88+
createAndUploadShardConfigToGcs(gcsResourceManager, mySQLResourceManager);
89+
gcsResourceManager.uploadArtifact(
90+
SCHEMA_OVERRIDE_GCS_PREFIX + "/file-overrides.json",
91+
Resources.getResource(SCHEMA_OVERRIDE_FILE_RESOURCE).getPath());
92+
pubsubResourceManager = setUpPubSubResourceManager();
93+
subscriptionName =
94+
createPubsubResources(
95+
getClass().getSimpleName(),
96+
pubsubResourceManager,
97+
getGcsPath("dlq", gcsResourceManager)
98+
.replace("gs://" + gcsResourceManager.getBucket(), ""),
99+
gcsResourceManager);
100+
ADDITIONAL_JOB_PARAMS.clear();
101+
ADDITIONAL_JOB_PARAMS.put(
102+
"schemaOverridesFilePath",
103+
getGcsPath(SCHEMA_OVERRIDE_GCS_PREFIX + "/file-overrides.json", gcsResourceManager));
104+
jobInfo =
105+
launchDataflowJob(
106+
gcsResourceManager,
107+
spannerResourceManager,
108+
spannerMetadataResourceManager,
109+
subscriptionName.toString(),
110+
null,
111+
null,
112+
null,
113+
null,
114+
null,
115+
MYSQL_SOURCE_TYPE);
116+
}
117+
}
118+
}
119+
120+
/**
121+
* Cleanup dataflow job and all the resources and resource managers.
122+
*
123+
* @throws IOException
124+
*/
125+
@AfterClass
126+
public static void cleanUp() throws IOException {
127+
for (SpannerToSourceDbFileOverridesSchemaMapperIT instance : testInstances) {
128+
instance.tearDownBase();
129+
}
130+
ResourceManagerUtils.cleanResources(
131+
spannerResourceManager,
132+
mySQLResourceManager,
133+
spannerMetadataResourceManager,
134+
gcsResourceManager,
135+
pubsubResourceManager);
136+
}
137+
138+
@Test
139+
public void testSpannerToMySQLWithFileOverrides() throws Exception {
140+
assertThatPipeline(jobInfo).isRunning();
141+
// Insert data into Spanner tables matching the override scenario
142+
spannerResourceManager.write(
143+
Mutation.newInsertOrUpdateBuilder("Target_Table_1")
144+
.set("id_col1")
145+
.to(1)
146+
.set("Target_Name_Col_1")
147+
.to("Name One")
148+
.set("data_col1")
149+
.to("Data for one")
150+
.build());
151+
spannerResourceManager.write(
152+
Mutation.newInsertOrUpdateBuilder("Target_Table_1")
153+
.set("id_col1")
154+
.to(2)
155+
.set("Target_Name_Col_1")
156+
.to("Name Two")
157+
.set("data_col1")
158+
.to("Data for two")
159+
.build());
160+
spannerResourceManager.write(
161+
Mutation.newInsertOrUpdateBuilder("source_table2")
162+
.set("key_col2")
163+
.to("K1")
164+
.set("Target_Category_Col_2")
165+
.to("Category Alpha")
166+
.set("value_col2")
167+
.to("Value Alpha")
168+
.build());
169+
spannerResourceManager.write(
170+
Mutation.newInsertOrUpdateBuilder("source_table2")
171+
.set("key_col2")
172+
.to("K2")
173+
.set("Target_Category_Col_2")
174+
.to("Category Beta")
175+
.set("value_col2")
176+
.to("Value Beta")
177+
.build());
178+
179+
PipelineOperator.Result result = pipelineOperator().waitUntilDone(createConfig(jobInfo));
180+
181+
// Assert MySQL table1 (should be source_table1, with column name_col1 renamed)
182+
List<Map<String, Object>> mysqlTable1 =
183+
mySQLResourceManager.runSQLQuery("SELECT id_col1, name_col1, data_col1 FROM source_table1");
184+
assertThat(mysqlTable1).hasSize(2);
185+
assertThat(mysqlTable1.get(0).get("id_col1")).isEqualTo(1);
186+
assertThat(mysqlTable1.get(0).get("name_col1")).isEqualTo("Name One");
187+
assertThat(mysqlTable1.get(0).get("data_col1")).isEqualTo("Data for one");
188+
assertThat(mysqlTable1.get(1).get("id_col1")).isEqualTo(2);
189+
assertThat(mysqlTable1.get(1).get("name_col1")).isEqualTo("Name Two");
190+
assertThat(mysqlTable1.get(1).get("data_col1")).isEqualTo("Data for two");
191+
192+
// Assert MySQL table2 (should be source_table2, with column category_col2 renamed)
193+
List<Map<String, Object>> mysqlTable2 =
194+
mySQLResourceManager.runSQLQuery(
195+
"SELECT key_col2, category_col2, value_col2 FROM source_table2");
196+
assertThat(mysqlTable2).hasSize(2);
197+
assertThat(mysqlTable2.get(0).get("key_col2")).isEqualTo("K1");
198+
assertThat(mysqlTable2.get(0).get("category_col2")).isEqualTo("Category Alpha");
199+
assertThat(mysqlTable2.get(0).get("value_col2")).isEqualTo("Value Alpha");
200+
assertThat(mysqlTable2.get(1).get("key_col2")).isEqualTo("K2");
201+
assertThat(mysqlTable2.get(1).get("category_col2")).isEqualTo("Category Beta");
202+
assertThat(mysqlTable2.get(1).get("value_col2")).isEqualTo("Value Beta");
203+
}
204+
}

0 commit comments

Comments
 (0)