Skip to content

Commit 682bf7e

Browse files
xin-zhang2steveburnett
authored andcommitted
Refactor compressionEnabled to compressionCodec in PagesSerdeFactory
Co-authored-by: Steve Burnett <[email protected]>
1 parent 162d632 commit 682bf7e

File tree

39 files changed

+333
-195
lines changed

39 files changed

+333
-195
lines changed

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,14 @@ Limit for memory used for unspilling a single aggregation operator instance.
388388

389389
The corresponding session property is :ref:`admin/properties-session:\`\`aggregation_operator_unspill_memory_limit\`\``.
390390

391-
``experimental.spill-compression-enabled``
392-
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
391+
``experimental.spill-compression-codec``
392+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
393393

394-
* **Type:** ``boolean``
395-
* **Default value:** ``false``
394+
* **Type:** ``string``
395+
* **Allowed value:** ``NONE``, ``LZ4``
396+
* **Default value:** ``NONE``
396397

397-
Enables data compression for pages spilled to disk.
398+
The data compression codec to be used for pages spilled to disk.
398399

399400
``experimental.spill-encryption-enabled``
400401
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

presto-docs/src/main/sphinx/admin/spill.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ there is no need to use RAID for spill.
9393
Spill Compression
9494
-----------------
9595

96-
When spill compression is enabled (``spill-compression-enabled`` property in
97-
:ref:`tuning-spilling`), spilled pages will be compressed using the same
98-
implementation as exchange compression when they are sufficiently compressible.
99-
Enabling this feature can reduce the amount of disk IO at the cost
96+
When :ref:`admin/properties:\`\`experimental.spill-compression-codec\`\`` is
97+
configured, spilled pages are compressed using
98+
the configured codec implementation when they are sufficiently compressible.
99+
This feature can reduce the amount of disk IO at the cost
100100
of extra CPU load to compress and decompress spilled pages.
101101

102102
Spill Encryption

presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestJdbcConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ public void testSession()
302302
try (Connection connection = createConnection("sessionProperties=query_max_run_time:2d;max_failed_task_percentage:0.6")) {
303303
assertThat(listSession(connection))
304304
.contains("join_distribution_type|AUTOMATIC|AUTOMATIC")
305-
.contains("exchange_compression|false|false")
305+
.contains("exchange_compression_codec|NONE|NONE")
306306
.contains("query_max_run_time|2d|100.00d")
307307
.contains("max_failed_task_percentage|0.6|0.3");
308308

@@ -312,15 +312,15 @@ public void testSession()
312312

313313
assertThat(listSession(connection))
314314
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
315-
.contains("exchange_compression|false|false");
315+
.contains("exchange_compression_codec|NONE|NONE");
316316

317317
try (Statement statement = connection.createStatement()) {
318-
statement.execute("SET SESSION exchange_compression = true");
318+
statement.execute("SET SESSION exchange_compression_codec = 'LZ4'");
319319
}
320320

321321
assertThat(listSession(connection))
322322
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
323-
.contains("exchange_compression|true|false");
323+
.contains("exchange_compression_codec|LZ4|NONE");
324324
}
325325
}
326326

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto;
15+
16+
public enum CompressionCodec {
17+
LZ4, NONE
18+
}

presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public final class SystemSessionProperties
160160
public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout";
161161
public static final String QUERY_ANALYZER_TIMEOUT = "query_analyzer_timeout";
162162
public static final String RUNTIME_OPTIMIZER_ENABLED = "runtime_optimizer_enabled";
163-
public static final String EXCHANGE_COMPRESSION = "exchange_compression";
163+
public static final String EXCHANGE_COMPRESSION_CODEC = "exchange_compression_codec";
164164
public static final String EXCHANGE_CHECKSUM = "exchange_checksum";
165165
public static final String LEGACY_TIMESTAMP = "legacy_timestamp";
166166
public static final String ENABLE_INTERMEDIATE_AGGREGATIONS = "enable_intermediate_aggregations";
@@ -839,11 +839,15 @@ public SystemSessionProperties(
839839
"Experimental: enable runtime optimizer",
840840
featuresConfig.isRuntimeOptimizerEnabled(),
841841
false),
842-
booleanProperty(
843-
EXCHANGE_COMPRESSION,
844-
"Enable compression in exchanges",
845-
featuresConfig.isExchangeCompressionEnabled(),
846-
false),
842+
new PropertyMetadata<>(
843+
EXCHANGE_COMPRESSION_CODEC,
844+
"Exchange compression codec",
845+
VARCHAR,
846+
CompressionCodec.class,
847+
featuresConfig.getExchangeCompressionCodec(),
848+
false,
849+
value -> CompressionCodec.valueOf(((String) value).toUpperCase()),
850+
CompressionCodec::name),
847851
booleanProperty(
848852
EXCHANGE_CHECKSUM,
849853
"Enable checksum in exchanges",
@@ -2285,9 +2289,9 @@ public static Duration getQueryAnalyzerTimeout(Session session)
22852289
return session.getSystemProperty(QUERY_ANALYZER_TIMEOUT, Duration.class);
22862290
}
22872291

