Skip to content

Commit a6ab4ea

Browse files
nathan-smit-1Naros
authored andcommitted
debezium/dbz#1553 Allow mining session lower bound to advance when time threshold reached
Signed-off-by: nathan-smit-1 <nathansm@pepkorit.com>
1 parent 58bdca4 commit a6ab4ea

File tree

6 files changed

+241
-3
lines changed

6 files changed

+241
-3
lines changed

debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,17 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
511511
.withDescription(
512512
"The maximum number of milliseconds that a LogMiner session lives for before being restarted. Defaults to 0 (indefinite until a log switch occurs)");
513513

514+
public static final Field LOG_MINING_WINDOW_MAX_MS = Field.create("log.mining.window.max.ms")
515+
.withDisplayName("Maximum number of milliseconds that the mining window can span")
516+
.withType(Type.LONG)
517+
.withWidth(Width.SHORT)
518+
.withImportance(Importance.LOW)
519+
.withDefault(TimeUnit.MINUTES.toMillis(0))
520+
.withValidation(Field::isNonNegativeInteger)
521+
.withDescription("The maximum number of milliseconds that the mining window can span. " +
522+
"If a transaction remains open for longer than this duration, the mining window start SCN will be advanced " +
523+
"to minimize the window size, preventing it from growing indefinitely. Defaults to 0 (disabled).");
524+
514525
public static final Field LOG_MINING_RESTART_CONNECTION = Field.create("log.mining.restart.connection")
515526
.withDisplayName("Restarts Oracle database connection when reaching maximum session time or database log switch")
516527
.withType(Type.BOOLEAN)
@@ -852,6 +863,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
852863
LOG_MINING_LOG_BACKOFF_INITIAL_DELAY_MS,
853864
LOG_MINING_LOG_BACKOFF_MAX_DELAY_MS,
854865
LOG_MINING_SESSION_MAX_MS,
866+
LOG_MINING_WINDOW_MAX_MS,
855867
LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE,
856868
LOG_MINING_READ_ONLY,
857869
LOG_MINING_FLUSH_TABLE_NAME,
@@ -944,6 +956,7 @@ public static ConfigDef configDef() {
944956
private final Duration logMiningInitialDelay;
945957
private final Duration logMiningMaxDelay;
946958
private final Duration logMiningMaximumSession;
959+
private final Duration logMiningWindowMaxMs;
947960
private final TransactionSnapshotBoundaryMode logMiningTransactionSnapshotBoundaryMode;
948961
private final Boolean logMiningReadOnly;
949962
private final String logMiningFlushTableName;
@@ -1043,6 +1056,18 @@ public OracleConnectorConfig(Configuration config) {
10431056
this.logMiningClientIdExcludes = Strings.setOfTrimmed(config.getString(LOG_MINING_CLIENTID_EXCLUDE_LIST), String::new);
10441057
this.logMiningPathToDictionary = config.getString(LOG_MINING_PATH_DICTIONARY);
10451058
this.logMiningUseCteQuery = config.getBoolean(LOG_MINING_USE_CTE_QUERY);
1059+
1060+
// Initialize logMiningWindowMaxMs, but disable if CTE is enabled as they are incompatible
1061+
final Duration configuredWindowMaxMs = Duration.ofMillis(config.getLong(LOG_MINING_WINDOW_MAX_MS));
1062+
if (this.logMiningUseCteQuery && !configuredWindowMaxMs.isZero()) {
1063+
LOGGER.warn("The log.mining.window.max.ms feature is not compatible with log.mining.use.cte.query. " +
1064+
"The log.mining.window.max.ms feature will be disabled.");
1065+
this.logMiningWindowMaxMs = Duration.ZERO;
1066+
}
1067+
else {
1068+
this.logMiningWindowMaxMs = configuredWindowMaxMs;
1069+
}
1070+
10461071
this.readonlyHostname = config.getString(LOG_MINING_READONLY_HOSTNAME);
10471072
this.logMiningRedoThreadScnAdjustment = config.getInteger(LOG_MINING_REDO_THREAD_SCN_ADJUSTMENT);
10481073
this.logMiningHashAreaSize = config.getLong(LOG_MINING_HASH_AREA_SIZE);
@@ -2001,6 +2026,13 @@ public Optional<Duration> getLogMiningMaximumSession() {
20012026
return logMiningMaximumSession.toMillis() == 0L ? Optional.empty() : Optional.of(logMiningMaximumSession);
20022027
}
20032028

2029+
/**
2030+
* @return the maximum duration for the mining window
2031+
*/
2032+
public Duration getLogMiningWindowMaxMs() {
2033+
return logMiningWindowMaxMs;
2034+
}
2035+
20042036
/**
20052037
* @return how in-progress transactions are the snapshot boundary are to be handled.
20062038
*/

debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ public class OracleOffsetContext extends CommonOffsetContext<SourceInfo> {
2727

2828
public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx";
2929
public static final String SNAPSHOT_SCN_KEY = "snapshot_scn";
30+
public static final String WINDOW_ADVANCE_ENABLED_KEY = "window_advance_enabled";
3031

3132
private final Schema sourceInfoSchema;
3233

34+
private boolean windowAdvanceEnabled;
35+
3336
private final TransactionContext transactionContext;
3437
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
3538

@@ -52,12 +55,13 @@ private OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Long
5255
Scn snapshotScn, Map<String, Scn> snapshotPendingTransactions, SnapshotType snapshot,
5356
boolean snapshotCompleted, TransactionContext transactionContext,
5457
IncrementalSnapshotContext<TableId> incrementalSnapshotContext,
55-
String transactionId, Long transactionSequence) {
58+
String transactionId, Long transactionSequence, boolean windowAdvanceEnabled) {
5659
super(new SourceInfo(connectorConfig), snapshotCompleted);
5760
sourceInfo.setScn(scn);
5861
sourceInfo.setScnIndex(scnIndex);
5962
sourceInfo.setTransactionId(transactionId);
6063
sourceInfo.setTransactionSequence(transactionSequence);
64+
this.windowAdvanceEnabled = windowAdvanceEnabled;
6165
// It is safe to set this value to the supplied SCN, specifically for snapshots.
6266
// During streaming this value will be updated by the current event handler.
6367
sourceInfo.setEventScn(scn);
@@ -98,6 +102,7 @@ public static class Builder {
98102
private String transactionId;
99103
private Long transactionSequence;
100104
private CommitScn commitScn = CommitScn.empty();
105+
private boolean windowAdvanceEnabled;
101106

102107
public Builder logicalName(OracleConnectorConfig connectorConfig) {
103108
this.connectorConfig = connectorConfig;
@@ -164,10 +169,15 @@ public Builder commitScn(CommitScn commitScn) {
164169
return this;
165170
}
166171

172+
public Builder windowAdvanceEnabled(boolean windowAdvanceEnabled) {
173+
this.windowAdvanceEnabled = windowAdvanceEnabled;
174+
return this;
175+
}
176+
167177
public OracleOffsetContext build() {
168178
return new OracleOffsetContext(connectorConfig, scn, scnIndex, commitScn, lcrPosition, snapshotScn,
169179
snapshotPendingTransactions, snapshot, snapshotCompleted, transactionContext,
170-
incrementalSnapshotContext, transactionId, transactionSequence);
180+
incrementalSnapshotContext, transactionId, transactionSequence, windowAdvanceEnabled);
171181
}
172182
}
173183

@@ -218,6 +228,10 @@ public static Builder create() {
218228
}
219229
}
220230

231+
if (windowAdvanceEnabled) {
232+
result.put(WINDOW_ADVANCE_ENABLED_KEY, true);
233+
}
234+
221235
return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result));
222236
}
223237

