Skip to content

Commit dfc7611

Browse files
author
Himanshu Gwalani
committed
HOENIX-7669 Enhance Header and Trailer validations to gracefully handle unclosed files
1 parent 1e1b9e5 commit dfc7611

File tree

12 files changed

+252
-125
lines changed

12 files changed

+252
-125
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.phoenix.replication.log;
19+
20+
import java.io.IOException;
21+
22+
/** Exception thrown when a log file has an invalid header. */
23+
public class InvalidLogHeaderException extends IOException {
24+
private static final long serialVersionUID = 1L;
25+
26+
public InvalidLogHeaderException(String message) {
27+
super(message);
28+
}
29+
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.phoenix.replication.log;
19+
20+
import java.io.IOException;
21+
22+
/** Exception thrown when a log file has an invalid trailer. */
23+
public class InvalidLogTrailerException extends IOException {
24+
private static final long serialVersionUID = 1L;
25+
26+
public InvalidLogTrailerException(String message) {
27+
super(message);
28+
}
29+
30+
}

phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.DataOutput;
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
25+
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.fs.FSDataInputStream;
2627
import org.apache.hadoop.fs.FileSystem;
2728
import org.apache.hadoop.fs.Path;
@@ -418,21 +419,29 @@ interface Decoder {
418419

419420
/**
420421
* Utility for determining if a file is a valid replication log file.
421-
* @param fs The FileSystem
422-
* @param path Path to the potential replication log file
422+
* @param conf The Configuration
423+
* @param fs The FileSystem
424+
* @param path Path to the potential replication log file
425+
* @param validateTrailer Whether to validate the trailer
423426
* @return true if the file is a valid replication log file, false otherwise
424427
* @throws IOException if an I/O problem was encountered
425428
*/
426-
static boolean isValidLogFile(final FileSystem fs, final Path path) throws IOException {
429+
static boolean isValidLogFile(final Configuration conf, final FileSystem fs, final Path path,
430+
final boolean validateTrailer) throws IOException {
427431
long length = fs.getFileStatus(path).getLen();
428432
try (FSDataInputStream in = fs.open(path)) {
429-
if (LogFileTrailer.isValidTrailer(in, length)) {
430-
return true;
431-
} else {
432-
// Not a valid trailer, do we need to do something (set a flag)?
433-
// Fall back to checking the header.
434-
return LogFileHeader.isValidHeader(in);
433+
// Check if the file is too short to be a valid log file.
434+
if (length < LogFileHeader.HEADERSIZE) {
435+
return false;
435436
}
437+
try (LogFileFormatReader reader = new LogFileFormatReader()) {
438+
LogFileReaderContext context = new LogFileReaderContext(conf).setFilePath(path)
439+
.setFileSize(length).setValidateTrailer(validateTrailer);
440+
reader.init(context, (SeekableDataInput) in);
441+
} catch (InvalidLogHeaderException | InvalidLogTrailerException e) {
442+
return false;
443+
}
444+
return true;
436445
}
437446
}
438447

phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public class LogFileFormatReader implements Closeable {
4747
private ByteBuffer currentBlockBuffer;
4848
private long currentBlockDataBytes;
4949
private long currentBlockConsumedBytes;
50-
private boolean trailerValidated;
5150
private CRC64 crc = new CRC64();
5251

5352
public LogFileFormatReader() {
@@ -61,13 +60,13 @@ public void init(LogFileReaderContext context, SeekableDataInput input) throws I
6160
this.currentBlockConsumedBytes = 0;
6261
try {
6362
readAndValidateTrailer();
64-
trailerValidated = true;
6563
} catch (IOException e) {
64+
// If we are validating the trailer, we cannot proceed without it.
65+
if (context.isValidateTrailer()) {
66+
throw e;
67+
}
6668
// Log warning, trailer might be missing or corrupt, proceed without it
67-
LOG.warn(
68-
"Failed to read or validate Log trailer for path: "
69-
+ (context != null ? context.getFilePath() : "unknown") + ". Proceeding without trailer.",
70-
e);
69+
LOG.warn("Failed to validate Log trailer for " + context.getFilePath() + ", proceeding", e);
7170
trailer = null; // Ensure trailer is null if reading/validation failed
7271
}
7372
this.decoder = null;
@@ -78,8 +77,7 @@ public void init(LogFileReaderContext context, SeekableDataInput input) throws I
7877

7978
private void readAndValidateTrailer() throws IOException {
8079
if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) {
81-
throw new IOException("File size " + context.getFileSize()
82-
+ " is smaller than the fixed trailer size " + LogFileTrailer.FIXED_TRAILER_SIZE);
80+
throw new InvalidLogTrailerException("Short file");
8381
}
8482
LogFileTrailer ourTrailer = new LogFileTrailer();
8583
// Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes back from end of file.
@@ -337,7 +335,7 @@ private long getEndOfDataOffset() throws IOException {
337335

338336
// Validates read counts against trailer counts if trailer was successfully read
339337
private void validateReadCounts() {
340-
if (!trailerValidated || trailer == null) {
338+
if (trailer == null) {
341339
return;
342340
}
343341
if (trailer.getBlockCount() != context.getBlocksRead()) {
@@ -367,8 +365,7 @@ public String toString() {
367365
return "LogFileFormatReader [context=" + context + ", decoder=" + decoder + ", input=" + input
368366
+ ", header=" + header + ", trailer=" + trailer + ", currentPosition=" + currentPosition
369367
+ ", currentBlockBuffer=" + currentBlockBuffer + ", currentBlockUncompressedSize="
370-
+ currentBlockDataBytes + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes
371-
+ ", trailerValidated=" + trailerValidated + "]";
368+
+ currentBlockDataBytes + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes + "]";
372369
}
373370

374371
LogFile.Header getHeader() {

phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ public class LogFileFormatWriter implements Closeable {
4040
private SyncableDataOutput output;
4141
private ByteArrayOutputStream currentBlockBytes;
4242
private DataOutputStream blockDataStream;
43-
private boolean headerWritten = false;
44-
private boolean trailerWritten = false;
4543
private long recordCount = 0;
4644
private long blockCount = 0;
4745
private long blocksStartOffset = -1;
@@ -59,29 +57,21 @@ public void init(LogFileWriterContext context, SyncableDataOutput output) throws
5957
this.currentBlockBytes = new ByteArrayOutputStream();
6058
this.blockDataStream = new DataOutputStream(currentBlockBytes);
6159
this.encoder = context.getCodec().getEncoder(blockDataStream);
60+
// Write header immediately when file is created
61+
writeFileHeader();
6262
}
6363

6464
private void writeFileHeader() throws IOException {
65-
if (!headerWritten) {
66-
LogFileHeader header = new LogFileHeader();
67-
header.write(output);
68-
blocksStartOffset = output.getPos(); // First block starts after header
69-
headerWritten = true;
70-
}
65+
LogFileHeader header = new LogFileHeader();
66+
header.write(output);
67+
blocksStartOffset = output.getPos(); // First block starts after header
7168
}
7269

7370
public long getBlocksStartOffset() {
7471
return blocksStartOffset;
7572
}
7673

7774
public void append(LogFile.Record record) throws IOException {
78-
if (!headerWritten) {
79-
// Lazily write file header
80-
writeFileHeader();
81-
}
82-
if (trailerWritten) {
83-
throw new IOException("Cannot append record after trailer has been written");
84-
}
8575
if (blockDataStream == null) {
8676
startBlock(); // Start the block if needed
8777
}
@@ -185,15 +175,10 @@ public long getPosition() throws IOException {
185175

186176
@Override
187177
public void close() throws IOException {
188-
// We use the fact we have already written the trailer as the boolean "closed" condition.
189-
if (trailerWritten) {
178+
if (output == null) {
190179
return;
191180
}
192181
try {
193-
// We might be closing an empty file, handle this case correctly.
194-
if (!headerWritten) {
195-
writeFileHeader();
196-
}
197182
// Close any outstanding block.
198183
closeBlock();
199184
// After we write the trailer we consider the file closed.
@@ -206,6 +191,7 @@ public void close() throws IOException {
206191
} catch (IOException e) {
207192
LOG.error("Exception while closing LogFormatWriter", e);
208193
}
194+
output = null;
209195
}
210196
}
211197
}
@@ -215,7 +201,6 @@ private void writeTrailer() throws IOException {
215201
new LogFileTrailer().setRecordCount(recordCount).setBlockCount(blockCount)
216202
.setBlocksStartOffset(blocksStartOffset).setTrailerStartOffset(output.getPos());
217203
trailer.write(output);
218-
trailerWritten = true;
219204
try {
220205
output.sync();
221206
} catch (IOException e) {
@@ -227,8 +212,7 @@ private void writeTrailer() throws IOException {
227212
@Override
228213
public String toString() {
229214
return "LogFileFormatWriter [writerContext=" + context + ", currentBlockUncompressedBytes="
230-
+ currentBlockBytes + ", headerWritten=" + headerWritten + ", trailerWritten="
231-
+ trailerWritten + ", recordCount=" + recordCount + ", blockCount=" + blockCount
215+
+ currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" + blockCount
232216
+ ", blocksStartOffset=" + blocksStartOffset + "]";
233217
}
234218

phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919

2020
import java.io.DataInput;
2121
import java.io.DataOutput;
22+
import java.io.EOFException;
2223
import java.io.IOException;
2324
import java.util.Arrays;
24-
import org.apache.hadoop.fs.FSDataInputStream;
25-
import org.apache.hadoop.fs.FileSystem;
26-
import org.apache.hadoop.fs.Path;
2725
import org.apache.hadoop.hbase.util.Bytes;
2826

2927
public class LogFileHeader implements LogFile.Header {
@@ -35,7 +33,7 @@ public class LogFileHeader implements LogFile.Header {
3533
/** Current minor version of the replication log format */
3634
static final int VERSION_MINOR = 0;
3735

38-
static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE;
36+
static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE;
3937

4038
private int majorVersion = VERSION_MAJOR;
4139
private int minorVersion = VERSION_MINOR;
@@ -69,18 +67,27 @@ public LogFile.Header setMinorVersion(int minorVersion) {
6967
@Override
7068
public void readFields(DataInput in) throws IOException {
7169
byte[] magic = new byte[MAGIC.length];
72-
in.readFully(magic);
70+
try {
71+
in.readFully(magic);
72+
} catch (EOFException e) {
73+
throw (IOException) new InvalidLogHeaderException("Short magic").initCause(e);
74+
}
7375
if (!Arrays.equals(MAGIC, magic)) {
74-
throw new IOException("Invalid LogFile magic. Got " + Bytes.toStringBinary(magic)
76+
throw new InvalidLogHeaderException("Bad magic. Got " + Bytes.toStringBinary(magic)
7577
+ ", expected " + Bytes.toStringBinary(MAGIC));
7678
}
77-
majorVersion = in.readByte();
78-
minorVersion = in.readByte();
79+
try {
80+
majorVersion = in.readByte();
81+
minorVersion = in.readByte();
82+
} catch (EOFException e) {
83+
throw (IOException) new InvalidLogHeaderException("Short version").initCause(e);
84+
}
7985
// Basic version check for now. We assume semver conventions where only higher major
8086
// versions may be incompatible.
8187
if (majorVersion > VERSION_MAJOR) {
82-
throw new IOException("Unsupported LogFile version. Got major=" + majorVersion + " minor="
83-
+ minorVersion + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR);
88+
throw new InvalidLogHeaderException(
89+
"Unsupported version. Got major=" + majorVersion + " minor=" + minorVersion
90+
+ ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR);
8491
}
8592
}
8693

@@ -96,32 +103,6 @@ public int getSerializedLength() {
96103
return HEADERSIZE;
97104
}
98105

99-
public static boolean isValidHeader(final FileSystem fs, final Path path) throws IOException {
100-
if (fs.getFileStatus(path).getLen() < HEADERSIZE) {
101-
return false;
102-
}
103-
try (FSDataInputStream in = fs.open(path)) {
104-
return isValidHeader(in);
105-
}
106-
}
107-
108-
public static boolean isValidHeader(FSDataInputStream in) throws IOException {
109-
in.seek(0);
110-
byte[] magic = new byte[MAGIC.length];
111-
in.readFully(magic);
112-
if (!Arrays.equals(MAGIC, magic)) {
113-
return false;
114-
}
115-
int majorVersion = in.readByte();
116-
in.readByte(); // minorVersion, for now we don't use it
117-
// Basic version check for now. We assume semver conventions where only higher major
118-
// versions may be incompatible.
119-
if (majorVersion > VERSION_MAJOR) {
120-
return false;
121-
}
122-
return true;
123-
}
124-
125106
@Override
126107
public String toString() {
127108
return "LogFileHeader [majorVersion=" + majorVersion + ", minorVersion=" + minorVersion + "]";

phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public void close() throws IOException {
134134
throw e;
135135
} finally {
136136
closed = true;
137-
LOG.debug("Closed LogFileReader for path {}", context.getFilePath());
137+
LOG.debug("Closed LogFileReader for path {}",
138+
context != null ? context.getFilePath() : "null");
138139
}
139140
}
140141

0 commit comments

Comments
 (0)