2424@ Fork (1 )
2525public class ZstdStreamingBenchmark {
2626
27+ @ State (Scope .Benchmark )
28+ public static class ShardsState {
29+ public List <List <TestChunks .Chunk >> shards ;
30+
31+ @ Setup
32+ public void setup () {
33+ shards = TestChunks .get ();
34+ }
35+ }
36+
2737 @ State (Scope .Benchmark )
2838 public static class ZstdDecompressorState {
2939 @ Param ({"jni" })
@@ -48,38 +58,32 @@ public void tearDown() {
4858 }
4959 }
5060
51- @ State (Scope .Benchmark )
52- public static class ZstdChunksState {
53- public List <TestChunks .Chunk > chunks ;
54-
55- @ Setup
56- public void setup () {
57- chunks = TestChunks .get (TestChunks .Compression .ZSTD );
58- }
59- }
60-
6161 @ Benchmark
62- public void zstd (ZstdDecompressorState decompressorState , ZstdChunksState chunksState , Blackhole blackhole ) throws IOException {
62+ public void zstd (ZstdDecompressorState decompressorState , ShardsState shardsState , Blackhole blackhole ) throws IOException {
6363 var context = decompressorState .context ;
64- context .reset ();
65- for (TestChunks .Chunk chunk : chunksState .chunks ) {
66- try (InputStream inputStream = context .createInputStream (chunk .getCompressed ())) {
67- blackhole .consume (DataObject .fromJson (inputStream ));
64+ for (List <TestChunks .Chunk > shard : shardsState .shards ) {
65+ context .reset ();
66+ for (TestChunks .Chunk chunk : shard ) {
67+ try (InputStream inputStream = context .createInputStream (chunk .zstdCompressed ())) {
68+ blackhole .consume (DataObject .fromJson (inputStream ));
69+ }
6870 }
6971 }
7072 }
7173
7274 @ Benchmark
73- public void zstdNoDeser (ZstdDecompressorState decompressorState , ZstdChunksState chunksState , Blackhole blackhole ) throws IOException {
75+ public void zstdNoDeser (ZstdDecompressorState decompressorState , ShardsState shardsState , Blackhole blackhole ) throws IOException {
7476 var context = decompressorState .context ;
75- context .reset ();
76- for (TestChunks .Chunk chunk : chunksState .chunks ) {
77- try (InputStream inputStream = context .createInputStream (chunk .getCompressed ())) {
78- while (true ) {
79- var read = inputStream .read (decompressorState .buf );
80- blackhole .consume (decompressorState .buf );
81- if (read <= 0 ) {
82- break ;
77+ for (List <TestChunks .Chunk > shard : shardsState .shards ) {
78+ context .reset ();
79+ for (TestChunks .Chunk chunk : shard ) {
80+ try (InputStream inputStream = context .createInputStream (chunk .zstdCompressed ())) {
81+ while (true ) {
82+ var read = inputStream .read (decompressorState .buf );
83+ blackhole .consume (decompressorState .buf );
84+ if (read <= 0 ) {
85+ break ;
86+ }
8387 }
8488 }
8589 }
@@ -101,49 +105,39 @@ public void setup() {
101105 }
102106 }
103107
104- @ State (Scope .Benchmark )
105- public static class ZlibChunksState {
106- public List <TestChunks .Chunk > chunks ;
107-
108- @ Setup
109- public void setup () {
110- chunks = TestChunks .get (TestChunks .Compression .ZLIB );
111- }
112- }
113-
114108 @ Benchmark
115- public void zlib (ZlibDecompressorState decompressorState , ZlibChunksState chunksState , Blackhole blackhole ) throws IOException {
109+ public void zlib (ZlibDecompressorState decompressorState , ShardsState shardsState , Blackhole blackhole ) throws IOException {
116110 var decompressor = decompressorState .decompressor ;
117- decompressor . reset ();
118- // Can't make a benchmark per-message (so we can see scaling based on message sizes
119- // as this uses a streaming decompressor, meaning this requires previous inputs
120- for ( TestChunks . Chunk chunk : chunksState . chunks ) {
121- try ( InputStream inputStream = decompressor . createInputStream ( chunk . getCompressed ())) {
122- blackhole . consume ( DataObject . fromJson ( inputStream ));
111+ for ( List < TestChunks . Chunk > shard : shardsState . shards ) {
112+ decompressor . reset ();
113+ for ( TestChunks . Chunk chunk : shard ) {
114+ try ( InputStream inputStream = decompressor . createInputStream ( chunk . zlibCompressed ()) ) {
115+ blackhole . consume ( DataObject . fromJson ( inputStream ));
116+ }
123117 }
124118 }
125119 }
126120
127121 @ Benchmark
128- public void zlibNoDeser (ZlibDecompressorState decompressorState , ZlibChunksState chunksState , Blackhole blackhole ) throws IOException {
122+ public void zlibNoDeser (ZlibDecompressorState decompressorState , ShardsState shardsState , Blackhole blackhole ) throws IOException {
129123 var decompressor = decompressorState .decompressor ;
130- decompressor .reset ();
131124 var bytes = decompressorState .buf ;
132- // Can't make a benchmark per-message (so we can see scaling based on message sizes
133- // as this uses a streaming decompressor, meaning this requires previous inputs
134- for (TestChunks .Chunk chunk : chunksState .chunks ) {
135- int currentlyDecompressedSize = 0 ;
136- int expectedDecompressedSize = chunk .getDecompressed ().length ;
137- try (InputStream inputStream = decompressor .createInputStream (chunk .getCompressed ())) {
138- // This is pretty stupid, #available() returns 1 even when there is no output to be read,
139- // we want to avoid handling EOFException as it may be slow and does not represent real world usage,
140- // checking `read < buf.length` is not viable since it can store data internally and returned in the next call.
141- // So, we instead decompress until we have the known decompressed data length.
142- do {
143- var read = inputStream .read (bytes );
144- currentlyDecompressedSize += read ;
145- blackhole .consume (bytes );
146- } while (currentlyDecompressedSize < expectedDecompressedSize );
125+ for (List <TestChunks .Chunk > shard : shardsState .shards ) {
126+ decompressor .reset ();
127+ for (TestChunks .Chunk chunk : shard ) {
128+ int currentlyDecompressedSize = 0 ;
129+ int expectedDecompressedSize = chunk .decompressed ().length ;
130+ try (InputStream inputStream = decompressor .createInputStream (chunk .zlibCompressed ())) {
131+ // This is pretty stupid, #available() returns 1 even when there is no output to be read,
132+ // we want to avoid handling EOFException as it may be slow and does not represent real world usage,
133+ // checking `read < buf.length` is not viable since it can store data internally and returned in the next call.
134+ // So, we instead decompress until we have the known decompressed data length.
135+ do {
136+ var read = inputStream .read (bytes );
137+ currentlyDecompressedSize += read ;
138+ blackhole .consume (bytes );
139+ } while (currentlyDecompressedSize < expectedDecompressedSize );
140+ }
147141 }
148142 }
149143 }
0 commit comments