Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;

import com.google.cloud.ReadChannel;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.cloud.hadoop.util.ErrorTypeExtractor;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
Expand Down Expand Up @@ -60,6 +59,7 @@ class GoogleCloudStorageClientReadChannel implements SeekableByteChannel {
private final GoogleCloudStorageReadOptions readOptions;
private final GoogleCloudStorageOptions storageOptions;
private final Storage storage;
private final BlobSourceOption[] storageReadOptions;
// The size of this object generation, in bytes.
private long objectSize;
private final ErrorTypeExtractor errorExtractor;
Expand Down Expand Up @@ -88,6 +88,7 @@ public GoogleCloudStorageClientReadChannel(
this.storageOptions = storageOptions;
this.contentReadChannel = new ContentReadChannel(readOptions, resourceId);
initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize());
this.storageReadOptions = generateReadOptions(resourceId);
}

protected void initMetadata(@Nullable String encoding, long sizeFromMetadata) throws IOException {
Expand Down Expand Up @@ -531,44 +532,31 @@ private void performPendingSeeks() {
}

private ReadableByteChannel getStorageReadChannel(long seek, long limit) throws IOException {
ReadChannel readChannel = storage.reader(blobId, generateReadOptions(blobId));
try {
readChannel.seek(seek);
readChannel.limit(limit);
// bypass the storage-client caching layer hence eliminates the need to maintain a copy of
// chunk
readChannel.setChunkSize(0);
return readChannel;
} catch (Exception e) {
GoogleCloudStorageEventBus.postOnException();
throw new IOException(
String.format(
"Unable to update the boundaries/Range of contentChannel %s. cause=%s",
resourceId, e),
e);
}
return RangeValidatingReadableByteChannel.of(storage, blobId, seek, limit,
storageReadOptions);
}

private BlobSourceOption[] generateReadOptions(BlobId blobId) {
List<BlobSourceOption> blobReadOptions = new ArrayList<>();
// To get decoded content
blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false));

if (blobId.getGeneration() != null) {
blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGeneration()));
}
if (storageOptions.getEncryptionKey() != null) {
blobReadOptions.add(
BlobSourceOption.decryptionKey(storageOptions.getEncryptionKey().value()));
}
return blobReadOptions.toArray(new BlobSourceOption[blobReadOptions.size()]);
}

private boolean isFooterRead() {
return objectSize - currentPosition <= readOptions.getMinRangeRequestSize();
}
}

private BlobSourceOption[] generateReadOptions(StorageResourceId blobId) {
List<BlobSourceOption> blobReadOptions = new ArrayList<>();
// enable transparent gzip-decompression
blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false));

if (blobId.getGenerationId() > StorageResourceId.UNKNOWN_GENERATION_ID) {
blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGenerationId()));
}
Comment on lines +545 to +552

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The parameter blobId is of type StorageResourceId, which could be confusing since there is also a com.google.cloud.storage.BlobId class used in this file. To improve clarity and avoid potential confusion, consider renaming it to resourceId to match its type and the variable name used at the call site in the constructor.

Suggested change
private BlobSourceOption[] generateReadOptions(StorageResourceId blobId) {
List<BlobSourceOption> blobReadOptions = new ArrayList<>();
// enable transparent gzip-decompression
blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false));
if (blobId.getGenerationId() > StorageResourceId.UNKNOWN_GENERATION_ID) {
blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGenerationId()));
}
private BlobSourceOption[] generateReadOptions(StorageResourceId resourceId) {
List<BlobSourceOption> blobReadOptions = new ArrayList<>();
// enable transparent gzip-decompression
blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false));
if (resourceId.getGenerationId() > StorageResourceId.UNKNOWN_GENERATION_ID) {
blobReadOptions.add(BlobSourceOption.generationMatch(resourceId.getGenerationId()));
}

if (storageOptions.getEncryptionKey() != null) {
blobReadOptions.add(
BlobSourceOption.decryptionKey(storageOptions.getEncryptionKey().value()));
}
return blobReadOptions.toArray(new BlobSourceOption[0]);
}

