Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeValidator;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
Expand Down Expand Up @@ -649,7 +649,7 @@ public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
String workerMachineType =
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType();
DataflowWorkerMachineTypeValidator.validateMachineSpecs(workerMachineType, 4);
DataflowWorkerMachineTypeUtils.validateMachineSpecs(workerMachineType, 4);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
// Ingest session file into schema object.
Schema schema = SessionFileReader.read(options.getSessionFilePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.defaults.MySqlConfigDefaults;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.re2j.Matcher;
Expand All @@ -33,6 +34,8 @@
import java.util.List;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -43,6 +46,21 @@ public final class OptionsToConfigBuilder {
private static final Logger LOG = LoggerFactory.getLogger(OptionsToConfigBuilder.class);
public static final String DEFAULT_POSTGRESQL_NAMESPACE = "public";

/**
* Extracts the worker zone from the options.
*
* @param options Pipeline options.
* @return The worker zone or null if not found.
*/
public static String extractWorkerZone(PipelineOptions options) {
try {
return options.as(DataflowPipelineWorkerPoolOptions.class).getWorkerZone();
} catch (Exception e) {
LOG.warn("Could not extract worker zone from options. Defaulting to null.", e);
return null;
}
}

public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
SourceDbToSpannerOptions options,
List<String> tables,
Expand All @@ -60,6 +78,12 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
long maxConnections =
options.getMaxConnections() > 0 ? (long) (options.getMaxConnections()) : 0;
Integer numPartitions = options.getNumPartitions();
String workerZone = extractWorkerZone(options);

Integer fetchSize = options.getFetchSize();
if (fetchSize != null && fetchSize < 0) {
fetchSize = null;
}

return getJdbcIOWrapperConfig(
sqlDialect,
Expand All @@ -78,8 +102,11 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
maxConnections,
numPartitions,
waitOn,
options.getFetchSize(),
options.getUniformizationStageCountHint());
fetchSize,
options.getUniformizationStageCountHint(),
options.getProjectId(),
workerZone,
options.as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType());
}

public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
Expand All @@ -100,7 +127,10 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
Integer numPartitions,
Wait.OnSignal<?> waitOn,
Integer fetchSize,
Long uniformizationStageCountHint) {
Long uniformizationStageCountHint,
String projectId,
String workerZone,
String workerMachineType) {
JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect);
SourceSchemaReference sourceSchemaReference =
sourceSchemaReferenceFrom(sqlDialect, dbName, namespace);
Expand All @@ -115,6 +145,14 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
.build())
.setJdbcDriverClassName(jdbcDriverClassName)
.setJdbcDriverJars(jdbcDriverJars);

if (workerMachineType != null && !workerMachineType.isEmpty()) {
builder.setWorkerMemoryGB(
DataflowWorkerMachineTypeUtils.getWorkerMemoryGB(
projectId, workerZone, workerMachineType));
builder.setWorkerCores(
DataflowWorkerMachineTypeUtils.getWorkerCores(projectId, workerZone, workerMachineType));
}
if (maxConnections != 0) {
builder = builder.setMaxConnections(maxConnections);
}
Expand Down Expand Up @@ -161,27 +199,34 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
}

