Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ public Table.Builder endColumn() {
}

public abstract Builder columnOptions(ImmutableList<String> options);

public Builder array(Type t) {
return type(Type.array(t));
}
}

private static class SizedType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public Schema(
this.spSchema = spSchema;
this.syntheticPKeys = syntheticPKeys;
this.srcSchema = srcSchema;
this.toSpanner = new HashMap<String, NameAndCols>();
this.toSource = new HashMap<String, NameAndCols>();
this.srcToID = new HashMap<String, NameAndCols>();
this.spannerToID = new HashMap<String, NameAndCols>();
this.empty = (spSchema == null || srcSchema == null);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.sourceddl;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Scanner for reading Cassandra database schema information. Implements SourceSchemaScanner to
* provide schema scanning functionality for Cassandra.
*/
public class CassandraInformationSchemaScanner implements SourceSchemaScanner {
private static final Logger LOG =
LoggerFactory.getLogger(CassandraInformationSchemaScanner.class);

private final CqlSession session;
private final String keyspaceName;
private final SourceDatabaseType sourceType = SourceDatabaseType.CASSANDRA;

public CassandraInformationSchemaScanner(CqlSession session, String keyspaceName) {
this.session = session;
this.keyspaceName = keyspaceName;
}

@Override
public SourceSchema scan() {
SourceSchema.Builder schemaBuilder =
SourceSchema.builder(sourceType).databaseName(keyspaceName);

try {
Map<String, SourceTable> tables = scanTables();
schemaBuilder.tables(ImmutableMap.copyOf(tables));
} catch (Exception e) {
throw new RuntimeException("Error scanning Cassandra schema", e);
}

return schemaBuilder.build();
}

private Map<String, SourceTable> scanTables() throws Exception {
Map<String, SourceTable> tables = new HashMap<>();

// Query to get all tables in the keyspace
String tableQuery = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?";
PreparedStatement tableStmt = session.prepare(tableQuery);
BoundStatement tableBoundStmt = tableStmt.bind(keyspaceName);
ResultSet tableResult = session.execute(tableBoundStmt);

// For each table, get its columns
while (tableResult.iterator().hasNext()) {
String tableName = tableResult.iterator().next().getString("table_name");
if (tableName == null) {
continue;
}
tables.put(tableName, scanTableColumns(tableName));
}

return tables;
}

private SourceTable scanTableColumns(String tableName) throws Exception {
// Query to get column information for the table
String columnQuery =
"SELECT column_name, type, kind FROM system_schema.columns "
+ "WHERE keyspace_name = ? AND table_name = ?";
PreparedStatement columnStmt = session.prepare(columnQuery);
BoundStatement columnBoundStmt = columnStmt.bind(keyspaceName, tableName);
ResultSet columnResult = session.execute(columnBoundStmt);

List<SourceColumn> columns = new ArrayList<>();
List<String> primaryKeyColumns = new ArrayList<>();

while (columnResult.iterator().hasNext()) {
var row = columnResult.iterator().next();
String columnName = row.getString("column_name");
String type = row.getString("type");
String kind = row.getString("kind");

// Create SourceColumn with appropriate type information
SourceColumn.Builder columnBuilder =
SourceColumn.builder(sourceType)
.name(columnName)
.type(type.toUpperCase().replaceAll("\\s+", "")) // Normalize type name
.isNullable(true) // Cassandra columns are nullable by default
.isPrimaryKey("partition_key".equals(kind) || "clustering".equals(kind));

if ("partition_key".equals(kind) || "clustering".equals(kind)) {
primaryKeyColumns.add(columnName);
}

columns.add(columnBuilder.build());
}

return SourceTable.builder(sourceType)
.name(tableName)
.schema(keyspaceName)
.columns(ImmutableList.copyOf(columns))
.primaryKeyColumns(ImmutableList.copyOf(primaryKeyColumns))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.sourceddl;

import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** MySQL implementation of {@link SourceSchemaScanner}. */
public class MySqlInformationSchemaScanner implements SourceSchemaScanner {

private static final Logger LOG = LoggerFactory.getLogger(MySqlInformationSchemaScanner.class);

private final Connection connection;
private final String databaseName;
private final SourceDatabaseType sourceType = SourceDatabaseType.MYSQL;

public MySqlInformationSchemaScanner(Connection connection, String databaseName) {
this.connection = connection;
this.databaseName = databaseName;
}

@Override
public SourceSchema scan() {
SourceSchema.Builder schemaBuilder =
SourceSchema.builder(sourceType).databaseName(databaseName);

try {
Map<String, SourceTable> tables = scanTables();
schemaBuilder.tables(com.google.common.collect.ImmutableMap.copyOf(tables));
} catch (SQLException e) {
throw new RuntimeException("Error scanning database schema", e);
}

return schemaBuilder.build();
}

private Map<String, SourceTable> scanTables() throws SQLException {
Map<String, SourceTable> tables = new java.util.HashMap<>();
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(getTablesQuery())) {
while (rs.next()) {
String tableName = rs.getString(1);
String schema = rs.getString(2);
if (tableName == null) {
continue;
}
SourceTable table = scanTable(tableName, schema);
tables.put(tableName, table);
}
}
return tables;
}

private SourceTable scanTable(String tableName, String schema) throws SQLException {
SourceTable.Builder tableBuilder =
SourceTable.builder(sourceType).name(tableName).schema(schema);

// Scan columns
List<SourceColumn> columns = scanColumns(tableName, schema);
tableBuilder.columns(com.google.common.collect.ImmutableList.copyOf(columns));

// Scan primary keys
List<String> primaryKeys = scanPrimaryKeys(tableName, schema);
tableBuilder.primaryKeyColumns(com.google.common.collect.ImmutableList.copyOf(primaryKeys));

return tableBuilder.build();
}

private String getTablesQuery() {
return String.format(
"SELECT table_name, table_schema "
+ "FROM information_schema.tables "
+ "WHERE table_schema = '%s' "
+ "AND table_type = 'BASE TABLE'",
databaseName);
}

private List<SourceColumn> scanColumns(String tableName, String schema) throws SQLException {
List<SourceColumn> columns = new ArrayList<>();
String query =
String.format(
"SELECT column_name, data_type, character_maximum_length, "
+ "numeric_precision, numeric_scale, is_nullable, column_key "
+ "FROM information_schema.columns "
+ "WHERE table_schema = '%s' AND table_name = '%s' "
+ "ORDER BY ordinal_position",
schema, tableName);

try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query)) {
while (rs.next()) {
SourceColumn.Builder columnBuilder =
SourceColumn.builder(sourceType)
.name(rs.getString("column_name"))
.type(rs.getString("data_type"))
.isNullable("YES".equals(rs.getString("is_nullable")))
.isPrimaryKey("PRI".equals(rs.getString("column_key")));

// Handle size/precision/scale
String maxLength = rs.getString("character_maximum_length");
if (maxLength != null) {
columnBuilder.size(Long.parseLong(maxLength));
}

String precision = rs.getString("numeric_precision");
if (precision != null) {
columnBuilder.precision(Integer.parseInt(precision));
}

String scale = rs.getString("numeric_scale");
if (scale != null) {
columnBuilder.scale(Integer.parseInt(scale));
}

columnBuilder.columnOptions(ImmutableList.of());
columns.add(columnBuilder.build());
}
}
return columns;
}

private List<String> scanPrimaryKeys(String tableName, String schema) throws SQLException {
List<String> primaryKeys = new ArrayList<>();
String query =
String.format(
"SELECT column_name "
+ "FROM information_schema.key_column_usage "
+ "WHERE table_schema = '%s' AND table_name = '%s' "
+ "AND constraint_name = 'PRIMARY' "
+ "ORDER BY ordinal_position",
schema, tableName);

try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query)) {
while (rs.next()) {
primaryKeys.add(rs.getString("column_name"));
}
}
return primaryKeys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.sourceddl;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import javax.annotation.Nullable;

