Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
003a5c3
support for snapshot
suranjan Feb 24, 2022
d3b340f
Fixed previous offset
Feb 28, 2022
2552b0b
fix for snapshot
suranjan Feb 28, 2022
b983a32
fix for snapshot
suranjan Feb 28, 2022
b99a3b9
Merge branch 'final-connector-ybdb' into iamoncar-final-connector
Mar 1, 2022
07ead05
Addressed review comments
Mar 1, 2022
6674a05
Merge branch 'iamoncar-final-connector' into final-connector-ybdb-sna…
suranjan Mar 2, 2022
064314c
fix
suranjan Mar 2, 2022
52dc984
fix
suranjan Mar 2, 2022
9894a32
fix
suranjan Mar 2, 2022
4762766
fix
suranjan Mar 2, 2022
67a81af
changes to fix snapshot modes
fourpointfour Mar 10, 2022
233dc84
fixed NullPointerException on restart of connector
fourpointfour Mar 16, 2022
c99649f
refactored a comment
fourpointfour Mar 16, 2022
f6ce7f6
fixed NullPointerException across connector restarts
fourpointfour Mar 16, 2022
b61ab24
added initial_only mode of snapshot streaming
fourpointfour Mar 22, 2022
b8d743c
removed extra comments
fourpointfour Mar 22, 2022
a307880
resolved merge conflicts
fourpointfour Mar 22, 2022
8a2f232
fixed a bug in initial_only snapshot mode
fourpointfour Mar 22, 2022
a240a66
Merge branch 'snapshot-branch-local' into merge-with-snapshot
fourpointfour Mar 22, 2022
4775b90
add update record fix
suranjan Mar 23, 2022
b7852cf
fix for record
suranjan Mar 23, 2022
071bd6b
fix for update wrapper
fourpointfour Mar 23, 2022
6d24e01
Merge branch 'update-record-fix' into update-wrapper-final
suranjan Mar 23, 2022
5f4bb9f
rever core changes
suranjan Mar 23, 2022
2ef67b0
added fix for update wrapper
fourpointfour Mar 23, 2022
7d98832
resolved merge conflicts
fourpointfour Mar 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package io.debezium.connector.yugabytedb;

import java.util.*;

import org.yb.client.*;
import java.util.Map;
import java.util.Objects;

import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Collect;

public class YBPartition implements Partition {

private static final String TABLETS_PARTITION_KEY = "tabletids";
private static final String TABLET_PARTITION_KEY = "tabletid";

private final String listOfTablets;
private final String tabletId;

public YBPartition(String listOfTablets) {
this.listOfTablets = listOfTablets;
public YBPartition(String tabletId) {
this.tabletId = tabletId;
}

@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(TABLETS_PARTITION_KEY, listOfTablets);
return Collect.hashMapOf(TABLET_PARTITION_KEY, tabletId);
}

public String getTabletId() {
return this.tabletId;
}

@Override
Expand All @@ -31,12 +34,18 @@ public boolean equals(Object obj) {
return false;
}
final YBPartition other = (YBPartition) obj;
return Objects.equals(listOfTablets, other.listOfTablets);
return Objects.equals(tabletId, other.tabletId);
}

@Override
public int hashCode() {
return listOfTablets.hashCode();
return tabletId.hashCode();
}

@Override
public String toString() {
return "YBPartition{" +
"tabletId='" + tabletId + '\'' +
'}';
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@
package io.debezium.connector.yugabytedb;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Struct;
Expand All @@ -29,13 +23,7 @@
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.*;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
Expand Down Expand Up @@ -99,6 +87,8 @@ protected Operation getOperation() {
return Operation.UPDATE;
case DELETE:
return Operation.DELETE;
case READ:
return Operation.READ;
case TRUNCATE:
return Operation.TRUNCATE;
default:
Expand Down Expand Up @@ -128,6 +118,8 @@ protected Object[] getOldColumnValues() {
return null;
case UPDATE:
return null;
case READ:
return null;
// return columnValues(message.getOldTupleList(), tableId, true,
// message.hasTypeMetadata(), true, true);
default:
Expand All @@ -147,7 +139,8 @@ protected Object[] getNewColumnValues() {
case CREATE:
return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false);
case UPDATE:
// todo vaibhav: add scenario for the case of multiple columns being updated
return updatedColumnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false);
case READ:
return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false);
default:
return null;
Expand Down Expand Up @@ -215,7 +208,52 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
if (position != -1) {
Object value = column.getValue(() -> (BaseConnection) connection.connection(),
connectorConfig.includeUnknownDatatypes());
values[position] = value;
// values[position] = value;
values[position] = new Object[]{ value, Boolean.TRUE };
}
}
return values;
}