/**
* For MySQL Dialect, if Fetchsize is expecitly set by the user, enables `useCursorFetch`.
* For MySQL Dialect, if Fetchsize is explicitly set by the user or if it's auto-inferred (null),
* enables `useCursorFetch`. It is disabled only if user explicitly sets FetchSize to 0.
*
* @param sqlDialect Sql Dialect.
* @param url DB Url from passed configs.
* @param fetchSize FetchSize Setting (Null if user has not explicitly set)
* @return Updated URL with `useCursorFetch` only if dialect is MySql and Fetchsize is not null.
* Same as input URL in all other cases.
* @return Updated URL with `useCursorFetch` only if dialect is MySql and Fetchsize is not 0. Same
* as input URL in all other cases.
*/
@VisibleForTesting
@Nullable
protected static String mysqlSetCursorModeIfNeeded(
SQLDialect sqlDialect, String url, @Nullable Integer fetchSize) {
if (fetchSize == null) {
LOG.info(
"FetchSize is not explicitly configured. In case of out of memory errors, please set `FetchSize` according to the available memory and maximum size of a row.");
if (sqlDialect != SQLDialect.MYSQL) {
return url;
}
if (sqlDialect != SQLDialect.MYSQL) {
// For MySQL, to enable streaming/cursor mode, useCursorFetch must be true.
// We enable it if fetchSize is NULL (Auto-infer) or > 0.
// We only disable it if fetchSize is explicitly 0 (Fetch All).
if (fetchSize != null && fetchSize == 0) {
LOG.info(
"FetchSize is explicitly 0. MySQL cursor mode (useCursorFetch) will not be enabled explicitly.");
return url;
}
LOG.info("For Mysql, Fetchsize is explicitly configured. So setting `useCursorMode=true`.");

LOG.info(
"FetchSize is {}. Setting MySQL `useCursorFetch=true`.",
fetchSize == null ? "Auto" : fetchSize);
String updatedUrl = addParamToJdbcUrl(url, "useCursorFetch", "true");
return updatedUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException;
import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.JdbcSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.JdbcValueMappingsProvider;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
import com.google.cloud.teleport.v2.source.reader.io.schema.RetriableSchemaDiscovery;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference.Kind;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Map;

/**
* Interface to support various dialects of JDBC databases.
Expand Down Expand Up @@ -111,4 +114,21 @@ ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableIndexes(
JdbcSchemaReference sourceSchemaReference,
ImmutableList<String> tables)
throws SchemaDiscoveryException, RetriableSchemaDiscoveryException;

default long estimateRowSize(
SourceTableSchema sourceTableSchema, JdbcValueMappingsProvider jdbcValueMappingsProvider) {
return estimateRowSize(
sourceTableSchema.sourceColumnNameToSourceColumnType(), jdbcValueMappingsProvider);
}

default long estimateRowSize(
Map<String, SourceColumnType> sourceColumnNameToSourceColumnType,
JdbcValueMappingsProvider jdbcValueMappingsProvider) {
long estimatedRowSize = 0;
for (Map.Entry<String, SourceColumnType> entry :
sourceColumnNameToSourceColumnType.entrySet()) {
estimatedRowSize += jdbcValueMappingsProvider.estimateColumnSize(entry.getValue());
}
return estimatedRowSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper;

import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Calculates the fetch size for JDBC readers based on worker resources and row size estimation.
* Formula: FetchSize = (WorkerMemory) / (4 * WorkerCores * MaxRowSize)
*/
public final class FetchSizeCalculator {
private static final Logger LOG = LoggerFactory.getLogger(FetchSizeCalculator.class);

private static final int MIN_FETCH_SIZE = 1;
private static final int MAX_FETCH_SIZE = Integer.MAX_VALUE;

private FetchSizeCalculator() {}

/**
* @param estimatedRowSize Estimated size of a row in bytes.
* @param workerMemoryGB The Dataflow worker memory in GB.
* @param workerCores The Dataflow worker cores.
* @return The calculated fetch size, or 0 if it cannot be calculated.
*/
public static Integer getFetchSize(
TableConfig tableConfig, long estimatedRowSize, Double workerMemoryGB, Integer workerCores) {
if (tableConfig.fetchSize() != null) {
LOG.info(
"Explicitly configured fetch size for table {}: {}",
tableConfig.tableName(),
tableConfig.fetchSize());
return tableConfig.fetchSize();
}

try {
if (estimatedRowSize == 0) {
LOG.warn(
"Estimated row size is 0 for table {}. FetchSize cannot be calculated. Cursor mode will not be enabled.",
tableConfig.tableName());
return 0;
}

if (workerMemoryGB == null || workerCores == null) {
LOG.warn(
"Worker memory or cores unavailable. FetchSize cannot be calculated. Cursor mode will not be enabled.");
return 0;
}

long workerMemoryBytes = (long) (workerMemoryGB * 1024 * 1024 * 1024);

// Formula: (Memory of Dataflow worker VM) / (2 * 2 * (Number of cores on the
// Dataflow worker VM) * (Maximum row size))
// 2 * 2 = 4 (Safety factor)
long denominator = 4L * workerCores * estimatedRowSize;

if (denominator == 0) { // Should not happen given estimatedRowSize check and cores >= 1
LOG.warn(
"Denominator for fetch size calculation is zero for table {}. FetchSize cannot be calculated. Cursor mode will not be enabled.",
tableConfig.tableName());
return 0;
}

long calculatedFetchSize = workerMemoryBytes / denominator;

LOG.info(
"Auto-inferred fetchSize for table {}: {} (Memory: {} bytes, Cores: {}, RowSize: {} bytes)",
tableConfig.tableName(),
calculatedFetchSize,
workerMemoryBytes,
workerCores,
estimatedRowSize);

if (calculatedFetchSize < MIN_FETCH_SIZE) {
return MIN_FETCH_SIZE;
}
if (calculatedFetchSize > MAX_FETCH_SIZE) {
return MAX_FETCH_SIZE;
}

return (int) calculatedFetchSize;

} catch (Exception e) {
LOG.warn(
"Failed to auto-infer fetch size for table {}, error: {}. Cursor mode will not be enabled.",
tableConfig.tableName(),
e.getMessage());
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public SourceSchema discoverTableSchema() {
tableConfig -> {
SourceTableSchema sourceTableSchema =
findSourceTableSchema(sourceSchema, tableConfig);
long estimatedRowSize = sourceTableSchema.estimatedRowSize();
Integer calculatedFetchSize =
FetchSizeCalculator.getFetchSize(
tableConfig, estimatedRowSize, config.workerMemoryGB(), config.workerCores());
int fetchSize = calculatedFetchSize;
return Map.entry(
SourceTableReference.builder()
.setSourceSchemaReference(sourceSchema.schemaReference())
Expand All @@ -193,13 +198,15 @@ public SourceSchema discoverTableSchema() {
dataSourceConfiguration,
sourceSchema.schemaReference(),
tableConfig,
sourceTableSchema)
sourceTableSchema,
fetchSize)
: getJdbcIO(
config,
dataSourceConfiguration,
sourceSchema.schemaReference(),
tableConfig,
sourceTableSchema));
sourceTableSchema,
fetchSize));
})
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down Expand Up @@ -237,6 +244,11 @@ static SourceSchema getSourceSchema(
colEntry ->
sourceTableSchemaBuilder.addSourceColumnNameToSourceColumnType(
colEntry.getKey(), colEntry.getValue()));
long estimatedRowSize =
config
.dialectAdapter()
.estimateRowSize(tableEntry.getValue(), config.valueMappingsProvider());
sourceTableSchemaBuilder.setEstimatedRowSize(estimatedRowSize);
return sourceTableSchemaBuilder.build();
})
.forEach(sourceSchemaBuilder::addTableSchema);
Expand Down Expand Up @@ -280,6 +292,10 @@ private static TableConfig getTableConfig(
if (config.maxPartitions() != null && config.maxPartitions() != 0) {
tableConfigBuilder.setMaxPartitions(config.maxPartitions());
}
// Set fetch size for the table from global fetch size if configured
if (config.maxFetchSize() != null) {
tableConfigBuilder.setFetchSize(config.maxFetchSize());
}
/*
* TODO(vardhanvthigle): Add optional support for non-primary indexes.
* Note: most of the implementation is generic for any unique index.
Expand Down Expand Up @@ -413,7 +429,8 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
DataSourceConfiguration dataSourceConfiguration,
SourceSchemaReference sourceSchemaReference,
TableConfig tableConfig,
SourceTableSchema sourceTableSchema) {
SourceTableSchema sourceTableSchema,
int fetchSize) {
ReadWithPartitions<SourceRow, @UnknownKeyFor @NonNull @Initialized Long> jdbcIO =
JdbcIO.<SourceRow>readWithPartitions()
.withTable(delimitIdentifier(tableConfig.tableName()))
Expand All @@ -428,9 +445,6 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
if (tableConfig.maxPartitions() != null) {
jdbcIO = jdbcIO.withNumPartitions(tableConfig.maxPartitions());
}
if (config.maxFetchSize() != null) {
jdbcIO = jdbcIO.withFetchSize(config.maxFetchSize());
}
return jdbcIO;
}

Expand All @@ -449,7 +463,8 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
DataSourceConfiguration dataSourceConfiguration,
SourceSchemaReference sourceSchemaReference,
TableConfig tableConfig,
SourceTableSchema sourceTableSchema) {
SourceTableSchema sourceTableSchema,
int fetchSize) {

ReadWithUniformPartitions.Builder<SourceRow> readWithUniformPartitionsBuilder =
ReadWithUniformPartitions.<SourceRow>builder()
Expand All @@ -458,7 +473,7 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
.setDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration))
.setDbAdapter(config.dialectAdapter())
.setApproxTotalRowCount(tableConfig.approxRowCount())
.setFetchSize(config.maxFetchSize())
.setFetchSize(fetchSize)
.setRowMapper(
new JdbcSourceRowMapper(
config.valueMappingsProvider(),
Expand Down
Loading
Loading