Skip to content

Commit c875296

Browse files
xin-zhang2yingsu00
authored andcommitted
Add various compressionCodec in PagesSerdeFactory
1 parent 682bf7e commit c875296

File tree

11 files changed

+335
-21
lines changed

11 files changed

+335
-21
lines changed

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`aggrega
392392
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
393393

394394
* **Type:** ``string``
395-
* **Allowed value:** ``NONE``, ``LZ4``
395+
* **Allowed value:** ``SNAPPY``, ``NONE``, ``GZIP``, ``LZ4``, ``LZO``,, ``ZLIB`` ``ZSTD``
396396
* **Default value:** ``NONE``
397397

398398
The data compression codec to be used for pages spilled to disk.

presto-main-base/src/main/java/com/facebook/presto/CompressionCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414
package com.facebook.presto;
1515

1616
public enum CompressionCodec {
17-
LZ4, NONE
17+
GZIP, LZ4, LZO, SNAPPY, ZLIB, ZSTD, NONE
1818
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.buffer;
15+
16+
import io.airlift.compress.Compressor;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
21+
import java.nio.Buffer;
22+
import java.nio.ByteBuffer;
23+
import java.util.zip.GZIPOutputStream;
24+
25+
public class GzipCompressor
26+
implements Compressor
27+
{
28+
private static final int EXTRA_COMPRESSION_SPACE = 16;
29+
30+
@Override
31+
public int maxCompressedLength(int uncompressedSize)
32+
{
33+
// From Mark Adler's post http://stackoverflow.com/questions/1207877/java-size-of-compression-output-bytearray
34+
return uncompressedSize + ((uncompressedSize + 7) >> 3) + ((uncompressedSize + 63) >> 6) + 5 + EXTRA_COMPRESSION_SPACE;
35+
}
36+
37+
@Override
38+
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
39+
{
40+
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
41+
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
42+
gzipOutputStream.write(input, inputOffset, inputLength);
43+
gzipOutputStream.finish();
44+
byte[] compressed = byteArrayOutputStream.toByteArray();
45+
if (compressed.length > maxOutputLength) {
46+
throw new IllegalArgumentException("maxCompressedLength formula is incorrect, because gzip produced more data");
47+
}
48+
System.arraycopy(compressed, 0, output, outputOffset, compressed.length);
49+
return compressed.length;
50+
}
51+
catch (IOException e) {
52+
throw new UncheckedIOException(e);
53+
}
54+
}
55+
56+
@Override
57+
public void compress(ByteBuffer input, ByteBuffer output)
58+
{
59+
if (input.isDirect() || output.isDirect() || !input.hasArray() || !output.hasArray()) {
60+
throw new IllegalArgumentException("Non-direct byte buffer backed by byte array required");
61+
}
62+
int inputOffset = input.arrayOffset() + input.position();
63+
int outputOffset = output.arrayOffset() + output.position();
64+
65+
int written = compress(input.array(), inputOffset, input.remaining(), output.array(), outputOffset, output.remaining());
66+
((Buffer) output).position(output.position() + written);
67+
}
68+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.buffer;
15+
16+
import io.airlift.compress.Decompressor;
17+
import io.airlift.compress.MalformedInputException;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.IOException;
21+
import java.io.UncheckedIOException;
22+
import java.nio.Buffer;
23+
import java.nio.ByteBuffer;
24+
import java.util.zip.GZIPInputStream;
25+
26+
public class GzipDecompressor
27+
implements Decompressor
28+
{
29+
@Override
30+
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) throws MalformedInputException
31+
{
32+
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input, inputOffset, inputLength);
33+
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream)) {
34+
int totalRead = 0;
35+
int bytesRead;
36+
while (totalRead < maxOutputLength) {
37+
bytesRead = gzipInputStream.read(output, outputOffset + totalRead, maxOutputLength - totalRead);
38+
if (bytesRead == -1) {
39+
break;
40+
}
41+
totalRead += bytesRead;
42+
}
43+
if (totalRead >= maxOutputLength && gzipInputStream.read() != -1) {
44+
throw new IllegalArgumentException("maxOutputLength is incorrect, there is more data to be decompressed");
45+
}
46+
return totalRead;
47+
}
48+
catch (IOException e) {
49+
throw new UncheckedIOException(e);
50+
}
51+
}
52+
53+
@Override
54+
public void decompress(ByteBuffer input, ByteBuffer output) throws MalformedInputException
55+
{
56+
int inputOffset = input.arrayOffset() + input.position();
57+
int outputOffset = output.arrayOffset() + output.position();
58+
int written = decompress(input.array(), inputOffset, input.remaining(), output.array(), outputOffset, output.remaining());
59+
((Buffer) output).position(output.position() + written);
60+
}
61+
}

presto-main-base/src/main/java/com/facebook/presto/execution/buffer/PagesSerdeFactory.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,15 @@
2121
import com.facebook.presto.spi.spiller.SpillCipher;
2222
import io.airlift.compress.lz4.Lz4Compressor;
2323
import io.airlift.compress.lz4.Lz4Decompressor;
24+
import io.airlift.compress.lzo.LzoCompressor;
25+
import io.airlift.compress.lzo.LzoDecompressor;
26+
import io.airlift.compress.snappy.SnappyCompressor;
27+
import io.airlift.compress.snappy.SnappyDecompressor;
28+
import io.airlift.compress.zstd.ZstdCompressor;
29+
import io.airlift.compress.zstd.ZstdDecompressor;
2430

2531
import java.util.Optional;
32+
import java.util.OptionalInt;
2633

2734
import static java.util.Objects.requireNonNull;
2835

@@ -62,8 +69,18 @@ private PagesSerde createPagesSerdeInternal(Optional<SpillCipher> spillCipher)
6269
private Optional<PageCompressor> getPageCompressor()
6370
{
6471
switch (compressionCodec) {
72+
case GZIP:
73+
return Optional.of(new AirliftCompressorAdapter(new GzipCompressor()));
6574
case LZ4:
6675
return Optional.of(new AirliftCompressorAdapter(new Lz4Compressor()));
76+
case LZO:
77+
return Optional.of(new AirliftCompressorAdapter(new LzoCompressor()));
78+
case SNAPPY:
79+
return Optional.of(new AirliftCompressorAdapter(new SnappyCompressor()));
80+
case ZLIB:
81+
return Optional.of(new AirliftCompressorAdapter(new ZlibCompressor(OptionalInt.empty())));
82+
case ZSTD:
83+
return Optional.of(new AirliftCompressorAdapter(new ZstdCompressor()));
6784
case NONE:
6885
default:
6986
return Optional.empty();
@@ -73,8 +90,18 @@ private Optional<PageCompressor> getPageCompressor()
7390
private Optional<PageDecompressor> getPageDecompressor()
7491
{
7592
switch (compressionCodec) {
93+
case GZIP:
94+
return Optional.of(new AirliftDecompressorAdapter(new GzipDecompressor()));
7695
case LZ4:
7796
return Optional.of(new AirliftDecompressorAdapter(new Lz4Decompressor()));
97+
case LZO:
98+
return Optional.of(new AirliftDecompressorAdapter(new LzoDecompressor()));
99+
case SNAPPY:
100+
return Optional.of(new AirliftDecompressorAdapter(new SnappyDecompressor()));
101+
case ZLIB:
102+
return Optional.of(new AirliftDecompressorAdapter(new ZlibDecompressor()));
103+
case ZSTD:
104+
return Optional.of(new AirliftDecompressorAdapter(new ZstdDecompressor()));
78105
case NONE:
79106
default:
80107
return Optional.empty();
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.buffer;
15+
16+
import io.airlift.compress.Compressor;
17+
18+
import java.nio.Buffer;
19+
import java.nio.ByteBuffer;
20+
import java.util.OptionalInt;
21+
import java.util.zip.Deflater;
22+
23+
import static java.util.Objects.requireNonNull;
24+
import static java.util.zip.Deflater.FULL_FLUSH;
25+
26+
public class ZlibCompressor
27+
implements Compressor
28+
{
29+
private static final int EXTRA_COMPRESSION_SPACE = 16;
30+
private static final int DEFAULT_COMPRESSION_LEVEL = 4;
31+
32+
private final int compressionLevel;
33+
34+
public ZlibCompressor(OptionalInt compressionLevel)
35+
{
36+
requireNonNull(compressionLevel, "compressionLevel is null");
37+
this.compressionLevel = compressionLevel.orElse(DEFAULT_COMPRESSION_LEVEL);
38+
}
39+
40+
@Override
41+
public int maxCompressedLength(int uncompressedSize)
42+
{
43+
// From Mark Adler's post http://stackoverflow.com/questions/1207877/java-size-of-compression-output-bytearray
44+
return uncompressedSize + ((uncompressedSize + 7) >> 3) + ((uncompressedSize + 63) >> 6) + 5 + EXTRA_COMPRESSION_SPACE;
45+
}
46+
47+
@Override
48+
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
49+
{
50+
int maxCompressedLength = maxCompressedLength(inputLength);
51+
if (maxOutputLength < maxCompressedLength) {
52+
throw new IllegalArgumentException("Output buffer must be at least " + maxCompressedLength + " bytes");
53+
}
54+
55+
Deflater deflater = new Deflater(compressionLevel, false);
56+
try {
57+
deflater.setInput(input, inputOffset, inputLength);
58+
deflater.finish();
59+
60+
int compressedDataLength = deflater.deflate(output, outputOffset, maxOutputLength, FULL_FLUSH);
61+
if (!deflater.finished()) {
62+
throw new IllegalArgumentException("maxCompressedLength formula is incorrect, because deflate produced more data");
63+
}
64+
return compressedDataLength;
65+
}
66+
finally {
67+
deflater.end();
68+
}
69+
}
70+
71+
@Override
72+
public void compress(ByteBuffer input, ByteBuffer output)
73+
{
74+
if (input.isDirect() || output.isDirect() || !input.hasArray() || !output.hasArray()) {
75+
throw new IllegalArgumentException("Non-direct byte buffer backed by byte array required");
76+
}
77+
int inputOffset = input.arrayOffset() + input.position();
78+
int outputOffset = output.arrayOffset() + output.position();
79+
80+
int written = compress(input.array(), inputOffset, input.remaining(), output.array(), outputOffset, output.remaining());
81+
((Buffer) output).position(output.position() + written);
82+
}
83+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.buffer;
15+
16+
import io.airlift.compress.Decompressor;
17+
import io.airlift.compress.MalformedInputException;
18+
19+
import java.nio.Buffer;
20+
import java.nio.ByteBuffer;
21+
import java.util.zip.DataFormatException;
22+
import java.util.zip.Inflater;
23+
24+
public class ZlibDecompressor
25+
implements Decompressor
26+
{
27+
@Override
28+
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
29+
throws MalformedInputException
30+
{
31+
Inflater inflater = new Inflater(false);
32+
inflater.setInput(input, inputOffset, inputLength);
33+
int uncompressedLength = 0;
34+
try {
35+
uncompressedLength = inflater.inflate(output, outputOffset, maxOutputLength);
36+
if (!inflater.finished()) {
37+
throw new IllegalArgumentException("maxOutputLength is incorrect, there is more data to be decompressed");
38+
}
39+
}
40+
catch (DataFormatException e) {
41+
throw new MalformedInputException(inputOffset, e.getMessage());
42+
}
43+
finally {
44+
inflater.end();
45+
}
46+
return uncompressedLength;
47+
}
48+
49+
@Override
50+
public void decompress(ByteBuffer input, ByteBuffer output)
51+
throws MalformedInputException
52+
{
53+
if (input.isDirect() || output.isDirect() || !input.hasArray() || !output.hasArray()) {
54+
throw new IllegalArgumentException("Non-direct byte buffer backed by byte array required");
55+
}
56+
int inputOffset = input.arrayOffset() + input.position();
57+
int outputOffset = output.arrayOffset() + output.position();
58+
59+
int written = decompress(input.array(), inputOffset, input.remaining(), output.array(), outputOffset, output.remaining());
60+
((Buffer) output).position(output.position() + written);
61+
}
62+
}

presto-main-base/src/test/java/com/facebook/presto/execution/buffer/TestPagesSerde.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ public class TestPagesSerde
4545
public Object[][] createTestCompressionCodec()
4646
{
4747
return new Object[][] {
48+
{CompressionCodec.GZIP},
4849
{CompressionCodec.LZ4},
50+
{CompressionCodec.LZO},
51+
{CompressionCodec.SNAPPY},
52+
{CompressionCodec.ZLIB},
53+
{CompressionCodec.ZSTD},
4954
{CompressionCodec.NONE}
5055
};
5156
}

presto-main-base/src/test/java/com/facebook/presto/operator/repartition/BenchmarkPartitionedOutputOperator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,16 @@ private CompressionCodec getCompressionCodec(String compressionCodec)
290290
switch (compressionCodec) {
291291
case "NONE":
292292
return CompressionCodec.NONE;
293+
case "SNAPPY":
294+
return CompressionCodec.SNAPPY;
293295
case "LZ4":
294296
return CompressionCodec.LZ4;
297+
case "ZLIB":
298+
return CompressionCodec.ZLIB;
299+
case "GZIP":
300+
return CompressionCodec.GZIP;
301+
case "ZSTD":
302+
return CompressionCodec.ZSTD;
295303
default:
296304
throw new UnsupportedOperationException("Unsupported compression codec: " + compressionCodec);
297305
}

0 commit comments

Comments
 (0)