diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java index b2e8c7fffbc..49cc68c7e63 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java @@ -1,25 +1,28 @@ package io.debezium.connector.yugabytedb; -import java.util.*; - -import org.yb.client.*; +import java.util.Map; +import java.util.Objects; import io.debezium.pipeline.spi.Partition; import io.debezium.util.Collect; public class YBPartition implements Partition { - private static final String TABLETS_PARTITION_KEY = "tabletids"; + private static final String TABLET_PARTITION_KEY = "tabletid"; - private final String listOfTablets; + private final String tabletId; - public YBPartition(String listOfTablets) { - this.listOfTablets = listOfTablets; + public YBPartition(String tabletId) { + this.tabletId = tabletId; } @Override public Map getSourcePartition() { - return Collect.hashMapOf(TABLETS_PARTITION_KEY, listOfTablets); + return Collect.hashMapOf(TABLET_PARTITION_KEY, tabletId); + } + + public String getTabletId() { + return this.tabletId; } @Override @@ -31,12 +34,18 @@ public boolean equals(Object obj) { return false; } final YBPartition other = (YBPartition) obj; - return Objects.equals(listOfTablets, other.listOfTablets); + return Objects.equals(tabletId, other.tabletId); } @Override public int hashCode() { - return listOfTablets.hashCode(); + return tabletId.hashCode(); } + @Override + public String toString() { + return "YBPartition{" + + "tabletId='" + tabletId + '\'' + + '}'; + } } diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java new file mode 100644 index 00000000000..e55545f58bb --- /dev/null +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java @@ -0,0 +1,444 @@ +package io.debezium.connector.yugabytedb; + +/* * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +import java.sql.Types; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.Immutable; +import io.debezium.annotation.ThreadSafe; +import io.debezium.data.Envelope; +import io.debezium.data.SchemaUtil; +import io.debezium.relational.*; +import io.debezium.relational.Key.KeyMapper; +import io.debezium.relational.Tables.ColumnNameFilter; +import io.debezium.relational.mapping.ColumnMapper; +import io.debezium.relational.mapping.ColumnMappers; +import io.debezium.schema.FieldNameSelector; +import io.debezium.schema.FieldNameSelector.FieldNamer; +import io.debezium.util.SchemaNameAdjuster; +import io.debezium.util.Strings; + +/** + * Builder that constructs {@link TableSchema} instances for {@link Table} definitions. + *

+ * This builder is responsible for mapping {@link Column table columns} to {@link Field fields} in Kafka Connect {@link Schema}s, + * and this is necessarily dependent upon the database's supported types. Although mappings are defined for standard types, + * this class may need to be subclassed for each DBMS to add support for DBMS-specific types by overriding any of the + * "{@code add*Field}" methods. + *

+ * See the Java SE Mapping SQL + * and Java Types for details about how JDBC {@link Types types} map to Java value types. + * + * @author Randall Hauch + */ +@ThreadSafe +@Immutable +public class YBTableSchemaBuilder extends TableSchemaBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(YBTableSchemaBuilder.class); + + private final SchemaNameAdjuster schemaNameAdjuster; + private final ValueConverterProvider valueConverterProvider; + private final Schema sourceInfoSchema; + private final FieldNamer fieldNamer; + private final CustomConverterRegistry customConverterRegistry; + + /** + * Create a new instance of the builder. + * + * @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be + * null + * @param schemaNameAdjuster the adjuster for schema names; may not be null + */ + public YBTableSchemaBuilder(ValueConverterProvider valueConverterProvider, SchemaNameAdjuster schemaNameAdjuster, CustomConverterRegistry customConverterRegistry, + Schema sourceInfoSchema, boolean sanitizeFieldNames) { + super(valueConverterProvider, schemaNameAdjuster, customConverterRegistry, sourceInfoSchema, sanitizeFieldNames); + this.schemaNameAdjuster = schemaNameAdjuster; + this.valueConverterProvider = valueConverterProvider; + this.sourceInfoSchema = sourceInfoSchema; + this.fieldNamer = FieldNameSelector.defaultSelector(sanitizeFieldNames); + this.customConverterRegistry = customConverterRegistry; + } + + /** + * Create a {@link TableSchema} from the given {@link Table table definition}. The resulting TableSchema will have a + * {@link TableSchema#keySchema() key schema} that contains all of the columns that make up the table's primary key, + * and a {@link TableSchema#valueSchema() value schema} that contains only those columns that are not in the table's primary + * key. + *

+ * This is equivalent to calling {@code create(table,false)}. + * + * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no + * prefix + * @param envelopSchemaName the name of the schema of the built table's envelope + * @param table the table definition; may not be null + * @param filter the filter that specifies whether columns in the table should be included; may be null if all columns + * are to be included + * @param mappers the mapping functions for columns; may be null if none of the columns are to be mapped to different values + * @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null + */ + public TableSchema create(String schemaPrefix, String envelopSchemaName, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) { + if (schemaPrefix == null) { + schemaPrefix = ""; + } + + // Build the schemas ... + final TableId tableId = table.id(); + final String tableIdStr = tableSchemaName(tableId); + final String schemaNamePrefix = schemaPrefix + tableIdStr; + LOGGER.info("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); + SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value")); + SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key")); + AtomicBoolean hasPrimaryKey = new AtomicBoolean(false); + + Key tableKey = new Key.Builder(table).customKeyMapper(keysMapper).build(); + tableKey.keyColumns().forEach(column -> { + addField(keySchemaBuilder, table, column, null); + hasPrimaryKey.set(true); + }); + + table.columns() + .stream() + .filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) + .forEach(column -> { + ColumnMapper mapper = mappers == null ? null : mappers.mapperFor(tableId, column); + addField(valSchemaBuilder, table, column, mapper); + }); + + Schema valSchema = valSchemaBuilder.optional().build(); + Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null; + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Mapped primary key for table '{}' to schema: {}", tableId, SchemaUtil.asDetailedString(keySchema)); + LOGGER.debug("Mapped columns for table '{}' to schema: {}", tableId, SchemaUtil.asDetailedString(valSchema)); + } + + Envelope envelope = Envelope.defineSchema() + .withName(schemaNameAdjuster.adjust(envelopSchemaName)) + .withRecord(valSchema) + .withSource(sourceInfoSchema) + .build(); + + // Create the generators ... + StructGenerator keyGenerator = createKeyGenerator(keySchema, tableId, tableKey.keyColumns()); + StructGenerator valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); + + // And the table schema ... + return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator); + } + + /** + * Returns the type schema name for the given table. + */ + private String tableSchemaName(TableId tableId) { + if (Strings.isNullOrEmpty(tableId.catalog())) { + if (Strings.isNullOrEmpty(tableId.schema())) { + return tableId.table(); + } + else { + return tableId.schema() + "." + tableId.table(); + } + } + else if (Strings.isNullOrEmpty(tableId.schema())) { + return tableId.catalog() + "." + tableId.table(); + } + // When both catalog and schema is present then only schema is used + else { + return tableId.schema() + "." + tableId.table(); + } + } + + /** + * Creates the function that produces a Kafka Connect key object for a row of data. + * + * @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator + * will be null + * @param columnSetName the name for the set of columns, used in error messages; may not be null + * @param columns the column definitions for the table that defines the row; may not be null + * @return the key-generating function, or null if there is no key schema + */ + protected StructGenerator createKeyGenerator(Schema schema, TableId columnSetName, List columns) { + if (schema != null) { + int[] recordIndexes = indexesForColumns(columns); + Field[] fields = fieldsForColumns(schema, columns); + int numFields = recordIndexes.length; + ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null); + return (row) -> { + Struct result = new Struct(schema); + for (int i = 0; i != numFields; ++i) { + validateIncomingRowToInternalMetadata(recordIndexes, fields, converters, row, i); + Object value = row[recordIndexes[i]]; + ValueConverter converter = converters[i]; + if (converter != null) { + // A component of primary key must be not-null. + // It is possible for some databases and values (MySQL and all-zero datetime) + // to be reported as null by JDBC or streaming reader. + // It thus makes sense to convert them to a sensible default replacement value. + value = converter.convert(((Object[]) value)[0]); + // value = converter.convert(value); + try { + Struct cell = new Struct(fields[i].schema()); + cell.put("value", value); + cell.put("set", true); + // valueStruct.put(cdef.getColumnName(), cell); + result.put(fields[i], cell); + } + catch (DataException e) { + Column col = columns.get(i); + LOGGER.error("Failed to properly convert key value for '{}.{}' of type {} for row {}:", + columnSetName, col.name(), col.typeName(), row, e); + } + } + } + return result; + }; + } + return null; + } + + private void validateIncomingRowToInternalMetadata(int[] recordIndexes, Field[] fields, ValueConverter[] converters, + Object[] row, int position) { + if (position >= converters.length) { + LOGGER.error("Error requesting a converter, converters: {}, requested index: {}", converters.length, position); + throw new ConnectException( + "Column indexing array is larger than number of converters, internal schema representation is probably out of sync with real database schema"); + } + if (position >= fields.length) { + LOGGER.error("Error requesting a field, fields: {}, requested index: {}", fields.length, position); + throw new ConnectException("Too few schema fields, internal schema representation is probably out of sync with real database schema"); + } + if (recordIndexes[position] >= row.length) { + LOGGER.error("Error requesting a row value, row: {}, requested index: {} at position {}", row.length, recordIndexes[position], position); + throw new ConnectException("Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema"); + } + } + + /** + * Creates the function that produces a Kafka Connect value object for a row of data. + * + * @param schema the Kafka Connect schema for the value; may be null if there is no known schema, in which case the generator + * will be null + * @param tableId the table identifier; may not be null + * @param columns the column definitions for the table that defines the row; may not be null + * @param filter the filter that specifies whether columns in the table should be included; may be null if all columns + * are to be included + * @param mappers the mapping functions for columns; may be null if none of the columns are to be mapped to different values + * @return the value-generating function, or null if there is no value schema + */ + protected StructGenerator createValueGenerator(Schema schema, TableId tableId, List columns, + ColumnNameFilter filter, ColumnMappers mappers) { + if (schema != null) { + List columnsThatShouldBeAdded = columns.stream() + .filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) + .collect(Collectors.toList()); + int[] recordIndexes = indexesForColumns(columnsThatShouldBeAdded); + Field[] fields = fieldsForColumns(schema, columnsThatShouldBeAdded); + int numFields = recordIndexes.length; + ValueConverter[] converters = convertersForColumns(schema, tableId, columnsThatShouldBeAdded, mappers); + return (row) -> { + // columns + // .stream() + // .filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) + // .forEach(column -> { + // ColumnMapper mapper = mappers == null ? null : mappers.mapperFor(tableId, column); + // addField(valSchemaBuilder, table, column, mapper); + // }); + // + // Schema valSchema = valSchemaBuilder.optional().build(); + + Struct result = new Struct(schema); + + for (int i = 0; i != numFields; ++i) { + validateIncomingRowToInternalMetadata(recordIndexes, fields, converters, row, i); + Object value = row[recordIndexes[i]]; + + ValueConverter converter = converters[i]; + + if (converter != null) { + LOGGER.trace("converter for value object: *** {} ***", converter); + } + else { + LOGGER.trace("converter is null..."); + } + + if (converter != null) { + try { + if (value != null) { + value = converter.convert(((Object[]) value)[0]); + Struct cell = new Struct(fields[i].schema()); + cell.put("value", value); + cell.put("set", true); + // valueStruct.put(cdef.getColumnName(), cell); + result.put(fields[i], cell); + } + else { + result.put(fields[i], null); + } + // result.put(fields[i], value); + } + catch (DataException | IllegalArgumentException e) { + Column col = columns.get(i); + LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:", + tableId, col.name(), col.typeName(), row, e); + } + catch (final Exception e) { + Column col = columns.get(i); + LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:", + tableId, col.name(), col.typeName(), row, e); + } + } + } + return result; + }; + } + return null; + } + + protected int[] indexesForColumns(List columns) { + int[] recordIndexes = new int[columns.size()]; + AtomicInteger i = new AtomicInteger(0); + columns.forEach(column -> { + recordIndexes[i.getAndIncrement()] = column.position() - 1; // position is 1-based, indexes 0-based + }); + return recordIndexes; + } + + protected Field[] fieldsForColumns(Schema schema, List columns) { + Field[] fields = new Field[columns.size()]; + AtomicInteger i = new AtomicInteger(0); + columns.forEach(column -> { + Field field = schema.field(fieldNamer.fieldNameFor(column)); // may be null if the field is unused ... + fields[i.getAndIncrement()] = field; + }); + return fields; + } + + /** + * Obtain the array of converters for each column in a row. A converter might be null if the column is not be included in + * the records. + * + * @param schema the schema; may not be null + * @param tableId the identifier of the table that contains the columns + * @param columns the columns in the row; may not be null + * @param mappers the mapping functions for columns; may be null if none of the columns are to be mapped to different values + * @return the converters for each column in the rows; never null + */ + protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId, List columns, ColumnMappers mappers) { + + ValueConverter[] converters = new ValueConverter[columns.size()]; + + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + + ValueConverter converter = createValueConverterFor(tableId, column, schema.field(fieldNamer.fieldNameFor(column))); + converter = wrapInMappingConverterIfNeeded(mappers, tableId, column, converter); + + if (converter == null) { + LOGGER.warn( + "No converter found for column {}.{} of type {}. The column will not be part of change events for that table.", + tableId, column.name(), column.typeName()); + } + + // may be null if no converter found + converters[i] = converter; + } + + return converters; + } + + private ValueConverter wrapInMappingConverterIfNeeded(ColumnMappers mappers, TableId tableId, Column column, ValueConverter converter) { + if (mappers == null || converter == null) { + return converter; + } + + ValueConverter mappingConverter = mappers.mappingConverterFor(tableId, column); + if (mappingConverter == null) { + return converter; + } + + return (value) -> mappingConverter.convert(converter.convert(value)); + } + + /** + * Add to the supplied {@link SchemaBuilder} a field for the column with the given information. + * + * @param builder the schema builder; never null + * @param table the table definition; never null + * @param column the column definition + * @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values + */ + protected void addField(SchemaBuilder builder, Table table, Column column, ColumnMapper mapper) { + final SchemaBuilder fieldBuilder = customConverterRegistry.registerConverterFor(table.id(), column) + .orElse(valueConverterProvider.schemaBuilder(column)); + + if (fieldBuilder != null) { + if (mapper != null) { + // Let the mapper add properties to the schema ... + mapper.alterFieldSchema(column, fieldBuilder); + } + if (column.isOptional()) { + fieldBuilder.optional(); + } + + // if the default value is provided + if (column.hasDefaultValue()) { + fieldBuilder + .defaultValue(customConverterRegistry.getValueConverter(table.id(), column).orElse(ValueConverter.passthrough()).convert(column.defaultValue())); + } + Schema optionalCellSchema = cellSchema(fieldNamer.fieldNameFor(column), fieldBuilder.build(), column.isOptional()); + + builder.field(fieldNamer.fieldNameFor(column), optionalCellSchema); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("- field '{}' ({}{}) from column {}", column.name(), builder.isOptional() ? "OPTIONAL " : "", + fieldBuilder.type(), + column); + } + } + else { + LOGGER.warn("Unexpected JDBC type '{}' for column '{}' that will be ignored", column.jdbcType(), column.name()); + } + } + + /** + * Create a {@link ValueConverter} that can be used to convert row values for the given column into the Kafka Connect value + * object described by the {@link Field field definition}. This uses the supplied {@link ValueConverterProvider} object. + * + * @param tableId the id of the table containing the column; never null + * @param column the column describing the input values; never null + * @param fieldDefn the definition for the field in a Kafka Connect {@link Schema} describing the output of the function; + * never null + * @return the value conversion function; may not be null + */ + protected ValueConverter createValueConverterFor(TableId tableId, Column column, Field fieldDefn) { + return customConverterRegistry.getValueConverter(tableId, column).orElse(valueConverterProvider.converter(column, fieldDefn)); + } + + static Schema cellSchema(String name, Schema valueSchema, boolean isOptional) { + if (valueSchema != null) { + SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(name) + .field("value", valueSchema) + .field("set", Schema.BOOLEAN_SCHEMA); + if (isOptional) { + schemaBuilder.optional(); + } + return schemaBuilder.build(); + } + else { + return null; + } + } +} diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index ebc0bf69c10..8b2f5a83ad8 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -6,13 +6,7 @@ package io.debezium.connector.yugabytedb; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Struct; @@ -29,13 +23,7 @@ import io.debezium.pipeline.spi.ChangeRecordEmitter; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; -import io.debezium.relational.Column; -import io.debezium.relational.ColumnEditor; -import io.debezium.relational.RelationalChangeRecordEmitter; -import io.debezium.relational.Table; -import io.debezium.relational.TableEditor; -import io.debezium.relational.TableId; -import io.debezium.relational.TableSchema; +import io.debezium.relational.*; import io.debezium.schema.DataCollectionSchema; import io.debezium.util.Clock; import io.debezium.util.Strings; @@ -99,6 +87,8 @@ protected Operation getOperation() { return Operation.UPDATE; case DELETE: return Operation.DELETE; + case READ: + return Operation.READ; case TRUNCATE: return Operation.TRUNCATE; default: @@ -128,6 +118,8 @@ protected Object[] getOldColumnValues() { return null; case UPDATE: return null; + case READ: + return null; // return columnValues(message.getOldTupleList(), tableId, true, // message.hasTypeMetadata(), true, true); default: @@ -147,7 +139,8 @@ protected Object[] getNewColumnValues() { case CREATE: return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); case UPDATE: - // todo vaibhav: add scenario for the case of multiple columns being updated + return updatedColumnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); + case READ: return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); default: return null; @@ -215,7 +208,52 @@ private Object[] columnValues(List columns, TableId t if (position != -1) { Object value = column.getValue(() -> (BaseConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); - values[position] = value; + // values[position] = value; + values[position] = new Object[]{ value, Boolean.TRUE }; + } + } + return values; + } + + private Object[] updatedColumnValues(List columns, TableId tableId, + boolean refreshSchemaIfChanged, boolean metadataInMessage, + boolean sourceOfToasted, boolean oldValues) + throws SQLException { + if (columns == null || columns.isEmpty()) { + return null; + } + final Table table = schema.tableFor(tableId); + if (table == null) { + schema.dumpTableId(); + } + Objects.requireNonNull(table); + + // based on the schema columns, create the values on the same position as the columns + List schemaColumns = table.columns(); + // based on the replication message without toasted columns for now + List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)) + .collect(Collectors.toList()); + // JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT + Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() + ? schemaColumns.size() + : columnsWithoutToasted.size()]; + + // initialize to unset + + final Set undeliveredToastableColumns = new HashSet<>(schema + .getToastableColumnsForTableId(table.id())); + for (ReplicationMessage.Column column : columns) { + // DBZ-298 Quoted column names will be sent like that in messages, + // but stored unquoted in the column names + final String columnName = Strings.unquoteIdentifierPart(column.getName()); + undeliveredToastableColumns.remove(columnName); + + int position = getPosition(columnName, table, values); + if (position != -1) { + Object value = column.getValue(() -> (BaseConnection) connection.connection(), + connectorConfig.includeUnknownDatatypes()); + + values[position] = new Object[]{ value, Boolean.TRUE }; } } return values; diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java index ea52a4a48ad..ea0d30e6048 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnector.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.yb.client.*; import org.yb.master.MasterDdlOuterClass; +import org.yb.master.MasterTypes; import com.google.common.net.HostAndPort; @@ -31,7 +32,6 @@ import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; -import org.yb.master.MasterTypes; /** * A Kafka Connect source connector that creates tasks which use YugabyteDB CDC API diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index a7a1a443e85..33f99c4be96 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -158,8 +158,8 @@ public ChangeEventSourceCoordinator previousOffsets = getPreviousOffsetss(new YugabyteDBPartition.Provider(connectorConfig), new YugabyteDBOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); - final Set previousOffset = new HashSet<>(previousOffsets.values()); - YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffset, connectorConfig); + + YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffsets, connectorConfig); LoggingContext.PreviousContext previousContext = taskContext .configureLoggingContext(CONTEXT_NAME); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 851b02af861..fb4fc151d55 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -72,42 +72,56 @@ private YugabyteDBOffsetContext(YugabyteDBConnectorConfig connectorConfig, this.incrementalSnapshotContext = incrementalSnapshotContext; } - public YugabyteDBOffsetContext(Set s, + public YugabyteDBOffsetContext(Map previousOffsets, YugabyteDBConnectorConfig config) { this.tabletSourceInfo = new ConcurrentHashMap(); this.sourceInfo = new SourceInfo(config); this.sourceInfoSchema = sourceInfo.schema(); - for (YugabyteDBOffsetContext context : s) { - if (context != null) { - LOGGER.debug("Populating the tabletsourceinfo" + context.getTabletSourceInfo()); - if (context.getTabletSourceInfo() != null) { - this.tabletSourceInfo.putAll(context.getTabletSourceInfo()); - } + + for (Map.Entry context : previousOffsets.entrySet()) { + YugabyteDBOffsetContext c = context.getValue(); + if (c != null) { + this.lastCompletelyProcessedLsn = c.lastCompletelyProcessedLsn; + this.lastCommitLsn = c.lastCommitLsn; + String tabletId = context.getKey().getSourcePartition().values().stream().findAny().get(); + initSourceInfo(tabletId, config); + this.updateWalPosition(tabletId, + this.lastCommitLsn, lastCompletelyProcessedLsn, null, null, null, null); } } + LOGGER.debug("Populating the tabletsourceinfo with " + this.getTabletSourceInfo()); this.transactionContext = new TransactionContext(); this.incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>(); } + public static YugabyteDBOffsetContext initialContextForSnapshot(YugabyteDBConnectorConfig connectorConfig, + YugabyteDBConnection jdbcConnection, + Clock clock, + Set partitions) { + return initialContext(connectorConfig, jdbcConnection, clock, new OpId(-1, -1, "".getBytes(), -1, 0), + new OpId(-1, -1, "".getBytes(), -1, 0), partitions); + } + public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, YugabyteDBConnection jdbcConnection, - Clock clock) { - return initialContext(connectorConfig, jdbcConnection, clock, null, - null); + Clock clock, + Set partitions) { + return initialContext(connectorConfig, jdbcConnection, clock, new OpId(0, 0, "".getBytes(), 0, 0), + new OpId(0, 0, "".getBytes(), 0, 0), partitions); } public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, YugabyteDBConnection jdbcConnection, Clock clock, OpId lastCommitLsn, - OpId lastCompletelyProcessedLsn) { - + OpId lastCompletelyProcessedLsn, + Set partitions) { LOGGER.info("Creating initial offset context"); final OpId lsn = null; // OpId.valueOf(jdbcConnection.currentXLogLocation()); // TODO:Suranjan read the offset for each of the tablet final long txId = 0L;// new OpId(0,0,"".getBytes(), 0); LOGGER.info("Read checkpoint at '{}' ", lsn, txId); - return new YugabyteDBOffsetContext( + YugabyteDBOffsetContext context = new YugabyteDBOffsetContext( connectorConfig, lsn, lastCompletelyProcessedLsn, @@ -118,7 +132,13 @@ public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig c false, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>()); - + for (YBPartition p : partitions) { + if (context.getTabletSourceInfo().get(p.getTabletId()) == null) { + context.initSourceInfo(p.getTabletId(), connectorConfig); + context.updateWalPosition(p.getTabletId(), lastCommitLsn, lastCompletelyProcessedLsn, clock.currentTimeAsInstant(), String.valueOf(txId), null, null); + } + } + return context; } @Override @@ -241,6 +261,10 @@ OpId lastCompletelyProcessedLsn() { return lastCompletelyProcessedLsn; } + OpId lastCompletelyProcessedLsn(String tabletId) { + return lastCompletelyProcessedLsn; + } + OpId lastCommitLsn() { return lastCommitLsn; } @@ -273,7 +297,8 @@ public String toString() { + ", lastCommitLsn=" + lastCommitLsn + ", streamingStoppingLsn=" + streamingStoppingLsn + ", transactionContext=" + transactionContext - + ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]"; + + ", incrementalSnapshotContext=" + incrementalSnapshotContext + + ", tabletSourceInfo=" + tabletSourceInfo + "]"; } public OffsetState asOffsetState() { @@ -333,31 +358,18 @@ private String readOptionalString(Map offset, String key) { public YugabyteDBOffsetContext load(Map offset) { LOGGER.debug("The offset being loaded in YugabyteDBOffsetContext.. " + offset); - - /* - * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); - * final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset, - * LAST_COMPLETELY_PROCESSED_LSN_KEY)); - * final OpId lastCommitLsn = OpId.valueOf(readOptionalString(offset, - * LAST_COMPLETELY_PROCESSED_LSN_KEY)); - * final String txId = readOptionalString(offset, SourceInfo.TXID_KEY); - * - * final Instant useconds = Conversions.toInstantFromMicros((Long) offset - * .get(SourceInfo.TIMESTAMP_USEC_KEY)); - * final boolean snapshot = (boolean) ((Map) offset) - * .getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); - * final boolean lastSnapshotRecord = (boolean) ((Map) offset) - * .getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE); - * return new YugabyteDBOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, - * lastCommitLsn, txId, useconds, snapshot, lastSnapshotRecord, - * TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext - * .load(offset)); - */ + OpId lastCompletelyProcessedLsn; + if (offset != null) { + lastCompletelyProcessedLsn = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); + } + else { + lastCompletelyProcessedLsn = new OpId(0, 0, "".getBytes(), 0, 0); + } return new YugabyteDBOffsetContext(connectorConfig, - new OpId(0, 0, null, 0, 0), - new OpId(0, 0, null, 0, 0), - new OpId(0, 0, null, 0, 0), + lastCompletelyProcessedLsn, + lastCompletelyProcessedLsn, + lastCompletelyProcessedLsn, "txId", Instant.MIN, false, false, TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset)); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java index b4f64c11c86..2ccc7736b03 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java @@ -66,7 +66,7 @@ protected YugabyteDBSchema(YugabyteDBConnectorConfig config, YugabyteDBTypeRegis private static TableSchemaBuilder getTableSchemaBuilder(YugabyteDBConnectorConfig config, YugabyteDBValueConverter valueConverter) { - return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), + return new YBTableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), config.getSanitizeFieldNames()); } diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 2dc68b1886d..4eeb7078ea8 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -21,10 +21,7 @@ import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.relational.RelationalSnapshotChangeEventSource; -import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; import io.debezium.util.Strings; @@ -111,166 +108,7 @@ public SnapshotResult doExecute( SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { - final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; - - // Connection connection = null; - try { - LOGGER.info("Snapshot step 1 - Preparing"); - - if (previousOffset != null && previousOffset.isSnapshotRunning()) { - LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken."); - } - - // connection = createSnapshotConnection(); - // connectionCreated(ctx); - - LOGGER.info("Snapshot step 2 - Determining captured tables"); - - // Note that there's a minor race condition here: a new table matching the filters could be created between - // this call and the determination of the initial snapshot position below; this seems acceptable, though - determineCapturedTables(ctx); - snapshotProgressListener.monitoredDataCollectionsDetermined(ctx.capturedTables); - - LOGGER.info("Snapshot step 3 - Locking captured tables {}", ctx.capturedTables); - - if (snapshottingTask.snapshotSchema()) { - // lockTablesForSchemaSnapshot(context, ctx); - } - - LOGGER.info("Snapshot step 4 - Determining snapshot offset"); - determineSnapshotOffset(ctx, previousOffset); - - LOGGER.info("Snapshot step 5 - Reading structure of captured tables"); - readTableStructure(context, ctx, previousOffset); - - if (snapshottingTask.snapshotSchema()) { - LOGGER.info("Snapshot step 6 - Persisting schema history"); - - // createSchemaChangeEventsForTables(context, ctx, snapshottingTask); - - // if we've been interrupted before, the TX rollback will cause any locks to be released - releaseSchemaSnapshotLocks(ctx); - } - else { - LOGGER.info("Snapshot step 6 - Skipping persisting of schema history"); - } - - if (snapshottingTask.snapshotData()) { - LOGGER.info("Snapshot step 7 - Snapshotting data"); - createDataEvents(context, ctx); - } - else { - LOGGER.info("Snapshot step 7 - Skipping snapshotting of data"); - // releaseDataSnapshotLocks(ctx); - ctx.offset.preSnapshotCompletion(); - ctx.offset.postSnapshotCompletion(); - } - - // postSnapshot(); - dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset); - return SnapshotResult.completed(ctx.offset); - } - finally { - // rollbackTransaction(connection); - } - } - - private void createDataEvents(ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) - throws Exception { - EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); - // tryStartingSnapshot(snapshotContext); - - final int tableCount = snapshotContext.capturedTables.size(); - int tableOrder = 1; - LOGGER.info("Snapshotting contents of {} tables while still in transaction", tableCount); - for (Iterator tableIdIterator = snapshotContext.capturedTables.iterator(); tableIdIterator.hasNext();) { - final TableId tableId = tableIdIterator.next(); - snapshotContext.lastTable = !tableIdIterator.hasNext(); - - if (!sourceContext.isRunning()) { - throw new InterruptedException("Interrupted while snapshotting table " + tableId); - } - - LOGGER.debug("Snapshotting table {}", tableId); - - createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, - snapshotContext.tables.forTable(tableId), tableOrder++, tableCount); - } - - // releaseDataSnapshotLocks(snapshotContext); - snapshotContext.offset.preSnapshotCompletion(); - snapshotReceiver.completeSnapshot(); - snapshotContext.offset.postSnapshotCompletion(); - } - - private void createDataEventsForTable(ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, int tableOrder, - int tableCount) - throws InterruptedException { - - long exportStart = clock.currentTimeInMillis(); - LOGGER.info("Exporting data from table '{}' ({} of {} tables)", table.id(), tableOrder, tableCount); - - // final Optional selectStatement = determineSnapshotSelect(snapshotContext, table.id()); - if (true/* !selectStatement.isPresent() */) { - LOGGER.warn("For table '{}' the select statement was not provided, skipping table", table.id()); - snapshotProgressListener.dataCollectionSnapshotCompleted(table.id(), 0); - return; - } - // LOGGER.info("\t For table '{}' using select statement: '{}'", table.id(), selectStatement.get()); - final OptionalLong rowCount = OptionalLong.empty();// rowCountForTable(table.id()); - - // try (Statement statement = readTableStatement(rowCount); - // ResultSet rs = statement.executeQuery(selectStatement.get())) { - - // ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); - long rows = 0; - // Threads.Timer logTimer = getTableScanLogTimer(); - snapshotContext.lastRecordInTable = false; - - // if (rs.next()) { - // while (!snapshotContext.lastRecordInTable) { - // if (!sourceContext.isRunning()) { - // throw new InterruptedException("Interrupted while snapshotting table " + table.id()); - // } - // - // rows++; - // final Object[] row = jdbcConnection.rowToArray(table, schema(), rs, columnArray); - // - // snapshotContext.lastRecordInTable = !rs.next(); - // if (logTimer.expired()) { - // long stop = clock.currentTimeInMillis(); - // if (rowCount.isPresent()) { - // LOGGER.info("\t Exported {} of {} records for table '{}' after {}", rows, rowCount.getAsLong(), - // table.id(), Strings.duration(stop - exportStart)); - // } - // else { - // LOGGER.info("\t Exported {} records for table '{}' after {}", rows, table.id(), - // Strings.duration(stop - exportStart)); - // } - // snapshotProgressListener.rowsScanned(table.id(), rows); - // logTimer = getTableScanLogTimer(); - // } - // - // if (snapshotContext.lastTable && snapshotContext.lastRecordInTable) { - // lastSnapshotRecord(snapshotContext); - // } - // dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver); - // } - // } - // else if (snapshotContext.lastTable) { - // lastSnapshotRecord(snapshotContext); - // } - - LOGGER.info("\t Finished exporting {} records for table '{}'; total duration '{}'", rows, - table.id(), Strings.duration(clock.currentTimeInMillis() - exportStart)); - snapshotProgressListener.dataCollectionSnapshotCompleted(table.id(), rows); - // } - // catch (SQLException e) { - // throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); - // } + return SnapshotResult.completed(previousOffset); } @Override @@ -296,42 +134,12 @@ protected SnapshotContext prepare( return new PostgresSnapshotContext(partition, connectorConfig.databaseName()); } - // @Override - // protected void connectionCreated(RelationalSnapshotContext snapshotContext) - // throws Exception { - // // If using catch up streaming, the connector opens the transaction that the snapshot will eventually use - // // before the catch up streaming starts. By looking at the current wal location, the transaction can determine - // // where the catch up streaming should stop. The transaction is held open throughout the catch up - // // streaming phase so that the snapshot is performed from a consistent view of the data. Since the isolation - // // level on the transaction used in catch up streaming has already set the isolation level and executed - // // statements, the transaction does not need to get set the level again here. - // if (snapshotter.shouldStreamEventsStartingFromSnapshot() && startingSlotInfo == null) { - // setSnapshotTransactionIsolationLevel(); - // } - // schema.refresh(jdbcConnection, false); - // } - protected Set getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx) throws Exception { // return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[]{ "TABLE" }); return new HashSet<>(); } - // @Override - // protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, - // RelationalSnapshotContext snapshotContext) - // throws SQLException, InterruptedException { - // final Duration lockTimeout = connectorConfig.snapshotLockTimeout(); - // final Optional lockStatement = snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables); - // - // if (lockStatement.isPresent()) { - // LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", lockTimeout.getSeconds()); - // jdbcConnection.executeWithoutCommitting(lockStatement.get()); - // // now that we have the locks, refresh the schema - // schema.refresh(jdbcConnection, false); - // } - // } - protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException { } @@ -391,50 +199,6 @@ private OpId getTransactionStartLsn() throws SQLException { return null;// OpId.valueOf(jdbcConnection.currentXLogLocation()); } - protected void readTableStructure(ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - YugabyteDBOffsetContext offsetContext) - throws SQLException, InterruptedException { - Set schemas = snapshotContext.capturedTables.stream() - .map(TableId::schema) - .collect(Collectors.toSet()); - - // reading info only for the schemas we're interested in as per the set of captured tables; - // while the passed table name filter alone would skip all non-included tables, reading the schema - // would take much longer that way - // for (String schema : schemas) { - // if (!sourceContext.isRunning()) { - // throw new InterruptedException("Interrupted while reading structure of schema " + schema); - // } - // - // LOGGER.info("Reading structure of schema '{}'", snapshotContext.catalogName); - // jdbcConnection.readSchema( - // snapshotContext.tables, - // snapshotContext.catalogName, - // schema, - // connectorConfig.getTableFilters().dataCollectionFilter(), - // null, - // false); - // } - // schema.refresh(jdbcConnection, false); - } - - protected SchemaChangeEvent getCreateTableEvent( - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - Table table) - throws SQLException { - return new SchemaChangeEvent( - snapshotContext.partition.getSourcePartition(), - snapshotContext.offset.getOffset(), - snapshotContext.offset.getSourceInfo(), - snapshotContext.catalogName, - table.id().schema(), - null, - table, - SchemaChangeEventType.CREATE, - true); - } - @Override protected void complete(SnapshotContext snapshotContext) { snapshotter.snapshotCompleted(); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 6fe0572696e..d9efb6f3386 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -113,19 +113,29 @@ public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorC @Override public void execute(ChangeEventSourceContext context, YugabyteDBPartition partition, YugabyteDBOffsetContext offsetContext) { - if (!snapshotter.shouldStream()) { - LOGGER.info("Streaming is not enabled in correct configuration"); - return; - } - // replication slot could exist at the time of starting Debezium so // we will stream from the position in the slot // instead of the last position in the database - boolean hasStartLsnStoredInContext = offsetContext != null; + // Get all partitions + // Get + Set partitions = new YugabyteDBPartition.Provider(connectorConfig).getPartitions(); + boolean hasStartLsnStoredInContext = offsetContext != null && !offsetContext.getTabletSourceInfo().isEmpty(); + // LOGGER.info("SKSK The offset context is " + offsetContext + " partition is " + partition); if (!hasStartLsnStoredInContext) { - offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock); + LOGGER.info("No start opid found in the context."); + if (snapshotter.shouldSnapshot()) { + offsetContext = YugabyteDBOffsetContext.initialContextForSnapshot(connectorConfig, connection, clock, partitions); + } + else { + offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock, partitions); + } } + /* + * if (snapshotter.shouldSnapshot()) { + * getSnapshotChanges(); + * } + */ try { final WalPositionLocator walPosition; @@ -141,44 +151,12 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit LOGGER.info("No previous LSN found in Kafka, streaming from the latest checkpoint" + " in YugabyteDB"); walPosition = new WalPositionLocator(); + // create snpashot offset. + // replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition)); } - // for large dbs, the refresh of schema can take too much time - // such that the connection times out. We must enable keep - // alive to ensure that it doesn't time out - // ReplicationStream stream = this.replicationStream.get(); - // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); - - // refresh the schema so we have a latest view of the DB tables - // taskContext.refreshSchema(connection, true); - - // If we need to do a pre-snapshot streaming catch up, we should allow the snapshot transaction to persist - // but normally we want to start streaming without any open transactions. - // if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // connection.commit(); - // } - - // this.lastCompletelyProcessedLsn = replicationStream.get().startLsn(); - - // if (walPosition.searchingEnabled()) { - // searchWalPosition(context, stream, walPosition); - // try { - // if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // connection.commit(); - // } - // } - // catch (Exception e) { - // LOGGER.info("Commit failed while preparing for reconnect", e); - // } - // walPosition.enableFiltering(); - // stream.stopKeepAlive(); - // replicationConnection.reconnect(); - // replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); - // stream = this.replicationStream.get(); - // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); - // } - // processMessages(context, partition, offsetContext, stream); - getChanges2(context, partition, offsetContext); + + getChanges(context, partition, offsetContext, hasStartLsnStoredInContext); } catch (Throwable e) { errorHandler.setProducerThrowable(e); @@ -186,13 +164,7 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit finally { if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // Need to CDCSDK see what can be done. - // try { - // connection.commit(); - // } - // catch (SQLException throwables) { - // throwables.printStackTrace(); - // } + } if (asyncYBClient != null) { try { @@ -210,39 +182,36 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit e.printStackTrace(); } } - // if (replicationConnection != null) { - // LOGGER.debug("stopping streaming..."); - // // stop the keep alive thread, this also shuts down the - // // executor pool - // ReplicationStream stream = replicationStream.get(); - // if (stream != null) { - // stream.stopKeepAlive(); - // } - // // TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the stream, but it's not reliable atm (see javadoc) - // // replicationStream.close(); - // // close the connection - this should also disconnect the current stream even if it's blocking - // try { - // if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // connection.commit(); - // } - // replicationConnection.close(); - // } - // catch (Exception e) { - // LOGGER.debug("Exception while closing the connection", e); - // } - // replicationStream.set(null); - // } } } private GetChangesResponse getChangeResponse(YugabyteDBOffsetContext offsetContext) throws Exception { - return null; } - private void getChanges2(ChangeEventSourceContext context, - YugabyteDBPartition partitionn, - YugabyteDBOffsetContext offsetContext) + private void getSnapshotChanges(ChangeEventSourceContext context, + YugabyteDBPartition partitionn, + YugabyteDBOffsetContext offsetContext, + boolean previousOffsetPresent) { + + } + + private boolean isSnapshotCompleteForAllTablets(Map snapshotDoneForTablet) { + for (Map.Entry entry : snapshotDoneForTablet.entrySet()) { + if (entry.getValue() == Boolean.FALSE) { + return false; + } + } + + // Returning true would mean that we have captured the snapshot for all the tablets and in case of initial_only snapshotter + // the snapshot can be stopped now. + return true; + } + + private void getChanges(ChangeEventSourceContext context, + YugabyteDBPartition partitionn, + YugabyteDBOffsetContext offsetContext, + boolean previousOffsetPresent) throws Exception { LOGGER.debug("The offset is " + offsetContext.getOffset()); @@ -274,38 +243,48 @@ private void getChanges2(ChangeEventSourceContext context, tableIdToTable.put(tId, table); } - int noMessageIterations = 0; + // todo: rename schemaStreamed to something else + Map schemaStreamed = new HashMap<>(); + Map snapshotDoneForTablet = new HashMap<>(); + + // Initialize all the tabletIds with true signifying we need schemas for all the tablets. + // Also initialize the snapshot flags for each tablet. for (Pair entry : tabletPairList) { - final String tabletId = entry.getValue(); - offsetContext.initSourceInfo(tabletId, this.connectorConfig); + schemaStreamed.put(entry.getValue(), Boolean.TRUE); + snapshotDoneForTablet.put(entry.getValue(), Boolean.FALSE); } - LOGGER.debug("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); + + LOGGER.info("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); while (context.isRunning() && (offsetContext.getStreamingStoppingLsn() == null || (lastCompletelyProcessedLsn.compareTo(offsetContext.getStreamingStoppingLsn()) < 0))) { + // The following will specify the connector polling interval at which + // yb-client will ask the database for changes + Thread.sleep(connectorConfig.cdcPollIntervalms()); for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); YBPartition part = new YBPartition(tabletId); - // The following will specify the connector polling interval at which - // yb-client will ask the database for changes - Thread.sleep(connectorConfig.cdcPollIntervalms()); + // Ignore this tablet if the snapshot is already complete for it. + if (!snapshotter.shouldStream() && snapshotDoneForTablet.get(tabletId)) { + continue; + } YBTable table = tableIdToTable.get(entry.getKey()); OpId cp = offsetContext.lsn(tabletId); // GetChangesResponse response = getChangeResponse(offsetContext); - LOGGER.debug("Going to fetch for tablet " + tabletId + " from OpId " + cp + " " + + LOGGER.info("Going to fetch for tablet " + tabletId + " from OpId " + cp + " " + "table " + table.getName()); GetChangesResponse response = this.syncClient.getChangesCDCSDK( table, streamId, tabletId, - cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime()); + cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime(), schemaStreamed.get(tabletId)); - for (CdcService.CDCSDKProtoRecordPB record : response - .getResp() - .getCdcSdkProtoRecordsList()) { + + for (CdcService.CDCSDKProtoRecordPB record : response.getResp().getCdcSdkProtoRecordsList()) { + LOGGER.info("SKSK the recrds are " + record); CdcService.RowMessage m = record.getRowMessage(); YbProtoReplicationMessage message = new YbProtoReplicationMessage( m, this.yugabyteDBTypeRegistry); @@ -359,6 +338,10 @@ else if (message.isDDLMessage()) { LOGGER.debug("Received DDL message {}", message.getSchema().toString() + " the table is " + message.getTable()); + // Set schema received for this tablet ID which means that if a DDL message is received for a tablet, + // we do not need its schema again. + schemaStreamed.put(tabletId, Boolean.FALSE); + TableId tableId = null; if (message.getOperation() != Operation.NOOP) { tableId = YugabyteDBSchema.parseWithSchema(message.getTable(), pgSchemaNameInRecord); @@ -401,6 +384,25 @@ else if (message.isDDLMessage()) { } } + + if (!snapshotter.shouldStream() && response.getResp().getCdcSdkCheckpoint().getWriteId() != -1) { + LOGGER.debug("Marking snapshot complete for tablet: " + tabletId); + snapshotDoneForTablet.put(tabletId, Boolean.TRUE); + } + + // End the snapshot in case the snapshot is complete. + if (isSnapshotCompleteForAllTablets(snapshotDoneForTablet)) { + return; + } + + OpId finalOpid = new OpId( + response.getTerm(), + response.getIndex(), + response.getKey(), + response.getWriteId(), + response.getSnapshotTime()); + offsetContext.getSourceInfo(tabletId).updateLastCommit(finalOpid); + probeConnectionIfNeeded(); if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { @@ -509,10 +511,6 @@ public void commitOffset(Map offset) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing LSN to server: {}", lsn); } - // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments - // CDCSDK yugabyte does it automatically. - // but we may need an API - // replicationStream.flushLsn(lsn); } else { LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java index 8e9e86b7d4f..f75feef0b03 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java @@ -91,7 +91,7 @@ public boolean equals(Object o) { return false; OpId that = (OpId) o; return term == that.term && index == that.index && time == that.time - && write_id == that.write_id && Objects.equal(key, that.key); + && write_id == that.write_id && Arrays.equals(key, that.key); } @Override diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java index 65e4ff5bda9..aa941f49ffe 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java @@ -47,6 +47,7 @@ public enum Operation { BEGIN, COMMIT, DDL, + READ, NOOP } diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java index e2575a56440..1410e0dc1e0 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java @@ -28,7 +28,7 @@ /** * Replication message representing message sent by Postgres Decoderbufs * - * @author Jiri Pechanec + * @author Suranjan Kumar */ public class YbProtoReplicationMessage implements ReplicationMessage { @@ -52,6 +52,8 @@ public Operation getOperation() { return Operation.UPDATE; case DELETE: return Operation.DELETE; + case READ: + return Operation.READ; case BEGIN: return Operation.BEGIN; case COMMIT: diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java new file mode 100644 index 00000000000..c380f5b7d10 --- /dev/null +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java @@ -0,0 +1,144 @@ +package io.debezium.connector.yugabytedb.transforms; + +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yb.util.Pair; + +import io.debezium.transforms.ExtractNewRecordState; + +public class YBExtractNewRecordState> extends ExtractNewRecordState { + private Cache schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(256)); + private static final Logger LOGGER = LoggerFactory.getLogger(YBExtractNewRecordState.class); + + @Override + public R apply(final R record) { + final R ret = super.apply(record); + if (ret == null || (ret.value() != null && !(ret.value() instanceof Struct))) { + return ret; + } + + Pair p = getUpdatedValueAndSchema((Struct) ret.key()); + Schema updatedSchemaForKey = (Schema) p.getFirst(); + Struct updatedValueForKey = (Struct) p.getSecond(); + + Schema updatedSchemaForValue = null; + Struct updatedValueForValue = null; + if (ret.value() != null) { + Pair val = getUpdatedValueAndSchema((Struct) ret.value()); + updatedSchemaForValue = (Schema) val.getFirst(); + updatedValueForValue = (Struct) val.getSecond(); + } + + return ret.newRecord(ret.topic(), ret.kafkaPartition(), updatedSchemaForKey, updatedValueForKey, updatedSchemaForValue, updatedValueForValue, ret.timestamp()); + } + + @Override + public void close() { + super.close(); + schemaUpdateCache = null; + } + + private boolean isSimplifiableField(Field field) { + if (field.schema().type() != Type.STRUCT) { + return false; + } + + if (field.schema().fields().size() != 2 + || (!Objects.equals(field.schema().fields().get(0).name(), "value") + || !Objects.equals(field.schema().fields().get(1).name(), "set"))) { + return false; + } + return true; + } + + // todo: this function can be removed + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + + for (Field field : schema.fields()) { + if (isSimplifiableField(field)) { + builder.field(field.name(), field.schema().field("value").schema()); + } + else { + builder.field(field.name(), field.schema()); + } + } + + return builder.build(); + } + + private Schema makeUpdatedSchema(Schema schema, Struct value) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + + for (Field field : schema.fields()) { + if (isSimplifiableField(field)) { + if (value.get(field.name()) != null) { + builder.field(field.name(), field.schema().field("value").schema()); + } + } + else { + builder.field(field.name(), field.schema()); + } + } + + return builder.build(); + } + + private Pair getUpdatedValueAndSchema(Struct obj) { + final Struct value = obj; + Schema updatedSchema = null; + if (updatedSchema == null) { + updatedSchema = makeUpdatedSchema(value.schema(), value); + } + + LOGGER.debug("Updated schema as json: " + io.debezium.data.SchemaUtil.asString(value.schema())); + + final Struct updatedValue = new Struct(updatedSchema); + + for (Field field : value.schema().fields()) { + if (isSimplifiableField(field)) { + Struct fieldValue = (Struct) value.get(field); + if (fieldValue != null) { + updatedValue.put(field.name(), fieldValue.get("value")); + } + } + else { + } + } + + return new org.yb.util.Pair(updatedSchema, updatedValue); + } +} + +class SchemaUtil { + + public static SchemaBuilder copySchemaBasics(Schema source) { + return copySchemaBasics(source, new SchemaBuilder(source.type())); + } + + public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) { + builder.name(source.name()); + builder.version(source.version()); + builder.doc(source.doc()); + + final Map params = source.parameters(); + if (params != null) { + builder.parameters(params); + } + + return builder; + } + +} diff --git a/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java b/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java index 5b0b09a8b74..0f54ef0ce80 100644 --- a/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java +++ b/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java @@ -41,6 +41,7 @@ private CompletableFuture insertRecords(long numOfRowsToBeInserted) { for (int i = 0; i < numOfRowsToBeInserted; i++) { TestHelper.execute(String.format(formatInsertString, i)); } + }).exceptionally(throwable -> { throw new RuntimeException(throwable); }); @@ -70,9 +71,11 @@ protected Configuration.Builder getConfigBuilder(String fullTablenameWithSchema) } private void verifyPrimaryKeyOnly(long recordsCount) { + System.out.println("verifyPrimaryKeyOnly "); int totalConsumedRecords = 0; long start = System.currentTimeMillis(); List records = new ArrayList<>(); + recordsCount = 100; while (totalConsumedRecords < recordsCount) { int consumed = super.consumeAvailableRecords(record -> { System.out.println("The record being consumed is " + record); @@ -90,6 +93,7 @@ private void verifyPrimaryKeyOnly(long recordsCount) { // verify the records assertInsert(records.get(i), "id", i); } + } private void verifyValue(long recordsCount) { @@ -145,10 +149,12 @@ public void testRecordConsumption() throws Exception { Configuration.Builder configBuilder = getConfigBuilder("public.t1"); start(YugabyteDBConnector.class, configBuilder.build()); assertConnectorIsRunning(); - final long recordsCount = 2; + final long recordsCount = 1; + System.out.println("testRecordConsumption"); // insert rows in the table t1 with values insertRecords(recordsCount); + System.out.println("testRecordConsumption"); CompletableFuture.runAsync(() -> verifyPrimaryKeyOnly(recordsCount)) .exceptionally(throwable -> { diff --git a/debezium-connector-yugabytedb2/src/test/resources/log4j.properties b/debezium-connector-yugabytedb2/src/test/resources/log4j.properties index f90be99692e..c3487551fda 100644 --- a/debezium-connector-yugabytedb2/src/test/resources/log4j.properties +++ b/debezium-connector-yugabytedb2/src/test/resources/log4j.properties @@ -2,23 +2,23 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n -log4j.appender.stdout.threshold=DEBUG +#log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n +#log4j.appender.stdout.threshold=DEBUG # Root logger option -log4j.rootLogger=TRACE +log4j.rootLogger=DEBUG # Set up the default logging to be INFO level, then override specific units log4j.logger.io.confluent.connect.avro=WARN log4j.logger.io.confluent.kafka.serializers=WARN -log4j.logger.io.debezium=INFO +#log4j.logger.io.debezium=DEBUG log4j.logger.io.debezium.pipeline=DEBUG log4j.logger.io.debezium.connector.postgresql=DEBUG -log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings() +#log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings() log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN #log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG #log4j.logger.io.debezium.connector.postgresql.RecordsStreamProducer=DEBUG #log4j.logger.io.debezium.connector.postgresql.connection.YugabyteDBReplicationConnection=DEBUG #log4j.logger.io.debezium.connector.postgresql.PostgresConnectorTask=DEBUG log4j.logger.org.reflections=ERROR -log4j.logger.org.yb.client = INFO +#log4j.logger.org.yb.client=INFO diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index a88a9de60f8..53945a7f03b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -100,7 +100,7 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t final TableId tableId = table.id(); final String tableIdStr = tableSchemaName(tableId); final String schemaNamePrefix = schemaPrefix + tableIdStr; - LOGGER.info("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); + LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value")); SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key")); AtomicBoolean hasPrimaryKey = new AtomicBoolean(false);