private Object[] updatedColumnValues(List<ReplicationMessage.Column> columns, TableId tableId,
boolean refreshSchemaIfChanged, boolean metadataInMessage,
boolean sourceOfToasted, boolean oldValues)
throws SQLException {
if (columns == null || columns.isEmpty()) {
return null;
}
final Table table = schema.tableFor(tableId);
if (table == null) {
schema.dumpTableId();
}
Objects.requireNonNull(table);

// based on the schema columns, create the values on the same position as the columns
List<Column> schemaColumns = table.columns();
// based on the replication message without toasted columns for now
List<ReplicationMessage.Column> columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn))
.collect(Collectors.toList());
// JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT
Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size()
? schemaColumns.size()
: columnsWithoutToasted.size()];

// initialize to unset

final Set<String> undeliveredToastableColumns = new HashSet<>(schema
.getToastableColumnsForTableId(table.id()));
for (ReplicationMessage.Column column : columns) {
// DBZ-298 Quoted column names will be sent like that in messages,
// but stored unquoted in the column names
final String columnName = Strings.unquoteIdentifierPart(column.getName());
undeliveredToastableColumns.remove(columnName);

int position = getPosition(columnName, table, values);
if (position != -1) {
Object value = column.getValue(() -> (BaseConnection) connection.connection(),
connectorConfig.includeUnknownDatatypes());

values[position] = new Object[]{ value, Boolean.TRUE };
}
}
return values;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;
import org.yb.client.*;
import org.yb.master.MasterDdlOuterClass;
import org.yb.master.MasterTypes;

import com.google.common.net.HostAndPort;

Expand All @@ -31,7 +32,6 @@
import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import org.yb.master.MasterTypes;

