Skip to content

Commit 73218d1

Browse files
committed
Support compression
1 parent c86ef00 commit 73218d1

File tree

21 files changed

+1895
-321
lines changed

21 files changed

+1895
-321
lines changed

enterprise/lifecycle/transition_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,10 @@ func (m *mockFileClient) PutObject(ctx context.Context, address string, objectID
752752
return nil, nil
753753
}
754754

755+
func (m *mockFileClient) PutObjectWithCompression(ctx context.Context, address string, objectID string, data io.Reader, totalSize uint64, compression string) (*client.PutObjectResult, error) {
756+
return nil, nil
757+
}
758+
755759
func (m *mockFileClient) GetObject(ctx context.Context, address string, objectID string, writer client.ObjectWriter) (string, error) {
756760
return "", nil
757761
}

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/go-ldap/ldap/v3 v3.4.12
2121
github.com/go-sql-driver/mysql v1.9.3
2222
github.com/golang-jwt/jwt/v5 v5.3.0
23+
github.com/golang/snappy v1.0.0
2324
github.com/google/btree v1.1.3
2425
github.com/google/go-cmp v0.7.0
2526
github.com/google/uuid v1.6.0
@@ -29,10 +30,12 @@ require (
2930
github.com/hashicorp/raft-boltdb v0.0.0-20251103221153-05f9dd7a5148
3031
github.com/hashicorp/vault/api v1.22.0
3132
github.com/jackc/pgx/v5 v5.8.0
33+
github.com/klauspost/compress v1.18.2
3234
github.com/klauspost/reedsolomon v1.12.6
3335
github.com/lib/pq v1.10.9
3436
github.com/minio/crc64nvme v1.1.1
3537
github.com/minio/sha256-simd v1.0.1
38+
github.com/pierrec/lz4/v4 v4.1.22
3639
github.com/prometheus/client_golang v1.23.2
3740
github.com/prometheus/client_model v0.6.2
3841
github.com/redis/go-redis/v9 v9.17.2
@@ -87,7 +90,6 @@ require (
8790
github.com/go-faster/errors v0.7.1 // indirect
8891
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
8992
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
90-
github.com/golang/snappy v1.0.0 // indirect
9193
github.com/hashicorp/errwrap v1.1.0 // indirect
9294
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
9395
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
@@ -112,7 +114,6 @@ require (
112114
github.com/jcmturner/gofork v1.7.6 // indirect
113115
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
114116
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
115-
github.com/klauspost/compress v1.18.2 // indirect
116117
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
117118
github.com/mattn/go-colorable v0.1.14 // indirect
118119
github.com/mattn/go-isatty v0.0.20 // indirect
@@ -121,7 +122,6 @@ require (
121122
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
122123
github.com/paulmach/orb v0.12.0 // indirect
123124
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
124-
github.com/pierrec/lz4/v4 v4.1.22 // indirect
125125
github.com/pmezard/go-difflib v1.0.0 // indirect
126126
github.com/prometheus/common v0.67.4 // indirect
127127
github.com/prometheus/procfs v0.19.2 // indirect
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
//go:build integration
2+
3+
// Copyright 2025 ZapFS Authors
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
package file
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"io"
12+
"testing"
13+
"time"
14+
15+
"github.com/LeeDigitalWorks/zapfs/integration/testutil"
16+
"github.com/LeeDigitalWorks/zapfs/proto/file_pb"
17+
18+
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
// =============================================================================
23+
// Compression Tests
24+
// =============================================================================
25+
26+
func TestPutGetObject_LZ4Compression(t *testing.T) {
27+
t.Parallel()
28+
29+
client := newFileClient(t, fileServer1Addr)
30+
objectID := testutil.UniqueID("test-lz4")
31+
32+
// Create compressible data (repeated patterns compress well)
33+
data := bytes.Repeat([]byte("compression test data for lz4 algorithm "), 10000)
34+
35+
// Put with LZ4 compression
36+
resp := client.PutObject(objectID, data, testutil.WithCompression("lz4"))
37+
assert.Equal(t, uint64(len(data)), resp.Size, "size should match original")
38+
assert.Equal(t, testutil.ComputeETag(data), resp.Etag, "etag should match")
39+
40+
// Verify compression metadata in chunks
41+
require.NotEmpty(t, resp.Chunks, "should have at least one chunk")
42+
for _, chunk := range resp.Chunks {
43+
assert.Equal(t, "lz4", chunk.Compression, "chunk should be lz4 compressed")
44+
assert.Greater(t, chunk.OriginalSize, uint64(0), "should have original size")
45+
assert.Less(t, chunk.Size, chunk.OriginalSize, "compressed should be smaller for compressible data")
46+
}
47+
48+
// Get and verify data is correctly decompressed
49+
retrieved := client.GetObject(objectID)
50+
assert.Equal(t, data, retrieved, "retrieved data should match original")
51+
52+
// Cleanup
53+
client.DeleteObject(objectID)
54+
}
55+
56+
func TestPutGetObject_ZSTDCompression(t *testing.T) {
57+
t.Parallel()
58+
59+
client := newFileClient(t, fileServer1Addr)
60+
objectID := testutil.UniqueID("test-zstd")
61+
62+
// Create compressible data
63+
data := bytes.Repeat([]byte("zstd compression works great for storage "), 10000)
64+
65+
resp := client.PutObject(objectID, data, testutil.WithCompression("zstd"))
66+
assert.Equal(t, uint64(len(data)), resp.Size)
67+
68+
require.NotEmpty(t, resp.Chunks)
69+
for _, chunk := range resp.Chunks {
70+
assert.Equal(t, "zstd", chunk.Compression)
71+
assert.Less(t, chunk.Size, chunk.OriginalSize)
72+
}
73+
74+
retrieved := client.GetObject(objectID)
75+
assert.Equal(t, data, retrieved)
76+
77+
client.DeleteObject(objectID)
78+
}
79+
80+
func TestPutGetObject_SnappyCompression(t *testing.T) {
81+
t.Parallel()
82+
83+
client := newFileClient(t, fileServer1Addr)
84+
objectID := testutil.UniqueID("test-snappy")
85+
86+
data := bytes.Repeat([]byte("snappy is fast for real-time compression "), 10000)
87+
88+
resp := client.PutObject(objectID, data, testutil.WithCompression("snappy"))
89+
assert.Equal(t, uint64(len(data)), resp.Size)
90+
91+
require.NotEmpty(t, resp.Chunks)
92+
for _, chunk := range resp.Chunks {
93+
assert.Equal(t, "snappy", chunk.Compression)
94+
assert.Less(t, chunk.Size, chunk.OriginalSize)
95+
}
96+
97+
retrieved := client.GetObject(objectID)
98+
assert.Equal(t, data, retrieved)
99+
100+
client.DeleteObject(objectID)
101+
}
102+
103+
func TestPutGetObject_NoCompression(t *testing.T) {
104+
t.Parallel()
105+
106+
client := newFileClient(t, fileServer1Addr)
107+
objectID := testutil.UniqueID("test-nocomp")
108+
data := testutil.GenerateTestData(t, 10*1024) // 10KB
109+
110+
// Put without compression
111+
resp := client.PutObject(objectID, data)
112+
assert.Equal(t, uint64(len(data)), resp.Size)
113+
114+
require.NotEmpty(t, resp.Chunks)
115+
for _, chunk := range resp.Chunks {
116+
// No compression or "none"
117+
assert.True(t, chunk.Compression == "" || chunk.Compression == "none",
118+
"expected no compression, got: %s", chunk.Compression)
119+
}
120+
121+
retrieved := client.GetObject(objectID)
122+
assert.Equal(t, data, retrieved)
123+
124+
client.DeleteObject(objectID)
125+
}
126+
127+
func TestPutGetObject_CompressionSkippedForIncompressible(t *testing.T) {
128+
t.Parallel()
129+
130+
client := newFileClient(t, fileServer1Addr)
131+
objectID := testutil.UniqueID("test-incompressible")
132+
133+
// Random data doesn't compress well
134+
data := testutil.GenerateTestData(t, 10*1024) // 10KB random data
135+
136+
resp := client.PutObject(objectID, data, testutil.WithCompression("lz4"))
137+
assert.Equal(t, uint64(len(data)), resp.Size)
138+
139+
// Data should still be readable regardless of whether compression was applied
140+
retrieved := client.GetObject(objectID)
141+
assert.Equal(t, data, retrieved)
142+
143+
client.DeleteObject(objectID)
144+
}
145+
146+
func TestPutGetObject_CompressionWithRange(t *testing.T) {
147+
t.Parallel()
148+
149+
client := newFileClient(t, fileServer1Addr)
150+
objectID := testutil.UniqueID("test-comp-range")
151+
152+
// Create compressible data
153+
data := bytes.Repeat([]byte("0123456789"), 10000) // 100KB
154+
155+
resp := client.PutObject(objectID, data, testutil.WithCompression("lz4"))
156+
assert.Equal(t, uint64(len(data)), resp.Size)
157+
158+
// Verify range reads work correctly with compression
159+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
160+
defer cancel()
161+
162+
offset := uint64(10000)
163+
length := uint64(5000)
164+
165+
stream, err := client.FileServiceClient.GetObjectRange(ctx, &file_pb.GetObjectRangeRequest{
166+
ObjectId: objectID,
167+
Offset: offset,
168+
Length: length,
169+
})
170+
require.NoError(t, err)
171+
172+
var rangeData []byte
173+
for {
174+
resp, err := stream.Recv()
175+
if err == io.EOF {
176+
break
177+
}
178+
require.NoError(t, err)
179+
rangeData = append(rangeData, resp.Chunk...)
180+
}
181+
182+
expectedRange := data[offset : offset+length]
183+
assert.Equal(t, expectedRange, rangeData, "range data should match")
184+
185+
client.DeleteObject(objectID)
186+
}
187+
188+
func TestPutGetObject_LargeCompressed(t *testing.T) {
189+
testutil.SkipIfShort(t)
190+
t.Parallel()
191+
192+
client := newFileClient(t, fileServer1Addr)
193+
objectID := testutil.UniqueID("test-large-comp")
194+
195+
// Create large compressible data (10MB)
196+
data := bytes.Repeat([]byte("large object compression test with repeated patterns "), 200000)
197+
198+
start := time.Now()
199+
resp := client.PutObject(objectID, data, testutil.WithCompression("zstd"))
200+
putDuration := time.Since(start)
201+
assert.Equal(t, uint64(len(data)), resp.Size)
202+
203+
// Calculate compression ratio
204+
var totalCompressed, totalOriginal uint64
205+
for _, chunk := range resp.Chunks {
206+
totalCompressed += chunk.Size
207+
totalOriginal += chunk.OriginalSize
208+
}
209+
ratio := float64(totalOriginal) / float64(totalCompressed)
210+
t.Logf("Put %d bytes in %v, compression ratio: %.2fx (%d -> %d bytes)",
211+
len(data), putDuration, ratio, totalOriginal, totalCompressed)
212+
213+
start = time.Now()
214+
retrieved := client.GetObject(objectID)
215+
getDuration := time.Since(start)
216+
assert.Equal(t, data, retrieved)
217+
t.Logf("Get %d bytes in %v", len(data), getDuration)
218+
219+
client.DeleteObject(objectID)
220+
}
221+
222+
func TestCompression_Deduplication(t *testing.T) {
223+
t.Parallel()
224+
225+
client := newFileClient(t, fileServer1Addr)
226+
227+
// Create same compressible data for two objects
228+
data := bytes.Repeat([]byte("deduplicated compressed content "), 5000)
229+
230+
objectID1 := testutil.UniqueID("test-dedup-comp-1")
231+
objectID2 := testutil.UniqueID("test-dedup-comp-2")
232+
233+
// Put both objects with same compression
234+
resp1 := client.PutObject(objectID1, data, testutil.WithCompression("lz4"))
235+
resp2 := client.PutObject(objectID2, data, testutil.WithCompression("lz4"))
236+
237+
// Both should have same ETag (content hash)
238+
assert.Equal(t, resp1.Etag, resp2.Etag, "ETags should match for identical content")
239+
240+
// Both should reference the same chunk IDs (deduplication)
241+
require.NotEmpty(t, resp1.Chunks)
242+
require.NotEmpty(t, resp2.Chunks)
243+
assert.Equal(t, len(resp1.Chunks), len(resp2.Chunks), "chunk count should match")
244+
245+
for i := range resp1.Chunks {
246+
assert.Equal(t, resp1.Chunks[i].ChunkId, resp2.Chunks[i].ChunkId,
247+
"chunk %d should be deduplicated", i)
248+
}
249+
250+
// Verify both can be read back
251+
retrieved1 := client.GetObject(objectID1)
252+
retrieved2 := client.GetObject(objectID2)
253+
assert.Equal(t, data, retrieved1)
254+
assert.Equal(t, data, retrieved2)
255+
256+
client.DeleteObject(objectID1)
257+
client.DeleteObject(objectID2)
258+
}
259+
260+
func TestCompression_LocalChunkInfo(t *testing.T) {
261+
t.Parallel()
262+
263+
client := newFileClient(t, fileServer1Addr)
264+
objectID := testutil.UniqueID("test-chunk-info")
265+
266+
data := bytes.Repeat([]byte("chunk info test with compression "), 5000)
267+
268+
resp := client.PutObject(objectID, data, testutil.WithCompression("zstd"))
269+
require.NotEmpty(t, resp.Chunks)
270+
271+
// Get local chunk info and verify compression metadata
272+
chunkID := resp.Chunks[0].ChunkId
273+
chunkInfo := client.GetLocalChunkOrFail(chunkID)
274+
275+
assert.Equal(t, chunkID, chunkInfo.ChunkId)
276+
assert.Equal(t, "zstd", chunkInfo.Compression)
277+
assert.Greater(t, chunkInfo.OriginalSize, uint64(0))
278+
assert.Less(t, chunkInfo.Size, chunkInfo.OriginalSize)
279+
280+
client.DeleteObject(objectID)
281+
}

integration/testutil/file_client.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ func NewFileClient(t *testing.T, addr string) *FileClient {
3636

3737
// PutConfig configures a PutObject request
3838
type PutConfig struct {
39-
BackendID string
40-
UseEC bool
41-
ECScheme *common_pb.ECScheme
39+
BackendID string
40+
UseEC bool
41+
ECScheme *common_pb.ECScheme
42+
Compression string // Compression algorithm: "lz4", "zstd", "snappy", or "" for none
4243
}
4344

4445
// PutOption modifies PutConfig
@@ -55,6 +56,13 @@ func WithErasureCoding(dataShards, parityShards int32) PutOption {
5556
}
5657
}
5758

59+
// WithCompression enables compression with the specified algorithm
60+
func WithCompression(algo string) PutOption {
61+
return func(c *PutConfig) {
62+
c.Compression = algo
63+
}
64+
}
65+
5866
// PutObject uploads data to the file server
5967
func (fc *FileClient) PutObject(objectID string, data []byte, opts ...PutOption) *file_pb.PutObjectResponse {
6068
fc.t.Helper()
@@ -79,6 +87,7 @@ func (fc *FileClient) PutObject(objectID string, data []byte, opts ...PutOption)
7987
TotalSize: uint64(len(data)),
8088
UseErasureCoding: config.UseEC,
8189
EcScheme: config.ECScheme,
90+
Compression: config.Compression,
8291
},
8392
},
8493
}

0 commit comments

Comments
 (0)