Skip to content

Commit 7fb4434

Browse files
manitguptaaasthabharill
authored andcommitted
feat: Denylist small worker sizes in reverse replication (#3044)
* Add check for small machines * Remove log * Apply spotless * Use Preconditions * Update IT and LT base classes with workerMachineType * Add to params instead of env
1 parent e138380 commit 7fb4434

File tree

5 files changed

+193
-0
lines changed

5 files changed

+193
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import com.google.common.base.Preconditions;
19+
import org.apache.commons.lang3.StringUtils;
20+
21+
public class DataflowWorkerMachineTypeValidator {
22+
23+
public static void validateMachineSpecs(String workerMachineType, Integer minCPUs) {
24+
Preconditions.checkArgument(
25+
workerMachineType != null && !StringUtils.isBlank(workerMachineType),
26+
"Policy Violation: You must specify a workerMachineType with at least %s vCPUs.",
27+
minCPUs);
28+
29+
// Handle custom machine types first, format is custom-{vCPU}-{RAM}
30+
if (workerMachineType.startsWith("custom-")) {
31+
String[] parts = workerMachineType.split("-");
32+
Preconditions.checkArgument(
33+
parts.length == 3,
34+
"Invalid custom machine type format: '%s'. Expected format: custom-{vCPU}-{RAM}.",
35+
workerMachineType);
36+
Integer vCpus = null;
37+
try {
38+
vCpus = Integer.parseInt(parts[1]);
39+
} catch (NumberFormatException e) {
40+
Preconditions.checkArgument(
41+
false, "Invalid vCPU number in custom machine type: '%s'", workerMachineType);
42+
}
43+
Preconditions.checkArgument(
44+
vCpus >= minCPUs,
45+
"Policy Violation: Custom machine type '%s' has %s vCPUs. Minimum allowed is %s. Please use a higher machine type.",
46+
workerMachineType,
47+
vCpus,
48+
minCPUs);
49+
} else {
50+
// Handle standard machine types.
51+
java.util.regex.Pattern pattern = java.util.regex.Pattern.compile(".*-(\\d+)$");
52+
java.util.regex.Matcher matcher = pattern.matcher(workerMachineType);
53+
54+
if (matcher.find()) {
55+
Integer vCpus = null;
56+
try {
57+
vCpus = Integer.parseInt(matcher.group(1));
58+
} catch (NumberFormatException e) {
59+
Preconditions.checkArgument(
60+
false, "Invalid vCPU number in machine type: '%s'", workerMachineType);
61+
}
62+
Preconditions.checkArgument(
63+
vCpus >= minCPUs,
64+
"Policy Violation: Machine type '%s' has %s vCPUs. Minimum allowed is %s.",
65+
workerMachineType,
66+
vCpus,
67+
minCPUs);
68+
} else {
69+
Preconditions.checkArgument(
70+
false,
71+
"Unknown machine type format: '%s'. Please use a standard machine type (e.g., n1-standard-4) or a custom machine type (e.g., custom-4-4096) with at least %s vCPUs.",
72+
workerMachineType,
73+
minCPUs);
74+
}
75+
}
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import static org.junit.Assert.assertThrows;
19+
20+
import org.junit.Test;
21+
import org.junit.runner.RunWith;
22+
import org.junit.runners.JUnit4;
23+
24+
@RunWith(JUnit4.class)
25+
public class DataflowWorkerMachineTypeValidatorTest {
26+
27+
@Test
28+
public void testValidMachineType() {
29+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("n1-standard-4", 4);
30+
}
31+
32+
@Test
33+
public void testValidMachineTypeHighCpu() {
34+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("n1-standard-8", 4);
35+
}
36+
37+
@Test
38+
public void testInvalidMachineTypeLowCpu() {
39+
assertThrows(
40+
IllegalArgumentException.class,
41+
() -> {
42+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("n1-standard-2", 4);
43+
});
44+
}
45+
46+
@Test
47+
public void testNullMachineType() {
48+
assertThrows(
49+
IllegalArgumentException.class,
50+
() -> {
51+
DataflowWorkerMachineTypeValidator.validateMachineSpecs(null, 4);
52+
});
53+
}
54+
55+
@Test
56+
public void testEmptyMachineType() {
57+
assertThrows(
58+
IllegalArgumentException.class,
59+
() -> {
60+
DataflowWorkerMachineTypeValidator.validateMachineSpecs(" ", 4);
61+
});
62+
}
63+
64+
@Test
65+
public void testValidCustomMachineType() {
66+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("custom-8-12345", 4);
67+
}
68+
69+
@Test
70+
public void testValidCustomMachineTypeMinCpu() {
71+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("custom-4-12345", 4);
72+
}
73+
74+
@Test
75+
public void testInvalidCustomMachineTypeLowCpu() {
76+
assertThrows(
77+
IllegalArgumentException.class,
78+
() -> {
79+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("custom-2-12345", 4);
80+
});
81+
}
82+
83+
@Test
84+
public void testInvalidCustomMachineTypeFormat() {
85+
assertThrows(
86+
IllegalArgumentException.class,
87+
() -> {
88+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("custom-2", 4);
89+
});
90+
}
91+
92+
@Test
93+
public void testInvalidCustomMachineTypeNonNumericCpu() {
94+
assertThrows(
95+
IllegalArgumentException.class,
96+
() -> {
97+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("custom-abc-12345", 4);
98+
});
99+
}
100+
101+
@Test
102+
public void testUnknownMachineType() {
103+
assertThrows(
104+
IllegalArgumentException.class,
105+
() -> {
106+
DataflowWorkerMachineTypeValidator.validateMachineSpecs("unknown-machine-type", 4);
107+
});
108+
}
109+
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
4444
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
4545
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
46+
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeValidator;
4647
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
4748
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
4849
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
@@ -578,6 +579,10 @@ public static PipelineResult run(Options options) {
578579
+ " incease the max shard connections");
579580
}
580581

582+
String workerMachineType =
583+
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType();
584+
DataflowWorkerMachineTypeValidator.validateMachineSpecs(workerMachineType, 4);
585+
581586
// Prepare Spanner config
582587
SpannerConfig spannerConfig =
583588
SpannerConfig.create()

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
231231
put("maxNumWorkers", "1");
232232
put("numWorkers", "1");
233233
put("sourceType", sourceType);
234+
put("workerMachineType", "n2-standard-4");
234235
}
235236
};
236237
if (jobParameters != null) {

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLTBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
216216
put("deadLetterQueueDirectory", getGcsPath(artifactBucket, "dlq", gcsResourceManager));
217217
put("maxShardConnections", "100");
218218
put("sourceType", sourceType);
219+
put("workerMachineType", "n2-standard-4");
219220
}
220221
};
221222

0 commit comments

Comments
 (0)