Skip to content

S3 upload with BlockingOutputStream leads data corruption on writes using a shared byte array #4272

@jonathanswenson

Description

@jonathanswenson

Describe the bug

Using BlockingOutputStreamAsyncRequestBody (via AsyncRequestBody.forBlockingOutputStream(...)) and sharing the byte array between subsequent writes to the output stream, leads to data corruption when uploading a stream to S3 using async java sdk.

at a high level the write pattern is as follows (full code snippet below).

BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(320);
... make a request to s3

Random random = new Random(3470);
OutputStream outputStream = body.outputStream();
// single buffer used for all writes
byte[] buffer = new byte[1024];
for (int i = 0; i < 10; i++) {
  // fill this buffer between writes.
  random.nextBytes(buffer);
  outputStream.write(buffer, 0, bytesToWrite);
}

Expected Behavior

I expect that using re-using an byte array between writes to an OutputStream does not lead to corrupt data.

Current Behavior

The data written to the output stream does not match the data that is written to s3.

Reproduction Steps

gradle imports:

    implementation(platform("software.amazon.awssdk:bom:2.20.118"))
    implementation("software.amazon.awssdk:s3-transfer-manager")
    implementation("software.amazon.awssdk:s3")
    implementation("software.amazon.awssdk:sso")

    // https://mvnrepository.com/artifact/commons-codec/commons-codec
    implementation("commons-codec:commons-codec:1.16.0")
import org.apache.commons.codec.binary.Hex;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.BlockingOutputStreamAsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;

import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class BlockingOutputStreamTest {
  // not necessary for repro, just how I connect to AWS.
  String profileName = "<some profile name>";
  String bucket = "<some bucket name>";
  String algorithm = "SHA-256";

  @Test
  public void reUsedBufferTest() throws NoSuchAlgorithmException {
    S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
      .credentialsProvider(ProfileCredentialsProvider.create(profileName))
      .region(Region.US_WEST_2)
      .build();

    try (S3TransferManager transferManager = S3TransferManager.builder()
      .s3Client(s3AsyncClient)
      .build()) {

      // not 100% required, but it IS a race, and doesn't always happen on the first try.
      // this reproduction (10 chunks of 32 bytes) on my machine seems to always trigger it the first time.
      for (int iteration = 0; iteration < 30; iteration++) {
        String key = UUID.randomUUID().toString();
        System.out.println("key is: "+ key);

        BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(320L);

        UploadRequest uploadRequest = UploadRequest.builder()
          .putObjectRequest(PutObjectRequest.builder()
            .bucket(bucket)
            .key(key)
            // attempting to manually set the digest values here causes the requests to fail.
            .build())
          .requestBody(body)
          .build();

        Upload uploadResponse = transferManager.upload(uploadRequest);
        MessageDigest uploadMD = MessageDigest.getInstance(algorithm);
        // with a seed of 3470, we expect a crc32 digest of:
        // or sha256: "0e317c890dedbf007e6b4c25bbf347563f645d83e29b8eed85a1b293ea0d31ba"
        // or md5: 7052d097616126ae82c211a9834220f3

        Random random = new Random(3470);

        try (OutputStream outputStream = body.outputStream()) {
          byte[] buffer = new byte[1024];
          for (int i = 0; i < 10; i++) {
            // want new random bytes every time.
            random.nextBytes(buffer);
            int bytesToWrite = 32;

            uploadMD.update(buffer, 0, bytesToWrite);
            // My best guess here is that the BlockingOutputStreamAsyncRequestBody is not appropriately
            // copying the bytes. Instead, I believe that it is using the same buffer for each write
            // however, between each iteration we're overwriting that buffer.
            // then I believe that there's a race to consume those bytes before we get around to overwriting them
            outputStream.write(buffer, 0, bytesToWrite);

            // if you uncomment this line, it seems to work everytime.
            // Thread.sleep(10);
          }
        } catch (Exception e) {
          // this typically throws if the upload fails due bad credentials. It also takes some time (10s) to timeout
          // Don't throw, just let the below lines run, one of them will also throw letting us see the reason why
          // we failed to connect.
          e.printStackTrace();
        }

        // wait for the upload to finish.
        uploadResponse.completionFuture().join();

        String uploadDigest = Hex.encodeHexString(uploadMD.digest());
        System.out.println("ulDigest: " + uploadDigest);

        DownloadRequest<ResponseInputStream<GetObjectResponse>> downloadRequest = DownloadRequest.builder()
          .getObjectRequest(
            GetObjectRequest.builder()
              .bucket(bucket)
              .key(key)
              .build()
          ).responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
          .build();

        CompletedDownload<ResponseInputStream<GetObjectResponse>> dl = transferManager.download(downloadRequest)
          .completionFuture()
          .join();

        MessageDigest responseDigest = MessageDigest.getInstance(algorithm);
        try (InputStream inStream = dl.result()) {
          byte[] allBytes = inStream.readAllBytes();
          responseDigest.update(allBytes);
        } catch (Exception e) {
          e.printStackTrace();
        }

        String downloadDigest = Hex.encodeHexString(responseDigest.digest());
        System.out.println("dlDigest: " + downloadDigest);

        assertEquals(uploadDigest, downloadDigest);
      }
    }
  }

Possible Solution

believe this is happening due to wrapping, but not copying the bytes passed to the output stream.

In https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/async/OutputStreamPublisher.java#L71 the byte buffer is wrapped for compatibility with async nio / publisher APIs. However, due to a lack of immutability and an expectation of blocking behavior from the OutputStream API, this leads to the wrapped data being mutated before it is successfully passed to the CRT library.

  @Test
  public void wrapTest() {
    byte[] buffer = new byte[10];
    // I think this is unnecessary...
    Arrays.fill(buffer, (byte) 0);

    ByteBuffer wrapped = ByteBuffer.wrap(buffer, 0, 10);

    assertEquals(0, wrapped.get(0));
    // mutate the original buffer.
    buffer[0] = (byte) 1;
    
    assertEquals(0, wrapped.get(0));
  }

Likely what needs to happen here is the data needs to be copied before the write call returns.

Additional Information/Context

Originally I filed awslabs/aws-crt-java#658 with the aws-crt-java library

However I figured out that there is a similar but slightly different problem when using the CRT library -- the CRT library reports success when the correupted data is uploaded, while the standard (non-crt sdk) throws an error:

Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 1 failure: Data read has a different checksum than expected. Was 0x7052d097616126ae82c211a9834220f3, but expected 0xb0812d3843229373f0a691bd624025ee. This commonly means that the data was corrupted between the client and service. Note: Despite this error, the upload still completed and was persisted in S3.

AWS Java SDK version used

2.20.118

JDK version used

openjdk version "17.0.2" 2022-01-18 LTS OpenJDK Runtime Environment Zulu17.32+13-CA (build 17.0.2+8-LTS) OpenJDK 64-Bit Server VM Zulu17.32+13-CA (build 17.0.2+8-LTS, mixed mode, sharing)

Operating System and version

Mac OSX 13.4.1 (M1)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugThis issue is a bug.p2This is a standard priority issue

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions