Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@
<informix.db.name>testdb</informix.db.name>
<informix.image>informix-developer-database</informix.image>
<informix.image.registry>icr.io/informix</informix.image.registry>
<informix.image.tag>14.10.FC9W1DE</informix.image.tag>
<informix.version>14</informix.version>
<informix.image.tag>15.0</informix.image.tag>
<informix.version>15</informix.version>
<informix.init.timeout>300000</informix.init.timeout> <!-- five minutes max -->
<informix.run.wait.log>SCHAPI: Started \d dbWorker threads</informix.run.wait.log>
<informix.init.script.name>informix_post_init.sh</informix.init.script.name>
<informix.output.directory>config</informix.output.directory>
<informix.size>medium</informix.size>
<informix.init.script.name/>
<informix.sch.init.script.name/>
<informix.pre.init.script.name>informix_pre_init.sh</informix.pre.init.script.name>
<informix.post.init.script.name>informix_post_init.sh</informix.post.init.script.name>
<!-- Default values for the database -->
<informix.db.date>MDY4/</informix.db.date>
<informix.db.century>R</informix.db.century>
Expand Down Expand Up @@ -159,16 +162,11 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
</dependency>
<!-- Informix ChangeStream client -->
<dependency>
<groupId>com.ibm.informix</groupId>
<artifactId>ifx-changestream-client</artifactId>
<scope>provided</scope>
</dependency>
<!-- Informix JDBC driver -->
<dependency>
<groupId>com.ibm.informix</groupId>
<artifactId>jdbc</artifactId>
<version>15.0.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -295,14 +293,13 @@
<run>
<env>
<LICENSE>accept</LICENSE>
<SIZE>large</SIZE>
<TYPE>oltp</TYPE>
<USEOSTIME>1</USEOSTIME>
<SIZE>${informix.size}</SIZE>
<DB_SBSPACE>1</DB_SBSPACE>
<DBDATE>${informix.db.date}</DBDATE>
<DBCENTURY>${informix.db.century}</DBCENTURY>
<DBMONEY>${informix.db.money}</DBMONEY>
<RUN_FILE_POST_INIT>informix_post_init.sh</RUN_FILE_POST_INIT>
<RUN_FILE_PRE_INIT>${informix.pre.init.script.name}</RUN_FILE_PRE_INIT>
<RUN_FILE_POST_INIT>${informix.post.init.script.name}</RUN_FILE_POST_INIT>
</env>
<ports>
<port>${informix.port}:9088</port>
Expand All @@ -320,23 +317,45 @@
</run>
<build>
<from>${informix.image.registry}/${informix.image}:${informix.image.tag}</from>
<runCmds>
<run>sed -Ei 's/^(USEOSTIME)\s+\S/\1 1/' /opt/ibm/informix/etc/onconfig.std</run>
</runCmds>
<assembly>
<inline>
<includeBaseDirectory>false</includeBaseDirectory>
<files>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/${informix.version}/${informix.init.script.name}</source>
<outputDirectory>${informix.output.directory}</outputDirectory>
<outputDirectory>scripts</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/${informix.version}/testdb.sql</source>
<outputDirectory>informix/etc</outputDirectory>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/${informix.version}/${informix.sch.init.script.name}</source>
<outputDirectory>data</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/${informix.version}/${informix.pre.init.script.name}</source>
<outputDirectory>config</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/${informix.version}/${informix.post.init.script.name}</source>
<outputDirectory>config</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/${informix.version}/informix_config.custom</source>
<outputDirectory>data</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/testdb.sql</source>
<outputDirectory>data</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
</files>
</inline>
Expand Down Expand Up @@ -526,10 +545,13 @@
<informix.version>12</informix.version>
<informix.image>ibmcom-informix-developer-database</informix.image>
<informix.image.tag>12.10.FC12W1DE</informix.image.tag>
<informix.run.wait.log>Logical Log \d Complete</informix.run.wait.log>
<informix.run.wait.log>Checkpoint Completed: duration was \d seconds</informix.run.wait.log>
<informix.image.registry>quay.io/debezium</informix.image.registry>
<informix.size>custom</informix.size>
<informix.init.script.name>informix_init.sh</informix.init.script.name>
<informix.output.directory>scripts</informix.output.directory>
<informix.sch.init.script.name>sch_init_informix.custom.sql</informix.sch.init.script.name>
<informix.pre.init.script.name/>
<informix.post.init.script.name/>
<informix.db.date>MDY2.</informix.db.date>
<informix.db.century>C</informix.db.century>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
*/
package io.debezium.connector.informix;