/** Represents a column in a source database table. */
@AutoValue
public abstract class SourceColumn implements Serializable {

private static final long serialVersionUID = 1L;

public abstract String name();

public abstract String type();

public abstract boolean isNullable();

public abstract boolean isPrimaryKey();

@Nullable
public abstract Long size();

@Nullable
public abstract Integer precision();

@Nullable
public abstract Integer scale();

public abstract ImmutableList<String> columnOptions();

public abstract SourceDatabaseType sourceType();

public static Builder builder(SourceDatabaseType sourceType) {
return new AutoValue_SourceColumn.Builder()
.sourceType(sourceType)
.isNullable(true)
.isPrimaryKey(false)
.columnOptions(ImmutableList.of());
}

public abstract Builder toBuilder();

/** A builder for {@link SourceColumn}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder name(String name);

public abstract Builder type(String type);

public abstract Builder isNullable(boolean isNullable);

public abstract Builder isPrimaryKey(boolean isPrimaryKey);

public abstract Builder size(Long size);

public abstract Builder precision(Integer precision);

public abstract Builder scale(Integer scale);

public abstract Builder columnOptions(ImmutableList<String> columnOptions);

public abstract Builder sourceType(SourceDatabaseType sourceType);

public abstract SourceColumn build();
}
}
Loading
Loading