diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index e0ac692e60afd..ba158602e22a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -429,9 +429,6 @@ synchronized void journal(RequestInfo reqInfo, LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId + " ; journal id: " + journalId); } - if (cache != null) { - cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion); - } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are @@ -439,6 +436,7 @@ synchronized void journal(RequestInfo reqInfo, boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; + JournalFaultInjector.get().writeEdits(); curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); StopWatch sw = new StopWatch(); @@ -469,6 +467,12 @@ synchronized void journal(RequestInfo reqInfo, updateHighestWrittenTxId(lastTxnId); nextTxId = lastTxnId + 1; + + // Only cache the edits if we successfully persisted them + if (cache != null) { + cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion); + } + lastJournalTimestamp = Time.now(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java index a3aa532fb7ffd..16d26513abce2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java @@ -38,4 +38,5 @@ public static JournalFaultInjector get() { public void beforePersistPaxosData() throws IOException {} public void afterPersistPaxosData() throws IOException {} + public void writeEdits() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index bf3b0896bf578..511848a970d47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -518,6 +519,29 @@ public void testReadFromCache() throws Exception { assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion); } + @Test + public void testOnlyReadPersistedEditsFromCache() throws Exception { + JournalFaultInjector faultInjector = Mockito.mock(JournalFaultInjector.class); + JournalFaultInjector.instance = faultInjector; + + journal.newEpoch(FAKE_NSINFO, 1); + journal.startLogSegment(makeRI(1), 1, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5)); + assertJournaledEditsTxnCountAndContents(1, 10, 5, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + + Mockito.doThrow(new IOException("Injected")).when(faultInjector) + .writeEdits(); + assertThrows(IOException.class, () -> + journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5))); + Mockito.reset(faultInjector); + + // We should not see the edits that failed to persist to storage + assertJournaledEditsTxnCountAndContents(1, 10, 5, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + } + private void assertJournaledEditsTxnCountAndContents(int startTxn, int requestedMaxTxns, int expectedEndTxn, int layoutVersion) throws Exception {