@VisibleForTesting
boolean randomAccessStatus() {
return contentReadChannel.fileAccessManager.shouldAdaptToRandomAccess();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.google.cloud.hadoop.gcsio;

import com.google.cloud.ReadChannel;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Locale;

final class RangeValidatingReadableByteChannel implements ReadableByteChannel {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final String resourceId;
private final long beginOffset;
private final long endOffset;
private final ReadableByteChannel delegate;

private long position;

/**
* @param endOffset expected to be <= to the length of the object -- this class does not possess
* the capability of clamping the end offset to the object size
*/
private RangeValidatingReadableByteChannel(
String resourceId, long beginOffset, long endOffset, ReadableByteChannel delegate) {
this.resourceId = resourceId;
this.beginOffset = beginOffset;
this.endOffset = endOffset;
this.position = beginOffset;
this.delegate = delegate;
}

@Override
public int read(ByteBuffer dst) throws IOException {
int expectedMaxRead;
long expectedChannelRemaining = endOffset - position;
if (expectedChannelRemaining < dst.remaining()) {
expectedMaxRead = Math.toIntExact(expectedChannelRemaining);
} else {
expectedMaxRead = dst.remaining();
}
Comment on lines +43 to +47

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The calculation for expectedMaxRead can be simplified and made more robust by ensuring it's never negative. Using Math.max(0, ...) will handle cases where position > endOffset cleanly, and Math.min can combine the logic into a single line.

    expectedMaxRead = (int) Math.min(dst.remaining(), Math.max(0, expectedChannelRemaining));

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dst.remaining will always return a value >= 0. This is already taken care of gemini. Also, blindly casing a long to an int can cause overflow, which is why Math.toIntExact is used instead.

int read = delegate.read(dst);
if (read > -1) {
position += read;
if (read > expectedMaxRead) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the logs received in the bug, buffer had some remaining space but channel end was crossed. So expectedMaxRead will always be greater than or equal to read and unlikely to log what is happening in the bug. I think we should log over shoot when position > endOffset to find out what is happening in the actual bug.

// over-read
logger.atWarning().log(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should close theRangeValidatingReadableByteChannel in GoogleCloudStorageClientReadChannel.java‎ when overshoot happens to avoid query failures.

"over-read of object %s detected. "
+ "Channel opened with {beginOffset: %s, endOffset: %s}, "
+ "over-read {position: %s, expectedMaxRead: %s, read: %s} (read %s additional bytes)",
resourceId,
beginOffset,
endOffset,
position,
expectedMaxRead,
read,
read - expectedMaxRead);
}
} else {
if (position < endOffset) {
// in reality, if this were to ever happen it should be handled as a retryable error, where
// the next read would begin at the current offset/position.
// under-read
throw new IOException(
String.format(
Locale.US,
"under-read of object %s detected. "
+ "Channel opened with {beginOffset: %s, endOffset: %s}, "
+ "EOF detected at position: %s (missing %s bytes)",
resourceId,
beginOffset,
endOffset,
position,
expectedChannelRemaining));
}
}

return read;
}

@Override
public boolean isOpen() {
return delegate.isOpen();
}

@Override
public void close() throws IOException {
delegate.close();
}

@VisibleForTesting
static RangeValidatingReadableByteChannel of(
String resourceId, long beginOffset, long endOffset, ReadableByteChannel delegate) {
return new RangeValidatingReadableByteChannel(resourceId, beginOffset, endOffset, delegate);
}

static RangeValidatingReadableByteChannel of(
Storage storage,
BlobId id,
long beginOffset,
long endOffset,
BlobSourceOption... storageReadOptions)
throws IOException {
ReadChannel readChannel = storage.reader(id, storageReadOptions);
try {
readChannel.seek(beginOffset);
readChannel.limit(endOffset);
// disable client level chunk buffering. This also makes the channel semi-non-blocking
readChannel.setChunkSize(0);
return of(id.toGsUtilUri(), beginOffset, endOffset, readChannel);
} catch (Exception e) {
GoogleCloudStorageEventBus.postOnException();
throw new IOException(
String.format(
"Unable to update the boundaries/Range of contentChannel %s. cause=%s",
id.toGsUtilUri(), e),
e);
}
}
}
Loading