Skip to content

Commit edd846a

Browse files
committed
Implement GzipCompressor, enable all gzip tests
1 parent 913f534 commit edd846a

File tree

7 files changed

+205
-111
lines changed

7 files changed

+205
-111
lines changed

runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/io/GzipByteReadChannelTest.kt

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
*/
55
package aws.smithy.kotlin.runtime.io
66

7-
import aws.smithy.kotlin.runtime.IgnoreNative
87
import aws.smithy.kotlin.runtime.compression.decompressGzipBytes
98
import aws.smithy.kotlin.runtime.hashing.crc32
109
import kotlinx.coroutines.test.runTest
@@ -13,7 +12,6 @@ import kotlin.test.assertContentEquals
1312
import kotlin.test.assertEquals
1413

1514
class GzipByteReadChannelTest {
16-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
1715
@Test
1816
fun testReadAll() = runTest {
1917
val payload = "Hello World"
@@ -33,7 +31,6 @@ class GzipByteReadChannelTest {
3331
assertEquals(bytesHash, decompressedBytes.crc32())
3432
}
3533

36-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
3734
@Test
3835
fun testReadToBuffer() = runTest {
3936
val payload = "Hello World".repeat(1600)
@@ -52,7 +49,6 @@ class GzipByteReadChannelTest {
5249
assertEquals(bytesHash, decompressedBytes.crc32())
5350
}
5451

55-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
5652
@Test
5753
fun testReadRemaining() = runTest {
5854
val payload = "Hello World".repeat(1600)
@@ -72,7 +68,6 @@ class GzipByteReadChannelTest {
7268
assertEquals(bytesHash, decompressedBytes.crc32())
7369
}
7470

75-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
7671
@Test
7772
fun testRead() = runTest {
7873
val payload = "Hello World"
@@ -93,7 +88,6 @@ class GzipByteReadChannelTest {
9388
assertEquals(bytesHash, decompressedBytes.crc32())
9489
}
9590

96-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
9791
@Test
9892
fun testReadLargeBody() = runTest {
9993
val payload = "Hello World".repeat(1600)
@@ -114,7 +108,6 @@ class GzipByteReadChannelTest {
114108
assertEquals(bytesHash, decompressedBytes.crc32())
115109
}
116110

117-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
118111
@Test
119112
fun testReadLargeLimit() = runTest {
120113
val payload = "Hello World"
@@ -135,7 +128,6 @@ class GzipByteReadChannelTest {
135128
assertEquals(bytesHash, decompressedBytes.crc32())
136129
}
137130

138-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
139131
@Test
140132
fun testReadLargeBodyLargeLimit() = runTest {
141133
val payload = "Hello World".repeat(1600)
@@ -156,7 +148,6 @@ class GzipByteReadChannelTest {
156148
assertEquals(bytesHash, decompressedBytes.crc32())
157149
}
158150

159-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
160151
@Test
161152
fun testIsClosedForRead() = runTest {
162153
val payload = "Hello World"
@@ -179,7 +170,6 @@ class GzipByteReadChannelTest {
179170
assertEquals(bytesHash, decompressedBytes.crc32())
180171
}
181172

182-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
183173
@Test
184174
fun testIsClosedForReadLargeBody() = runTest {
185175
val payload = "Hello World".repeat(1600)
@@ -202,7 +192,6 @@ class GzipByteReadChannelTest {
202192
assertEquals(bytesHash, decompressedBytes.crc32())
203193
}
204194

205-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
206195
@Test
207196
fun testIsClosedForReadLargeLimit() = runTest {
208197
val payload = "Hello World"
@@ -225,7 +214,6 @@ class GzipByteReadChannelTest {
225214
assertEquals(bytesHash, decompressedBytes.crc32())
226215
}
227216

228-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
229217
@Test
230218
fun testIsClosedForReadLargeBodyLargeLimit() = runTest {
231219
val payload = "Hello World".repeat(1600)

runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/io/GzipSdkSourceTest.kt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
*/
55
package aws.smithy.kotlin.runtime.io
66

7-
import aws.smithy.kotlin.runtime.IgnoreNative
87
import aws.smithy.kotlin.runtime.compression.decompressGzipBytes
98
import aws.smithy.kotlin.runtime.hashing.crc32
109
import kotlinx.coroutines.test.runTest
@@ -13,7 +12,6 @@ import kotlin.test.assertContentEquals
1312
import kotlin.test.assertEquals
1413

1514
class GzipSdkSourceTest {
16-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
1715
@Test
1816
fun testReadToByteArray() = runTest {
1917
val payload = "Hello World"
@@ -31,7 +29,6 @@ class GzipSdkSourceTest {
3129
assertEquals(bytesHash, decompressedBytes.crc32())
3230
}
3331

34-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
3532
@Test
3633
fun testRead() = runTest {
3734
val payload = "Hello World"
@@ -52,7 +49,6 @@ class GzipSdkSourceTest {
5249
assertEquals(bytesHash, decompressedBytes.crc32())
5350
}
5451

55-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
5652
@Test
5753
fun testReadLargeBody() = runTest {
5854
val payload = "Hello World".repeat(1600)
@@ -74,7 +70,6 @@ class GzipSdkSourceTest {
7470
assertEquals(bytesHash, decompressedBytes.crc32())
7571
}
7672

77-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
7873
@Test
7974
fun testReadLargeLimit() = runTest {
8075
val payload = "Hello World"
@@ -95,7 +90,6 @@ class GzipSdkSourceTest {
9590
assertEquals(bytesHash, decompressedBytes.crc32())
9691
}
9792

98-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
9993
@Test
10094
fun testReadLargeBodyLargeLimit() = runTest {
10195
val payload = "Hello World".repeat(1600)
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.smithy.kotlin.runtime.compression
6+
7+
import kotlinx.atomicfu.atomic
8+
import kotlinx.cinterop.*
9+
import platform.zlib.*
10+
11+
private const val DEFAULT_WINDOW_BITS = 15
12+
private const val GZIP_ENCODING = 16 // Add this to windowBits for gzip encoding
13+
private const val COMPRESSION_LEVEL = Z_BEST_COMPRESSION
14+
15+
/**
16+
* Streaming-style gzip compressor, implemented using zlib bindings
17+
*/
18+
internal class GzipCompressor {
19+
companion object {
20+
internal const val BUFFER_SIZE = 16384
21+
}
22+
23+
private val buffer = ByteArray(BUFFER_SIZE)
24+
private val stream = nativeHeap.alloc<z_stream>()
25+
private val outputBuffer = ArrayList<Byte>()
26+
private val _isClosed = atomic(false)
27+
28+
internal val isClosed
29+
get() = _isClosed.value
30+
31+
internal val availableForRead: Int
32+
get() = outputBuffer.size
33+
34+
init {
35+
// Initialize deflate with gzip encoding
36+
val initResult = deflateInit2_(
37+
stream.ptr,
38+
COMPRESSION_LEVEL,
39+
Z_DEFLATED,
40+
DEFAULT_WINDOW_BITS + GZIP_ENCODING, // Add 16 for gzip encoding
41+
8, // Default memory level
42+
Z_DEFAULT_STRATEGY,
43+
ZLIB_VERSION,
44+
sizeOf<z_stream>().toInt(),
45+
)
46+
47+
if (initResult != Z_OK) {
48+
throw RuntimeException("Failed to initialize zlib deflate with error code $initResult: ${zError(initResult)!!.toKString()}")
49+
}
50+
}
51+
52+
/**
53+
* Update the compressor with [input] bytes
54+
*/
55+
fun update(input: ByteArray) = memScoped {
56+
val inputPin = input.pin()
57+
58+
stream.next_in = inputPin.addressOf(0).reinterpret()
59+
stream.avail_in = input.size.toUInt()
60+
61+
while (stream.avail_in > 0u) {
62+
val outputPin = buffer.pin()
63+
stream.next_out = outputPin.addressOf(0).reinterpret()
64+
stream.avail_out = BUFFER_SIZE.toUInt()
65+
66+
val deflateResult = deflate(stream.ptr, Z_NO_FLUSH)
67+
if (deflateResult != Z_OK) {
68+
throw RuntimeException("Deflate failed: $deflateResult")
69+
}
70+
71+
val bytesWritten = BUFFER_SIZE - stream.avail_out.toInt()
72+
outputBuffer.addAll(buffer.take(bytesWritten))
73+
74+
outputPin.unpin()
75+
}
76+
77+
inputPin.unpin()
78+
}
79+
80+
/**
81+
* Consume [count] gzip-compressed bytes.
82+
*/
83+
fun consume(count: Int): ByteArray {
84+
if (count < 0) {
85+
throw IllegalArgumentException("Requested bytes must be at least 0, got $count")
86+
}
87+
if (count > availableForRead) {
88+
throw IllegalArgumentException("Requested more bytes than available, $count > ${outputBuffer.size}")
89+
}
90+
91+
val result = outputBuffer.take(count).toByteArray()
92+
repeat(count) { outputBuffer.removeAt(0) }
93+
return result
94+
}
95+
96+
/**
97+
* Close the compressor, clean up all resources, and return the terminal sequence of bytes
98+
* that represent the end of the gzip compression.
99+
*/
100+
fun close(): ByteArray {
101+
if (isClosed) { return byteArrayOf() }
102+
103+
memScoped {
104+
var finished = false
105+
106+
while (!finished) {
107+
val outputPin = buffer.pin()
108+
stream.next_out = outputPin.addressOf(0).reinterpret()
109+
stream.avail_out = BUFFER_SIZE.toUInt()
110+
111+
val deflateResult = deflate(stream.ptr, Z_FINISH)
112+
if (deflateResult != Z_STREAM_END && deflateResult != Z_OK) {
113+
throw RuntimeException("Deflate failed during finish: $deflateResult")
114+
}
115+
116+
val bytesWritten = BUFFER_SIZE - stream.avail_out.toInt()
117+
outputBuffer.addAll(buffer.take(bytesWritten))
118+
119+
finished = deflateResult == Z_STREAM_END
120+
outputPin.unpin()
121+
}
122+
123+
deflateEnd(stream.ptr)
124+
nativeHeap.free(stream.ptr)
125+
_isClosed.value = true
126+
127+
return outputBuffer.toByteArray()
128+
}
129+
}
130+
}

runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipNative.kt

Lines changed: 3 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,6 @@ import aws.smithy.kotlin.runtime.io.GzipByteReadChannel
99
import aws.smithy.kotlin.runtime.io.GzipSdkSource
1010
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
1111
import aws.smithy.kotlin.runtime.io.SdkSource
12-
import kotlinx.cinterop.addressOf
13-
import kotlinx.cinterop.alloc
14-
import kotlinx.cinterop.memScoped
15-
import kotlinx.cinterop.ptr
16-
import kotlinx.cinterop.refTo
17-
import kotlinx.cinterop.reinterpret
18-
import kotlinx.cinterop.sizeOf
19-
import kotlinx.cinterop.usePinned
20-
import platform.zlib.*
2112

2213
/**
2314
* The gzip compression algorithm.
@@ -47,7 +38,9 @@ public actual class Gzip : CompressionAlgorithm {
4738
if (sourceBytes.isEmpty()) {
4839
stream
4940
} else {
50-
val compressed = gzipCompressBytes(sourceBytes)
41+
val compressed = GzipCompressor().apply {
42+
update(sourceBytes)
43+
}.close()
5144

5245
object : ByteStream.Buffer() {
5346
override fun bytes(): ByteArray = compressed
@@ -56,49 +49,3 @@ public actual class Gzip : CompressionAlgorithm {
5649
}
5750
}
5851
}
59-
60-
internal fun gzipCompressBytes(bytes: ByteArray): ByteArray {
61-
val compressedBuffer = UByteArray(bytes.size + 128)
62-
memScoped {
63-
val zStream = alloc<z_stream>().apply {
64-
zalloc = null
65-
zfree = null
66-
opaque = null
67-
}
68-
69-
// Initialize the deflate context with gzip encoding
70-
val initResult = deflateInit2_(
71-
strm = zStream.ptr,
72-
level = Z_BEST_COMPRESSION,
73-
method = Z_DEFLATED,
74-
windowBits = 31, // 15 (max window bits) + 16 (GZIP offset)
75-
memLevel = 9,
76-
strategy = Z_DEFAULT_STRATEGY,
77-
version = ZLIB_VERSION,
78-
stream_size = sizeOf<z_stream>().toInt(),
79-
)
80-
if (initResult != Z_OK) {
81-
throw IllegalStateException("Failed to initialize zlib with error code $initResult")
82-
}
83-
84-
try {
85-
zStream.next_in = bytes.refTo(0).getPointer(memScope).reinterpret()
86-
zStream.avail_in = bytes.size.toUInt()
87-
88-
compressedBuffer.usePinned { pinnedBuffer ->
89-
zStream.next_out = pinnedBuffer.addressOf(0)
90-
zStream.avail_out = compressedBuffer.size.toUInt()
91-
92-
val deflateResult = deflate(zStream.ptr, Z_FINISH)
93-
if (deflateResult != Z_STREAM_END) {
94-
throw IllegalStateException("Compression failed with error code $deflateResult")
95-
}
96-
}
97-
98-
val compressedSize = compressedBuffer.size.toUInt() - zStream.avail_out
99-
return compressedBuffer.copyOf(compressedSize.toInt()).toByteArray()
100-
} finally {
101-
deflateEnd(zStream.ptr)
102-
}
103-
}
104-
}

0 commit comments

Comments
 (0)