2288-
public static boolean isExchangeCompressionEnabled(Session session)
2292+
public static CompressionCodec getExchangeCompressionCodec(Session session)
22892293
{
2290-
return session.getSystemProperty(EXCHANGE_COMPRESSION, Boolean.class);
2294+
return session.getSystemProperty(EXCHANGE_COMPRESSION_CODEC, CompressionCodec.class);
22912295
}
22922296

22932297
public static boolean isExchangeChecksumEnabled(Session session)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.buffer;
15+
16+
import com.facebook.presto.spi.page.PageCompressor;
17+
import io.airlift.compress.Compressor;
18+
19+
import java.nio.ByteBuffer;
20+
21+
import static java.util.Objects.requireNonNull;
22+
23+
public class AirliftCompressorAdapter
24+
implements PageCompressor
25+
{
26+
private final Compressor compressor;
27+
28+
public AirliftCompressorAdapter(Compressor compressor)
29+
{
30+
this.compressor = requireNonNull(compressor, "compressor is null");
31+
}
32+
33+
@Override
34+
public int maxCompressedLength(int uncompressedSize)
35+
{
36+
return compressor.maxCompressedLength(uncompressedSize);
37+
}
38+
39+
@Override
40+
public int compress(
41+
byte[] input,
42+
int inputOffset,
43+
int inputLength,
44+
byte[] output,
45+
int outputOffset,
46+
int maxOutputLength)
47+
{
48+
return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
49+
}
50+
51+
@Override
52+
public void compress(ByteBuffer input, ByteBuffer output)
53+
{
54+
compressor.compress(input, output);
55+
}
56+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.buffer;
15+
16+
import com.facebook.presto.spi.page.PageDecompressor;
17+
import io.airlift.compress.Decompressor;
18+
19+
import java.nio.ByteBuffer;
20+
21+
import static java.util.Objects.requireNonNull;
22+
23+
public class AirliftDecompressorAdapter
24+
implements PageDecompressor
25+
{
26+
private final Decompressor decompressor;
27+
28+
public AirliftDecompressorAdapter(Decompressor decompressor)
29+
{
30+
this.decompressor = requireNonNull(decompressor, "decompressor is null");
31+
}
32+
33+
@Override
34+
public int decompress(
35+
byte[] input,
36+
int inputOffset,
37+
int inputLength,
38+
byte[] output,
39+
int outputOffset,
40+
int maxOutputLength)
41+
{
42+
return decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
43+
}
44+
45+
@Override
46+
public void decompress(ByteBuffer input, ByteBuffer output)
47+
{
48+
decompressor.decompress(input, output);
49+
}
50+
}

presto-main-base/src/main/java/com/facebook/presto/execution/buffer/PagesSerdeFactory.java

Lines changed: 26 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,36 +13,34 @@
1313
*/
1414
package com.facebook.presto.execution.buffer;
1515

16+
import com.facebook.presto.CompressionCodec;
1617
import com.facebook.presto.common.block.BlockEncodingSerde;
1718
import com.facebook.presto.spi.page.PageCompressor;
1819
import com.facebook.presto.spi.page.PageDecompressor;
1920
import com.facebook.presto.spi.page.PagesSerde;
2021
import com.facebook.presto.spi.spiller.SpillCipher;
21-
import io.airlift.compress.Compressor;
22-
import io.airlift.compress.Decompressor;
2322
import io.airlift.compress.lz4.Lz4Compressor;
2423
import io.airlift.compress.lz4.Lz4Decompressor;
2524

26-
import java.nio.ByteBuffer;
2725
import java.util.Optional;
2826

2927
import static java.util.Objects.requireNonNull;
3028

3129
public class PagesSerdeFactory
3230
{
3331
private final BlockEncodingSerde blockEncodingSerde;
34-
private final boolean compressionEnabled;
32+
private final CompressionCodec compressionCodec;
3533
private final boolean checksumEnabled;
3634

37-
public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, boolean compressionEnabled)
35+
public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec)
3836
{
39-
this(blockEncodingSerde, compressionEnabled, false);
37+
this(blockEncodingSerde, compressionCodec, false);
4038
}
4139

