Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFactory<YBPartition, YugabyteDBOffsetContext> {

private final YugabyteDBConnectorConfig configuration;
private final YugabyteDBConnection jdbcConnection;
private final ErrorHandler errorHandler;
private final YugabyteDBEventDispatcher<TableId> dispatcher;
private final Clock clock;
Expand All @@ -43,7 +42,6 @@ public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFact

public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuration,
Snapshotter snapshotter,
YugabyteDBConnection jdbcConnection,
ErrorHandler errorHandler,
YugabyteDBEventDispatcher<TableId> dispatcher,
Clock clock, YugabyteDBSchema schema,
Expand All @@ -53,7 +51,6 @@ public YugabyteDBChangeEventSourceFactory(YugabyteDBConnectorConfig configuratio
SlotState startingSlotInfo,
ChangeEventQueue<DataChangeEvent> queue) {
this.configuration = configuration;
this.jdbcConnection = jdbcConnection;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
Expand All @@ -73,7 +70,6 @@ public SnapshotChangeEventSource<YBPartition, YugabyteDBOffsetContext> getSnapsh
configuration,
taskContext,
snapshotter,
jdbcConnection,
schema,
dispatcher,
clock,
Expand All @@ -85,7 +81,6 @@ public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStrea
return new YugabyteDBStreamingChangeEventSource(
configuration,
snapshotter,
jdbcConnection,
dispatcher,
errorHandler,
clock,
Expand All @@ -94,19 +89,4 @@ public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStrea
replicationConnection,
queue);
}

@Override
public Optional<IncrementalSnapshotChangeEventSource<YBPartition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(YugabyteDBOffsetContext offsetContext,
SnapshotProgressListener snapshotProgressListener,
DataChangeEventListener dataChangeEventListener) {
final SignalBasedIncrementalSnapshotChangeEventSource<YBPartition, TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<YBPartition, TableId>(
configuration,
jdbcConnection,
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener);
return Optional.of(incrementalSnapshotChangeEventSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class YugabyteDBChangeRecordEmitter extends RelationalChangeRecordEmitter
private final ReplicationMessage message;
private final YugabyteDBSchema schema;
private final YugabyteDBConnectorConfig connectorConfig;
private final YugabyteDBConnection connection;
private final TableId tableId;

private boolean shouldSendBeforeImage = false;
Expand All @@ -52,15 +51,14 @@ public class YugabyteDBChangeRecordEmitter extends RelationalChangeRecordEmitter

public YugabyteDBChangeRecordEmitter(YBPartition partition, OffsetContext offset, Clock clock,
YugabyteDBConnectorConfig connectorConfig,
YugabyteDBSchema schema, YugabyteDBConnection connection,
YugabyteDBSchema schema,
TableId tableId, ReplicationMessage message, String pgSchemaName,
String tabletId, boolean shouldSendBeforeImage) {
super(partition, offset, clock);

this.schema = schema;
this.message = message;
this.connectorConfig = connectorConfig;
this.connection = connection;

this.pgSchemaName = pgSchemaName;

Expand Down Expand Up @@ -184,17 +182,13 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
? schemaColumns.size()
: columnsWithoutToasted.size()];

final Set<String> undeliveredToastableColumns = new HashSet<>(schema
.getToastableColumnsForTableId(table.id()));
for (ReplicationMessage.Column column : columns) {
// DBZ-298 Quoted column names will be sent like that in messages,
// but stored unquoted in the column names
final String columnName = Strings.unquoteIdentifierPart(column.getName());
undeliveredToastableColumns.remove(columnName);
int position = getPosition(columnName, table, values);
if (position != -1) {
Object value = column.getValue(() -> (BaseConnection) connection.connection(),
connectorConfig.includeUnknownDatatypes());
Object value = column.getValue(connectorConfig.includeUnknownDatatypes());
// values[position] = value;
values[position] = new Object[]{ value, Boolean.TRUE };
}
Expand Down Expand Up @@ -227,18 +221,14 @@ private Object[] updatedColumnValues(List<ReplicationMessage.Column> columns, Ta

// initialize to unset

final Set<String> undeliveredToastableColumns = new HashSet<>(schema
.getToastableColumnsForTableId(table.id()));
for (ReplicationMessage.Column column : columns) {
// DBZ-298 Quoted column names will be sent like that in messages,
// but stored unquoted in the column names
final String columnName = Strings.unquoteIdentifierPart(column.getName());
undeliveredToastableColumns.remove(columnName);

int position = getPosition(columnName, table, values);
if (position != -1) {
Object value = column.getValue(() -> (BaseConnection) connection.connection(),
connectorConfig.includeUnknownDatatypes());
Object value = column.getValue(connectorConfig.includeUnknownDatatypes());

values[position] = new Object[]{ value, Boolean.TRUE };
}
Expand Down Expand Up @@ -284,8 +274,7 @@ private void refreshTableFromDatabase(TableId tableId) {
try {
// Using another implementation of refresh() to take into picture the schema information too.
LOGGER.debug("Refreshing schema for the table {}", tableId);
schema.refresh(connection, tableId,
connectorConfig.skipRefreshSchemaOnMissingToastableData(),
schema.refresh(tableId, connectorConfig.skipRefreshSchemaOnMissingToastableData(),
schema.getSchemaPBForTablet(tableId, tabletId), tabletId);
}
catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,25 @@ public void start(Map<String, String> props) {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (props == null) {
LOGGER.error("Configuring a maximum of {} tasks with no connector configuration" +
" available", maxTasks);
return Collections.emptyList();
List<Map<String, String>> taskConfigs = new ArrayList<>();
try {
if (props == null) {
LOGGER.error("Configuring a maximum of {} tasks with no connector configuration" +
" available", maxTasks);
return Collections.emptyList();
}

connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL);
taskConfigs = taskConfigsImpl(maxTasks);
} finally {
closeYBClient();
connection.close();
}

connection = new YugabyteDBConnection(yugabyteDBConnectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL);
return taskConfigs;
}

private List<Map<String, String>> taskConfigsImpl(int maxTasks) {
final Charset databaseCharset = connection.getDatabaseCharset();
String charSetName = databaseCharset.name();

Expand Down Expand Up @@ -120,7 +132,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
Map<String, ConfigValue> results = validateAllFields(config);

validateTServerConnection(results, config);

String streamIdValue = this.yugabyteDBConnectorConfig.streamId();
LOGGER.debug("The streamid in config is" + this.yugabyteDBConnectorConfig.streamId());

Expand Down Expand Up @@ -169,7 +181,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
}

LOGGER.debug("Configuring {} YugabyteDB connector task(s)", taskConfigs.size());
closeYBClient();

return taskConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public class YugabyteDBConnectorTask

private volatile YugabyteDBTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile YugabyteDBConnection jdbcConnection;
private volatile YugabyteDBConnection heartbeatConnection;
private volatile YugabyteDBSchema schema;

@Override
Expand Down Expand Up @@ -118,10 +116,6 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
databaseCharset,
typeRegistry);

// Global JDBC connection used both for snapshotting and streaming.
// Must be able to resolve datatypes.
jdbcConnection = new YugabyteDBConnection(connectorConfig.getJdbcConfig(), valueConverterBuilder, YugabyteDBConnection.CONNECTION_GENERAL);

// CDCSDK We can just build the type registry on the co-ordinator and then send the
// map of Postgres Type and Oid to the Task using Config
final YugabyteDBTypeRegistry yugabyteDBTypeRegistry =
Expand Down Expand Up @@ -163,24 +157,6 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
final YugabyteDBEventMetadataProvider metadataProvider = new YugabyteDBEventMetadataProvider();

Configuration configuration = connectorConfig.getConfig();
HeartbeatFactory heartbeatFactory = new HeartbeatFactory<>(
connectorConfig,
topicSelector,
schemaNameAdjuster,
() -> new YugabyteDBConnection(connectorConfig.getJdbcConfig(), YugabyteDBConnection.CONNECTION_GENERAL),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
// Postgres error admin_shutdown, see https://www.postgresql.org/docs/12/errcodes-appendix.html
throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
case "57P03":
// Postgres error cannot_connect_now, see https://www.postgresql.org/docs/12/errcodes-appendix.html
throw new RetriableException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
default:
break;
}
});

final YugabyteDBEventDispatcher<TableId> dispatcher = new YugabyteDBEventDispatcher<>(
connectorConfig,
Expand All @@ -191,9 +167,7 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
DataChangeEvent::new,
YugabyteDBChangeRecordEmitter::updateSchema,
metadataProvider,
heartbeatFactory,
schemaNameAdjuster,
jdbcConnection);
schemaNameAdjuster);

YugabyteDBChangeEventSourceCoordinator coordinator = new YugabyteDBChangeEventSourceCoordinator(
previousOffsets,
Expand All @@ -203,7 +177,6 @@ public ChangeEventSourceCoordinator<YBPartition, YugabyteDBOffsetContext> start(
new YugabyteDBChangeEventSourceFactory(
connectorConfig,
snapshotter,
jdbcConnection,
errorHandler,
dispatcher,
clock,
Expand Down Expand Up @@ -335,14 +308,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {

@Override
protected void doStop() {
if (jdbcConnection != null) {
jdbcConnection.close();
}

if (heartbeatConnection != null) {
heartbeatConnection.close();
}

if (schema != null) {
schema.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,12 @@ public class YugabyteDBEventDispatcher<T extends DataCollectionId> extends Event
private final LogicalDecodingMessageMonitor logicalDecodingMessageMonitor;
private final LogicalDecodingMessageFilter messageFilter;

public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
new HeartbeatFactory<>(connectorConfig, topicSelector, schemaNameAdjuster), schemaNameAdjuster, null);
}

public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
heartbeatFactory, schemaNameAdjuster, null);
}

public YugabyteDBEventDispatcher(YugabyteDBConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<YBPartition, T> inconsistentSchemaHandler,
EventMetadataProvider metadataProvider, HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster,
JdbcConnection jdbcConnection) {
EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
super(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, metadataProvider,
heartbeatFactory, schemaNameAdjuster);
new HeartbeatFactory<>(connectorConfig, topicSelector, schemaNameAdjuster), schemaNameAdjuster);
this.queue = queue;
this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage);
this.messageFilter = connectorConfig.getMessageFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,20 @@ public YugabyteDBOffsetContext(Offsets<YBPartition, YugabyteDBOffsetContext> pre
}

public static YugabyteDBOffsetContext initialContextForSnapshot(YugabyteDBConnectorConfig connectorConfig,
YugabyteDBConnection jdbcConnection,
Clock clock,
Set<YBPartition> partitions) {
return initialContext(connectorConfig, jdbcConnection, clock, new OpId(-1, -1, "".getBytes(), -1, 0),
return initialContext(connectorConfig, clock, new OpId(-1, -1, "".getBytes(), -1, 0),
new OpId(-1, -1, "".getBytes(), -1, 0), partitions);
}

public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig,
YugabyteDBConnection jdbcConnection,
Clock clock,
Set<YBPartition> partitions) {
return initialContext(connectorConfig, jdbcConnection, clock, new OpId(0, 0, "".getBytes(), 0, 0),
return initialContext(connectorConfig, clock, new OpId(0, 0, "".getBytes(), 0, 0),
new OpId(0, 0, "".getBytes(), 0, 0), partitions);
}

public static YugabyteDBOffsetContext initialContext(YugabyteDBConnectorConfig connectorConfig,
YugabyteDBConnection jdbcConnection,
Clock clock,
OpId lastCommitLsn,
OpId lastCompletelyProcessedLsn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,11 @@ private int resolveJdbcType(int nativeType) {
* @param tabletId the tablet UUId to refresh the schema for
* @throws SQLException if JDBC connection fails
*/
protected void refresh(YugabyteDBConnection connection, TableId tableId,
protected void refresh(TableId tableId,
boolean refreshToastableColumns, CdcService.CDCSDKSchemaPB schemaPB,
String tabletId) throws SQLException {
readSchemaWithTablet(null /* dummy object */, null, tableId.schema(), tableId::equals,
null, true, schemaPB, tableId, tabletId);

if (refreshToastableColumns) {
// and refresh toastable columns info
refreshToastableColumnsMap(connection, tableId);
}
}

protected boolean isFilteredOut(TableId id) {
Expand Down
Loading