Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.google.cloud.hadoop.fs.gcs;

import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpExecuteInterceptor;
import com.google.api.client.http.HttpRequest;
import com.google.common.annotations.VisibleForTesting;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* Interceptor to simulate data or checksum corruption for testing GCS integrity checks. Wraps an
* existing interceptor to preserve original request initialization.
*/
@VisibleForTesting
public class CorruptionSimulatorInterceptor implements HttpExecuteInterceptor {

public enum CorruptionType {
NONE,
DATA, // Corrupts the upload body
CHECKSUM // Corrupts the x-goog-hash header
}

private final HttpExecuteInterceptor existingInterceptor;
private final CorruptionType corruptionType;

public CorruptionSimulatorInterceptor(
HttpExecuteInterceptor existingInterceptor, CorruptionType corruptionType) {
this.existingInterceptor = existingInterceptor;
this.corruptionType = corruptionType;
}

@Override
public void intercept(HttpRequest request) throws IOException {
// Run the existing interceptor (e.g., auth, logging)
if (existingInterceptor != null) {
existingInterceptor.intercept(request);
}

// Apply corruption logic based on type
if (corruptionType == CorruptionType.CHECKSUM) {
corruptChecksum(request);
} else if (corruptionType == CorruptionType.DATA) {
corruptData(request);
}
}

private void corruptChecksum(HttpRequest request) {
// Only corrupt if the header exists
if (request.getHeaders().containsKey("x-goog-hash")) {
// Overwrite valid checksum with a bad one
request.getHeaders().set("x-goog-hash", "crc32c=AAAAAA==");
}
}

private void corruptData(HttpRequest request) {
String method = request.getRequestMethod();
boolean hasHeader = request.getHeaders().containsKey("x-goog-hash");

// Only corrupt PUT requests that have a checksum header (final chunks/objects)
if (hasHeader && "PUT".equals(method)) {
HttpContent originalContent = request.getContent();
if (originalContent != null) {
request.setContent(new CorruptedHttpContent(originalContent));
}
}
}

/** Wrapper for HttpContent that corrupts the first byte of the stream. */
private static class CorruptedHttpContent implements HttpContent {
private final HttpContent delegate;

public CorruptedHttpContent(HttpContent delegate) {
this.delegate = delegate;
}

@Override
public long getLength() throws IOException {
return delegate.getLength();
}

@Override
public String getType() {
return delegate.getType();
}

@Override
public boolean retrySupported() {
return delegate.retrySupported();
}

@Override
public void writeTo(OutputStream out) throws IOException {
delegate.writeTo(new CorruptingOutputStream(out));
}
}

/** FilterOutputStream that flips the bits of the first byte written. */
private static class CorruptingOutputStream extends FilterOutputStream {
private boolean corrupted = false;

public CorruptingOutputStream(OutputStream out) {
super(out);
}

@Override
public void write(int b) throws IOException {
if (!corrupted) {
corrupted = true;
// Flip bits to corrupt
out.write(b ^ 1);
} else {
out.write(b);
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!corrupted && len > 0) {
byte[] corruptedCopy = new byte[len];
System.arraycopy(b, off, corruptedCopy, 0, len);
// Flip first byte
corruptedCopy[0] = (byte) (corruptedCopy[0] ^ 1);
corrupted = true;
out.write(corruptedCopy, 0, len);
} else {
out.write(b, off, len);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,39 @@

package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.fs.gcs.CorruptionSimulatorInterceptor.CorruptionType;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpExecuteInterceptor;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.storage.Storage;
import com.google.auth.Credentials;
import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemIntegrationHelper;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import com.google.cloud.hadoop.util.RetryHttpInitializer;
import com.google.cloud.hadoop.util.RetryHttpInitializerOptions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryNotEmptyException;
import java.util.EnumSet;
import java.util.Random;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -238,6 +258,35 @@ public void testDeleteRecursive_shouldDeleteAllInPath() throws IOException, URIS
assertThrows(FileNotFoundException.class, () -> ghfs.getFileStatus(testDir.getParent()));
}

@Test
public void testWriteWithTrailingChecksum_Enabled_MultiChunk() throws Exception {
Configuration config = GoogleHadoopFileSystemIntegrationHelper.getTestConfig();
config.setInt("fs.gs.outputstream.upload.chunk.size", 1 * 1024 * 1024);

GoogleHadoopFileSystem ghfs = new GoogleHadoopFileSystem();
ghfs.initialize(initUri, config);

Path testPath = new Path(initUri.resolve("/testWriteWithTrailingChecksum_Enabled.bin"));

int fileSize = (int) (2.5 * 1024 * 1024);
byte[] expectedData = new byte[fileSize];
new Random().nextBytes(expectedData);
// write data
try (FSDataOutputStream out = ghfs.create(testPath)) {
out.write(expectedData);
}

byte[] actualData = new byte[fileSize];
// read the data
try (FSDataInputStream in = ghfs.open(testPath)) {
in.readFully(actualData);
}

assertThat(actualData).isEqualTo(expectedData);

ghfs.delete(testPath, false);
}

@Test
public void testDeleteNotRecursive_shouldBeAppliedToHierarchyOfDirectories()
throws IOException, URISyntaxException {
Expand Down Expand Up @@ -322,4 +371,134 @@ public void testRenameInternal_shouldMakeOldPathNotFound()
assertThrows(FileNotFoundException.class, () -> ghfs.listStatus(srcPath));
assertThat(ghfs.listStatus(dstPath)).hasLength(1);
}

@Test
public void testWriteWithTrailingChecksum_SimulatedChecksumCorruption() throws Exception {
Configuration config = GoogleHadoopFileSystemIntegrationHelper.getTestConfig();
GoogleHadoopFileSystem ghfs = new GoogleHadoopFileSystem();
ghfs.initialize(initUri, config);

String bucket = initUri.getAuthority();
String objectName = "testWithCorruption_" + System.currentTimeMillis() + ".bin";
StorageResourceId resourceId = new StorageResourceId(bucket, objectName);

Credentials credentials = GoogleCloudStorageTestHelper.getCredentials();

HttpRequestInitializer sabotageInitializer =
request -> {
new RetryHttpInitializer(
credentials,
RetryHttpInitializerOptions.builder()
.setDefaultUserAgent("GHFS-Integration-Test-Corruption")
.build())
.initialize(request);

// Inject Sabotage Interceptor
final HttpExecuteInterceptor existingInterceptor = request.getInterceptor();
// Inject Sabotage Interceptor
request.setInterceptor(
new CorruptionSimulatorInterceptor(existingInterceptor, CorruptionType.CHECKSUM));
};

Storage storage =
new Storage.Builder(
GoogleNetHttpTransport.newTrustedTransport(),
GsonFactory.getDefaultInstance(),
sabotageInitializer)
.setApplicationName("GHFS-Corruption-Test")
.build();

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder().setUploadChunkSize(1024 * 1024).build();

GoogleCloudStorageWriteChannel channel =
new GoogleCloudStorageWriteChannel(
storage,
new ClientRequestHelper<>(),
Executors.newCachedThreadPool(),
writeOptions,
resourceId,
CreateObjectOptions.DEFAULT_OVERWRITE,
ObjectWriteConditions.NONE);

channel.initialize();

byte[] data = new byte[2 * 1024 * 1024];
new Random().nextBytes(data);
channel.write(ByteBuffer.wrap(data));

IOException e = assertThrows(IOException.class, () -> channel.close());

assertThat(e.getCause().getMessage()).contains("400");
assertThat(e.getCause().getMessage()).contains("CRC32C");

try {
ghfs.getGcsFs().getGcs().deleteObjects(java.util.Collections.singletonList(resourceId));
} catch (Exception ignored) {
}
}

@Test
public void testWriteWithTrailingChecksum_SimulatedDataCorruption() throws Exception {
Configuration config = GoogleHadoopFileSystemIntegrationHelper.getTestConfig();
GoogleHadoopFileSystem ghfs = new GoogleHadoopFileSystem();
ghfs.initialize(initUri, config);

String bucket = initUri.getAuthority();
String objectName = "corruption_test_" + System.currentTimeMillis() + ".bin";
StorageResourceId resourceId = new StorageResourceId(bucket, objectName);

Credentials credentials = GoogleCloudStorageTestHelper.getCredentials();

HttpRequestInitializer sabotageInitializer =
request -> {
new RetryHttpInitializer(
credentials,
RetryHttpInitializerOptions.builder()
.setDefaultUserAgent("GHFS-Corruption-Test")
.build())
.initialize(request);

final HttpExecuteInterceptor existingInterceptor = request.getInterceptor();
// Inject Sabotage Interceptor
request.setInterceptor(
new CorruptionSimulatorInterceptor(existingInterceptor, CorruptionType.DATA));
};

Storage storage =
new Storage.Builder(
GoogleNetHttpTransport.newTrustedTransport(),
GsonFactory.getDefaultInstance(),
sabotageInitializer)
.setApplicationName("GHFS-Corruption-Test")
.build();

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder().setUploadChunkSize(1024 * 1024).build();

GoogleCloudStorageWriteChannel channel =
new GoogleCloudStorageWriteChannel(
storage,
new ClientRequestHelper<>(),
Executors.newCachedThreadPool(),
writeOptions,
resourceId,
CreateObjectOptions.DEFAULT_OVERWRITE,
ObjectWriteConditions.NONE);

channel.initialize();

byte[] data = new byte[2 * 1024 * 1024];
new Random().nextBytes(data);
channel.write(ByteBuffer.wrap(data));

IOException e = assertThrows(IOException.class, () -> channel.close());
assertThat(e.getCause().getMessage()).contains("400");
assertThat(e.getCause().getMessage()).contains("CRC32C");

try {
ghfs.getGcsFs().getGcs().deleteObjects(java.util.Collections.singletonList(resourceId));
} catch (Exception ignored) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.ChecksumContext;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.hadoop.util.LoggingMediaHttpUploaderProgressListener;
Expand Down Expand Up @@ -193,6 +194,7 @@ public StorageObject call() throws Exception {
// Try-with-resource will close this end of the pipe so that
// the writer at the other end will not hang indefinitely.
try (InputStream ignore = pipeSource) {
ChecksumContext.CURRENT_HASHER.set(cumulativeCrc32cHasher);
return uploadObject.execute();
} catch (IOException e) {
GoogleCloudStorageEventBus.postOnException();
Expand All @@ -204,6 +206,8 @@ public StorageObject call() throws Exception {
"Received IOException during '%s' upload, but successfully converted to response: '%s'.",
resourceId, response);
return response;
} finally {
ChecksumContext.CURRENT_HASHER.remove();
}
}
}
Expand Down
Loading