import static com.informix.stream.api.IfmxStreamRecordType.AFTER_UPDATE;
import static com.informix.stream.api.IfmxStreamRecordType.BEFORE_UPDATE;
import static com.informix.stream.api.IfmxStreamRecordType.COMMIT;
import static com.informix.stream.api.IfmxStreamRecordType.DELETE;
import static com.informix.stream.api.IfmxStreamRecordType.INSERT;
import static com.informix.stream.api.IfmxStreamRecordType.ROLLBACK;
import static com.informix.stream.api.IfmxStreamRecordType.TRUNCATE;
import static com.informix.jdbc.stream.api.StreamRecordType.AFTER_UPDATE;
import static com.informix.jdbc.stream.api.StreamRecordType.BEFORE_UPDATE;
import static com.informix.jdbc.stream.api.StreamRecordType.COMMIT;
import static com.informix.jdbc.stream.api.StreamRecordType.DELETE;
import static com.informix.jdbc.stream.api.StreamRecordType.INSERT;
import static com.informix.jdbc.stream.api.StreamRecordType.ROLLBACK;
import static com.informix.jdbc.stream.api.StreamRecordType.TRUNCATE;

import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -29,12 +29,12 @@
import org.slf4j.LoggerFactory;

import com.informix.jdbc.IfmxTableDescriptor;
import com.informix.stream.api.IfmxStreamRecord;
import com.informix.stream.api.IfmxStreamRecordType;
import com.informix.stream.api.IfxTransactionEngine;
import com.informix.stream.cdc.IfxCDCEngine;
import com.informix.stream.cdc.records.IfxCDCBeginTransactionRecord;
import com.informix.stream.impl.IfxStreamException;
import com.informix.jdbc.stream.api.StreamRecord;
import com.informix.jdbc.stream.api.StreamRecordType;
import com.informix.jdbc.stream.api.TransactionEngine;
import com.informix.jdbc.stream.cdc.CDCEngine;
import com.informix.jdbc.stream.cdc.records.CDCBeginTransactionRecord;
import com.informix.jdbc.stream.impl.StreamException;