42-
public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, boolean compressionEnabled, boolean checksumEnabled)
40+
public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec, boolean checksumEnabled)
4341
{
4442
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
45-
this.compressionEnabled = compressionEnabled;
43+
this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null");
4644
this.checksumEnabled = checksumEnabled;
4745
}
4846

@@ -58,60 +56,28 @@ public PagesSerde createPagesSerdeForSpill(Optional<SpillCipher> spillCipher)
5856

5957
private PagesSerde createPagesSerdeInternal(Optional<SpillCipher> spillCipher)
6058
{
61-
if (compressionEnabled) {
62-
return new PagesSerde(
63-
blockEncodingSerde,
64-
Optional.of(new PageCompressor()
65-
{
66-
Compressor compressor = new Lz4Compressor();
67-
@Override
68-
public int maxCompressedLength(int uncompressedSize)
69-
{
70-
return compressor.maxCompressedLength(uncompressedSize);
71-
}
72-
73-
@Override
74-
public int compress(
75-
byte[] input,
76-
int inputOffset,
77-
int inputLength,
78-
byte[] output,
79-
int outputOffset,
80-
int maxOutputLength)
81-
{
82-
return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
83-
}
84-
85-
@Override
86-
public void compress(ByteBuffer input, ByteBuffer output)
87-
{
88-
compressor.compress(input, output);
89-
}
90-
}),
91-
Optional.of(new PageDecompressor()
92-
{
93-
Decompressor decompressor = new Lz4Decompressor();
94-
@Override
95-
public int decompress(
96-
byte[] input,
97-
int inputOffset,
98-
int inputLength,
99-
byte[] output,
100-
int outputOffset,
101-
int maxOutputLength)
102-
{
103-
return decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
104-
}
59+
return new PagesSerde(blockEncodingSerde, getPageCompressor(), getPageDecompressor(), spillCipher, checksumEnabled);
60+
}
10561

106-
@Override
107-
public void decompress(ByteBuffer input, ByteBuffer output)
108-
{
109-
decompressor.decompress(input, output);
110-
}
111-
}),
112-
spillCipher, checksumEnabled);
62+
private Optional<PageCompressor> getPageCompressor()
63+
{
64+
switch (compressionCodec) {
65+
case LZ4:
66+
return Optional.of(new AirliftCompressorAdapter(new Lz4Compressor()));
67+
case NONE:
68+
default:
69+
return Optional.empty();
11370
}
71+
}
11472

115-
return new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), spillCipher, checksumEnabled);
73+
private Optional<PageDecompressor> getPageDecompressor()
74+
{
75+
switch (compressionCodec) {
76+
case LZ4:
77+
return Optional.of(new AirliftDecompressorAdapter(new Lz4Decompressor()));
78+
case NONE:
79+
default:
80+
return Optional.empty();
81+
}
11682
}
11783
}

presto-main-base/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheConfig.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.configuration.Config;
1717
import com.facebook.airlift.configuration.ConfigDescription;
18+
import com.facebook.presto.CompressionCodec;
1819
import io.airlift.units.DataSize;
1920
import io.airlift.units.Duration;
2021
import io.airlift.units.MinDataSize;
@@ -32,7 +33,7 @@ public class FileFragmentResultCacheConfig
3233
{
3334
private boolean cachingEnabled;
3435
private URI baseDirectory;
35-
private boolean blockEncodingCompressionEnabled;
36+
private CompressionCodec blockEncodingCompressionCodec = CompressionCodec.NONE;
3637

3738
private int maxCachedEntries = 10_000;
3839
private Duration cacheTtl = new Duration(2, DAYS);
@@ -68,16 +69,16 @@ public FileFragmentResultCacheConfig setBaseDirectory(URI baseDirectory)
6869
return this;
6970
}
7071

71-
public boolean isBlockEncodingCompressionEnabled()
72+
public CompressionCodec getBlockEncodingCompressionCodec()
7273
{
73-
return blockEncodingCompressionEnabled;
74+
return blockEncodingCompressionCodec;
7475
}
7576

76-
@Config("fragment-result-cache.block-encoding-compression-enabled")
77-
@ConfigDescription("Enable compression for block encoding")
78-
public FileFragmentResultCacheConfig setBlockEncodingCompressionEnabled(boolean blockEncodingCompressionEnabled)
77+
@Config("fragment-result-cache.block-encoding-compression-codec")
78+
@ConfigDescription("Compression codec for block encoding")
79+
public FileFragmentResultCacheConfig setBlockEncodingCompressionCodec(CompressionCodec blockEncodingCompressionCodec)
7980
{
80-
this.blockEncodingCompressionEnabled = blockEncodingCompressionEnabled;
81+
this.blockEncodingCompressionCodec = blockEncodingCompressionCodec;
8182
return this;
8283
}
8384

0 commit comments

Comments
 (0)