diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseBzipAmazonS3Test.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseBzipAmazonS3Test.java new file mode 100644 index 000000000..97ef97670 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseBzipAmazonS3Test.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*; + +import com.amazonaws.services.s3.AmazonS3; +import io.findify.s3mock.S3Mock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.junit.After; +import org.junit.Before; + +public class BaseBzipAmazonS3Test { + + public static final String S3_TEST_BUCKET = "testbucket"; + protected S3Mock s3Mock; + protected AmazonS3 client; + protected String endpointConfiguration; + protected Map unmodifiableCommonsProperties; + + @Before + public void setUp() throws Exception { + final Random generator = new Random(); + final int s3Port = generator.nextInt(10000) + 10000; + s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); + s3Mock.start(); + + endpointConfiguration = "http://localhost:" + s3Port; + unmodifiableCommonsProperties = new HashMap<>(); + unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id"); + unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key"); + unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET); + unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2"); + unmodifiableCommonsProperties.put("compression.method", "bzip"); + unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties); + + client = AmazonS3ClientUtils.createS3Client( + new AmazonS3ClientConfig(unmodifiableCommonsProperties), + endpointConfiguration + ); + } + + @After + public void tearDown() throws Exception { + client.shutdown(); + s3Mock.shutdown(); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseGzipAmazonS3Test.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseGzipAmazonS3Test.java new file mode 100644 index 000000000..0de96b685 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseGzipAmazonS3Test.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*; + +import com.amazonaws.services.s3.AmazonS3; +import io.findify.s3mock.S3Mock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.junit.After; +import org.junit.Before; + +public class BaseGzipAmazonS3Test { + + public static final String S3_TEST_BUCKET = "testbucket"; + protected S3Mock s3Mock; + protected AmazonS3 client; + protected String endpointConfiguration; + protected Map unmodifiableCommonsProperties; + + @Before + public void setUp() throws Exception { + final Random generator = new Random(); + final int s3Port = generator.nextInt(10000) + 10000; + s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); + s3Mock.start(); + + endpointConfiguration = "http://localhost:" + s3Port; + unmodifiableCommonsProperties = new HashMap<>(); + unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id"); + unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key"); + unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET); + unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2"); + unmodifiableCommonsProperties.put("compression.method", "gzip"); + unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties); + + client = AmazonS3ClientUtils.createS3Client( + new AmazonS3ClientConfig(unmodifiableCommonsProperties), + endpointConfiguration + ); + } + + @After + public void tearDown() throws Exception { + client.shutdown(); + s3Mock.shutdown(); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseZipAmazonS3Test.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseZipAmazonS3Test.java new file mode 100644 index 000000000..c7d5ce314 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseZipAmazonS3Test.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*; + +import com.amazonaws.services.s3.AmazonS3; +import io.findify.s3mock.S3Mock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.junit.After; +import org.junit.Before; + +public class BaseZipAmazonS3Test { + + public static final String S3_TEST_BUCKET = "testbucket"; + protected S3Mock s3Mock; + protected AmazonS3 client; + protected String endpointConfiguration; + protected Map unmodifiableCommonsProperties; + + @Before + public void setUp() throws Exception { + final Random generator = new Random(); + final int s3Port = generator.nextInt(10000) + 10000; + s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); + s3Mock.start(); + + endpointConfiguration = "http://localhost:" + s3Port; + unmodifiableCommonsProperties = new HashMap<>(); + unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id"); + unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key"); + unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET); + unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2"); + unmodifiableCommonsProperties.put("compression.method", "zip"); + unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties); + + client = AmazonS3ClientUtils.createS3Client( + new AmazonS3ClientConfig(unmodifiableCommonsProperties), + endpointConfiguration + ); + } + + @After + public void tearDown() throws Exception { + client.shutdown(); + s3Mock.shutdown(); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowBzipFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowBzipFileInputReaderTest.java new file mode 100644 index 000000000..c190bc6f2 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowBzipFileInputReaderTest.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage; +import io.streamthoughts.kafka.connect.filepulse.fs.BaseBzipAmazonS3Test; +import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AmazonS3RowBzipFileInputReaderTest extends BaseBzipAmazonS3Test{ + + private static final String LF = "\n"; + + private static final int NLINES = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File objectFile; + + private AmazonS3RowFileInputReader reader; + + @Before + public void setUp() throws Exception { + super.setUp(); + objectFile = testFolder.newFile(); + System.out.println(objectFile.toPath()); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new BZip2CompressorOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8)); + generateLines(writer); + + reader = new AmazonS3RowFileInputReader(); + reader.setStorage(new AmazonS3Storage(client)); + reader.configure(unmodifiableCommonsProperties); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + reader.close(); + } + + @Test + public void should_read_all_zip_lines() { + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, "my-key", objectFile); + + final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder() + .withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI()) + .build(); + + final FileInputIterator> iterator = reader.newIterator(meta.uri()); + List> results = new ArrayList<>(); + while (iterator.hasNext()) { + final RecordsIterable> next = iterator.next(); + results.addAll(next.collect()); + } + Assert.assertEquals(10, results.size()); + } + + private void generateLines(final BufferedWriter writer) throws IOException { + + for (int i = 0; i < NLINES; i++) { + String line = "00000000-" + i; + writer.write(line); + if (i + 1 < NLINES) { + writer.write(LF); + } + } + writer.flush(); + writer.close(); + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowGzipFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowGzipFileInputReaderTest.java new file mode 100644 index 000000000..90581610b --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowGzipFileInputReaderTest.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage; +import io.streamthoughts.kafka.connect.filepulse.fs.BaseGzipAmazonS3Test; +import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AmazonS3RowGzipFileInputReaderTest extends BaseGzipAmazonS3Test { + + private static final String LF = "\n"; + + private static final int NLINES = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File objectFile; + + private AmazonS3RowFileInputReader reader; + + @Before + public void setUp() throws Exception { + super.setUp(); + objectFile = testFolder.newFile(); + System.out.println(objectFile.toPath()); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8)); + generateLines(writer); + + reader = new AmazonS3RowFileInputReader(); + reader.setStorage(new AmazonS3Storage(client)); + reader.configure(unmodifiableCommonsProperties); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + reader.close(); + } + + @Test + public void should_read_all_gzip_lines() { + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, "my-key", objectFile); + + final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder() + .withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI()) + .build(); + + final FileInputIterator> iterator = reader.newIterator(meta.uri()); + List> results = new ArrayList<>(); + while (iterator.hasNext()) { + final RecordsIterable> next = iterator.next(); + results.addAll(next.collect()); + } + Assert.assertEquals(10, results.size()); + } + + private void generateLines(final BufferedWriter writer) throws IOException { + + for (int i = 0; i < NLINES; i++) { + String line = "00000000-" + i; + writer.write(line); + if (i + 1 < NLINES) { + writer.write(LF); + } + } + writer.flush(); + writer.close(); + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowZipFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowZipFileInputReaderTest.java new file mode 100644 index 000000000..bb7700116 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowZipFileInputReaderTest.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage; +import io.streamthoughts.kafka.connect.filepulse.fs.BaseZipAmazonS3Test; +import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.attribute.FileTime; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AmazonS3RowZipFileInputReaderTest extends BaseZipAmazonS3Test{ + + private static final String LF = "\n"; + + private static final int NLINES = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File objectFile; + + private AmazonS3RowFileInputReader reader; + + @Before + public void setUp() throws Exception { + super.setUp(); + objectFile = testFolder.newFile(); + System.out.println(objectFile.toPath()); + ZipOutputStream zos =new ZipOutputStream(new FileOutputStream(objectFile.toPath().toString())); + ZipEntry entry = new ZipEntry(objectFile.toPath().toFile().getName()); + entry.setCreationTime(FileTime.fromMillis(objectFile.toPath().toFile().lastModified())); + entry.setComment("created by jimbo"); + zos.putNextEntry(entry); + + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(zos, StandardCharsets.UTF_8)); + generateLines(writer); + zos.closeEntry(); + writer.close(); + + reader = new AmazonS3RowFileInputReader(); + reader.setStorage(new AmazonS3Storage(client)); + reader.configure(unmodifiableCommonsProperties); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + reader.close(); + } + + @Test + public void should_read_all_zip_lines() { + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, "my-key", objectFile); + + final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder() + .withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI()) + .build(); + + final FileInputIterator> iterator = reader.newIterator(meta.uri()); + List> results = new ArrayList<>(); + while (iterator.hasNext()) { + final RecordsIterable> next = iterator.next(); + results.addAll(next.collect()); + } + Assert.assertEquals(10, results.size()); + } + + private void generateLines(final BufferedWriter writer) throws IOException { + + for (int i = 0; i < NLINES; i++) { + String line = "00000000-" + i; + writer.write(line); + if (i + 1 < NLINES) { + writer.write(LF); + } + } + writer.flush(); + + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java index 647237b5f..1460b603d 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java @@ -21,6 +21,10 @@ public class RowFileInputIteratorConfig extends AbstractConfig { public static final String FILE_ENCODING_DOC = "The text file encoding to use (default = UTF_8)"; public static final String FILE_ENCODING_DEFAULT = StandardCharsets.UTF_8.displayName(); + public static final String COMPRESSION_METHOD_CONFIG = "compression.method"; + public static final String COMPRESSION_METHOD_DOC = "The compression method - gzip zip bzip"; + public static final String COMPRESSION_METHOD_DEFAULT = ""; + public static final String BUFFER_INIT_BYTES_SIZE_CONFIG = "buffer.initial.bytes.size"; public static final String BUFFER_INIT_BYTES_SIZE_DOC = "The initial buffer size used to read input files"; public static final int BUFFER_INIT_BYTES_SIZE_DEFAULT = 4096; @@ -48,6 +52,8 @@ public RowFileInputIteratorConfig(final Map originals) { super(configDef(), originals); } + public String compressionMethod() {return getString(COMPRESSION_METHOD_CONFIG);} + public int bufferInitialBytesSize() { return getInt(BUFFER_INIT_BYTES_SIZE_CONFIG); } @@ -86,6 +92,10 @@ private static ConfigDef configDef() { ConfigDef.Importance.LOW, READER_WAIT_MAX_MS_DOC) .define(READER_FIELD_FOOTER_CONFIG, ConfigDef.Type.INT, READER_FIELD_FOOTER_DEFAULT, - ConfigDef.Importance.HIGH, READER_FIELD_FOOTER_DOC); + ConfigDef.Importance.HIGH, READER_FIELD_FOOTER_DOC) + + .define(COMPRESSION_METHOD_CONFIG, ConfigDef.Type.STRING, COMPRESSION_METHOD_DEFAULT, + ConfigDef.Importance.HIGH, COMPRESSION_METHOD_DOC); + } } diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java index 091f2177c..236f48169 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java @@ -15,6 +15,10 @@ import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; import java.net.URI; +import java.util.Objects; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; public class RowFileInputIteratorFactory implements FileInputIteratorFactory { @@ -56,13 +60,46 @@ public FileInputIterator> newIterator(final URI objectUR .withIteratorManager(iteratorManager) .withReaderSupplier(() -> { try { - var br = new NonBlockingBufferReader( - storage.getInputStream(objectURI), - configs.bufferInitialBytesSize(), - configs.charset() - ); - br.disableAutoFlush(); - return br; + if (Objects.equals(configs.compressionMethod(), "gzip")){ + var br = new NonBlockingBufferReader( + new GZIPInputStream(storage.getInputStream(objectURI)), + configs.bufferInitialBytesSize(), + configs.charset() + ); + br.disableAutoFlush(); + return br; + } + else if (Objects.equals(configs.compressionMethod(), "bzip")){ + var br = new NonBlockingBufferReader( + new BZip2CompressorInputStream(storage.getInputStream(objectURI)), + configs.bufferInitialBytesSize(), + configs.charset() + ); + + br.disableAutoFlush(); + return br; + } + else if (Objects.equals(configs.compressionMethod(), "zip")){ + ZipInputStream zis =new ZipInputStream(storage.getInputStream(objectURI)); + zis.getNextEntry(); + var br = new NonBlockingBufferReader( + zis, + configs.bufferInitialBytesSize(), + configs.charset() + ); + br.disableAutoFlush(); + return br; + } + else { + var br = new NonBlockingBufferReader( + storage.getInputStream(objectURI), + configs.bufferInitialBytesSize(), + configs.charset() + ); + + br.disableAutoFlush(); + return br; + } } catch (Exception e) { throw new ReaderException("Failed to get InputStream for object: " + objectMetadata, e); }