import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
Expand All @@ -45,18 +45,18 @@
* @author Lars M Johansson
*
*/
public class InformixCdcTransactionEngine implements IfxTransactionEngine {
public class InformixCdcTransactionEngine implements TransactionEngine {

private static final Logger LOGGER = LoggerFactory.getLogger(InformixCdcTransactionEngine.class);
private static final String PROCESSING_RECORD = "Processing {} record";
private static final String MISSING_TRANSACTION_START_FOR_RECORD = "Missing transaction start for record: {}";
protected final Builder builder;
protected final IfxCDCEngine engine;
protected final CDCEngine engine;
protected final ChangeEventSourceContext context;
protected EnumSet<IfmxStreamRecordType> operationFilters;
protected EnumSet<IfmxStreamRecordType> transactionFilters;
protected final Map<Integer, TransactionHolder> transactionMap;
protected boolean returnEmptyTransactions;
protected EnumSet<StreamRecordType> operationFilters;
protected EnumSet<StreamRecordType> transactionFilters;
protected final Map<Integer, TransactionHolder> transactionMap;
protected Map<String, TableId> tableIdByLabelId;

public static Builder builder(DataSource ds) {
Expand All @@ -67,41 +67,37 @@ protected InformixCdcTransactionEngine(Builder builder) {
this.builder = builder;
this.engine = builder.engine;
this.context = builder.context;
this.returnEmptyTransactions = builder.returnEmptyTransactions;
this.operationFilters = EnumSet.of(INSERT, DELETE, BEFORE_UPDATE, AFTER_UPDATE, TRUNCATE);
this.transactionFilters = EnumSet.of(COMMIT, ROLLBACK);
this.transactionMap = new ConcurrentSkipListMap<>();
this.returnEmptyTransactions = false;
}

@Override
public IfmxStreamRecord getRecord() throws SQLException, IfxStreamException {
IfmxStreamRecord streamRecord;
public StreamRecord getRecord() throws SQLException, StreamException {
StreamRecord streamRecord;
while (context.isRunning() && (streamRecord = engine.getRecord()) != null) {

TransactionHolder holder = transactionMap.get(streamRecord.getTransactionId());
if (holder != null) {
LOGGER.debug("Processing [{}] record for transaction id: {}", streamRecord.getType(), streamRecord.getTransactionId());
}
switch (streamRecord.getType()) {
case BEGIN:
case BEGIN -> {
holder = new TransactionHolder();
holder.beginRecord = (IfxCDCBeginTransactionRecord) streamRecord;
holder.beginRecord = (CDCBeginTransactionRecord) streamRecord;
transactionMap.put(streamRecord.getTransactionId(), holder);
LOGGER.debug("Watching transaction id: {}", streamRecord.getTransactionId());
break;
case INSERT:
case DELETE:
case BEFORE_UPDATE:
case AFTER_UPDATE:
case TRUNCATE:
}
case INSERT, DELETE, BEFORE_UPDATE, AFTER_UPDATE, TRUNCATE -> {
if (holder == null) {
LOGGER.warn(MISSING_TRANSACTION_START_FOR_RECORD, streamRecord);
break;
}
LOGGER.debug(PROCESSING_RECORD, streamRecord.getType());
holder.records.add(streamRecord);
break;
case DISCARD:
}
case DISCARD -> {
if (holder == null) {
LOGGER.warn(MISSING_TRANSACTION_START_FOR_RECORD, streamRecord);
break;
Expand All @@ -112,27 +108,23 @@ public IfmxStreamRecord getRecord() throws SQLException, IfxStreamException {
if (holder.records.removeIf(r -> r.getSequenceId() >= sequenceId)) {
LOGGER.debug("Discarding records with sequence >={}", sequenceId);
}
break;
case COMMIT:
case ROLLBACK:
}
case COMMIT, ROLLBACK -> {
if (holder == null) {
LOGGER.warn(MISSING_TRANSACTION_START_FOR_RECORD, streamRecord);
break;
}
LOGGER.debug(PROCESSING_RECORD, streamRecord.getType());
holder.closingRecord = streamRecord;
break;
case METADATA:
case TIMEOUT:
case ERROR:
}
case METADATA, TIMEOUT, ERROR -> {
if (holder == null) {
return streamRecord;
}
LOGGER.debug(PROCESSING_RECORD, streamRecord.getType());
holder.records.add(streamRecord);
break;
default:
LOGGER.warn("Unknown operation for record: {}", streamRecord);
}
default -> LOGGER.warn("Unknown operation for record: {}", streamRecord);
}
if (holder != null && holder.closingRecord != null) {
transactionMap.remove(streamRecord.getTransactionId());
Expand All @@ -146,22 +138,22 @@ public IfmxStreamRecord getRecord() throws SQLException, IfxStreamException {
}

@Override
public InformixStreamTransactionRecord getTransaction() throws SQLException, IfxStreamException {
IfmxStreamRecord streamRecord;
public InformixStreamTransactionRecord getTransaction() throws SQLException, StreamException {
StreamRecord streamRecord;
while ((streamRecord = getRecord()) != null && !(streamRecord instanceof InformixStreamTransactionRecord)) {
LOGGER.debug("Discard non-transaction record: {}", streamRecord);
}
return (InformixStreamTransactionRecord) streamRecord;
}

@Override
public InformixCdcTransactionEngine setOperationFilters(IfmxStreamRecordType... recordTypes) {
public InformixCdcTransactionEngine setOperationFilters(StreamRecordType... recordTypes) {
operationFilters = EnumSet.copyOf(Set.of(recordTypes));
return this;
}

@Override
public InformixCdcTransactionEngine setTransactionFilters(IfmxStreamRecordType... recordTypes) {
public InformixCdcTransactionEngine setTransactionFilters(StreamRecordType... recordTypes) {
transactionFilters = EnumSet.copyOf(Set.of(recordTypes));
return this;
}
Expand All @@ -173,7 +165,7 @@ public InformixCdcTransactionEngine returnEmptyTransactions(boolean returnEmptyT
}

@Override
public void init() throws SQLException, IfxStreamException {
public void init() throws SQLException, StreamException {
engine.init();

/*
Expand All @@ -186,7 +178,7 @@ public void init() throws SQLException, IfxStreamException {
}

@Override
public void close() throws IfxStreamException {
public void close() throws StreamException {
engine.close();
}

Expand All @@ -199,9 +191,9 @@ public Map<String, TableId> getTableIdByLabelId() {
}

protected static class TransactionHolder {
final List<IfmxStreamRecord> records = new ArrayList<>();
IfxCDCBeginTransactionRecord beginRecord;
IfmxStreamRecord closingRecord;
final List<StreamRecord> records = new ArrayList<>();
CDCBeginTransactionRecord beginRecord;
StreamRecord closingRecord;
}

public Builder getBuilder() {
Expand All @@ -210,12 +202,13 @@ public Builder getBuilder() {

public static class Builder {

private final IfxCDCEngine.Builder builder;
private IfxCDCEngine engine;
private final CDCEngine.Builder builder;
private CDCEngine engine;
private ChangeEventSourceContext context;
private boolean returnEmptyTransactions = false;

protected Builder(DataSource ds) {
builder = IfxCDCEngine.builder(ds);
builder = CDCEngine.builder(ds);
}

public DataSource getDataSource() {
Expand Down Expand Up @@ -264,12 +257,12 @@ public Builder watchTable(IfmxTableDescriptor desc, String... columns) {
return this;
}

public Builder watchTable(IfxCDCEngine.IfmxWatchedTable table) {
public Builder watchTable(CDCEngine.IfmxWatchedTable table) {
builder.watchTable(table);
return this;
}

public List<IfxCDCEngine.IfmxWatchedTable> getWatchedTables() {
public List<CDCEngine.IfmxWatchedTable> getWatchedTables() {
return builder.getWatchedTables();
}

Expand All @@ -278,6 +271,11 @@ public Builder stopLoggingOnClose(boolean stopOnClose) {
return this;
}

public Builder returnEmptyTransactions(boolean returnEmptyTransactions) {
this.returnEmptyTransactions = returnEmptyTransactions;
return this;
}

public InformixCdcTransactionEngine build() throws SQLException {
engine = builder.build();
return new InformixCdcTransactionEngine(this);
Expand Down
Loading
Loading