Skip to content

Commit 065df9b

Browse files
committed
feat(graph-size-per-shard): 5KLT
1 parent a099b46 commit 065df9b

File tree

3 files changed

+581
-34
lines changed

3 files changed

+581
-34
lines changed
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.loadtesting;
17+
18+
import static com.google.common.truth.Truth.assertThat;
19+
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
20+
21+
import com.google.cloud.spanner.Struct;
22+
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
23+
import com.google.cloud.teleport.metadata.TemplateLoadTest;
24+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
25+
import com.google.cloud.teleport.v2.templates.SourceDbToSpanner;
26+
import java.io.IOException;
27+
import java.sql.Connection;
28+
import java.sql.DriverManager;
29+
import java.sql.PreparedStatement;
30+
import java.sql.SQLException;
31+
import java.time.Duration;
32+
import java.time.Instant;
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutorService;
38+
import java.util.concurrent.Executors;
39+
import java.util.concurrent.TimeUnit;
40+
import org.apache.beam.it.common.PipelineLauncher;
41+
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
42+
import org.apache.beam.it.common.PipelineOperator;
43+
import org.apache.beam.it.common.utils.ResourceManagerUtils;
44+
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
45+
import org.apache.beam.it.jdbc.JDBCResourceManager;
46+
import org.apache.beam.it.jdbc.MySQLResourceManager;
47+
import org.junit.After;
48+
import org.junit.Before;
49+
import org.junit.Test;
50+
import org.junit.experimental.categories.Category;
51+
import org.junit.runner.RunWith;
52+
import org.junit.runners.JUnit4;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
55+
56+
/**
57+
* A load test for {@link SourceDbToSpanner} Flex template which tests 5,000 tables migration.
58+
*
59+
* <p>This test verifies the template's ability to handle a massive number of tables by:
60+
*
61+
* <ol>
62+
* <li>Generating 5,000 tables in a source MySQL database.
63+
* <li>Executing 5,000 DDL statements on the destination Spanner database in parallel.
64+
* <li>Migrating the data using the Flex template with a constant-size graph.
65+
* <li>Validating the results using a high-concurrency verification process.
66+
* </ol>
67+
*/
68+
@Category({TemplateLoadTest.class, SkipDirectRunnerTest.class})
69+
@TemplateLoadTest(SourceDbToSpanner.class)
70+
@RunWith(JUnit4.class)
71+
public class MySQL5KTablesLT extends SourceDbToSpannerLTBase {
72+
private static final Logger LOG = LoggerFactory.getLogger(MySQL5KTablesLT.class);
73+
private Instant startTime;
74+
75+
private MySQLResourceManager mySQLResourceManager;
76+
77+
@Before
78+
public void setUp() throws IOException {
79+
LOG.info("Began Setup for 5K Table test");
80+
super.setUp();
81+
startTime = Instant.now();
82+
83+
// Initialize Resource Managers directly to avoid base class constraints
84+
mySQLResourceManager = MySQLResourceManager.builder(testName).build();
85+
spannerResourceManager =
86+
SpannerResourceManager.builder(testName, project, region)
87+
.maybeUseStaticInstance()
88+
.setMonitoringClient(monitoringClient)
89+
.build();
90+
91+
gcsResourceManager = createSpannerLTGcsResourceManager();
92+
93+
this.dialect = SQLDialect.MYSQL;
94+
}
95+
96+
@After
97+
public void cleanUp() {
98+
ResourceManagerUtils.cleanResources(
99+
spannerResourceManager, mySQLResourceManager, gcsResourceManager);
100+
LOG.info(
101+
"CleanupCompleted for 5K Table test. Test took {}",
102+
Duration.between(startTime, Instant.now()));
103+
}
104+
105+
/**
106+
* Tests the bulk migration of 5,000 tables from MySQL to Spanner.
107+
*
108+
* @throws Exception if any part of the test setup or execution fails.
109+
*/
110+
@Test
111+
public void mySQLToSpannerBulkFiveThousandTablesTest() throws Exception {
112+
int numTables = 5000;
113+
114+
HashMap<String, String> columns = new HashMap<>();
115+
columns.put("id", "BIGINT UNSIGNED");
116+
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, "id");
117+
118+
// OPTIMIZE MYSQL:
119+
// We disable synchronous flushing and binary logging to speed up the creation and
120+
// population of 5,000 tables on the source database.
121+
try (Connection jdbcConnection = getJdbcConnection(mySQLResourceManager);
122+
PreparedStatement pstmt =
123+
jdbcConnection.prepareStatement("SET GLOBAL innodb_flush_log_at_trx_commit = 0;")) {
124+
pstmt.executeUpdate();
125+
}
126+
try (Connection jdbcConnection = getJdbcConnection(mySQLResourceManager);
127+
PreparedStatement pstmt = jdbcConnection.prepareStatement("SET GLOBAL sync_binlog = 0;")) {
128+
pstmt.executeUpdate();
129+
}
130+
131+
List<String> spannerDdlStatements = new ArrayList<>();
132+
133+
LOG.info("Creating {} tables on Source and collecting Spanner DDLs", numTables);
134+
for (int i = 0; i < numTables; i++) {
135+
String tableName = "table_" + i;
136+
mySQLResourceManager.createTable(tableName, schema);
137+
138+
try (Connection jdbcConnection = getJdbcConnection(mySQLResourceManager);
139+
PreparedStatement pstmt =
140+
jdbcConnection.prepareStatement("INSERT INTO " + tableName + " (id) VALUES (42)")) {
141+
pstmt.executeUpdate();
142+
}
143+
spannerDdlStatements.add("CREATE TABLE " + tableName + " (id INT64) PRIMARY KEY (id)");
144+
145+
if (i % 500 == 0) {
146+
LOG.info("Created {} tables so far on Source", i);
147+
}
148+
}
149+
150+
// Restore MySQL durability settings to ensure a realistic state for the template.
151+
try (Connection jdbcConnection = getJdbcConnection(mySQLResourceManager);
152+
PreparedStatement pstmt =
153+
jdbcConnection.prepareStatement("SET GLOBAL innodb_flush_log_at_trx_commit = 1;")) {
154+
pstmt.executeUpdate();
155+
}
156+
157+
try (Connection jdbcConnection = getJdbcConnection(mySQLResourceManager);
158+
PreparedStatement pstmt = jdbcConnection.prepareStatement("SET GLOBAL sync_binlog = 1;")) {
159+
pstmt.executeUpdate();
160+
}
161+
162+
// PARALLEL DDL EXECUTION:
163+
// Batch and execute spanner DDL statements in parallel to reduce setup time.
164+
LOG.info("Executing Spanner DDLs in parallel batches");
165+
int batchSize = 100;
166+
ExecutorService ddlExecutor = Executors.newFixedThreadPool(3);
167+
for (int i = 0; i < spannerDdlStatements.size(); i += batchSize) {
168+
int end = Math.min(spannerDdlStatements.size(), i + batchSize);
169+
List<String> batch = spannerDdlStatements.subList(i, end);
170+
ddlExecutor.submit(
171+
() -> {
172+
try {
173+
spannerResourceManager.executeDdlStatements(batch);
174+
} catch (Exception e) {
175+
LOG.error("Failed to execute Spanner DDL batch", e);
176+
}
177+
});
178+
}
179+
ddlExecutor.shutdown();
180+
if (!ddlExecutor.awaitTermination(60, TimeUnit.MINUTES)) {
181+
throw new RuntimeException("Spanner DDL timed out after 60 minutes");
182+
}
183+
184+
// Prepare parameters and launch job
185+
Map<String, String> params = getCommonParameters();
186+
params.putAll(
187+
getJdbcParameters(
188+
mySQLResourceManager.getUri(),
189+
mySQLResourceManager.getUsername(),
190+
mySQLResourceManager.getPassword(),
191+
"com.mysql.jdbc.Driver"));
192+
params.put("maxConnections", "16");
193+
params.put("numWorkers", "16");
194+
params.put("maxNumWorkers", "16");
195+
params.put("workerMachineType", "n2-standard-4");
196+
197+
LaunchConfig.Builder options = LaunchConfig.builder(testName, SPEC_PATH).setParameters(params);
198+
199+
PipelineLauncher.LaunchInfo jobInfo = launchJob(options);
200+
201+
// Wait for completion
202+
PipelineOperator.Result result =
203+
pipelineOperator.waitUntilDone(createConfig(jobInfo, Duration.ofMinutes(60L)));
204+
assertThatResult(result).isLaunchFinished();
205+
206+
// High-concurrency validation
207+
validateResult(spannerResourceManager, numTables);
208+
209+
// Collect and export metrics
210+
collectAndExportMetrics(jobInfo);
211+
}
212+
213+
private static Connection getJdbcConnection(MySQLResourceManager mySQLResourceManager)
214+
throws SQLException {
215+
try {
216+
return DriverManager.getConnection(
217+
mySQLResourceManager.getUri(),
218+
mySQLResourceManager.getUsername(),
219+
mySQLResourceManager.getPassword());
220+
} catch (Exception e) {
221+
LOG.error("Could not open connection to MySql", e);
222+
throw e;
223+
}
224+
}
225+
226+
/**
227+
* Validates that all 5,000 tables were correctly migrated and contain the expected data.
228+
*
229+
* <p>To handle the large number of tables, validation is performed in parallel using a large
230+
* thread pool.
231+
*
232+
* @param resourceManager the Spanner resource manager.
233+
* @param numTables the total number of tables to validate.
234+
* @throws InterruptedException if validation is interrupted.
235+
*/
236+
private void validateResult(SpannerResourceManager resourceManager, int numTables)
237+
throws InterruptedException {
238+
LOG.info("Validating {} tables on Spanner", numTables);
239+
ExecutorService executor = Executors.newFixedThreadPool(100);
240+
for (int i = 0; i < numTables; i++) {
241+
String tableName = "table_" + i;
242+
executor.submit(
243+
() -> {
244+
assertThat(resourceManager.getRowCount(tableName)).isEqualTo(1L);
245+
List<Struct> rows = resourceManager.readTableRecords(tableName, "id");
246+
assertThat(rows.get(0).getLong("id")).isEqualTo(42L);
247+
});
248+
}
249+
executor.shutdown();
250+
if (!executor.awaitTermination(30, TimeUnit.MINUTES)) {
251+
throw new RuntimeException("Validation timed out after 30 minutes");
252+
}
253+
LOG.info("Validation successful for all {} tables", numTables);
254+
}
255+
}

0 commit comments

Comments
 (0)