@@ -294,6 +308,18 @@ public void setSnapshotPendingTransactions(Map<String, Scn> snapshotPendingTrans
294308
this.snapshotPendingTransactions = snapshotPendingTransactions;
295309
}
296310

311+
public boolean isWindowAdvanceEnabled() {
312+
return windowAdvanceEnabled;
313+
}
314+
315+
/**
316+
* Marks the window advance feature as having been enabled.
317+
* Once set to true, this cannot be unset (until offsets are cleared).
318+
*/
319+
public void setWindowAdvanceEnabled() {
320+
this.windowAdvanceEnabled = true;
321+
}
322+
297323
public void setTransactionId(String transactionId) {
298324
sourceInfo.setTransactionId(transactionId);
299325
}
@@ -493,6 +519,17 @@ public static Long loadTransactionSequence(Map<String, ?> offset) {
493519
return readOffsetValue(offset, SourceInfo.TXSEQ_KEY, Long.class);
494520
}
495521

522+
/**
523+
* Helper method to read whether window advance has been enabled from the offset map.
524+
*
525+
* @param offset the offset map
526+
* @return true if window advance has ever been enabled, false otherwise
527+
*/
528+
public static boolean loadWindowAdvanceEnabled(Map<String, ?> offset) {
529+
Boolean value = readOffsetValue(offset, WINDOW_ADVANCE_ENABLED_KEY, Boolean.class);
530+
return value != null && value;
531+
}
532+
496533
private static <T> T readOffsetValue(Map<String, ?> offsets, String key, Class<T> valueType) {
497534
final Object value = offsets.get(key);
498535
return valueType.isInstance(value) ? valueType.cast(value) : null;

debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/buffered/BufferedLogMinerOracleOffsetContextLoader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public OracleOffsetContext load(Map<String, ?> offset) {
4040
.transactionId(OracleOffsetContext.loadTransactionId(offset))
4141
.transactionSequence(OracleOffsetContext.loadTransactionSequence(offset))
4242
.incrementalSnapshotContext(SignalBasedIncrementalSnapshotContext.load(offset))
43+
.windowAdvanceEnabled(OracleOffsetContext.loadWindowAdvanceEnabled(offset))
4344
.build();
4445
}
4546

debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/buffered/BufferedLogMinerStreamingChangeEventSource.java

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class BufferedLogMinerStreamingChangeEventSource extends AbstractLogMiner
7474

7575
private static final Logger LOGGER = LoggerFactory.getLogger(BufferedLogMinerStreamingChangeEventSource.class);
7676
private static final Logger ABANDONED_DETAILS_LOGGER = LoggerFactory.getLogger(BufferedLogMinerStreamingChangeEventSource.class.getName() + ".AbandonedDetails");
77+
private static final Logger WINDOW_ADVANCED = LoggerFactory.getLogger(BufferedLogMinerStreamingChangeEventSource.class.getName() + ".WindowAdvanced");
78+
7779
private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
7880

7981
private final String queryString;
@@ -82,6 +84,7 @@ public class BufferedLogMinerStreamingChangeEventSource extends AbstractLogMiner
8284

8385
private Instant lastProcessedScnChangeTime = null;
8486
private Scn lastProcessedScn = Scn.NULL;
87+
private Scn lastLoggedWindowAdvanceScn = Scn.NULL;
8588

8689
public BufferedLogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
8790
OracleConnection jdbcConnection,
@@ -778,7 +781,13 @@ private ProcessResult calculateNewStartScn(Scn startScn, Scn endScn, Scn maxComm
778781

779782
if (!minCacheScn.isNull()) {
780783
// Cache have values
781-
final Scn miningSessionStartScn = minCacheScn.subtract(Scn.ONE);
784+
// By default, the mining window starts at the oldest transaction in the cache.
785+
// If window max is configured, it may be adjusted to not pin on long-running transactions.
786+
final Scn miningSessionStartScn = applyWindowMaxAdjustment(
787+
minCacheScn.subtract(Scn.ONE),
788+
endScn,
789+
minCacheScn,
790+
minCacheScnChangeTime);
782791

783792
getOffsetContext().setScn(miningSessionStartScn);
784793
getEventDispatcher().dispatchHeartbeatEvent(getPartition(), getOffsetContext());
@@ -801,6 +810,82 @@ private ProcessResult calculateNewStartScn(Scn startScn, Scn endScn, Scn maxComm
801810
}
802811
}
803812

813+
/**
814+
* Adjusts the mining session start SCN based on the window max duration threshold.
815+
* <p>
816+
* When {@code log.mining.window.max.ms} is configured, this method prevents long-running
817+
* transactions from pinning the mining window to an old position. If the oldest
818+
* transaction in the cache exceeds the window threshold, this method finds the oldest
819+
* transaction that falls within the acceptable window, or advances to the end SCN if all
820+
* transactions are too old.
821+
* <p>
822+
* Long-running transactions will still be captured when they eventually commit, but they
823+
* won't force the connector to re-mine an ever-growing window of redo logs.
824+
*
825+
* @param defaultStartScn the default start SCN (oldest transaction minus one)
826+
* @param endScn the current end SCN of the mining window
827+
* @param minCacheScn the SCN of the oldest transaction in the cache
828+
* @param minCacheScnChangeTime the change time of the oldest transaction in the cache
829+
* @return the adjusted start SCN, or {@code defaultStartScn} if no adjustment is needed
830+
*/
831+
private Scn applyWindowMaxAdjustment(Scn defaultStartScn, Scn endScn,
832+
Scn minCacheScn, Instant minCacheScnChangeTime) {
833+
final Duration windowMaxMs = getConfig().getLogMiningWindowMaxMs();
834+
if (windowMaxMs.toMillis() <= 0 || lastProcessedScnChangeTime == null) {
835+
return defaultStartScn;
836+
}
837+
838+
// Mark in offsets that the window advance feature has been enabled
839+
if (!getOffsetContext().isWindowAdvanceEnabled()) {
840+
getOffsetContext().setWindowAdvanceEnabled();
841+
}
842+
843+
final Instant thresholdTime = lastProcessedScnChangeTime.minus(windowMaxMs);
844+
845+
// Check if the oldest transaction exceeds the window threshold
846+
if (minCacheScnChangeTime == null || minCacheScnChangeTime.compareTo(thresholdTime) >= 0) {
847+
// Oldest transaction is within the window, no adjustment needed
848+
return defaultStartScn;
849+
}
850+
851+
// The oldest transaction exceeds the threshold, find a suitable start SCN
852+
// by looking for the oldest transaction that falls within the window
853+
final Optional<LogMinerTransactionCache.ScnDetails> activeScnDetails = getTransactionCache()
854+
.streamTransactionsAndReturn(stream -> stream
855+
.filter(t -> t.getChangeTime().compareTo(thresholdTime) >= 0)
856+
.map(t -> new LogMinerTransactionCache.ScnDetails(t.getStartScn(), t.getChangeTime()))
857+
.min(Comparator.comparing(LogMinerTransactionCache.ScnDetails::scn)));
858+
859+
Scn adjustedStartScn;
860+
if (activeScnDetails.isPresent()) {
861+
// Found a transaction within the window, use its start SCN
862+
adjustedStartScn = activeScnDetails.get().scn().subtract(Scn.ONE);
863+
}
864+
else {
865+
// All transactions are older than the window max duration
866+
// Advance to the end SCN (like we do when the cache is empty)
867+
adjustedStartScn = endScn.subtract(Scn.ONE);
868+
}
869+
870+
// Safety check: never advance past the end SCN
871+
final Scn maxAllowedScn = endScn.subtract(Scn.ONE);
872+
if (adjustedStartScn.compareTo(maxAllowedScn) > 0) {
873+
adjustedStartScn = maxAllowedScn;
874+
}
875+
876+
// Log a warning, but only once per oldest transaction SCN to avoid flooding logs
877+
if (!minCacheScn.equals(lastLoggedWindowAdvanceScn)) {
878+
WINDOW_ADVANCED.warn("Mining window lower bound advanced past transaction at SCN {} to SCN {} " +
879+
"due to log.mining.window.max.ms threshold ({}ms). " +
880+
"Long-running transactions older than the threshold will continue to be captured " +
881+
"but won't pin the mining window.",
882+
minCacheScn, adjustedStartScn, windowMaxMs.toMillis());
883+
lastLoggedWindowAdvanceScn = minCacheScn;
884+
}
885+
886+
return adjustedStartScn;
887+
}
888+
804889
/**
805890
* Calculates the smallest system change number currently in the transaction cache, if any exist.
806891
*

debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/buffered/AbstractBufferedLogMinerStreamingChangeEventSourceIT.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static org.assertj.core.api.Assertions.assertThat;
99

10+
import java.time.Duration;
1011
import java.util.List;
1112
import java.util.concurrent.TimeUnit;
1213

@@ -232,4 +233,58 @@ public void shouldLogAdditionalDetailsForAbandonedTransaction() throws Exception
232233
TestHelper.dropTable(connection, "dbz8044");
233234
}
234235
}
236+
237+
@Test
238+
@FixFor("DBZ-1553")
239+
public void shouldAdvanceMiningWindowForLongRunningTransaction() throws Exception {
240+
TestHelper.dropTable(connection, "dbz1553");
241+
try {
242+
connection.execute("CREATE TABLE dbz1553 (id numeric(9,0) primary key, data varchar2(50))");
243+
TestHelper.streamTable(connection, "dbz1553");
244+
245+
// Configure the connector with a 30 second window max
246+
Configuration config = getBufferImplementationConfig()
247+
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ1553")
248+
.with(OracleConnectorConfig.LOG_MINING_WINDOW_MAX_MS, "30000")
249+
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA)
250+
.build();
251+
252+
final LogInterceptor logInterceptor = new LogInterceptor(BufferedLogMinerStreamingChangeEventSource.class);
253+
start(OracleConnector.class, config);
254+
assertConnectorIsRunning();
255+
256+
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
257+
258+
// Start a long-running transaction that will not be committed
259+
connection.executeWithoutCommitting("INSERT INTO dbz1553 (id,data) values (1, 'long-running')");
260+
261+
// Wait for the window threshold to be exceeded and the mining window to be advanced.
262+
// The log message should appear once the mining window lower bound is moved past the
263+
// long-running transaction.
264+
Awaitility.await()
265+
.atMost(Duration.ofMinutes(2))
266+
.pollInterval(Duration.ofSeconds(5))
267+
.until(() -> logInterceptor.containsWarnMessage("Mining window lower bound advanced"));
268+
269+
// Verify the warning message indicates the window was advanced due to the threshold
270+
assertThat(logInterceptor.containsWarnMessage("due to log.mining.window.max.ms threshold")).isTrue();
271+
272+
// Now commit the long-running transaction
273+
connection.commit();
274+
275+
// Consume the record to verify the transaction was fully captured
276+
SourceRecords records = consumeRecordsByTopic(1);
277+
assertThat(records.allRecordsInOrder()).hasSize(1);
278+
279+
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ1553");
280+
assertThat(tableRecords).hasSize(1);
281+
282+
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
283+
assertThat(after.get("ID")).isEqualTo(1);
284+
assertThat(after.get("DATA")).isEqualTo("long-running");
285+
}
286+
finally {
287+
TestHelper.dropTable(connection, "dbz1553");
288+
}
289+
}
235290
}

0 commit comments

Comments
 (0)