From 003a5c31c1ab01eb03cc2be84a6f5942c1f851ac Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Thu, 24 Feb 2022 11:27:42 +0530 Subject: [PATCH 01/21] support for snapshot --- .../YugabyteDBChangeRecordEmitter.java | 2 + .../YugabyteDBSnapshotChangeEventSource.java | 238 +----------------- .../YugabyteDBStreamingChangeEventSource.java | 51 ++-- .../connector/yugabytedb/connection/OpId.java | 2 +- .../connection/ReplicationMessage.java | 1 + .../pgproto/YbProtoReplicationMessage.java | 4 +- .../yugabytedb/YugabyteDBConnectorIT.java | 29 +++ 7 files changed, 65 insertions(+), 262 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index ebc0bf69c10..e5e08705468 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -99,6 +99,8 @@ protected Operation getOperation() { return Operation.UPDATE; case DELETE: return Operation.DELETE; + case READ: + return Operation.READ; case TRUNCATE: return Operation.TRUNCATE; default: diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 2dc68b1886d..4eeb7078ea8 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -21,10 +21,7 @@ import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.relational.RelationalSnapshotChangeEventSource; -import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.schema.SchemaChangeEvent; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; import io.debezium.util.Strings; @@ -111,166 +108,7 @@ public SnapshotResult doExecute( SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { - final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; - - // Connection connection = null; - try { - LOGGER.info("Snapshot step 1 - Preparing"); - - if (previousOffset != null && previousOffset.isSnapshotRunning()) { - LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken."); - } - - // connection = createSnapshotConnection(); - // connectionCreated(ctx); - - LOGGER.info("Snapshot step 2 - Determining captured tables"); - - // Note that there's a minor race condition here: a new table matching the filters could be created between - // this call and the determination of the initial snapshot position below; this seems acceptable, though - determineCapturedTables(ctx); - snapshotProgressListener.monitoredDataCollectionsDetermined(ctx.capturedTables); - - LOGGER.info("Snapshot step 3 - Locking captured tables {}", ctx.capturedTables); - - if (snapshottingTask.snapshotSchema()) { - // lockTablesForSchemaSnapshot(context, ctx); - } - - LOGGER.info("Snapshot step 4 - Determining snapshot offset"); - determineSnapshotOffset(ctx, previousOffset); - - LOGGER.info("Snapshot step 5 - Reading structure of captured tables"); - readTableStructure(context, ctx, previousOffset); - - if (snapshottingTask.snapshotSchema()) { - LOGGER.info("Snapshot step 6 - Persisting schema history"); - - // createSchemaChangeEventsForTables(context, ctx, snapshottingTask); - - // if we've been interrupted before, the TX rollback will cause any locks to be released - releaseSchemaSnapshotLocks(ctx); - } - else { - LOGGER.info("Snapshot step 6 - Skipping persisting of schema history"); - } - - if (snapshottingTask.snapshotData()) { - LOGGER.info("Snapshot step 7 - Snapshotting data"); - createDataEvents(context, ctx); - } - else { - LOGGER.info("Snapshot step 7 - Skipping snapshotting of data"); - // releaseDataSnapshotLocks(ctx); - ctx.offset.preSnapshotCompletion(); - ctx.offset.postSnapshotCompletion(); - } - - // postSnapshot(); - dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset); - return SnapshotResult.completed(ctx.offset); - } - finally { - // rollbackTransaction(connection); - } - } - - private void createDataEvents(ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) - throws Exception { - EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); - // tryStartingSnapshot(snapshotContext); - - final int tableCount = snapshotContext.capturedTables.size(); - int tableOrder = 1; - LOGGER.info("Snapshotting contents of {} tables while still in transaction", tableCount); - for (Iterator tableIdIterator = snapshotContext.capturedTables.iterator(); tableIdIterator.hasNext();) { - final TableId tableId = tableIdIterator.next(); - snapshotContext.lastTable = !tableIdIterator.hasNext(); - - if (!sourceContext.isRunning()) { - throw new InterruptedException("Interrupted while snapshotting table " + tableId); - } - - LOGGER.debug("Snapshotting table {}", tableId); - - createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, - snapshotContext.tables.forTable(tableId), tableOrder++, tableCount); - } - - // releaseDataSnapshotLocks(snapshotContext); - snapshotContext.offset.preSnapshotCompletion(); - snapshotReceiver.completeSnapshot(); - snapshotContext.offset.postSnapshotCompletion(); - } - - private void createDataEventsForTable(ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, int tableOrder, - int tableCount) - throws InterruptedException { - - long exportStart = clock.currentTimeInMillis(); - LOGGER.info("Exporting data from table '{}' ({} of {} tables)", table.id(), tableOrder, tableCount); - - // final Optional selectStatement = determineSnapshotSelect(snapshotContext, table.id()); - if (true/* !selectStatement.isPresent() */) { - LOGGER.warn("For table '{}' the select statement was not provided, skipping table", table.id()); - snapshotProgressListener.dataCollectionSnapshotCompleted(table.id(), 0); - return; - } - // LOGGER.info("\t For table '{}' using select statement: '{}'", table.id(), selectStatement.get()); - final OptionalLong rowCount = OptionalLong.empty();// rowCountForTable(table.id()); - - // try (Statement statement = readTableStatement(rowCount); - // ResultSet rs = statement.executeQuery(selectStatement.get())) { - - // ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); - long rows = 0; - // Threads.Timer logTimer = getTableScanLogTimer(); - snapshotContext.lastRecordInTable = false; - - // if (rs.next()) { - // while (!snapshotContext.lastRecordInTable) { - // if (!sourceContext.isRunning()) { - // throw new InterruptedException("Interrupted while snapshotting table " + table.id()); - // } - // - // rows++; - // final Object[] row = jdbcConnection.rowToArray(table, schema(), rs, columnArray); - // - // snapshotContext.lastRecordInTable = !rs.next(); - // if (logTimer.expired()) { - // long stop = clock.currentTimeInMillis(); - // if (rowCount.isPresent()) { - // LOGGER.info("\t Exported {} of {} records for table '{}' after {}", rows, rowCount.getAsLong(), - // table.id(), Strings.duration(stop - exportStart)); - // } - // else { - // LOGGER.info("\t Exported {} records for table '{}' after {}", rows, table.id(), - // Strings.duration(stop - exportStart)); - // } - // snapshotProgressListener.rowsScanned(table.id(), rows); - // logTimer = getTableScanLogTimer(); - // } - // - // if (snapshotContext.lastTable && snapshotContext.lastRecordInTable) { - // lastSnapshotRecord(snapshotContext); - // } - // dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver); - // } - // } - // else if (snapshotContext.lastTable) { - // lastSnapshotRecord(snapshotContext); - // } - - LOGGER.info("\t Finished exporting {} records for table '{}'; total duration '{}'", rows, - table.id(), Strings.duration(clock.currentTimeInMillis() - exportStart)); - snapshotProgressListener.dataCollectionSnapshotCompleted(table.id(), rows); - // } - // catch (SQLException e) { - // throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); - // } + return SnapshotResult.completed(previousOffset); } @Override @@ -296,42 +134,12 @@ protected SnapshotContext prepare( return new PostgresSnapshotContext(partition, connectorConfig.databaseName()); } - // @Override - // protected void connectionCreated(RelationalSnapshotContext snapshotContext) - // throws Exception { - // // If using catch up streaming, the connector opens the transaction that the snapshot will eventually use - // // before the catch up streaming starts. By looking at the current wal location, the transaction can determine - // // where the catch up streaming should stop. The transaction is held open throughout the catch up - // // streaming phase so that the snapshot is performed from a consistent view of the data. Since the isolation - // // level on the transaction used in catch up streaming has already set the isolation level and executed - // // statements, the transaction does not need to get set the level again here. - // if (snapshotter.shouldStreamEventsStartingFromSnapshot() && startingSlotInfo == null) { - // setSnapshotTransactionIsolationLevel(); - // } - // schema.refresh(jdbcConnection, false); - // } - protected Set getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx) throws Exception { // return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[]{ "TABLE" }); return new HashSet<>(); } - // @Override - // protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, - // RelationalSnapshotContext snapshotContext) - // throws SQLException, InterruptedException { - // final Duration lockTimeout = connectorConfig.snapshotLockTimeout(); - // final Optional lockStatement = snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables); - // - // if (lockStatement.isPresent()) { - // LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", lockTimeout.getSeconds()); - // jdbcConnection.executeWithoutCommitting(lockStatement.get()); - // // now that we have the locks, refresh the schema - // schema.refresh(jdbcConnection, false); - // } - // } - protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException { } @@ -391,50 +199,6 @@ private OpId getTransactionStartLsn() throws SQLException { return null;// OpId.valueOf(jdbcConnection.currentXLogLocation()); } - protected void readTableStructure(ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - YugabyteDBOffsetContext offsetContext) - throws SQLException, InterruptedException { - Set schemas = snapshotContext.capturedTables.stream() - .map(TableId::schema) - .collect(Collectors.toSet()); - - // reading info only for the schemas we're interested in as per the set of captured tables; - // while the passed table name filter alone would skip all non-included tables, reading the schema - // would take much longer that way - // for (String schema : schemas) { - // if (!sourceContext.isRunning()) { - // throw new InterruptedException("Interrupted while reading structure of schema " + schema); - // } - // - // LOGGER.info("Reading structure of schema '{}'", snapshotContext.catalogName); - // jdbcConnection.readSchema( - // snapshotContext.tables, - // snapshotContext.catalogName, - // schema, - // connectorConfig.getTableFilters().dataCollectionFilter(), - // null, - // false); - // } - // schema.refresh(jdbcConnection, false); - } - - protected SchemaChangeEvent getCreateTableEvent( - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - Table table) - throws SQLException { - return new SchemaChangeEvent( - snapshotContext.partition.getSourcePartition(), - snapshotContext.offset.getOffset(), - snapshotContext.offset.getSourceInfo(), - snapshotContext.catalogName, - table.id().schema(), - null, - table, - SchemaChangeEventType.CREATE, - true); - } - @Override protected void complete(SnapshotContext snapshotContext) { snapshotter.snapshotCompleted(); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 78f9bd402bc..52a1071c1e0 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -121,7 +121,7 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit // replication slot could exist at the time of starting Debezium so // we will stream from the position in the slot // instead of the last position in the database - boolean hasStartLsnStoredInContext = offsetContext != null; + boolean hasStartLsnStoredInContext = offsetContext != null && !offsetContext.getTabletSourceInfo().isEmpty(); if (!hasStartLsnStoredInContext) { offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock); @@ -141,22 +141,10 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit LOGGER.info("No previous LSN found in Kafka, streaming from the latest checkpoint" + " in YugabyteDB"); walPosition = new WalPositionLocator(); + // create snpashot offset. + // replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition)); } - // for large dbs, the refresh of schema can take too much time - // such that the connection times out. We must enable keep - // alive to ensure that it doesn't time out - // ReplicationStream stream = this.replicationStream.get(); - // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); - - // refresh the schema so we have a latest view of the DB tables - // taskContext.refreshSchema(connection, true); - - // If we need to do a pre-snapshot streaming catch up, we should allow the snapshot transaction to persist - // but normally we want to start streaming without any open transactions. - // if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // connection.commit(); - // } // this.lastCompletelyProcessedLsn = replicationStream.get().startLsn(); @@ -178,7 +166,7 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); // } // processMessages(context, partition, offsetContext, stream); - getChanges2(context, partition, offsetContext); + getChanges(context, partition, offsetContext, hasStartLsnStoredInContext); } catch (Throwable e) { errorHandler.setProducerThrowable(e); @@ -236,13 +224,13 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit } private GetChangesResponse getChangeResponse(YugabyteDBOffsetContext offsetContext) throws Exception { - return null; } - private void getChanges2(ChangeEventSourceContext context, - YugabyteDBPartition partitionn, - YugabyteDBOffsetContext offsetContext) + private void getChanges(ChangeEventSourceContext context, + YugabyteDBPartition partitionn, + YugabyteDBOffsetContext offsetContext, + boolean previousOffsetPresent) throws Exception { LOGGER.debug("The offset is " + offsetContext.getOffset()); @@ -278,8 +266,13 @@ private void getChanges2(ChangeEventSourceContext context, for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); + + if (offsetContext.lsn(tabletId).equals(new OpId(0, 0, null, 0, 0))) { + offsetContext.getSourceInfo(tabletId) + .updateLastCommit(new OpId(-1, -1, "".getBytes(), -1, 0)); + } } - LOGGER.debug("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); + LOGGER.info("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); while (context.isRunning() && (offsetContext.getStreamingStoppingLsn() == null || (lastCompletelyProcessedLsn.compareTo(offsetContext.getStreamingStoppingLsn()) < 0))) { @@ -296,18 +289,19 @@ private void getChanges2(ChangeEventSourceContext context, OpId cp = offsetContext.lsn(tabletId); // GetChangesResponse response = getChangeResponse(offsetContext); - LOGGER.debug("Going to fetch for tablet " + tabletId + " from OpId " + cp + " " + + LOGGER.info("Going to fetch for tablet " + tabletId + " from OpId " + cp + " " + "table " + table.getName()); GetChangesResponse response = this.syncClient.getChangesCDCSDK( table, streamId, tabletId, cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime()); - boolean receivedMessage = response.getResp().getCdcSdkRecordsCount() != 0; + boolean receivedMessage = false; // response.getResp().getCdcSdkRecordsCount() != 0; for (CdcService.CDCSDKProtoRecordPB record : response .getResp() .getCdcSdkProtoRecordsList()) { + LOGGER.info("SKSK the recrds are " + record); CdcService.RowMessage m = record.getRowMessage(); YbProtoReplicationMessage message = new YbProtoReplicationMessage( m, this.yugabyteDBTypeRegistry); @@ -403,6 +397,17 @@ else if (message.isDDLMessage()) { } } + // if (cp.equals(new OpId(-1, -1, "".getBytes(), -1, 0))) { + OpId finalOpid = new OpId( + response.getTerm(), + response.getIndex(), + response.getKey(), + response.getWriteId(), + response.getSnapshotTime()); + offsetContext.getSourceInfo(tabletId) + .updateLastCommit(finalOpid); + // } + probeConnectionIfNeeded(); if (receivedMessage) { diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java index 8e9e86b7d4f..f75feef0b03 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/OpId.java @@ -91,7 +91,7 @@ public boolean equals(Object o) { return false; OpId that = (OpId) o; return term == that.term && index == that.index && time == that.time - && write_id == that.write_id && Objects.equal(key, that.key); + && write_id == that.write_id && Arrays.equals(key, that.key); } @Override diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java index 65e4ff5bda9..aa941f49ffe 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/ReplicationMessage.java @@ -47,6 +47,7 @@ public enum Operation { BEGIN, COMMIT, DDL, + READ, NOOP } diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java index e2575a56440..1410e0dc1e0 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/connection/pgproto/YbProtoReplicationMessage.java @@ -28,7 +28,7 @@ /** * Replication message representing message sent by Postgres Decoderbufs * - * @author Jiri Pechanec + * @author Suranjan Kumar */ public class YbProtoReplicationMessage implements ReplicationMessage { @@ -52,6 +52,8 @@ public Operation getOperation() { return Operation.UPDATE; case DELETE: return Operation.DELETE; + case READ: + return Operation.READ; case BEGIN: return Operation.BEGIN; case COMMIT: diff --git a/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorIT.java b/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorIT.java index 4f8e680f928..4ce3dbdd0c4 100644 --- a/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorIT.java +++ b/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorIT.java @@ -2097,6 +2097,34 @@ private void assertFieldAbsent(SourceRecord record, String fieldName) { } } + // SURANJAN + @Test + public void testSsnapshotPerformance() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + Thread.sleep(1000); + Configuration.Builder configBuilder = TestHelper.defaultConfig() + // .with + .with(YugabyteDBConnectorConfig.HOSTNAME, "127.0.0.1") + .with(YugabyteDBConnectorConfig.PORT, 5433) + .with(YugabyteDBConnectorConfig.MASTER_ADDRESSES, "127.0.0.1:7100") + .with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) + .with(YugabyteDBConnectorConfig.DELETE_STREAM_ON_STOP, Boolean.TRUE) + .with(YugabyteDBConnectorConfig.AUTO_CREATE_STREAM, Boolean.TRUE) + .with(YugabyteDBConnectorConfig.TABLE_INCLUDE_LIST, "public.t1" /* + ",public.t2" */); + // .with(YugabyteDBConnectorConfig.STREAM_ID, "3ec5241cea9c44d9a891245c357f0533"); + start(YugabyteDBConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + final long recordsCount = 100000; + // final int batchSize = 10; + + // batchInsertRecords(recordsCount, batchSize); + CompletableFuture.runAsync(() -> consumeRecords(recordsCount)) + .exceptionally(throwable -> { + throw new RuntimeException(throwable); + }).get(); + } + // SURANJAN @Test public void testStreamingPerformance() throws Exception { @@ -2110,6 +2138,7 @@ public void testStreamingPerformance() throws Exception { .with(YugabyteDBConnectorConfig.MASTER_ADDRESSES, "127.0.0.1:7100") .with(YugabyteDBConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) .with(YugabyteDBConnectorConfig.DELETE_STREAM_ON_STOP, Boolean.TRUE) + .with(YugabyteDBConnectorConfig.AUTO_CREATE_STREAM, Boolean.TRUE) .with(YugabyteDBConnectorConfig.TABLE_INCLUDE_LIST, "public.t1" /* + ",public.t2" */); // .with(YugabyteDBConnectorConfig.STREAM_ID, "3ec5241cea9c44d9a891245c357f0533"); start(YugabyteDBConnector.class, configBuilder.build()); From d3b340ffaddb4740bd1411bc425e868a418c2836 Mon Sep 17 00:00:00 2001 From: Isha Amoncar Date: Mon, 28 Feb 2022 04:59:42 +0000 Subject: [PATCH 02/21] Fixed previous offset --- .../yugabytedb/YugabyteDBOffsetContext.java | 15 +++++++++------ .../YugabyteDBSnapshotChangeEventSource.java | 3 +-- .../YugabyteDBStreamingChangeEventSource.java | 3 +++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 851b02af861..40916aa9033 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -60,7 +60,7 @@ private YugabyteDBOffsetContext(YugabyteDBConnectorConfig connectorConfig, // sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin()); sourceInfo.updateLastCommit(lastCommitLsn); sourceInfoSchema = sourceInfo.schema(); - + this.lastSnapshotRecord = lastSnapshotRecord; if (this.lastSnapshotRecord) { postSnapshotCompletion(); @@ -79,6 +79,8 @@ public YugabyteDBOffsetContext(Set s, this.sourceInfoSchema = sourceInfo.schema(); for (YugabyteDBOffsetContext context : s) { if (context != null) { + this.lastCompletelyProcessedLsn = context.lastCompletelyProcessedLsn; + this.lastCommitLsn = context.lastCommitLsn; LOGGER.debug("Populating the tabletsourceinfo" + context.getTabletSourceInfo()); if (context.getTabletSourceInfo() != null) { this.tabletSourceInfo.putAll(context.getTabletSourceInfo()); @@ -273,7 +275,8 @@ public String toString() { + ", lastCommitLsn=" + lastCommitLsn + ", streamingStoppingLsn=" + streamingStoppingLsn + ", transactionContext=" + transactionContext - + ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]"; + + ", incrementalSnapshotContext=" + incrementalSnapshotContext + + ", tabletSourceInfo=" + tabletSourceInfo + "]"; } public OffsetState asOffsetState() { @@ -333,7 +336,7 @@ private String readOptionalString(Map offset, String key) { public YugabyteDBOffsetContext load(Map offset) { LOGGER.debug("The offset being loaded in YugabyteDBOffsetContext.. " + offset); - + OpId opid1 = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); /* * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); * final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset, @@ -355,9 +358,9 @@ public YugabyteDBOffsetContext load(Map offset) { */ return new YugabyteDBOffsetContext(connectorConfig, - new OpId(0, 0, null, 0, 0), - new OpId(0, 0, null, 0, 0), - new OpId(0, 0, null, 0, 0), + opid1, + opid1, + opid1, "txId", Instant.MIN, false, false, TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset)); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 2dc68b1886d..151ec1bec18 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -112,7 +112,6 @@ public SnapshotResult doExecute( SnapshottingTask snapshottingTask) throws Exception { final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; - // Connection connection = null; try { LOGGER.info("Snapshot step 1 - Preparing"); @@ -168,7 +167,7 @@ public SnapshotResult doExecute( // postSnapshot(); dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset); - return SnapshotResult.completed(ctx.offset); + return SnapshotResult.completed(previousOffset); } finally { // rollbackTransaction(connection); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 78f9bd402bc..d1d191624e9 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -278,6 +278,9 @@ private void getChanges2(ChangeEventSourceContext context, for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); + if(offsetContext.lsn(tabletId).equals(new OpId(0,0,null,0,0))){ + offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); + } } LOGGER.debug("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); From 2552b0ba381c52928fa9dced55ee3294817b5eb6 Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Mon, 28 Feb 2022 13:15:30 +0530 Subject: [PATCH 03/21] fix for snapshot --- .../YugabyteDBChangeRecordEmitter.java | 37 +++++++------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index e5e08705468..06d60fd2f56 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -5,23 +5,6 @@ */ package io.debezium.connector.yugabytedb; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.header.ConnectHeaders; -import org.postgresql.core.BaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.connector.yugabytedb.connection.ReplicationMessage; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import io.debezium.data.Envelope.Operation; @@ -29,16 +12,20 @@ import io.debezium.pipeline.spi.ChangeRecordEmitter; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; -import io.debezium.relational.Column; -import io.debezium.relational.ColumnEditor; -import io.debezium.relational.RelationalChangeRecordEmitter; -import io.debezium.relational.Table; -import io.debezium.relational.TableEditor; -import io.debezium.relational.TableId; -import io.debezium.relational.TableSchema; +import io.debezium.relational.*; import io.debezium.schema.DataCollectionSchema; import io.debezium.util.Clock; import io.debezium.util.Strings; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.postgresql.core.BaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.*; +import java.util.stream.Collectors; /** * Emits change data based on a logical decoding event coming as protobuf or JSON message. @@ -151,6 +138,8 @@ protected Object[] getNewColumnValues() { case UPDATE: // todo vaibhav: add scenario for the case of multiple columns being updated return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); + case READ: + return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); default: return null; } From b983a325abede26195e0368c1a9a9e32f15c730a Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Mon, 28 Feb 2022 13:20:51 +0530 Subject: [PATCH 04/21] fix for snapshot --- .../connector/yugabytedb/YugabyteDBChangeRecordEmitter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index 06d60fd2f56..4a39cb5f531 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -117,6 +117,8 @@ protected Object[] getOldColumnValues() { return null; case UPDATE: return null; + case READ: + return null; // return columnValues(message.getOldTupleList(), tableId, true, // message.hasTypeMetadata(), true, true); default: From 07ead05b4f033f4fa7774d53145f790fafe389bd Mon Sep 17 00:00:00 2001 From: Isha Amoncar Date: Tue, 1 Mar 2022 09:07:24 +0000 Subject: [PATCH 05/21] Addressed review comments --- .../yugabytedb/YugabyteDBOffsetContext.java | 14 ++++++++++---- .../YugabyteDBStreamingChangeEventSource.java | 4 +--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 40916aa9033..ca1581a5f19 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -336,7 +336,13 @@ private String readOptionalString(Map offset, String key) { public YugabyteDBOffsetContext load(Map offset) { LOGGER.debug("The offset being loaded in YugabyteDBOffsetContext.. " + offset); - OpId opid1 = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); + OpId lastCompletelyProcessedLsn; + if(offset != null){ + lastCompletelyProcessedLsn = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); + } + else{ + lastCompletelyProcessedLsn = new OpId(0, 0, null, 0, 0); + } /* * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); * final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset, @@ -358,9 +364,9 @@ public YugabyteDBOffsetContext load(Map offset) { */ return new YugabyteDBOffsetContext(connectorConfig, - opid1, - opid1, - opid1, + lastCompletelyProcessedLsn, + lastCompletelyProcessedLsn, + lastCompletelyProcessedLsn, "txId", Instant.MIN, false, false, TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset)); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 2fb4c517fc1..41a4c8a7813 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -278,9 +278,7 @@ private void getChanges2(ChangeEventSourceContext context, for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); - if(offsetContext.lsn(tabletId).equals(new OpId(0,0,null,0,0))){ - offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); - } + offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); } LOGGER.debug("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); From 064314cce1e9d1fb6ae5f67ef87cf39f500c06f1 Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 2 Mar 2022 15:28:33 +0530 Subject: [PATCH 06/21] fix --- .../yugabytedb/YugabyteDBConnectorTask.java | 4 +- .../yugabytedb/YugabyteDBOffsetContext.java | 46 ++++++++++--------- .../YugabyteDBStreamingChangeEventSource.java | 24 ++++++++-- 3 files changed, 45 insertions(+), 29 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java index a7a1a443e85..33f99c4be96 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConnectorTask.java @@ -158,8 +158,8 @@ public ChangeEventSourceCoordinator previousOffsets = getPreviousOffsetss(new YugabyteDBPartition.Provider(connectorConfig), new YugabyteDBOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); - final Set previousOffset = new HashSet<>(previousOffsets.values()); - YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffset, connectorConfig); + + YugabyteDBOffsetContext context = new YugabyteDBOffsetContext(previousOffsets, connectorConfig); LoggingContext.PreviousContext previousContext = taskContext .configureLoggingContext(CONTEXT_NAME); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index a9349f43b22..057fc01ffcd 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -5,17 +5,6 @@ */ package io.debezium.connector.yugabytedb; -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.connector.SnapshotRecord; import io.debezium.connector.yugabytedb.connection.OpId; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; @@ -28,6 +17,15 @@ import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; import io.debezium.util.Clock; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class YugabyteDBOffsetContext implements OffsetContext { public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; @@ -72,21 +70,21 @@ private YugabyteDBOffsetContext(YugabyteDBConnectorConfig connectorConfig, this.incrementalSnapshotContext = incrementalSnapshotContext; } - public YugabyteDBOffsetContext(Set s, + public YugabyteDBOffsetContext(Map previousOffsets, YugabyteDBConnectorConfig config) { this.tabletSourceInfo = new ConcurrentHashMap(); this.sourceInfo = new SourceInfo(config); this.sourceInfoSchema = sourceInfo.schema(); - for (YugabyteDBOffsetContext context : s) { - if (context != null) { - this.lastCompletelyProcessedLsn = context.lastCompletelyProcessedLsn; - this.lastCommitLsn = context.lastCommitLsn; - LOGGER.debug("Populating the tabletsourceinfo" + context.getTabletSourceInfo()); - if (context.getTabletSourceInfo() != null) { - this.tabletSourceInfo.putAll(context.getTabletSourceInfo()); - } - } + + for (Map.Entry context : previousOffsets.entrySet()) { + this.lastCompletelyProcessedLsn = context.getValue().lastCompletelyProcessedLsn; + this.lastCommitLsn = context.getValue().lastCommitLsn; + String tabletId = context.getKey().getSourcePartition().values().stream().findAny().get(); + initSourceInfo(tabletId, config); + this.updateWalPosition(tabletId, + this.lastCommitLsn, lastCompletelyProcessedLsn, null, null, null, null); } + LOGGER.debug("Populating the tabletsourceinfo with " + this.getTabletSourceInfo()); this.transactionContext = new TransactionContext(); this.incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>(); } @@ -243,6 +241,10 @@ OpId lastCompletelyProcessedLsn() { return lastCompletelyProcessedLsn; } + OpId lastCompletelyProcessedLsn(String tabletId) { + return lastCompletelyProcessedLsn; + } + OpId lastCommitLsn() { return lastCommitLsn; } @@ -341,7 +343,7 @@ public YugabyteDBOffsetContext load(Map offset) { lastCompletelyProcessedLsn = OpId.valueOf((String) offset.get(YugabyteDBOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); } else { - lastCompletelyProcessedLsn = new OpId(0, 0, null, 0, 0); + lastCompletelyProcessedLsn = new OpId(0, 0, "".getBytes(), 0, 0); } /* * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index ea3054b7ed1..43aa378c243 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -113,19 +113,18 @@ public YugabyteDBStreamingChangeEventSource(YugabyteDBConnectorConfig connectorC @Override public void execute(ChangeEventSourceContext context, YugabyteDBPartition partition, YugabyteDBOffsetContext offsetContext) { - if (!snapshotter.shouldStream()) { - LOGGER.info("Streaming is not enabled in correct configuration"); - return; - } - // replication slot could exist at the time of starting Debezium so // we will stream from the position in the slot // instead of the last position in the database boolean hasStartLsnStoredInContext = offsetContext != null && !offsetContext.getTabletSourceInfo().isEmpty(); if (!hasStartLsnStoredInContext) { + LOGGER.info("No start opid found in the context."); offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock); } + /*if (snapshotter.shouldSnapshot()) { + getSnapshotChanges(); + }*/ try { final WalPositionLocator walPosition; @@ -166,6 +165,10 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); // } // processMessages(context, partition, offsetContext, stream); + if (!snapshotter.shouldStream()) { + LOGGER.info("Streaming is not enabled in correct configuration"); + return; + } getChanges(context, partition, offsetContext, hasStartLsnStoredInContext); } catch (Throwable e) { @@ -227,6 +230,14 @@ private GetChangesResponse getChangeResponse(YugabyteDBOffsetContext offsetConte return null; } + private void getSnapshotChanges(ChangeEventSourceContext context, + YugabyteDBPartition partitionn, + YugabyteDBOffsetContext offsetContext, + boolean previousOffsetPresent) { + + + } + private void getChanges(ChangeEventSourceContext context, YugabyteDBPartition partitionn, YugabyteDBOffsetContext offsetContext, @@ -266,7 +277,9 @@ private void getChanges(ChangeEventSourceContext context, for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); + offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); + if (offsetContext.lsn(tabletId).equals(new OpId(0, 0, null, 0, 0))) { offsetContext.getSourceInfo(tabletId) .updateLastCommit(new OpId(-1, -1, "".getBytes(), -1, 0)); @@ -398,6 +411,7 @@ else if (message.isDDLMessage()) { } // if (cp.equals(new OpId(-1, -1, "".getBytes(), -1, 0))) { + OpId finalOpid = new OpId( response.getTerm(), response.getIndex(), From 52dc984b29f73dfc3c89c362acb3ffb3b16a91ea Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 2 Mar 2022 15:30:13 +0530 Subject: [PATCH 07/21] fix --- .../yugabytedb/YugabyteDBOffsetContext.java | 19 ++++++++++--------- .../YugabyteDBStreamingChangeEventSource.java | 15 ++++++++------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 057fc01ffcd..7649e56f469 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -5,6 +5,16 @@ */ package io.debezium.connector.yugabytedb; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.connector.SnapshotRecord; import io.debezium.connector.yugabytedb.connection.OpId; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; @@ -17,15 +27,6 @@ import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; import io.debezium.util.Clock; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public class YugabyteDBOffsetContext implements OffsetContext { public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 43aa378c243..cc6ac6aba1a 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -122,9 +122,11 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit LOGGER.info("No start opid found in the context."); offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock); } - /*if (snapshotter.shouldSnapshot()) { - getSnapshotChanges(); - }*/ + /* + * if (snapshotter.shouldSnapshot()) { + * getSnapshotChanges(); + * } + */ try { final WalPositionLocator walPosition; @@ -231,10 +233,9 @@ private GetChangesResponse getChangeResponse(YugabyteDBOffsetContext offsetConte } private void getSnapshotChanges(ChangeEventSourceContext context, - YugabyteDBPartition partitionn, - YugabyteDBOffsetContext offsetContext, - boolean previousOffsetPresent) { - + YugabyteDBPartition partitionn, + YugabyteDBOffsetContext offsetContext, + boolean previousOffsetPresent) { } From 9894a3258aa63b70785730fc798cc4040b3a771d Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 2 Mar 2022 16:00:05 +0530 Subject: [PATCH 08/21] fix --- .../yugabytedb/YugabyteDBOffsetContext.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 7649e56f469..5eea23c9ff1 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -5,16 +5,6 @@ */ package io.debezium.connector.yugabytedb; -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.connector.SnapshotRecord; import io.debezium.connector.yugabytedb.connection.OpId; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; @@ -27,6 +17,15 @@ import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; import io.debezium.util.Clock; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class YugabyteDBOffsetContext implements OffsetContext { public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; @@ -78,12 +77,15 @@ public YugabyteDBOffsetContext(Map previou this.sourceInfoSchema = sourceInfo.schema(); for (Map.Entry context : previousOffsets.entrySet()) { - this.lastCompletelyProcessedLsn = context.getValue().lastCompletelyProcessedLsn; - this.lastCommitLsn = context.getValue().lastCommitLsn; - String tabletId = context.getKey().getSourcePartition().values().stream().findAny().get(); - initSourceInfo(tabletId, config); - this.updateWalPosition(tabletId, - this.lastCommitLsn, lastCompletelyProcessedLsn, null, null, null, null); + YugabyteDBOffsetContext c = context.getValue(); + if (c != null) { + this.lastCompletelyProcessedLsn = c.lastCompletelyProcessedLsn; + this.lastCommitLsn = c.lastCommitLsn; + String tabletId = context.getKey().getSourcePartition().values().stream().findAny().get(); + initSourceInfo(tabletId, config); + this.updateWalPosition(tabletId, + this.lastCommitLsn, lastCompletelyProcessedLsn, null, null, null, null); + } } LOGGER.debug("Populating the tabletsourceinfo with " + this.getTabletSourceInfo()); this.transactionContext = new TransactionContext(); From 47627662b0597b57d33b61fa6e439f7bf2f34b4b Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 2 Mar 2022 19:26:07 +0530 Subject: [PATCH 09/21] fix --- .../connector/yugabytedb/YBPartition.java | 29 +++++++---- .../yugabytedb/YugabyteDBOffsetContext.java | 49 +++++++++++++------ .../YugabyteDBStreamingChangeEventSource.java | 41 ++++++++++------ 3 files changed, 77 insertions(+), 42 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java index b2e8c7fffbc..49cc68c7e63 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBPartition.java @@ -1,25 +1,28 @@ package io.debezium.connector.yugabytedb; -import java.util.*; - -import org.yb.client.*; +import java.util.Map; +import java.util.Objects; import io.debezium.pipeline.spi.Partition; import io.debezium.util.Collect; public class YBPartition implements Partition { - private static final String TABLETS_PARTITION_KEY = "tabletids"; + private static final String TABLET_PARTITION_KEY = "tabletid"; - private final String listOfTablets; + private final String tabletId; - public YBPartition(String listOfTablets) { - this.listOfTablets = listOfTablets; + public YBPartition(String tabletId) { + this.tabletId = tabletId; } @Override public Map getSourcePartition() { - return Collect.hashMapOf(TABLETS_PARTITION_KEY, listOfTablets); + return Collect.hashMapOf(TABLET_PARTITION_KEY, tabletId); + } + + public String getTabletId() { + return this.tabletId; } @Override @@ -31,12 +34,18 @@ public boolean equals(Object obj) { return false; } final YBPartition other = (YBPartition) obj; - return Objects.equals(listOfTablets, other.listOfTablets); + return Objects.equals(tabletId, other.tabletId); } @Override public int hashCode() { - return listOfTablets.hashCode(); + return tabletId.hashCode(); } + @Override + public String toString() { + return "YBPartition{" + + "tabletId='" + tabletId + '\'' + + '}'; + } } diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index 5eea23c9ff1..f35dd34c4e4 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -5,6 +5,17 @@ */ package io.debezium.connector.yugabytedb; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.connector.SnapshotRecord; import io.debezium.connector.yugabytedb.connection.OpId; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; @@ -17,15 +28,6 @@ import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; import io.debezium.util.Clock; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public class YugabyteDBOffsetContext implements OffsetContext { public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; @@ -92,25 +94,34 @@ public YugabyteDBOffsetContext(Map previou this.incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>(); } + public static YugabyteDBOffsetContext initialContextForSnapshot(YugabyteDBConnectorConfig connectorConfig, + YugabyteDBConnection jdbcConnection, + Clock clock, + Set partitions) { + return initialContext(connectorConfig, jdbcConnection, clock, new OpId(-1, -1, "".getBytes(), -1, 0), + new OpId(-1, -1, "".getBytes(), -1, 0), partitions); + } + public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, YugabyteDBConnection jdbcConnection, - Clock clock) { - return initialContext(connectorConfig, jdbcConnection, clock, null, - null); + Clock clock, + Set partitions) { + return initialContext(connectorConfig, jdbcConnection, clock, new OpId(0, 0, "".getBytes(), 0, 0), + new OpId(0, 0, "".getBytes(), 0, 0), partitions); } public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig, YugabyteDBConnection jdbcConnection, Clock clock, OpId lastCommitLsn, - OpId lastCompletelyProcessedLsn) { - + OpId lastCompletelyProcessedLsn, + Set partitions) { LOGGER.info("Creating initial offset context"); final OpId lsn = null; // OpId.valueOf(jdbcConnection.currentXLogLocation()); // TODO:Suranjan read the offset for each of the tablet final long txId = 0L;// new OpId(0,0,"".getBytes(), 0); LOGGER.info("Read checkpoint at '{}' ", lsn, txId); - return new YugabyteDBOffsetContext( + YugabyteDBOffsetContext context = new YugabyteDBOffsetContext( connectorConfig, lsn, lastCompletelyProcessedLsn, @@ -121,7 +132,13 @@ public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig c false, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>()); - + for (YBPartition p : partitions) { + if (context.getTabletSourceInfo().get(p.getTabletId()) == null) { + context.initSourceInfo(p.getTabletId(), connectorConfig); + context.updateWalPosition(p.getTabletId(), lastCommitLsn, lastCompletelyProcessedLsn, clock.currentTimeAsInstant(), String.valueOf(txId), null, null); + } + } + return context; } @Override diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index cc6ac6aba1a..81fc4bb4745 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -116,11 +116,20 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit // replication slot could exist at the time of starting Debezium so // we will stream from the position in the slot // instead of the last position in the database + // Get all partitions + // Get + Set partitions = new YugabyteDBPartition.Provider(connectorConfig).getPartitions(); boolean hasStartLsnStoredInContext = offsetContext != null && !offsetContext.getTabletSourceInfo().isEmpty(); + LOGGER.info("SKSK The offset context is " + offsetContext + " partition is " + partition); if (!hasStartLsnStoredInContext) { LOGGER.info("No start opid found in the context."); - offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock); + if (snapshotter.shouldSnapshot()) { + offsetContext = YugabyteDBOffsetContext.initialContextForSnapshot(connectorConfig, connection, clock, partitions); + } + else { + offsetContext = YugabyteDBOffsetContext.initialContext(connectorConfig, connection, clock, partitions); + } } /* * if (snapshotter.shouldSnapshot()) { @@ -167,10 +176,10 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); // } // processMessages(context, partition, offsetContext, stream); - if (!snapshotter.shouldStream()) { - LOGGER.info("Streaming is not enabled in correct configuration"); - return; - } + // if (!snapshotter.shouldStream()) { + // LOGGER.info("Streaming is not enabled in correct configuration"); + // return; + // } getChanges(context, partition, offsetContext, hasStartLsnStoredInContext); } catch (Throwable e) { @@ -275,17 +284,17 @@ private void getChanges(ChangeEventSourceContext context, } int noMessageIterations = 0; - for (Pair entry : tabletPairList) { - final String tabletId = entry.getValue(); - offsetContext.initSourceInfo(tabletId, this.connectorConfig); - - offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); - - if (offsetContext.lsn(tabletId).equals(new OpId(0, 0, null, 0, 0))) { - offsetContext.getSourceInfo(tabletId) - .updateLastCommit(new OpId(-1, -1, "".getBytes(), -1, 0)); - } - } + // for (Pair entry : tabletPairList) { + // final String tabletId = entry.getValue(); + // offsetContext.initSourceInfo(tabletId, this.connectorConfig); + // + // offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); + // + // if (offsetContext.lsn(tabletId).equals(new OpId(0, 0, null, 0, 0))) { + // offsetContext.getSourceInfo(tabletId) + // .updateLastCommit(new OpId(-1, -1, "".getBytes(), -1, 0)); + // } + // } LOGGER.info("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); while (context.isRunning() && (offsetContext.getStreamingStoppingLsn() == null || From 67a81afce22400305e96d2f3251404202b81b43d Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 10 Mar 2022 19:01:24 +0530 Subject: [PATCH 10/21] changes to fix snapshot modes --- .../yugabytedb/YugabyteDBStreamingChangeEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 81fc4bb4745..22af4bbfc82 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -121,7 +121,7 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit Set partitions = new YugabyteDBPartition.Provider(connectorConfig).getPartitions(); boolean hasStartLsnStoredInContext = offsetContext != null && !offsetContext.getTabletSourceInfo().isEmpty(); - LOGGER.info("SKSK The offset context is " + offsetContext + " partition is " + partition); + // LOGGER.info("SKSK The offset context is " + offsetContext + " partition is " + partition); if (!hasStartLsnStoredInContext) { LOGGER.info("No start opid found in the context."); if (snapshotter.shouldSnapshot()) { From 233dc8453c4a796088eabebc484925bfd4144e6d Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 16 Mar 2022 17:20:11 +0530 Subject: [PATCH 11/21] fixed NullPointerException on restart of connector --- debezium-connector-yugabytedb2/pom.xml | 4 ++-- .../YugabyteDBStreamingChangeEventSource.java | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/debezium-connector-yugabytedb2/pom.xml b/debezium-connector-yugabytedb2/pom.xml index a91fca6ee37..b02439c10db 100644 --- a/debezium-connector-yugabytedb2/pom.xml +++ b/debezium-connector-yugabytedb2/pom.xml @@ -41,12 +41,12 @@ org.yb yb-client - 0.8.15-SNAPSHOT + 0.8.16-SNAPSHOT org.yb yb-client - 0.8.15-SNAPSHOT + 0.8.16-SNAPSHOT test-jar test diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 22af4bbfc82..3ed529a8999 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -283,6 +283,12 @@ private void getChanges(ChangeEventSourceContext context, tableIdToTable.put(tId, table); } + Map schemaStreamed = new HashMap<>(); + // Initialize all the tabletIds with false + for (Pair entry : tabletPairList) { + schemaStreamed.put(entry.getValue(), Boolean.TRUE); + } + int noMessageIterations = 0; // for (Pair entry : tabletPairList) { // final String tabletId = entry.getValue(); @@ -317,7 +323,8 @@ private void getChanges(ChangeEventSourceContext context, GetChangesResponse response = this.syncClient.getChangesCDCSDK( table, streamId, tabletId, - cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime()); + cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime(), schemaStreamed.get(tabletId)); + boolean receivedMessage = false; // response.getResp().getCdcSdkRecordsCount() != 0; @@ -378,6 +385,9 @@ else if (message.isDDLMessage()) { LOGGER.debug("Received DDL message {}", message.getSchema().toString() + " the table is " + message.getTable()); + // Set schema received for this tablet ID + schemaStreamed.put(tabletId, Boolean.FALSE); + TableId tableId = null; if (message.getOperation() != Operation.NOOP) { tableId = YugabyteDBSchema.parseWithSchema(message.getTable(), pgSchemaNameInRecord); @@ -420,6 +430,7 @@ else if (message.isDDLMessage()) { } } + // if (cp.equals(new OpId(-1, -1, "".getBytes(), -1, 0))) { OpId finalOpid = new OpId( From c99649fb6c4117f9b51bc8c9c4ffbccd671ebf25 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 16 Mar 2022 17:21:25 +0530 Subject: [PATCH 12/21] refactored a comment --- .../yugabytedb/YugabyteDBStreamingChangeEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 3ed529a8999..ef0fc7a9418 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -284,7 +284,7 @@ private void getChanges(ChangeEventSourceContext context, } Map schemaStreamed = new HashMap<>(); - // Initialize all the tabletIds with false + // Initialize all the tabletIds with true signifying we need schemas for all the tablets for (Pair entry : tabletPairList) { schemaStreamed.put(entry.getValue(), Boolean.TRUE); } From f6ce7f61b63f8a084efb2aa9bed79d307d31489e Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 16 Mar 2022 19:06:04 +0530 Subject: [PATCH 13/21] fixed NullPointerException across connector restarts --- .../YugabyteDBStreamingChangeEventSource.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 6fe0572696e..57e5fe96d77 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -274,7 +274,12 @@ private void getChanges2(ChangeEventSourceContext context, tableIdToTable.put(tId, table); } - int noMessageIterations = 0; + // todo: rename schemaStreamed to something else + Map schemaStreamed = new HashMap<>(); + for (Pair entry : tabletPairList) { + schemaStreamed.put(entry.getValue(), Boolean.TRUE); + } + for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); offsetContext.initSourceInfo(tabletId, this.connectorConfig); @@ -301,7 +306,7 @@ private void getChanges2(ChangeEventSourceContext context, GetChangesResponse response = this.syncClient.getChangesCDCSDK( table, streamId, tabletId, - cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime()); + cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime(), schemaStreamed.get(tabletId)); for (CdcService.CDCSDKProtoRecordPB record : response .getResp() @@ -359,6 +364,9 @@ else if (message.isDDLMessage()) { LOGGER.debug("Received DDL message {}", message.getSchema().toString() + " the table is " + message.getTable()); + // If a DDL message is received for a tablet, we do not need its schema again + schemaStreamed.put(tabletId, Boolean.FALSE); + TableId tableId = null; if (message.getOperation() != Operation.NOOP) { tableId = YugabyteDBSchema.parseWithSchema(message.getTable(), pgSchemaNameInRecord); From b61ab2495ba2333945431b120e9e3cd86f14e3d8 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 22 Mar 2022 11:40:13 +0530 Subject: [PATCH 14/21] added initial_only mode of snapshot streaming --- .../YugabyteDBStreamingChangeEventSource.java | 104 +++++------------- 1 file changed, 30 insertions(+), 74 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index ef0fc7a9418..84e8d52dfab 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -156,30 +156,6 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit // replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition)); } - // this.lastCompletelyProcessedLsn = replicationStream.get().startLsn(); - - // if (walPosition.searchingEnabled()) { - // searchWalPosition(context, stream, walPosition); - // try { - // if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // connection.commit(); - // } - // } - // catch (Exception e) { - // LOGGER.info("Commit failed while preparing for reconnect", e); - // } - // walPosition.enableFiltering(); - // stream.stopKeepAlive(); - // replicationConnection.reconnect(); - // replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); - // stream = this.replicationStream.get(); - // stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); - // } - // processMessages(context, partition, offsetContext, stream); - // if (!snapshotter.shouldStream()) { - // LOGGER.info("Streaming is not enabled in correct configuration"); - // return; - // } getChanges(context, partition, offsetContext, hasStartLsnStoredInContext); } catch (Throwable e) { @@ -188,13 +164,7 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit finally { if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // Need to CDCSDK see what can be done. - // try { - // connection.commit(); - // } - // catch (SQLException throwables) { - // throwables.printStackTrace(); - // } + } if (asyncYBClient != null) { try { @@ -212,28 +182,6 @@ public void execute(ChangeEventSourceContext context, YugabyteDBPartition partit e.printStackTrace(); } } - // if (replicationConnection != null) { - // LOGGER.debug("stopping streaming..."); - // // stop the keep alive thread, this also shuts down the - // // executor pool - // ReplicationStream stream = replicationStream.get(); - // if (stream != null) { - // stream.stopKeepAlive(); - // } - // // TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the stream, but it's not reliable atm (see javadoc) - // // replicationStream.close(); - // // close the connection - this should also disconnect the current stream even if it's blocking - // try { - // if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { - // connection.commit(); - // } - // replicationConnection.close(); - // } - // catch (Exception e) { - // LOGGER.debug("Exception while closing the connection", e); - // } - // replicationStream.set(null); - // } } } @@ -248,6 +196,18 @@ private void getSnapshotChanges(ChangeEventSourceContext context, } + private boolean isSnapshotCompleteForAllTablets(Map snapshotDoneForTablet) { + for (Map.Entry entry : snapshotDoneForTablet.entrySet()) { + if (entry.getValue() == Boolean.FALSE) { + return false; + } + } + + // Returning true would mean that we have captured the snapshot for all the tablets and in case of initial_only snapshotter + // the snapshot can be stopped now. + return true; + } + private void getChanges(ChangeEventSourceContext context, YugabyteDBPartition partitionn, YugabyteDBOffsetContext offsetContext, @@ -284,23 +244,13 @@ private void getChanges(ChangeEventSourceContext context, } Map schemaStreamed = new HashMap<>(); + Map snapshotDoneForTablet = new HashMap<>(); // Initialize all the tabletIds with true signifying we need schemas for all the tablets for (Pair entry : tabletPairList) { schemaStreamed.put(entry.getValue(), Boolean.TRUE); + snapshotDoneForTablet.put(entry.getValue(), Boolean.FALSE); } - int noMessageIterations = 0; - // for (Pair entry : tabletPairList) { - // final String tabletId = entry.getValue(); - // offsetContext.initSourceInfo(tabletId, this.connectorConfig); - // - // offsetContext.getSourceInfo(tabletId).updateLastCommit(offsetContext.lastCompletelyProcessedLsn()); - // - // if (offsetContext.lsn(tabletId).equals(new OpId(0, 0, null, 0, 0))) { - // offsetContext.getSourceInfo(tabletId) - // .updateLastCommit(new OpId(-1, -1, "".getBytes(), -1, 0)); - // } - // } LOGGER.info("The init tabletSourceInfo is " + offsetContext.getTabletSourceInfo()); while (context.isRunning() && (offsetContext.getStreamingStoppingLsn() == null || @@ -310,6 +260,11 @@ private void getChanges(ChangeEventSourceContext context, final String tabletId = entry.getValue(); YBPartition part = new YBPartition(tabletId); + // Ignore this tablet if the snapshot is already complete for it. + if (!snapshotter.shouldStream() && snapshotDoneForTablet.get(tabletId)) { + continue; + } + // The following will specify the connector polling interval at which // yb-client will ask the database for changes Thread.sleep(connectorConfig.cdcPollIntervalms()); @@ -326,11 +281,7 @@ private void getChanges(ChangeEventSourceContext context, cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime(), schemaStreamed.get(tabletId)); - boolean receivedMessage = false; // response.getResp().getCdcSdkRecordsCount() != 0; - - for (CdcService.CDCSDKProtoRecordPB record : response - .getResp() - .getCdcSdkProtoRecordsList()) { + for (CdcService.CDCSDKProtoRecordPB record : response.getResp().getCdcSdkProtoRecordsList()) { LOGGER.info("SKSK the recrds are " + record); CdcService.RowMessage m = record.getRowMessage(); YbProtoReplicationMessage message = new YbProtoReplicationMessage( @@ -431,7 +382,14 @@ else if (message.isDDLMessage()) { } - // if (cp.equals(new OpId(-1, -1, "".getBytes(), -1, 0))) { + if (!snapshotter.shouldStream() && response.getWriteId() != -1) { + snapshotDoneForTablet.put(tabletId, Boolean.TRUE); + } + + // End the snapshot in case the snapshot is complete. + if (isSnapshotCompleteForAllTablets(snapshotDoneForTablet)) { + return; + } OpId finalOpid = new OpId( response.getTerm(), @@ -439,9 +397,7 @@ else if (message.isDDLMessage()) { response.getKey(), response.getWriteId(), response.getSnapshotTime()); - offsetContext.getSourceInfo(tabletId) - .updateLastCommit(finalOpid); - // } + offsetContext.getSourceInfo(tabletId).updateLastCommit(finalOpid); probeConnectionIfNeeded(); From b8d743c86c78cedb01f4ac0ead7c1bc8e1719584 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 22 Mar 2022 15:38:14 +0530 Subject: [PATCH 15/21] removed extra comments --- .../yugabytedb/YugabyteDBOffsetContext.java | 19 ------------------- .../YugabyteDBStreamingChangeEventSource.java | 4 ---- 2 files changed, 23 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index f35dd34c4e4..fb4fc151d55 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -365,25 +365,6 @@ public YugabyteDBOffsetContext load(Map offset) { else { lastCompletelyProcessedLsn = new OpId(0, 0, "".getBytes(), 0, 0); } - /* - * final OpId lsn = OpId.valueOf(readOptionalString(offset, SourceInfo.LSN_KEY)); - * final OpId lastCompletelyProcessedLsn = OpId.valueOf(readOptionalString(offset, - * LAST_COMPLETELY_PROCESSED_LSN_KEY)); - * final OpId lastCommitLsn = OpId.valueOf(readOptionalString(offset, - * LAST_COMPLETELY_PROCESSED_LSN_KEY)); - * final String txId = readOptionalString(offset, SourceInfo.TXID_KEY); - * - * final Instant useconds = Conversions.toInstantFromMicros((Long) offset - * .get(SourceInfo.TIMESTAMP_USEC_KEY)); - * final boolean snapshot = (boolean) ((Map) offset) - * .getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); - * final boolean lastSnapshotRecord = (boolean) ((Map) offset) - * .getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE); - * return new YugabyteDBOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, - * lastCommitLsn, txId, useconds, snapshot, lastSnapshotRecord, - * TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext - * .load(offset)); - */ return new YugabyteDBOffsetContext(connectorConfig, lastCompletelyProcessedLsn, diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 84e8d52dfab..3b3b9c526ec 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -507,10 +507,6 @@ public void commitOffset(Map offset) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing LSN to server: {}", lsn); } - // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments - // CDCSDK yugabyte does it automatically. - // but we may need an API - // replicationStream.flushLsn(lsn); } else { LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); From 8a2f232331d5d3267c1a757ecf051e9fcc6591bd Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 22 Mar 2022 16:27:43 +0530 Subject: [PATCH 16/21] fixed a bug in initial_only snapshot mode --- .../yugabytedb/YugabyteDBStreamingChangeEventSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 3b3b9c526ec..e598ac549e9 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -382,7 +382,8 @@ else if (message.isDDLMessage()) { } - if (!snapshotter.shouldStream() && response.getWriteId() != -1) { + if (!snapshotter.shouldStream() && response.getResp().getCdcSdkCheckpoint().getWriteId() != -1) { + LOGGER.debug("Marking snapshot complete for tablet: " + tabletId); snapshotDoneForTablet.put(tabletId, Boolean.TRUE); } From 4775b90f4bc82a30dcdbd11812dcfd32753b4631 Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 23 Mar 2022 12:20:58 +0530 Subject: [PATCH 17/21] add update record fix --- .../yugabytedb/YBTableSchemaBuilder.java | 4 + .../YugabyteDBChangeRecordEmitter.java | 50 ++++++++- .../yugabytedb/YugabyteDBSchema.java | 7 ++ .../YugabyteDBStreamingChangeEventSource.java | 7 +- .../transforms/YBExtractNewRecordState.java | 102 ++++++++++++++++++ .../yugabytedb/YugabyteDBDatatypesTest.java | 8 +- .../src/test/resources/log4j.properties | 12 +-- .../relational/TableSchemaBuilder.java | 80 ++++++++++---- 8 files changed, 237 insertions(+), 33 deletions(-) create mode 100644 debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java create mode 100644 debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java new file mode 100644 index 00000000000..5f259e82394 --- /dev/null +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YBTableSchemaBuilder.java @@ -0,0 +1,4 @@ +package io.debezium.connector.yugabytedb; + +public class YBTableSchemaBuilder { +} diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java index ebc0bf69c10..589cae6062f 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBChangeRecordEmitter.java @@ -148,7 +148,8 @@ protected Object[] getNewColumnValues() { return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); case UPDATE: // todo vaibhav: add scenario for the case of multiple columns being updated - return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); + //return columnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); + return updatedColumnValues(message.getNewTupleList(), tableId, true, message.hasTypeMetadata(), false, false); default: return null; } @@ -215,7 +216,52 @@ private Object[] columnValues(List columns, TableId t if (position != -1) { Object value = column.getValue(() -> (BaseConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); - values[position] = value; + //values[position] = value; + values[position] = new Object[]{ value, Boolean.TRUE}; + } + } + return values; + } + + private Object[] updatedColumnValues(List columns, TableId tableId, + boolean refreshSchemaIfChanged, boolean metadataInMessage, + boolean sourceOfToasted, boolean oldValues) + throws SQLException { + if (columns == null || columns.isEmpty()) { + return null; + } + final Table table = schema.tableFor(tableId); + if (table == null) { + schema.dumpTableId(); + } + Objects.requireNonNull(table); + + // based on the schema columns, create the values on the same position as the columns + List schemaColumns = table.columns(); + // based on the replication message without toasted columns for now + List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)) + .collect(Collectors.toList()); + // JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT + Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() + ? schemaColumns.size() + : columnsWithoutToasted.size()]; + + // initialize to unset + + final Set undeliveredToastableColumns = new HashSet<>(schema + .getToastableColumnsForTableId(table.id())); + for (ReplicationMessage.Column column : columns) { + // DBZ-298 Quoted column names will be sent like that in messages, + // but stored unquoted in the column names + final String columnName = Strings.unquoteIdentifierPart(column.getName()); + undeliveredToastableColumns.remove(columnName); + + int position = getPosition(columnName, table, values); + if (position != -1) { + Object value = column.getValue(() -> (BaseConnection) connection.connection(), + connectorConfig.includeUnknownDatatypes()); + + values[position] = new Object[]{ value, Boolean.TRUE}; } } return values; diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java index b4f64c11c86..7d45f4796d8 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSchema.java @@ -71,6 +71,13 @@ private static TableSchemaBuilder getTableSchemaBuilder(YugabyteDBConnectorConfi config.getSanitizeFieldNames()); } + private static TableSchemaBuilder getYBTableSchemaBuilder(YugabyteDBConnectorConfig config, + YugabyteDBValueConverter valueConverter) { + return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), + config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), + config.getSanitizeFieldNames()); + } + /** * Initializes the content for this schema by reading all the database information from the supplied connection. * diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 6fe0572696e..fc51a810705 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -283,15 +283,14 @@ private void getChanges2(ChangeEventSourceContext context, while (context.isRunning() && (offsetContext.getStreamingStoppingLsn() == null || (lastCompletelyProcessedLsn.compareTo(offsetContext.getStreamingStoppingLsn()) < 0))) { + // The following will specify the connector polling interval at which + // yb-client will ask the database for changes + Thread.sleep(connectorConfig.cdcPollIntervalms()); for (Pair entry : tabletPairList) { final String tabletId = entry.getValue(); YBPartition part = new YBPartition(tabletId); - // The following will specify the connector polling interval at which - // yb-client will ask the database for changes - Thread.sleep(connectorConfig.cdcPollIntervalms()); - YBTable table = tableIdToTable.get(entry.getKey()); OpId cp = offsetContext.lsn(tabletId); diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java new file mode 100644 index 00000000000..94103d570b1 --- /dev/null +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java @@ -0,0 +1,102 @@ +package io.debezium.connector.yugabytedb.transforms; + +import io.debezium.transforms.ExtractNewRecordState; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Schema.Type; + +import java.util.Map; + +public class YBExtractNewRecordState> extends ExtractNewRecordState { + private Cache schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(256)); + + @Override + public R apply(final R record) { + final R ret = super.apply(record); + if (ret == null || !(ret.value() instanceof Struct)) { + return ret; + } + + final Struct value = (Struct)ret.value(); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); + } + + final Struct updatedValue = new Struct(updatedSchema); + + for (Field field : value.schema().fields()) { + if (isSimplifiableField(field)) { + Struct fieldValue = (Struct) value.get(field); + updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value")); + } else { + updatedValue.put(field.name(), value.get(field)); + } + } + + return ret.newRecord(ret.topic(), ret.kafkaPartition(), ret.keySchema(), ret.key(), updatedSchema, updatedValue, ret.timestamp()); + } + + @Override + public void close() { + super.close(); + schemaUpdateCache = null; + } + + private boolean isSimplifiableField(Field field) { + if (field.schema().type() != Type.STRUCT) { + return false; + } + + if (field.schema().fields().size() != 1 + || field.schema().fields().get(0).name() != "value") { + return false; + } + + return true; + } + + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + + for (Field field : schema.fields()) { + if (isSimplifiableField(field)) { + builder.field(field.name(), field.schema().field("value").schema()); + } else { + builder.field(field.name(), field.schema()); + } + } + + return builder.build(); + } +} + +class SchemaUtil { + + public static SchemaBuilder copySchemaBasics(Schema source) { + return copySchemaBasics(source, new SchemaBuilder(source.type())); + } + + public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) { + builder.name(source.name()); + builder.version(source.version()); + builder.doc(source.doc()); + + final Map params = source.parameters(); + if (params != null) { + builder.parameters(params); + } + + return builder; + } + +} diff --git a/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java b/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java index 5b0b09a8b74..0f54ef0ce80 100644 --- a/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java +++ b/debezium-connector-yugabytedb2/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java @@ -41,6 +41,7 @@ private CompletableFuture insertRecords(long numOfRowsToBeInserted) { for (int i = 0; i < numOfRowsToBeInserted; i++) { TestHelper.execute(String.format(formatInsertString, i)); } + }).exceptionally(throwable -> { throw new RuntimeException(throwable); }); @@ -70,9 +71,11 @@ protected Configuration.Builder getConfigBuilder(String fullTablenameWithSchema) } private void verifyPrimaryKeyOnly(long recordsCount) { + System.out.println("verifyPrimaryKeyOnly "); int totalConsumedRecords = 0; long start = System.currentTimeMillis(); List records = new ArrayList<>(); + recordsCount = 100; while (totalConsumedRecords < recordsCount) { int consumed = super.consumeAvailableRecords(record -> { System.out.println("The record being consumed is " + record); @@ -90,6 +93,7 @@ private void verifyPrimaryKeyOnly(long recordsCount) { // verify the records assertInsert(records.get(i), "id", i); } + } private void verifyValue(long recordsCount) { @@ -145,10 +149,12 @@ public void testRecordConsumption() throws Exception { Configuration.Builder configBuilder = getConfigBuilder("public.t1"); start(YugabyteDBConnector.class, configBuilder.build()); assertConnectorIsRunning(); - final long recordsCount = 2; + final long recordsCount = 1; + System.out.println("testRecordConsumption"); // insert rows in the table t1 with values insertRecords(recordsCount); + System.out.println("testRecordConsumption"); CompletableFuture.runAsync(() -> verifyPrimaryKeyOnly(recordsCount)) .exceptionally(throwable -> { diff --git a/debezium-connector-yugabytedb2/src/test/resources/log4j.properties b/debezium-connector-yugabytedb2/src/test/resources/log4j.properties index f90be99692e..c3487551fda 100644 --- a/debezium-connector-yugabytedb2/src/test/resources/log4j.properties +++ b/debezium-connector-yugabytedb2/src/test/resources/log4j.properties @@ -2,23 +2,23 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n -log4j.appender.stdout.threshold=DEBUG +#log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n +#log4j.appender.stdout.threshold=DEBUG # Root logger option -log4j.rootLogger=TRACE +log4j.rootLogger=DEBUG # Set up the default logging to be INFO level, then override specific units log4j.logger.io.confluent.connect.avro=WARN log4j.logger.io.confluent.kafka.serializers=WARN -log4j.logger.io.debezium=INFO +#log4j.logger.io.debezium=DEBUG log4j.logger.io.debezium.pipeline=DEBUG log4j.logger.io.debezium.connector.postgresql=DEBUG -log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings() +#log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings() log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN #log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG #log4j.logger.io.debezium.connector.postgresql.RecordsStreamProducer=DEBUG #log4j.logger.io.debezium.connector.postgresql.connection.YugabyteDBReplicationConnection=DEBUG #log4j.logger.io.debezium.connector.postgresql.PostgresConnectorTask=DEBUG log4j.logger.org.reflections=ERROR -log4j.logger.org.yb.client = INFO +#log4j.logger.org.yb.client=INFO diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index a88a9de60f8..d7ea3d7efc3 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -5,21 +5,6 @@ */ package io.debezium.relational; -import java.sql.Types; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.DataException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.annotation.Immutable; import io.debezium.annotation.ThreadSafe; import io.debezium.data.Envelope; @@ -32,6 +17,20 @@ import io.debezium.schema.FieldNameSelector.FieldNamer; import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.Strings; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Types; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Builder that constructs {@link TableSchema} instances for {@link Table} definitions. @@ -188,9 +187,14 @@ protected StructGenerator createKeyGenerator(Schema schema, TableId columnSetNam // It is possible for some databases and values (MySQL and all-zero datetime) // to be reported as null by JDBC or streaming reader. // It thus makes sense to convert them to a sensible default replacement value. - value = converter.convert(value); + value = converter.convert(((Object[])value)[0]); + //value = converter.convert(value); try { - result.put(fields[i], value); + Struct cell = new Struct(fields[i].schema()); + cell.put("value", value); + cell.put("set", true); + //valueStruct.put(cdef.getColumnName(), cell); + result.put(fields[i], cell); } catch (DataException e) { Column col = columns.get(i); @@ -245,7 +249,18 @@ protected StructGenerator createValueGenerator(Schema schema, TableId tableId, L int numFields = recordIndexes.length; ValueConverter[] converters = convertersForColumns(schema, tableId, columnsThatShouldBeAdded, mappers); return (row) -> { +// columns +// .stream() +// .filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) +// .forEach(column -> { +// ColumnMapper mapper = mappers == null ? null : mappers.mapperFor(tableId, column); +// addField(valSchemaBuilder, table, column, mapper); +// }); +// +// Schema valSchema = valSchemaBuilder.optional().build(); + Struct result = new Struct(schema); + for (int i = 0; i != numFields; ++i) { validateIncomingRowToInternalMetadata(recordIndexes, fields, converters, row, i); Object value = row[recordIndexes[i]]; @@ -261,8 +276,17 @@ protected StructGenerator createValueGenerator(Schema schema, TableId tableId, L if (converter != null) { try { - value = converter.convert(value); - result.put(fields[i], value); + if (value != null) { + value = converter.convert(((Object[])value)[0]); + Struct cell = new Struct(fields[i].schema()); + cell.put("value", value); + cell.put("set", true); + //valueStruct.put(cdef.getColumnName(), cell); + result.put(fields[i], cell); + } else { + result.put(fields[i], null); + } + //result.put(fields[i], value); } catch (DataException | IllegalArgumentException e) { Column col = columns.get(i); @@ -373,8 +397,9 @@ protected void addField(SchemaBuilder builder, Table table, Column column, Colum fieldBuilder .defaultValue(customConverterRegistry.getValueConverter(table.id(), column).orElse(ValueConverter.passthrough()).convert(column.defaultValue())); } + Schema optionalCellSchema = cellSchema(fieldNamer.fieldNameFor(column), fieldBuilder.build(), column.isOptional()); - builder.field(fieldNamer.fieldNameFor(column), fieldBuilder.build()); + builder.field(fieldNamer.fieldNameFor(column), optionalCellSchema); if (LOGGER.isDebugEnabled()) { LOGGER.debug("- field '{}' ({}{}) from column {}", column.name(), builder.isOptional() ? "OPTIONAL " : "", fieldBuilder.type(), @@ -399,4 +424,19 @@ protected void addField(SchemaBuilder builder, Table table, Column column, Colum protected ValueConverter createValueConverterFor(TableId tableId, Column column, Field fieldDefn) { return customConverterRegistry.getValueConverter(tableId, column).orElse(valueConverterProvider.converter(column, fieldDefn)); } + + static Schema cellSchema(String name, Schema valueSchema, boolean isOptional) { + if (valueSchema != null) { + SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(name) + .field("value", valueSchema) + .field("set", Schema.BOOLEAN_SCHEMA); + if (isOptional) { + schemaBuilder.optional(); + } + return schemaBuilder.build(); + } + else { + return null; + } + } } From b7852cf64fa59210a43e548c9cab8ffd644f4fad Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 23 Mar 2022 12:48:12 +0530 Subject: [PATCH 18/21] fix for record --- .../yugabytedb/transforms/YBExtractNewRecordState.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java index 94103d570b1..837f70bd5a1 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java @@ -57,8 +57,9 @@ private boolean isSimplifiableField(Field field) { return false; } - if (field.schema().fields().size() != 1 - || field.schema().fields().get(0).name() != "value") { + if (field.schema().fields().size() != 2 + || (field.schema().fields().get(0).name() != "value" + || field.schema().fields().get(0).name() != "set")) { return false; } From 071bd6b07facfdec9b3bece312982ab3587046ab Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 23 Mar 2022 14:59:55 +0530 Subject: [PATCH 19/21] fix for update wrapper --- .../YugabyteDBStreamingChangeEventSource.java | 2 +- .../transforms/YBExtractNewRecordState.java | 120 ++++++++++++++---- 2 files changed, 99 insertions(+), 23 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index fc51a810705..21d03fc966e 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -300,7 +300,7 @@ private void getChanges2(ChangeEventSourceContext context, GetChangesResponse response = this.syncClient.getChangesCDCSDK( table, streamId, tabletId, - cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime()); + cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWrite_id(), cp.getTime(), true); for (CdcService.CDCSDKProtoRecordPB record : response .getResp() diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java index 837f70bd5a1..20a4a9be1ac 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java @@ -1,5 +1,6 @@ package io.debezium.connector.yugabytedb.transforms; +import io.debezium.connector.yugabytedb.YugabyteDBStreamingChangeEventSource; import io.debezium.transforms.ExtractNewRecordState; import org.apache.kafka.common.cache.Cache; @@ -11,39 +12,55 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yb.util.Pair; import java.util.Map; +import java.util.Objects; public class YBExtractNewRecordState> extends ExtractNewRecordState { private Cache schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(256)); + private static final Logger LOGGER = LoggerFactory.getLogger(YBExtractNewRecordState.class); @Override public R apply(final R record) { final R ret = super.apply(record); + LOGGER.info("VKVK beginning of apply after super function: " + record); if (ret == null || !(ret.value() instanceof Struct)) { return ret; } - final Struct value = (Struct)ret.value(); - - Schema updatedSchema = schemaUpdateCache.get(value.schema()); - if (updatedSchema == null) { - updatedSchema = makeUpdatedSchema(value.schema()); - schemaUpdateCache.put(value.schema(), updatedSchema); - } - - final Struct updatedValue = new Struct(updatedSchema); - - for (Field field : value.schema().fields()) { - if (isSimplifiableField(field)) { - Struct fieldValue = (Struct) value.get(field); - updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value")); - } else { - updatedValue.put(field.name(), value.get(field)); - } - } - - return ret.newRecord(ret.topic(), ret.kafkaPartition(), ret.keySchema(), ret.key(), updatedSchema, updatedValue, ret.timestamp()); + Pair p = getUpdatedValueAndSchema((Struct)ret.key()); + Schema updatedSchemaForKey = (Schema) p.getFirst(); + Struct updatedValueForKey = (Struct) p.getSecond(); + + Pair val = getUpdatedValueAndSchema((Struct) ret.value()); + Schema updatedSchemaForValue = (Schema) val.getFirst(); + Struct updatedValueForValue = (Struct) val.getSecond(); + +// final Struct value = (Struct)ret.value(); +// LOGGER.info("VKVK value of record: " + value); +// Schema updatedSchema = schemaUpdateCache.get(value.schema()); +// LOGGER.info("VKVK json representation before update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); +// if (updatedSchema == null) { +// LOGGER.info("VKVK calling makeUpdatedSchema"); +// updatedSchema = makeUpdatedSchema(value.schema()); +// schemaUpdateCache.put(value.schema(), updatedSchema); +// } +// LOGGER.info("VKVK json representation after update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); +// final Struct updatedValue = new Struct(updatedSchema); +// LOGGER.info("VKVK updatedValue: " + updatedValue); +// for (Field field : value.schema().fields()) { +// if (isSimplifiableField(field)) { +// Struct fieldValue = (Struct) value.get(field); +// updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value")); +// } else { +// updatedValue.put(field.name(), value.get(field)); +// } +// } +// LOGGER.info("VKVK updatedValue2: " + updatedValue); + return ret.newRecord(ret.topic(), ret.kafkaPartition(), updatedSchemaForKey, updatedValueForKey, updatedSchemaForValue, updatedValueForValue, ret.timestamp()); } @Override @@ -53,23 +70,30 @@ public void close() { } private boolean isSimplifiableField(Field field) { +// LOGGER.info("VKVK size: " + field.schema().fields().size()); if (field.schema().type() != Type.STRUCT) { +// LOGGER.info("VKVK field schema type: " + field.schema().type()); return false; } if (field.schema().fields().size() != 2 - || (field.schema().fields().get(0).name() != "value" - || field.schema().fields().get(0).name() != "set")) { + || (!Objects.equals(field.schema().fields().get(0).name(), "value") + || !Objects.equals(field.schema().fields().get(1).name(), "set"))) { +// LOGGER.info("VKVK fields get value name: " + field.schema().fields().get(0).name()); +// LOGGER.info("VKVK fields get set name: " + field.schema().fields().get(1).name()); return false; } + return true; } private Schema makeUpdatedSchema(Schema schema) { final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); +// LOGGER.info("VKVK inside makeUpdatedSchema, schema field size: " + schema.fields().size()); for (Field field : schema.fields()) { +// LOGGER.info("VKVK calling for name: " + field.name() + " and value: " + field.schema()); if (isSimplifiableField(field)) { builder.field(field.name(), field.schema().field("value").schema()); } else { @@ -79,6 +103,58 @@ private Schema makeUpdatedSchema(Schema schema) { return builder.build(); } + + private Schema makeUpdatedSchema(Schema schema, Struct value) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + +// LOGGER.info("VKVK inside makeUpdatedSchema, schema field size: " + schema.fields().size()); + for (Field field : schema.fields()) { +// LOGGER.info("VKVK calling for name: " + field.name() + " and value: " + field.schema()); + if (isSimplifiableField(field)) { + if (value.get(field.name()) != null) { + builder.field(field.name(), field.schema().field("value").schema()); + } +// builder.field(field.name(), field.schema().field("value").schema()); + } else { + builder.field(field.name(), field.schema()); + } + } + + return builder.build(); + } + + + private Pair getUpdatedValueAndSchema(Struct obj) { + final Struct value = obj; +// LOGGER.info("VKVK value of record: " + value); + Schema updatedSchema = null; //schemaUpdateCache.get(value.schema()); +// LOGGER.info("VKVK json representation before update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); + if (updatedSchema == null) { +// LOGGER.info("VKVK calling makeUpdatedSchema"); +// LOGGER.info("VKVK value schema as json: " + io.debezium.data.SchemaUtil.asString(value.schema())); + updatedSchema = makeUpdatedSchema(value.schema(), value); +// schemaUpdateCache.put(value.schema(), updatedSchema); + } +// LOGGER.info("VKVK json representation after update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); + final Struct updatedValue = new Struct(updatedSchema); +// LOGGER.info("VKVK updatedValue: " + updatedValue); + for (Field field : value.schema().fields()) { + if (isSimplifiableField(field)) { + Struct fieldValue = (Struct) value.get(field); + if (fieldValue != null) { + updatedValue.put(field.name(), fieldValue.get("value")); + } else { + // remove the field from the schema which has the null value + } +// updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value")); + } else { +// updatedValue.put(field.name(), value.get(field)); + } + } +// LOGGER.info("VKVK updatedValue2: " + updatedValue); + + return new org.yb.util.Pair(updatedSchema, updatedValue); + } } class SchemaUtil { From 5f4bb9fcff633133da2d76573ef24b23259c7ea8 Mon Sep 17 00:00:00 2001 From: Suranjan Kumar Date: Wed, 23 Mar 2022 17:22:45 +0530 Subject: [PATCH 20/21] rever core changes --- .../relational/TableSchemaBuilder.java | 54 +++---------------- 1 file changed, 6 insertions(+), 48 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index 69284e9d2c0..53945a7f03b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -100,7 +100,7 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t final TableId tableId = table.id(); final String tableIdStr = tableSchemaName(tableId); final String schemaNamePrefix = schemaPrefix + tableIdStr; - LOGGER.info("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); + LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value")); SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key")); AtomicBoolean hasPrimaryKey = new AtomicBoolean(false); @@ -188,14 +188,9 @@ protected StructGenerator createKeyGenerator(Schema schema, TableId columnSetNam // It is possible for some databases and values (MySQL and all-zero datetime) // to be reported as null by JDBC or streaming reader. // It thus makes sense to convert them to a sensible default replacement value. - value = converter.convert(((Object[]) value)[0]); - // value = converter.convert(value); + value = converter.convert(value); try { - Struct cell = new Struct(fields[i].schema()); - cell.put("value", value); - cell.put("set", true); - // valueStruct.put(cdef.getColumnName(), cell); - result.put(fields[i], cell); + result.put(fields[i], value); } catch (DataException e) { Column col = columns.get(i); @@ -250,18 +245,7 @@ protected StructGenerator createValueGenerator(Schema schema, TableId tableId, L int numFields = recordIndexes.length; ValueConverter[] converters = convertersForColumns(schema, tableId, columnsThatShouldBeAdded, mappers); return (row) -> { - // columns - // .stream() - // .filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) - // .forEach(column -> { - // ColumnMapper mapper = mappers == null ? null : mappers.mapperFor(tableId, column); - // addField(valSchemaBuilder, table, column, mapper); - // }); - // - // Schema valSchema = valSchemaBuilder.optional().build(); - Struct result = new Struct(schema); - for (int i = 0; i != numFields; ++i) { validateIncomingRowToInternalMetadata(recordIndexes, fields, converters, row, i); Object value = row[recordIndexes[i]]; @@ -277,18 +261,8 @@ protected StructGenerator createValueGenerator(Schema schema, TableId tableId, L if (converter != null) { try { - if (value != null) { - value = converter.convert(((Object[]) value)[0]); - Struct cell = new Struct(fields[i].schema()); - cell.put("value", value); - cell.put("set", true); - // valueStruct.put(cdef.getColumnName(), cell); - result.put(fields[i], cell); - } - else { - result.put(fields[i], null); - } - // result.put(fields[i], value); + value = converter.convert(value); + result.put(fields[i], value); } catch (DataException | IllegalArgumentException e) { Column col = columns.get(i); @@ -399,9 +373,8 @@ protected void addField(SchemaBuilder builder, Table table, Column column, Colum fieldBuilder .defaultValue(customConverterRegistry.getValueConverter(table.id(), column).orElse(ValueConverter.passthrough()).convert(column.defaultValue())); } - Schema optionalCellSchema = cellSchema(fieldNamer.fieldNameFor(column), fieldBuilder.build(), column.isOptional()); - builder.field(fieldNamer.fieldNameFor(column), optionalCellSchema); + builder.field(fieldNamer.fieldNameFor(column), fieldBuilder.build()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("- field '{}' ({}{}) from column {}", column.name(), builder.isOptional() ? "OPTIONAL " : "", fieldBuilder.type(), @@ -426,19 +399,4 @@ protected void addField(SchemaBuilder builder, Table table, Column column, Colum protected ValueConverter createValueConverterFor(TableId tableId, Column column, Field fieldDefn) { return customConverterRegistry.getValueConverter(tableId, column).orElse(valueConverterProvider.converter(column, fieldDefn)); } - - static Schema cellSchema(String name, Schema valueSchema, boolean isOptional) { - if (valueSchema != null) { - SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(name) - .field("value", valueSchema) - .field("set", Schema.BOOLEAN_SCHEMA); - if (isOptional) { - schemaBuilder.optional(); - } - return schemaBuilder.build(); - } - else { - return null; - } - } } From 2ef67b00c511d122071d3f29490f01baa95f5b59 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 23 Mar 2022 19:28:17 +0530 Subject: [PATCH 21/21] added fix for update wrapper --- .../transforms/YBExtractNewRecordState.java | 72 +++++-------------- 1 file changed, 18 insertions(+), 54 deletions(-) diff --git a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java index ed0b1516a7e..c380f5b7d10 100644 --- a/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java +++ b/debezium-connector-yugabytedb2/src/main/java/io/debezium/connector/yugabytedb/transforms/YBExtractNewRecordState.java @@ -25,8 +25,7 @@ public class YBExtractNewRecordState> extends Extract @Override public R apply(final R record) { final R ret = super.apply(record); - LOGGER.info("VKVK beginning of apply after super function: " + record); - if (ret == null || !(ret.value() instanceof Struct)) { + if (ret == null || (ret.value() != null && !(ret.value() instanceof Struct))) { return ret; } @@ -34,31 +33,14 @@ public R apply(final R record) { Schema updatedSchemaForKey = (Schema) p.getFirst(); Struct updatedValueForKey = (Struct) p.getSecond(); - Pair val = getUpdatedValueAndSchema((Struct) ret.value()); - Schema updatedSchemaForValue = (Schema) val.getFirst(); - Struct updatedValueForValue = (Struct) val.getSecond(); - - // final Struct value = (Struct)ret.value(); - // LOGGER.info("VKVK value of record: " + value); - // Schema updatedSchema = schemaUpdateCache.get(value.schema()); - // LOGGER.info("VKVK json representation before update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); - // if (updatedSchema == null) { - // LOGGER.info("VKVK calling makeUpdatedSchema"); - // updatedSchema = makeUpdatedSchema(value.schema()); - // schemaUpdateCache.put(value.schema(), updatedSchema); - // } - // LOGGER.info("VKVK json representation after update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); - // final Struct updatedValue = new Struct(updatedSchema); - // LOGGER.info("VKVK updatedValue: " + updatedValue); - // for (Field field : value.schema().fields()) { - // if (isSimplifiableField(field)) { - // Struct fieldValue = (Struct) value.get(field); - // updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value")); - // } else { - // updatedValue.put(field.name(), value.get(field)); - // } - // } - // LOGGER.info("VKVK updatedValue2: " + updatedValue); + Schema updatedSchemaForValue = null; + Struct updatedValueForValue = null; + if (ret.value() != null) { + Pair val = getUpdatedValueAndSchema((Struct) ret.value()); + updatedSchemaForValue = (Schema) val.getFirst(); + updatedValueForValue = (Struct) val.getSecond(); + } + return ret.newRecord(ret.topic(), ret.kafkaPartition(), updatedSchemaForKey, updatedValueForKey, updatedSchemaForValue, updatedValueForValue, ret.timestamp()); } @@ -69,29 +51,23 @@ public void close() { } private boolean isSimplifiableField(Field field) { - // LOGGER.info("VKVK size: " + field.schema().fields().size()); if (field.schema().type() != Type.STRUCT) { - // LOGGER.info("VKVK field schema type: " + field.schema().type()); return false; } if (field.schema().fields().size() != 2 || (!Objects.equals(field.schema().fields().get(0).name(), "value") || !Objects.equals(field.schema().fields().get(1).name(), "set"))) { - // LOGGER.info("VKVK fields get value name: " + field.schema().fields().get(0).name()); - // LOGGER.info("VKVK fields get set name: " + field.schema().fields().get(1).name()); - return false; + return false; } - return true; } + // todo: this function can be removed private Schema makeUpdatedSchema(Schema schema) { final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); - // LOGGER.info("VKVK inside makeUpdatedSchema, schema field size: " + schema.fields().size()); for (Field field : schema.fields()) { - // LOGGER.info("VKVK calling for name: " + field.name() + " and value: " + field.schema()); if (isSimplifiableField(field)) { builder.field(field.name(), field.schema().field("value").schema()); } @@ -106,14 +82,11 @@ private Schema makeUpdatedSchema(Schema schema) { private Schema makeUpdatedSchema(Schema schema, Struct value) { final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); - // LOGGER.info("VKVK inside makeUpdatedSchema, schema field size: " + schema.fields().size()); for (Field field : schema.fields()) { - // LOGGER.info("VKVK calling for name: " + field.name() + " and value: " + field.schema()); if (isSimplifiableField(field)) { if (value.get(field.name()) != null) { builder.field(field.name(), field.schema().field("value").schema()); } - // builder.field(field.name(), field.schema().field("value").schema()); } else { builder.field(field.name(), field.schema()); @@ -125,35 +98,26 @@ private Schema makeUpdatedSchema(Schema schema, Struct value) { private Pair getUpdatedValueAndSchema(Struct obj) { final Struct value = obj; - // LOGGER.info("VKVK value of record: " + value); - Schema updatedSchema = null; // schemaUpdateCache.get(value.schema()); - // LOGGER.info("VKVK json representation before update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); + Schema updatedSchema = null; if (updatedSchema == null) { - // LOGGER.info("VKVK calling makeUpdatedSchema"); - // LOGGER.info("VKVK value schema as json: " + io.debezium.data.SchemaUtil.asString(value.schema())); - updatedSchema = makeUpdatedSchema(value.schema(), value); - // schemaUpdateCache.put(value.schema(), updatedSchema); + updatedSchema = makeUpdatedSchema(value.schema(), value); } - // LOGGER.info("VKVK json representation after update: " + io.debezium.data.SchemaUtil.asString(updatedSchema)); + + LOGGER.debug("Updated schema as json: " + io.debezium.data.SchemaUtil.asString(value.schema())); + final Struct updatedValue = new Struct(updatedSchema); - // LOGGER.info("VKVK updatedValue: " + updatedValue); + for (Field field : value.schema().fields()) { if (isSimplifiableField(field)) { Struct fieldValue = (Struct) value.get(field); if (fieldValue != null) { updatedValue.put(field.name(), fieldValue.get("value")); } - else { - // remove the field from the schema which has the null value - } - // updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value")); } else { - // updatedValue.put(field.name(), value.get(field)); } } - // LOGGER.info("VKVK updatedValue2: " + updatedValue); - + return new org.yb.util.Pair(updatedSchema, updatedValue); } }