Skip to content

Commit 1a0bfc9

Browse files
authored
Support snappy compression (#1406)
1 parent b12eccc commit 1a0bfc9

File tree

8 files changed

+151
-0
lines changed

8 files changed

+151
-0
lines changed

pulsar/consumer_partition.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,6 +2242,8 @@ func (pc *partitionConsumer) initializeCompressionProvider(
22422242
return compression.NewLz4Provider(), nil
22432243
case pb.CompressionType_ZSTD:
22442244
return compression.NewZStdProvider(compression.Default), nil
2245+
case pb.CompressionType_SNAPPY:
2246+
return compression.NewSnappyProvider(), nil
22452247
}
22462248

22472249
return nil, fmt.Errorf("unsupported compression type: %v", compressionType)

pulsar/consumer_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,98 @@ func TestConsumerCompression(t *testing.T) {
12931293
}
12941294
}
12951295

1296+
func TestConsumerMultiCompressions(t *testing.T) {
1297+
type testProvider struct {
1298+
name string
1299+
compressionType CompressionType
1300+
}
1301+
1302+
providers := []testProvider{
1303+
{"zlib", ZLib},
1304+
{"lz4", LZ4},
1305+
{"zstd", ZSTD},
1306+
{"snappy", SNAPPY},
1307+
}
1308+
1309+
for _, provider := range providers {
1310+
p := provider
1311+
t.Run(p.name, func(t *testing.T) {
1312+
client, err := NewClient(ClientOptions{
1313+
URL: lookupURL,
1314+
})
1315+
1316+
assert.Nil(t, err)
1317+
defer client.Close()
1318+
1319+
batchTopic, nonBatchTopic := newTopicName(), newTopicName()
1320+
ctx := context.Background()
1321+
1322+
// enable batching
1323+
batchProducer, err := client.CreateProducer(ProducerOptions{
1324+
Topic: batchTopic,
1325+
CompressionType: p.compressionType,
1326+
DisableBatching: false,
1327+
})
1328+
assert.Nil(t, err)
1329+
defer batchProducer.Close()
1330+
1331+
batchConsumer, err := client.Subscribe(ConsumerOptions{
1332+
Topic: batchTopic,
1333+
SubscriptionName: "sub-1",
1334+
})
1335+
assert.Nil(t, err)
1336+
defer batchConsumer.Close()
1337+
1338+
const N = 100
1339+
for i := 0; i < N; i++ {
1340+
batchProducer.SendAsync(ctx, &ProducerMessage{
1341+
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)),
1342+
}, func(_ MessageID, _ *ProducerMessage, err error) {
1343+
assert.Nil(t, err)
1344+
})
1345+
}
1346+
1347+
for i := 0; i < N; i++ {
1348+
msg, err := batchConsumer.Receive(ctx)
1349+
assert.Nil(t, err)
1350+
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload()))
1351+
batchConsumer.Ack(msg)
1352+
}
1353+
1354+
// disable batching
1355+
nonBatchProducer, err := client.CreateProducer(ProducerOptions{
1356+
Topic: nonBatchTopic,
1357+
CompressionType: p.compressionType,
1358+
DisableBatching: true,
1359+
})
1360+
assert.Nil(t, err)
1361+
defer nonBatchProducer.Close()
1362+
1363+
nonBatchConsumer, err := client.Subscribe(ConsumerOptions{
1364+
Topic: nonBatchTopic,
1365+
SubscriptionName: "sub-1",
1366+
})
1367+
assert.Nil(t, err)
1368+
defer nonBatchConsumer.Close()
1369+
1370+
for i := 0; i < N; i++ {
1371+
if _, err := nonBatchProducer.Send(ctx, &ProducerMessage{
1372+
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)),
1373+
}); err != nil {
1374+
t.Fatal(err)
1375+
}
1376+
}
1377+
1378+
for i := 0; i < N; i++ {
1379+
msg, err := nonBatchConsumer.Receive(ctx)
1380+
assert.Nil(t, err)
1381+
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload()))
1382+
nonBatchConsumer.Ack(msg)
1383+
}
1384+
})
1385+
}
1386+
}
1387+
12961388
func TestConsumerCompressionWithBatches(t *testing.T) {
12971389
client, err := NewClient(ClientOptions{
12981390
URL: lookupURL,

pulsar/internal/batch_builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ func GetCompressionProvider(
313313
return compression.NewZLibProvider()
314314
case pb.CompressionType_ZSTD:
315315
return compression.NewZStdProvider(level)
316+
case pb.CompressionType_SNAPPY:
317+
return compression.NewSnappyProvider()
316318
default:
317319
panic("unsupported compression type")
318320
}

pulsar/internal/compression/compression_bench_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ var benchmarkProviders = []testProvider{
7272
{"zstd-cgo-level-fastest", newCGoZStdProvider(Faster), nil},
7373
{"zstd-cgo-level-default", newCGoZStdProvider(Default), nil},
7474
{"zstd-cgo-level-best", newCGoZStdProvider(Better), nil},
75+
{"snappy", NewSnappyProvider(), nil},
7576
}
7677

7778
func BenchmarkCompression(b *testing.B) {

pulsar/internal/compression/compression_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var providers = []testProvider{
3636
{"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
3737
{"zstd", NewZStdProvider(Default),
3838
[]byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
39+
{"snappy", NewSnappyProvider(), []byte{0x05, 0x10, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
3940
}
4041

4142
func TestCompression(t *testing.T) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package compression
19+
20+
import (
21+
"github.com/klauspost/compress/snappy"
22+
)
23+
24+
type snappyProvider struct {
25+
}
26+
27+
// NewSnappyProvider returns a Provider interface
28+
func NewSnappyProvider() Provider {
29+
return &snappyProvider{}
30+
}
31+
32+
func (p *snappyProvider) CompressMaxSize(srcSize int) int {
33+
return snappy.MaxEncodedLen(srcSize)
34+
}
35+
36+
func (p *snappyProvider) Compress(dst, src []byte) []byte {
37+
return snappy.Encode(dst, src)
38+
}
39+
40+
func (p *snappyProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
41+
return snappy.Decode(dst, src)
42+
}
43+
44+
func (p *snappyProvider) Close() error {
45+
return nil
46+
}
47+
48+
func (p *snappyProvider) Clone() Provider {
49+
return &snappyProvider{}
50+
}

pulsar/producer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
LZ4
4141
ZLib
4242
ZSTD
43+
SNAPPY
4344
)
4445

4546
type CompressionLevel int
@@ -120,6 +121,7 @@ type ProducerOptions struct {
120121
// - LZ4
121122
// - ZLIB
122123
// - ZSTD
124+
// - SNAPPY
123125
//
124126
// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
125127
// release in order to be able to receive messages compressed with ZSTD.

pulsar/producer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ func TestProducerCompression(t *testing.T) {
245245
{"zlib", ZLib},
246246
{"lz4", LZ4},
247247
{"zstd", ZSTD},
248+
{"snappy", SNAPPY},
248249
}
249250

250251
for _, provider := range providers {

0 commit comments

Comments
 (0)