/**
* A Kafka Connect source connector that creates tasks which use YugabyteDB CDC API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ public ChangeEventSourceCoordinator<YugabyteDBPartition, YugabyteDBOffsetContext
final Map<YBPartition, YugabyteDBOffsetContext> previousOffsets = getPreviousOffsetss(new YugabyteDBPartition.Provider(connectorConfig),
new YugabyteDBOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
final Set<YugabyteDBOffsetContext> previousOffset = new HashSet<>(previousOffsets.values());
YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffset, connectorConfig);

YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffsets, connectorConfig);

LoggingContext.PreviousContext previousContext = taskContext
.configureLoggingContext(CONTEXT_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,42 +72,56 @@ private YugabyteDBOffsetContext(YugabyteDBConnectorConfig connectorConfig,
this.incrementalSnapshotContext = incrementalSnapshotContext;
}

public YugabyteDBOffsetContext(Set<YugabyteDBOffsetContext> s,
public YugabyteDBOffsetContext(Map<YBPartition, YugabyteDBOffsetContext> previousOffsets,
YugabyteDBConnectorConfig config) {
this.tabletSourceInfo = new ConcurrentHashMap();
this.sourceInfo = new SourceInfo(config);
this.sourceInfoSchema = sourceInfo.schema();
for (YugabyteDBOffsetContext context : s) {
if (context != null) {
LOGGER.debug("Populating the tabletsourceinfo" + context.getTabletSourceInfo());
if (context.getTabletSourceInfo() != null) {
this.tabletSourceInfo.putAll(context.getTabletSourceInfo());
}

for (Map.Entry<YBPartition, YugabyteDBOffsetContext> context : previousOffsets.entrySet()) {
YugabyteDBOffsetContext c = context.getValue();
if (c != null) {
this.lastCompletelyProcessedLsn = c.lastCompletelyProcessedLsn;
this.lastCommitLsn = c.lastCommitLsn;
String tabletId = context.getKey().getSourcePartition().values().stream().findAny().get();
initSourceInfo(tabletId, config);
this.updateWalPosition(tabletId,
this.lastCommitLsn, lastCompletelyProcessedLsn, null, null, null, null);
}
}
LOGGER.debug("Populating the tabletsourceinfo with " + this.getTabletSourceInfo());
this.transactionContext = new TransactionContext();
this.incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>();
}

public static YugabyteDBOffsetContext initialContextForSnapshot(YugabyteDBConnectorConfig connectorConfig,
YugabyteDBConnection jdbcConnection,
Clock clock,
Set<YBPartition> partitions) {
return initialContext(connectorConfig, jdbcConnection, clock, new OpId(-1, -1, "".getBytes(), -1, 0),
new OpId(-1, -1, "".getBytes(), -1, 0), partitions);
}

public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig,
YugabyteDBConnection jdbcConnection,
Clock clock) {
return initialContext(connectorConfig, jdbcConnection, clock, null,
null);
Clock clock,
Set<YBPartition> partitions) {
return initialContext(connectorConfig, jdbcConnection, clock, new OpId(0, 0, "".getBytes(), 0, 0),
new OpId(0, 0, "".getBytes(), 0, 0), partitions);
}

public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig,
YugabyteDBConnection jdbcConnection,
Clock clock,
OpId lastCommitLsn,
OpId lastCompletelyProcessedLsn) {

OpId lastCompletelyProcessedLsn,
Set<YBPartition> partitions) {
LOGGER.info("Creating initial offset context");
final OpId lsn = null; // OpId.valueOf(jdbcConnection.currentXLogLocation());
// TODO:Suranjan read the offset for each of the tablet
final long txId = 0L;// new OpId(0,0,"".getBytes(), 0);
LOGGER.info("Read checkpoint at '{}' ", lsn, txId);
return new YugabyteDBOffsetContext(
YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(
connectorConfig,
lsn,
lastCompletelyProcessedLsn,
Expand All @@ -118,7 +132,13 @@ public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig c
false,
new TransactionContext(),
new SignalBasedIncrementalSnapshotContext<>());

for (YBPartition p : partitions) {
if (context.getTabletSourceInfo().get(p.getTabletId()) == null) {
context.initSourceInfo(p.getTabletId(), connectorConfig);
context.updateWalPosition(p.getTabletId(), lastCommitLsn, lastCompletelyProcessedLsn, clock.currentTimeAsInstant(), String.valueOf(txId), null, null);
}
}
return context;
}

@Override
Expand Down Expand Up @@ -241,6 +261,10 @@ OpId lastCompletelyProcessedLsn() {
return lastCompletelyProcessedLsn;
}

OpId lastCompletelyProcessedLsn(String tabletId) {
return lastCompletelyProcessedLsn;
}

OpId lastCommitLsn() {
return lastCommitLsn;
}
Expand Down Expand Up @@ -273,7 +297,8 @@ public String toString() {
+ ", lastCommitLsn=" + lastCommitLsn
+ ", streamingStoppingLsn=" + streamingStoppingLsn
+ ", transactionContext=" + transactionContext
+ ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]";
+ ", incrementalSnapshotContext=" + incrementalSnapshotContext
+ ", tabletSourceInfo=" + tabletSourceInfo + "]";
}

public OffsetState asOffsetState() {
Expand Down Expand Up @@ -333,31 +358,18 @@ private String readOptionalString(Map<String, ?> offset, String key) {
public YugabyteDBOffsetContext load(Map<String, ?> offset) {

LOGGER.debug("The offset being loaded in YugabyteDBOffsetContext.. " + offset);

/*
* final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY));
* final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset,
* LAST_COMPLETELY_PROCESSED_LSN_KEY));
* final OpId lastCommitLsn = OpId.valueOf(readOptionalString(offset,
* LAST_COMPLETELY_PROCESSED_LSN_KEY));
* final String txId = readOptionalString(offset, SourceInfo.TXID_KEY);
*
* final Instant useconds = Conversions.toInstantFromMicros((Long) offset
* .get(SourceInfo.TIMESTAMP_USEC_KEY));
* final boolean snapshot = (boolean) ((Map<String, Object>) offset)
* .getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
* final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset)
* .getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE);
* return new YugabyteDBOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn,
* lastCommitLsn, txId, useconds, snapshot, lastSnapshotRecord,
* TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext
* .load(offset));
*/
OpId lastCompletelyProcessedLsn;
if (offset != null) {
lastCompletelyProcessedLsn = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
}
else {
lastCompletelyProcessedLsn = new OpId(0, 0, "".getBytes(), 0, 0);
}

return new YugabyteDBOffsetContext(connectorConfig,
new OpId(0, 0, null, 0, 0),
new OpId(0, 0, null, 0, 0),
new OpId(0, 0, null, 0, 0),
lastCompletelyProcessedLsn,
lastCompletelyProcessedLsn,
lastCompletelyProcessedLsn,
"txId", Instant.MIN, false, false,
TransactionContext.load(offset),
SignalBasedIncrementalSnapshotContext.load(offset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected YugabyteDBSchema(YugabyteDBConnectorConfig config, YugabyteDBTypeRegis

private static TableSchemaBuilder getTableSchemaBuilder(YugabyteDBConnectorConfig config,
YugabyteDBValueConverter valueConverter) {
return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(),
return new YBTableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(),
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getSanitizeFieldNames());
}
Expand Down
Loading