Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication.log;

import java.io.IOException;

/** Exception thrown when a log file has an invalid header. */
public class InvalidLogHeaderException extends IOException {
private static final long serialVersionUID = 1L;

public InvalidLogHeaderException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication.log;

import java.io.IOException;

/** Exception thrown when a log file has an invalid trailer. */
public class InvalidLogTrailerException extends IOException {
private static final long serialVersionUID = 1L;

public InvalidLogTrailerException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -418,21 +419,29 @@ interface Decoder {

/**
* Utility for determining if a file is a valid replication log file.
* @param fs The FileSystem
* @param path Path to the potential replication log file
* @param conf The Configuration
* @param fs The FileSystem
* @param path Path to the potential replication log file
* @param validateTrailer Whether to validate the trailer
* @return true if the file is a valid replication log file, false otherwise
* @throws IOException if an I/O problem was encountered
*/
static boolean isValidLogFile(final FileSystem fs, final Path path) throws IOException {
static boolean isValidLogFile(final Configuration conf, final FileSystem fs, final Path path,
final boolean validateTrailer) throws IOException {
long length = fs.getFileStatus(path).getLen();
try (FSDataInputStream in = fs.open(path)) {
if (LogFileTrailer.isValidTrailer(in, length)) {
return true;
} else {
// Not a valid trailer, do we need to do something (set a flag)?
// Fall back to checking the header.
return LogFileHeader.isValidHeader(in);
// Check if the file is too short to be a valid log file.
if (length < LogFileHeader.HEADERSIZE) {
return false;
}
try (LogFileFormatReader reader = new LogFileFormatReader()) {
LogFileReaderContext context = new LogFileReaderContext(conf).setFilePath(path)
.setFileSize(length).setValidateTrailer(validateTrailer);
reader.init(context, (SeekableDataInput) in);
} catch (InvalidLogHeaderException | InvalidLogTrailerException e) {
return false;
}
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class LogFileFormatReader implements Closeable {
private ByteBuffer currentBlockBuffer;
private long currentBlockDataBytes;
private long currentBlockConsumedBytes;
private boolean trailerValidated;
private CRC64 crc = new CRC64();

public LogFileFormatReader() {
Expand All @@ -61,13 +60,13 @@ public void init(LogFileReaderContext context, SeekableDataInput input) throws I
this.currentBlockConsumedBytes = 0;
try {
readAndValidateTrailer();
trailerValidated = true;
} catch (IOException e) {
// If we are validating the trailer, we cannot proceed without it.
if (context.isValidateTrailer()) {
throw e;
}
// Log warning, trailer might be missing or corrupt, proceed without it
LOG.warn(
"Failed to read or validate Log trailer for path: "
+ (context != null ? context.getFilePath() : "unknown") + ". Proceeding without trailer.",
e);
LOG.warn("Failed to validate Log trailer for " + context.getFilePath() + ", proceeding", e);
trailer = null; // Ensure trailer is null if reading/validation failed
}
this.decoder = null;
Expand All @@ -78,8 +77,7 @@ public void init(LogFileReaderContext context, SeekableDataInput input) throws I

private void readAndValidateTrailer() throws IOException {
if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) {
throw new IOException("File size " + context.getFileSize()
+ " is smaller than the fixed trailer size " + LogFileTrailer.FIXED_TRAILER_SIZE);
throw new InvalidLogTrailerException("Short file");
}
LogFileTrailer ourTrailer = new LogFileTrailer();
// Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes back from end of file.
Expand Down Expand Up @@ -337,7 +335,7 @@ private long getEndOfDataOffset() throws IOException {

// Validates read counts against trailer counts if trailer was successfully read
private void validateReadCounts() {
if (!trailerValidated || trailer == null) {
if (trailer == null) {
return;
}
if (trailer.getBlockCount() != context.getBlocksRead()) {
Expand Down Expand Up @@ -367,8 +365,7 @@ public String toString() {
return "LogFileFormatReader [context=" + context + ", decoder=" + decoder + ", input=" + input
+ ", header=" + header + ", trailer=" + trailer + ", currentPosition=" + currentPosition
+ ", currentBlockBuffer=" + currentBlockBuffer + ", currentBlockUncompressedSize="
+ currentBlockDataBytes + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes
+ ", trailerValidated=" + trailerValidated + "]";
+ currentBlockDataBytes + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes + "]";
}

LogFile.Header getHeader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class LogFileFormatWriter implements Closeable {
private SyncableDataOutput output;
private ByteArrayOutputStream currentBlockBytes;
private DataOutputStream blockDataStream;
private boolean headerWritten = false;
private boolean trailerWritten = false;
private long recordCount = 0;
private long blockCount = 0;
private long blocksStartOffset = -1;
Expand All @@ -59,29 +57,21 @@ public void init(LogFileWriterContext context, SyncableDataOutput output) throws
this.currentBlockBytes = new ByteArrayOutputStream();
this.blockDataStream = new DataOutputStream(currentBlockBytes);
this.encoder = context.getCodec().getEncoder(blockDataStream);
// Write header immediately when file is created
writeFileHeader();
}

private void writeFileHeader() throws IOException {
if (!headerWritten) {
LogFileHeader header = new LogFileHeader();
header.write(output);
blocksStartOffset = output.getPos(); // First block starts after header
headerWritten = true;
}
LogFileHeader header = new LogFileHeader();
header.write(output);
blocksStartOffset = output.getPos(); // First block starts after header
}

public long getBlocksStartOffset() {
return blocksStartOffset;
}

public void append(LogFile.Record record) throws IOException {
if (!headerWritten) {
// Lazily write file header
writeFileHeader();
}
if (trailerWritten) {
throw new IOException("Cannot append record after trailer has been written");
}
if (blockDataStream == null) {
startBlock(); // Start the block if needed
}
Expand Down Expand Up @@ -185,15 +175,10 @@ public long getPosition() throws IOException {

@Override
public void close() throws IOException {
// We use the fact we have already written the trailer as the boolean "closed" condition.
if (trailerWritten) {
if (output == null) {
return;
}
try {
// We might be closing an empty file, handle this case correctly.
if (!headerWritten) {
writeFileHeader();
}
// Close any outstanding block.
closeBlock();
// After we write the trailer we consider the file closed.
Expand All @@ -206,6 +191,7 @@ public void close() throws IOException {
} catch (IOException e) {
LOG.error("Exception while closing LogFormatWriter", e);
}
output = null;
}
}
}
Expand All @@ -215,7 +201,6 @@ private void writeTrailer() throws IOException {
new LogFileTrailer().setRecordCount(recordCount).setBlockCount(blockCount)
.setBlocksStartOffset(blocksStartOffset).setTrailerStartOffset(output.getPos());
trailer.write(output);
trailerWritten = true;
try {
output.sync();
} catch (IOException e) {
Expand All @@ -227,8 +212,7 @@ private void writeTrailer() throws IOException {
@Override
public String toString() {
return "LogFileFormatWriter [writerContext=" + context + ", currentBlockUncompressedBytes="
+ currentBlockBytes + ", headerWritten=" + headerWritten + ", trailerWritten="
+ trailerWritten + ", recordCount=" + recordCount + ", blockCount=" + blockCount
+ currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" + blockCount
+ ", blocksStartOffset=" + blocksStartOffset + "]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;

public class LogFileHeader implements LogFile.Header {
Expand All @@ -35,7 +33,7 @@ public class LogFileHeader implements LogFile.Header {
/** Current minor version of the replication log format */
static final int VERSION_MINOR = 0;

static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE;
static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE;

private int majorVersion = VERSION_MAJOR;
private int minorVersion = VERSION_MINOR;
Expand Down Expand Up @@ -69,18 +67,27 @@ public LogFile.Header setMinorVersion(int minorVersion) {
@Override
public void readFields(DataInput in) throws IOException {
byte[] magic = new byte[MAGIC.length];
in.readFully(magic);
try {
in.readFully(magic);
} catch (EOFException e) {
throw (IOException) new InvalidLogHeaderException("Short magic").initCause(e);
}
if (!Arrays.equals(MAGIC, magic)) {
throw new IOException("Invalid LogFile magic. Got " + Bytes.toStringBinary(magic)
throw new InvalidLogHeaderException("Bad magic. Got " + Bytes.toStringBinary(magic)
+ ", expected " + Bytes.toStringBinary(MAGIC));
}
majorVersion = in.readByte();
minorVersion = in.readByte();
try {
majorVersion = in.readByte();
minorVersion = in.readByte();
} catch (EOFException e) {
throw (IOException) new InvalidLogHeaderException("Short version").initCause(e);
}
// Basic version check for now. We assume semver conventions where only higher major
// versions may be incompatible.
if (majorVersion > VERSION_MAJOR) {
throw new IOException("Unsupported LogFile version. Got major=" + majorVersion + " minor="
+ minorVersion + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR);
throw new InvalidLogHeaderException(
"Unsupported version. Got major=" + majorVersion + " minor=" + minorVersion
+ ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR);
}
}

Expand All @@ -96,32 +103,6 @@ public int getSerializedLength() {
return HEADERSIZE;
}

public static boolean isValidHeader(final FileSystem fs, final Path path) throws IOException {
if (fs.getFileStatus(path).getLen() < HEADERSIZE) {
return false;
}
try (FSDataInputStream in = fs.open(path)) {
return isValidHeader(in);
}
}

public static boolean isValidHeader(FSDataInputStream in) throws IOException {
in.seek(0);
byte[] magic = new byte[MAGIC.length];
in.readFully(magic);
if (!Arrays.equals(MAGIC, magic)) {
return false;
}
int majorVersion = in.readByte();
in.readByte(); // minorVersion, for now we don't use it
// Basic version check for now. We assume semver conventions where only higher major
// versions may be incompatible.
if (majorVersion > VERSION_MAJOR) {
return false;
}
return true;
}

@Override
public String toString() {
return "LogFileHeader [majorVersion=" + majorVersion + ", minorVersion=" + minorVersion + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void close() throws IOException {
throw e;
} finally {
closed = true;
LOG.debug("Closed LogFileReader for path {}", context.getFilePath());
LOG.debug("Closed LogFileReader for path {}",
context != null ? context.getFilePath() : "null");
}
}

Expand Down
Loading