Skip to content

Commit 3d69a20

Browse files
authored
test: Add end to end forward migration followed by reverse migration (#2240)
* changes * changes * Changes * changes * changes * changes * changes * changes * changes * changes * cahnges * changes * Changes * changes * Changes
1 parent 4370331 commit 3d69a20

File tree

5 files changed

+788
-0
lines changed

5 files changed

+788
-0
lines changed
Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.endtoend;
17+
18+
import static java.util.Arrays.stream;
19+
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
22+
import com.google.cloud.datastream.v1.DestinationConfig;
23+
import com.google.cloud.datastream.v1.SourceConfig;
24+
import com.google.cloud.datastream.v1.Stream;
25+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
26+
import com.google.common.io.Resources;
27+
import com.google.gson.Gson;
28+
import com.google.gson.JsonArray;
29+
import com.google.gson.JsonObject;
30+
import com.google.pubsub.v1.SubscriptionName;
31+
import com.google.pubsub.v1.TopicName;
32+
import java.io.BufferedReader;
33+
import java.io.FileNotFoundException;
34+
import java.io.IOException;
35+
import java.io.InputStream;
36+
import java.io.InputStreamReader;
37+
import java.nio.charset.StandardCharsets;
38+
import java.nio.file.Files;
39+
import java.nio.file.Paths;
40+
import java.util.ArrayList;
41+
import java.util.Collections;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.stream.Collectors;
46+
import org.apache.beam.it.common.PipelineLauncher;
47+
import org.apache.beam.it.common.TestProperties;
48+
import org.apache.beam.it.common.utils.PipelineUtils;
49+
import org.apache.beam.it.conditions.ConditionCheck;
50+
import org.apache.beam.it.gcp.TemplateTestBase;
51+
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
52+
import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager;
53+
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
54+
import org.apache.beam.it.gcp.datastream.DatastreamResourceManager;
55+
import org.apache.beam.it.gcp.datastream.JDBCSource;
56+
import org.apache.beam.it.gcp.datastream.MySQLSource;
57+
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
58+
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
59+
import org.apache.beam.it.gcp.storage.GcsResourceManager;
60+
import org.apache.beam.it.jdbc.JDBCResourceManager;
61+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
62+
import org.slf4j.Logger;
63+
import org.slf4j.LoggerFactory;
64+
65+
public abstract class EndToEndTestingITBase extends TemplateTestBase {
66+
67+
private static final Logger LOG = LoggerFactory.getLogger(EndToEndTestingITBase.class);
68+
private static FlexTemplateDataflowJobResourceManager flexTemplateDataflowJobResourceManager;
69+
public DatastreamResourceManager datastreamResourceManager;
70+
protected JDBCSource jdbcSource;
71+
72+
protected SpannerResourceManager createSpannerDatabase(String spannerSchemaFile)
73+
throws IOException {
74+
SpannerResourceManager spannerResourceManager =
75+
SpannerResourceManager.builder("e2e-main-" + testName, PROJECT, REGION)
76+
.maybeUseStaticInstance()
77+
.build();
78+
79+
String ddl;
80+
try (InputStream inputStream =
81+
Thread.currentThread().getContextClassLoader().getResourceAsStream(spannerSchemaFile)) {
82+
if (inputStream == null) {
83+
throw new FileNotFoundException("Resource file not found: " + spannerSchemaFile);
84+
}
85+
try (BufferedReader reader =
86+
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
87+
ddl = reader.lines().collect(Collectors.joining("\n"));
88+
}
89+
}
90+
91+
if (ddl.isBlank()) {
92+
throw new IllegalStateException("DDL file is empty: " + spannerSchemaFile);
93+
}
94+
95+
String[] ddls = ddl.trim().split(";");
96+
for (String d : ddls) {
97+
d = d.trim();
98+
if (!d.isEmpty()) {
99+
spannerResourceManager.executeDdlStatement(d);
100+
}
101+
}
102+
return spannerResourceManager;
103+
}
104+
105+
protected SpannerResourceManager createSpannerMetadataDatabase() {
106+
SpannerResourceManager spannerMetadataResourceManager =
107+
SpannerResourceManager.builder("e2e-meta-" + testName, PROJECT, REGION)
108+
.maybeUseStaticInstance()
109+
.build();
110+
String dummy = "create table t1(id INT64 ) primary key(id)";
111+
spannerMetadataResourceManager.executeDdlStatement(dummy);
112+
return spannerMetadataResourceManager;
113+
}
114+
115+
public PubsubResourceManager setUpPubSubResourceManager() throws IOException {
116+
return PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build();
117+
}
118+
119+
// createPubsubResources generates pubsub topic, subscription and notification for migration.
120+
// It can be run in different modes based on type of migration.
121+
// Modes can be rr for reverse replication and fwd for forward migration
122+
public SubscriptionName createPubsubResources(
123+
String identifierSuffix,
124+
PubsubResourceManager pubsubResourceManager,
125+
String gcsPrefix,
126+
GcsResourceManager gcsResourceManager,
127+
String mode) {
128+
String topicNameSuffix = mode + "-it" + identifierSuffix;
129+
String subscriptionNameSuffix = mode + "-it-sub" + identifierSuffix;
130+
TopicName topic = pubsubResourceManager.createTopic(topicNameSuffix);
131+
SubscriptionName subscription =
132+
pubsubResourceManager.createSubscription(topic, subscriptionNameSuffix);
133+
String prefix = gcsPrefix;
134+
if (prefix.startsWith("/")) {
135+
prefix = prefix.substring(1);
136+
}
137+
// create retry directory for reverse migration
138+
if (mode == "rr") {
139+
prefix += "/retry/";
140+
}
141+
gcsResourceManager.createNotification(topic.toString(), prefix);
142+
return subscription;
143+
}
144+
145+
protected void createAndUploadReverseShardConfigToGcs(
146+
GcsResourceManager gcsResourceManager,
147+
CloudSqlResourceManager cloudSqlResourceManager,
148+
String privateHost) {
149+
Shard shard = new Shard();
150+
shard.setLogicalShardId("Shard1");
151+
shard.setUser(cloudSqlResourceManager.getUsername());
152+
shard.setHost(privateHost);
153+
shard.setPassword(cloudSqlResourceManager.getPassword());
154+
shard.setPort(String.valueOf(cloudSqlResourceManager.getPort()));
155+
shard.setDbName(cloudSqlResourceManager.getDatabaseName());
156+
JsonObject jsObj = new Gson().toJsonTree(shard).getAsJsonObject();
157+
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
158+
JsonArray ja = new JsonArray();
159+
ja.add(jsObj);
160+
String shardFileContents = ja.toString();
161+
LOG.info("Shard file contents: {}", shardFileContents);
162+
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
163+
}
164+
165+
public String getGcsFullPath(
166+
GcsResourceManager gcsResourceManager, String artifactId, String identifierSuffix) {
167+
return ArtifactUtils.getFullGcsPath(
168+
artifactBucketName, identifierSuffix, gcsResourceManager.runId(), artifactId);
169+
}
170+
171+
public PipelineLauncher.LaunchInfo launchRRDataflowJob(
172+
SpannerResourceManager spannerResourceManager,
173+
GcsResourceManager gcsResourceManager,
174+
SpannerResourceManager spannerMetadataResourceManager,
175+
PubsubResourceManager pubsubResourceManager,
176+
String sourceType)
177+
throws IOException {
178+
String rrJobName = PipelineUtils.createJobName("rrev-it" + testName);
179+
180+
// create subscription
181+
SubscriptionName rrSubscriptionName =
182+
createPubsubResources(
183+
getClass().getSimpleName(),
184+
pubsubResourceManager,
185+
getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, ""),
186+
gcsResourceManager,
187+
"rr");
188+
189+
// Launch Dataflow template
190+
flexTemplateDataflowJobResourceManager =
191+
FlexTemplateDataflowJobResourceManager.builder(rrJobName)
192+
.withTemplateName("Spanner_to_SourceDb")
193+
.withTemplateModulePath("v2/spanner-to-sourcedb")
194+
.addParameter("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager))
195+
.addParameter("instanceId", spannerResourceManager.getInstanceId())
196+
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
197+
.addParameter("spannerProjectId", PROJECT)
198+
.addParameter("metadataDatabase", spannerMetadataResourceManager.getDatabaseId())
199+
.addParameter("metadataInstance", spannerMetadataResourceManager.getInstanceId())
200+
.addParameter(
201+
"sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager))
202+
.addParameter("changeStreamName", "allstream")
203+
.addParameter("dlqGcsPubSubSubscription", rrSubscriptionName.toString())
204+
.addParameter("deadLetterQueueDirectory", getGcsPath("dlq", gcsResourceManager))
205+
.addParameter("maxShardConnections", "5")
206+
.addParameter("maxNumWorkers", "1")
207+
.addParameter("numWorkers", "1")
208+
.addParameter("sourceType", sourceType)
209+
.addEnvironmentVariable(
210+
"additionalExperiments", Collections.singletonList("use_runner_v2"))
211+
.build();
212+
213+
// Run
214+
PipelineLauncher.LaunchInfo jobInfo = flexTemplateDataflowJobResourceManager.launchJob();
215+
assertThatPipeline(jobInfo).isRunning();
216+
return jobInfo;
217+
}
218+
219+
public String getGcsPath(String... pathParts) {
220+
checkArgument(pathParts.length != 0, "Must provide at least one path part");
221+
checkArgument(
222+
stream(pathParts).noneMatch(Strings::isNullOrEmpty), "No path part can be null or empty");
223+
224+
return String.format("gs://%s", String.join("/", pathParts));
225+
}
226+
227+
public PipelineLauncher.LaunchInfo launchFwdDataflowJob(
228+
SpannerResourceManager spannerResourceManager,
229+
GcsResourceManager gcsResourceManager,
230+
PubsubResourceManager pubsubResourceManager)
231+
throws IOException {
232+
String testRootDir = getClass().getSimpleName();
233+
234+
// create subscriptions
235+
String gcsPrefix =
236+
String.join("/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "cdc"});
237+
SubscriptionName subscription =
238+
createPubsubResources(
239+
testRootDir + testName, pubsubResourceManager, gcsPrefix, gcsResourceManager, "fwd");
240+
241+
String dlqGcsPrefix =
242+
String.join("/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "dlq"});
243+
SubscriptionName dlqSubscription =
244+
createPubsubResources(
245+
testRootDir + testName + "dlq",
246+
pubsubResourceManager,
247+
dlqGcsPrefix,
248+
gcsResourceManager,
249+
"fwd");
250+
String artifactBucket = TestProperties.artifactBucket();
251+
252+
// launch datastream
253+
datastreamResourceManager =
254+
DatastreamResourceManager.builder(testName, PROJECT, REGION)
255+
.setCredentialsProvider(credentialsProvider)
256+
.setPrivateConnectivity("datastream-private-connect-us-central1")
257+
.build();
258+
Stream stream =
259+
createDataStreamResources(artifactBucket, gcsPrefix, jdbcSource, datastreamResourceManager);
260+
261+
String jobName = PipelineUtils.createJobName("fwd-" + getClass().getSimpleName());
262+
// launch dataflow template
263+
flexTemplateDataflowJobResourceManager =
264+
FlexTemplateDataflowJobResourceManager.builder(jobName)
265+
.withTemplateName("Cloud_Datastream_to_Spanner")
266+
.withTemplateModulePath("v2/datastream-to-spanner")
267+
.addParameter("inputFilePattern", getGcsPath(artifactBucket, gcsPrefix))
268+
.addParameter("streamName", stream.getName())
269+
.addParameter("instanceId", spannerResourceManager.getInstanceId())
270+
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
271+
.addParameter("projectId", PROJECT)
272+
.addParameter("deadLetterQueueDirectory", getGcsPath(artifactBucket, dlqGcsPrefix))
273+
.addParameter("gcsPubSubSubscription", subscription.toString())
274+
.addParameter("dlqGcsPubSubSubscription", dlqSubscription.toString())
275+
.addParameter("datastreamSourceType", "mysql")
276+
.addParameter("inputFileFormat", "avro")
277+
.addParameter("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager))
278+
.addEnvironmentVariable(
279+
"additionalExperiments", Collections.singletonList("use_runner_v2"))
280+
.build();
281+
282+
// Run
283+
PipelineLauncher.LaunchInfo jobInfo = flexTemplateDataflowJobResourceManager.launchJob();
284+
assertThatPipeline(jobInfo).isRunning();
285+
return jobInfo;
286+
}
287+
288+
public Stream createDataStreamResources(
289+
String artifactBucketName,
290+
String gcsPrefix,
291+
JDBCSource jdbcSource,
292+
DatastreamResourceManager datastreamResourceManager) {
293+
SourceConfig sourceConfig =
294+
datastreamResourceManager.buildJDBCSourceConfig("mysql", jdbcSource);
295+
296+
// Create DataStream GCS Destination Connection profile and config
297+
DestinationConfig destinationConfig =
298+
datastreamResourceManager.buildGCSDestinationConfig(
299+
"gcs",
300+
artifactBucketName,
301+
gcsPrefix,
302+
DatastreamResourceManager.DestinationOutputFormat.AVRO_FILE_FORMAT);
303+
304+
// Create and start DataStream stream
305+
Stream stream =
306+
datastreamResourceManager.createStream("ds-spanner", sourceConfig, destinationConfig);
307+
datastreamResourceManager.startStream(stream);
308+
return stream;
309+
}
310+
311+
protected ConditionCheck writeJdbcData(
312+
String tableName,
313+
Integer numRows,
314+
Map<String, Object> columns,
315+
Map<String, List<Map<String, Object>>> cdcEvents,
316+
CloudSqlResourceManager cloudSqlResourceManager) {
317+
return new ConditionCheck() {
318+
@Override
319+
protected String getDescription() {
320+
return "Send initial JDBC events.";
321+
}
322+
323+
@Override
324+
protected CheckResult check() {
325+
boolean success = true;
326+
List<String> messages = new ArrayList<>();
327+
List<Map<String, Object>> rows = new ArrayList<>();
328+
for (int i = 0; i < numRows; i++) {
329+
Map<String, Object> values = new HashMap<>();
330+
values.put("id", i);
331+
values.putAll(columns);
332+
rows.add(values);
333+
}
334+
cdcEvents.put(tableName, rows);
335+
success &= cloudSqlResourceManager.write(tableName, rows);
336+
messages.add(String.format("%d rows to %s", rows.size(), tableName));
337+
338+
return new CheckResult(success, "Sent " + String.join(", ", messages) + ".");
339+
}
340+
};
341+
}
342+
343+
protected String generateSessionFile(String srcDb, String spannerDb, String sessionFileResource)
344+
throws IOException {
345+
String sessionFile =
346+
Files.readString(Paths.get(Resources.getResource(sessionFileResource).getPath()));
347+
return sessionFile.replaceAll("SRC_DATABASE", srcDb).replaceAll("SP_DATABASE", spannerDb);
348+
}
349+
350+
protected JDBCSource createMySqlDatabase(
351+
CloudSqlResourceManager cloudSqlResourceManager, Map<String, Map<String, String>> tables) {
352+
for (HashMap.Entry<String, Map<String, String>> entry : tables.entrySet()) {
353+
cloudSqlResourceManager.createTable(
354+
entry.getKey(), new JDBCResourceManager.JDBCSchema(entry.getValue(), "id"));
355+
}
356+
return MySQLSource.builder(
357+
cloudSqlResourceManager.getHost(),
358+
cloudSqlResourceManager.getUsername(),
359+
cloudSqlResourceManager.getPassword(),
360+
cloudSqlResourceManager.getPort())
361+
.setAllowedTables(
362+
Map.of(cloudSqlResourceManager.getDatabaseName(), tables.keySet().stream().toList()))
363+
.build();
364+
}
365+
}

0 commit comments

Comments
 (0)