Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# On your terminal, run the following to build the image:
# mvn clean package -Dquick

FROM debezium/connect:2.5.2.Final

WORKDIR $KAFKA_CONNECT_PLUGINS_DIR
RUN rm -f debezium-connector-postgres/debezium-connector-postgres-*.jar
RUN rm -rf debezium-connector-db2
RUN rm -rf debezium-connector-informix
RUN rm -rf debezium-connector-mongodb
RUN rm -rf debezium-connector-jdbc
RUN rm -rf debezium-connector-mysql
RUN rm -rf debezium-connector-oracle
RUN rm -rf debezium-connector-spanner
RUN rm -rf debezium-connector-sqlserver
RUN rm -rf debezium-connector-vitess
WORKDIR /

# Copy the Debezium Connector for Postgres adapted for YugabyteDB
COPY debezium-connector-postgres/target/debezium-connector-postgres-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres/

# Set the TLS version to be used by Kafka processes
#ENV KAFKA_OPTS="-Djdk.tls.client.protocols=TLSv1.2 -javaagent:/kafka/etc/jmx_prometheus_javaagent-0.17.2.jar=8080:/kafka/etc/jmx-exporter/metrics.yml"

# Add the required jar files to be packaged with the base connector
RUN cd $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres && curl -sLo kafka-connect-jdbc-10.6.5-CUSTOM.7.jar https://github.com/yugabyte/kafka-connect-jdbc/releases/download/10.6.5-CUSTOM.7/kafka-connect-jdbc-10.6.5-CUSTOM.7.jar
RUN cd $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres && curl -sLo jdbc-yugabytedb-42.3.5-yb-1.jar https://repo1.maven.org/maven2/com/yugabyte/jdbc-yugabytedb/42.3.5-yb-1/jdbc-yugabytedb-42.3.5-yb-1.jar
RUN cd $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres && curl -sLo transforms-for-apache-kafka-connect-1.5.0.zip https://github.com/Aiven-Open/transforms-for-apache-kafka-connect/releases/download/v1.5.0/transforms-for-apache-kafka-connect-1.5.0.zip
RUN cd $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres && unzip transforms-for-apache-kafka-connect-1.5.0.zip

# Add Jmx agent and metrics pattern file to expose the metrics info
#RUN mkdir /kafka/etc && cd /kafka/etc && curl -so jmx_prometheus_javaagent-0.17.2.jar https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.17.2/jmx_prometheus_javaagent-0.17.2.jar

#COPY metrics.yml /kafka/etc/jmx-exporter/

ENV CLASSPATH=$KAFKA_HOME
ENV JMXHOST=localhost
ENV JMXPORT=1976

# properties file having instructions to roll over log files in case the size exceeds a given limit
#COPY log4j.properties $KAFKA_HOME/config/log4j.properties

19 changes: 19 additions & 0 deletions debezium-connector-postgres/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,25 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws S
String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand);
LOGGER.info("Opening transaction with statement {}", transactionStatement);
jdbcConnection.executeWithoutCommitting(transactionStatement);

if (slotCreatedInfo != null && !isOnDemand) {
// Setting transaction snapshot separately otherwise we will get an exception with the error message:
// ERROR: cannot export/import a snapshot in Batch Execution.
jdbcConnection.executeWithoutCommitting("SET TRANSACTION SNAPSHOT '" + slotCreatedInfo.snapshotName() + "';");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,14 +510,18 @@ public synchronized void close() {
* @throws SQLException if anything fails.
*/
public Long currentTransactionId() throws SQLException {
AtomicLong txId = new AtomicLong(0);
query("select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid", rs -> {
if (rs.next()) {
txId.compareAndSet(0, rs.getLong(1));
}
});
long value = txId.get();
return value > 0 ? value : null;
// YB changes: Returning a dummy default value here as transaction ID is not being used
// anywhere to make any difference.
return 2L;

// AtomicLong txId = new AtomicLong(0);
// query("select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid", rs -> {
// if (rs.next()) {
// txId.compareAndSet(0, rs.getLong(1));
// }
// });
// long value = txId.get();
// return value > 0 ? value : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,11 @@ public Optional<SlotCreationResult> createReplicationSlot() throws SQLException

try (Statement stmt = pgConnection().createStatement()) {
String createCommand = String.format(
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s",
slotName,
tempPart,
plugin.getPostgresPluginName());
plugin.getPostgresPluginName(),
"EXPORT_SNAPSHOT");
LOGGER.info("Creating replication slot with command {}", createCommand);
stmt.execute(createCommand);
// when we are in Postgres 9.4+, we can parse the slot creation info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newS
* the same snapshot from the existing exported transaction as for the initial snapshot.
*/
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());

// YB Change: We can either reset the string or comment out the above logic.
// By resetting it to an empty string, we are effectively disabling the setting of snapshot
// in the same statement as that of setting the isolation level.
snapSet = "";

return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
}
return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo, isOnDemand);
Expand Down
Loading