Skip to content

Commit f2da62b

Browse files
committed
debezium/dbz#1279 Fix archive log only mode logic
Signed-off-by: Chris Cranford <chris@hibernate.org>
1 parent 8d247d6 commit f2da62b

File tree

2 files changed

+16
-17
lines changed

2 files changed

+16
-17
lines changed

debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/AbstractLogMinerStreamingChangeEventSource.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ protected boolean waitForRangeAvailabilityInArchiveLogs(Scn startScn, Scn endScn
835835
}
836836
}
837837
else if (isNoDataProcessedInBatchAndAtEndOfArchiveLogs()) {
838-
if (endScn.compareTo(getMaximumArchiveLogsScn()) == 0) {
838+
if (endScn.compareTo(getMaximumArchiveLogsScn(startScn)) == 0) {
839839
// Prior iteration mined up to the last entry in the archive logs and no data was returned.
840840
return isArchiveLogOnlyModeAndScnIsNotAvailable(endScn.add(Scn.ONE));
841841
}
@@ -861,7 +861,7 @@ else if (isNoDataProcessedInBatchAndAtEndOfArchiveLogs()) {
861861
* @throws SQLException if a database exception is thrown
862862
*/
863863
protected Scn calculateUpperBounds(Scn lowerBoundsScn, Scn previousUpperBounds, Scn currentScn) throws SQLException {
864-
final Scn maximumScn = getConfig().isArchiveLogOnlyMode() ? getMaximumArchiveLogsScn() : currentScn;
864+
final Scn maximumScn = getConfig().isArchiveLogOnlyMode() ? getMaximumArchiveLogsScn(lowerBoundsScn) : currentScn;
865865

866866
final Scn maximumBatchScn = lowerBoundsScn.add(Scn.valueOf(metrics.getBatchSize()));
867867
final Scn defaultBatchSizeScn = Scn.valueOf(connectorConfig.getLogMiningBatchSizeDefault());
@@ -1072,11 +1072,9 @@ protected Scn getCurrentScn() throws SQLException {
10721072
*
10731073
* @return the maximum SCN, never {@code null}
10741074
*/
1075-
protected Scn getMaximumArchiveLogsScn() {
1076-
final List<LogFile> archiveLogs = (currentLogFiles == null)
1077-
? Collections.emptyList()
1078-
: currentLogFiles.stream().filter(LogFile::isArchive).toList();
1079-
1075+
protected Scn getMaximumArchiveLogsScn(Scn startScn) throws SQLException {
1076+
// It is safe to query these in real-time
1077+
final List<LogFile> archiveLogs = logCollector.getLogs(startScn).stream().filter(LogFile::isArchive).toList();
10801078
if (archiveLogs.isEmpty()) {
10811079
throw new DebeziumException("Cannot get maximum archive log SCN as no archive logs are present.");
10821080
}
@@ -1167,11 +1165,6 @@ protected void collectLogs(Scn lowerBoundsScn, Scn upperBoundsScn) throws SQLExc
11671165
}
11681166

11691167
sessionLogFiles = sessionLogFilesToAdd;
1170-
1171-
currentRedoLogSequences = currentLogFiles.stream()
1172-
.filter(LogFile::isCurrent)
1173-
.map(LogFile::getSequence)
1174-
.toList();
11751168
}
11761169

11771170
metrics.setRedoLogStatuses(jdbcConnection.queryAndMap(
@@ -1205,6 +1198,12 @@ protected void applyLogsToSession(boolean postMiningSessionEnded) throws SQLExce
12051198
sessionContext.addLogFile(logFile.getFileName());
12061199
}
12071200

1201+
// These need to be updated when we prepare the session so that log switch check works
1202+
currentRedoLogSequences = currentLogFiles.stream()
1203+
.filter(LogFile::isCurrent)
1204+
.map(LogFile::getSequence)
1205+
.toList();
1206+
12081207
metrics.setMinedLogFileNames(sessionLogFiles.stream()
12091208
.map(LogFile::getFileName)
12101209
.collect(Collectors.toSet()));

debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/unbuffered/UnbufferedLogMinerStreamingChangeEventSource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,21 @@ protected void executeLogMiningStreaming() throws Exception {
116116
// except once per iteration.
117117
databaseOffset = getMetrics().getDatabaseOffset();
118118

119-
minLogScn = computeResumeScnAndUpdateOffsets(minLogScn, minCommitScn);
120-
getMetrics().setOffsetScn(minLogScn);
121-
122119
Scn currentScn = getCurrentScn();
123120
getMetrics().setCurrentScn(currentScn);
124121

122+
collectLogs(minLogScn, getCurrentScn());
123+
124+
minLogScn = computeResumeScnAndUpdateOffsets(minLogScn, minCommitScn);
125+
getMetrics().setOffsetScn(minLogScn);
126+
125127
upperBoundsScn = calculateUpperBounds(minLogScn, upperBoundsScn, currentScn);
126128
if (upperBoundsScn.isNull()) {
127129
LOGGER.debug("Delaying mining transaction logs by one iteration");
128130
pauseBetweenMiningSessions();
129131
continue;
130132
}
131133

132-
collectLogs(minLogScn, getCurrentScn());
133-
134134
if (firstBatch) {
135135
applyLogsToSession(false);
136136
firstBatch = false;

0 commit comments

Comments
 (0)