Skip to content

Commit 51de1cd

Browse files
Allow to create LZ4BlockInputStream with LZ4SafeDecompressor
- added builder for LZ4BlockInputStream which accepts both fast and safe decompressor (prefers fast one)
1 parent 358ee50 commit 51de1cd

File tree

2 files changed

+309
-84
lines changed

2 files changed

+309
-84
lines changed

src/java/net/jpountz/lz4/LZ4BlockInputStream.java

Lines changed: 169 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
*/
4444
public class LZ4BlockInputStream extends FilterInputStream {
4545

46-
private final LZ4FastDecompressor decompressor;
46+
private final LZ4FastDecompressor fastDecompressor;
47+
private final LZ4SafeDecompressor safeDecompressor;
4748
private final Checksum checksum;
4849
private final boolean stopOnEmptyBlock;
4950
private byte[] buffer;
@@ -56,52 +57,45 @@ public class LZ4BlockInputStream extends FilterInputStream {
5657
* Creates a new LZ4 input stream to read from the specified underlying InputStream.
5758
*
5859
* @param in the {@link InputStream} to poll
59-
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
60+
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
6061
* use
6162
* @param checksum the {@link Checksum} instance to use, must be
6263
* equivalent to the instance which has been used to
6364
* write the stream
6465
* @param stopOnEmptyBlock whether read is stopped on an empty block
6566
*/
66-
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum, boolean stopOnEmptyBlock) {
67-
super(in);
68-
this.decompressor = decompressor;
69-
this.checksum = checksum;
70-
this.stopOnEmptyBlock = stopOnEmptyBlock;
71-
this.buffer = new byte[0];
72-
this.compressedBuffer = new byte[HEADER_LENGTH];
73-
o = originalLen = 0;
74-
finished = false;
67+
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, Checksum checksum, boolean stopOnEmptyBlock) {
68+
this(in, fastDecompressor, null, checksum, stopOnEmptyBlock);
7569
}
7670

7771
/**
7872
* Creates a new LZ4 input stream to read from the specified underlying InputStream.
7973
*
8074
* @param in the {@link InputStream} to poll
81-
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
75+
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
8276
* use
8377
* @param checksum the {@link Checksum} instance to use, must be
8478
* equivalent to the instance which has been used to
8579
* write the stream
8680
*
8781
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean)
8882
*/
89-
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) {
90-
this(in, decompressor, checksum, true);
83+
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, Checksum checksum) {
84+
this(in, fastDecompressor, checksum, true);
9185
}
9286

9387
/**
9488
* Creates a new LZ4 input stream to read from the specified underlying InputStream, using {@link XXHash32} for checksuming.
9589
*
9690
* @param in the {@link InputStream} to poll
97-
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
91+
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
9892
* use
9993
*
10094
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean)
10195
* @see StreamingXXHash32#asChecksum()
10296
*/
103-
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
104-
this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true);
97+
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor) {
98+
this(in, fastDecompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true);
10599
}
106100

107101
/**
@@ -130,6 +124,48 @@ public LZ4BlockInputStream(InputStream in) {
130124
this(in, LZ4Factory.fastestInstance().fastDecompressor());
131125
}
132126

127+
/**
128+
* Creates a new LZ4 input stream to read from the specified underlying InputStream.
129+
*
130+
* @param in the {@link InputStream} to poll
131+
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
132+
* use
133+
* @param safeDecompressor the {@link LZ4SafeDecompressor} instance to
134+
* use (if both fastDecompressor and safeDecompressor are
135+
* specified then the fastDecompressor gets used)
136+
* @param checksum the {@link Checksum} instance to use, must be
137+
* equivalent to the instance which has been used to
138+
* write the stream
139+
* @param stopOnEmptyBlock whether read is stopped on an empty block
140+
*/
141+
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, LZ4SafeDecompressor safeDecompressor,
142+
Checksum checksum, boolean stopOnEmptyBlock) {
143+
super(in);
144+
145+
this.fastDecompressor = fastDecompressor;
146+
this.safeDecompressor = safeDecompressor;
147+
this.checksum = checksum;
148+
this.stopOnEmptyBlock = stopOnEmptyBlock;
149+
this.buffer = new byte[0];
150+
this.compressedBuffer = new byte[HEADER_LENGTH];
151+
o = originalLen = 0;
152+
finished = false;
153+
}
154+
155+
/**
156+
* Creates a new LZ4 input stream builder. The following are defaults:
157+
* <ul>
158+
* <ol> decompressor - {@code LZ4Factory.fastestInstance().fastDecompressor()} </ol>
159+
* <ol> checksum - {@link XXHash32} </ol>
160+
* <ol> stopOnEmptyBlock - {@code true} </ol>
161+
* </ul>
162+
* @param in the {@link InputStream} to poll
163+
* @return new instance of {@link LZ4BlockInputStreamBuilder} to be used to configure and build new LZ4 input stream
164+
*/
165+
public static LZ4BlockInputStreamBuilder newBuilder(InputStream in) {
166+
return new LZ4BlockInputStreamBuilder(in);
167+
}
168+
133169
@Override
134170
public int available() throws IOException {
135171
return originalLen - o;
@@ -213,11 +249,11 @@ private void refill() throws IOException {
213249
final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
214250
assert HEADER_LENGTH == MAGIC_LENGTH + 13;
215251
if (originalLen > 1 << compressionLevel
216-
|| originalLen < 0
217-
|| compressedLen < 0
218-
|| (originalLen == 0 && compressedLen != 0)
219-
|| (originalLen != 0 && compressedLen == 0)
220-
|| (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
252+
|| originalLen < 0
253+
|| compressedLen < 0
254+
|| (originalLen == 0 && compressedLen != 0)
255+
|| (originalLen != 0 && compressedLen == 0)
256+
|| (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
221257
throw new IOException("Stream is corrupted");
222258
}
223259
if (originalLen == 0 && compressedLen == 0) {
@@ -235,25 +271,32 @@ private void refill() throws IOException {
235271
buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
236272
}
237273
switch (compressionMethod) {
238-
case COMPRESSION_METHOD_RAW:
239-
readFully(buffer, originalLen);
240-
break;
241-
case COMPRESSION_METHOD_LZ4:
242-
if (compressedBuffer.length < compressedLen) {
243-
compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
244-
}
245-
readFully(compressedBuffer, compressedLen);
246-
try {
247-
final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
248-
if (compressedLen != compressedLen2) {
249-
throw new IOException("Stream is corrupted");
274+
case COMPRESSION_METHOD_RAW:
275+
readFully(buffer, originalLen);
276+
break;
277+
case COMPRESSION_METHOD_LZ4:
278+
if (compressedBuffer.length < compressedLen) {
279+
compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
250280
}
251-
} catch (LZ4Exception e) {
252-
throw new IOException("Stream is corrupted", e);
253-
}
254-
break;
255-
default:
256-
throw new AssertionError();
281+
readFully(compressedBuffer, compressedLen);
282+
try {
283+
if (fastDecompressor == null) {
284+
final int decompressedLen = safeDecompressor.decompress(compressedBuffer, 0, compressedLen, buffer, 0, originalLen);
285+
if (decompressedLen != originalLen) {
286+
throw new IOException("Stream is corrupted");
287+
}
288+
} else {
289+
final int compressedLen2 = fastDecompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
290+
if (compressedLen != compressedLen2) {
291+
throw new IOException("Stream is corrupted");
292+
}
293+
}
294+
} catch (LZ4Exception e) {
295+
throw new IOException("Stream is corrupted", e);
296+
}
297+
break;
298+
default:
299+
throw new AssertionError();
257300
}
258301
checksum.reset();
259302
checksum.update(buffer, 0, originalLen);
@@ -304,7 +347,92 @@ public void reset() throws IOException {
304347
@Override
305348
public String toString() {
306349
return getClass().getSimpleName() + "(in=" + in
307-
+ ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
350+
+ ", decompressor=" + (fastDecompressor != null ? fastDecompressor : safeDecompressor)
351+
+ ", checksum=" + checksum + ")";
308352
}
309353

354+
public static final class LZ4BlockInputStreamBuilder {
355+
private final InputStream in;
356+
private boolean stopOnEmptyBlock = true;
357+
private LZ4FastDecompressor fastDecompressor;
358+
private LZ4SafeDecompressor safeDecompressor;
359+
private Checksum checksum;
360+
361+
/**
362+
* Creates a new LZ4 block input stream builder to read from the specified underlying InputStream.
363+
*
364+
* @param in the {@link InputStream} to poll
365+
*/
366+
public LZ4BlockInputStreamBuilder(InputStream in) {
367+
this.in = in;
368+
}
369+
370+
/**
371+
* Registers value of stopOnEmptyBlock to be used by the builder
372+
*
373+
* @param stopOnEmptyBlock whether read is stopped on an empty block
374+
*/
375+
public LZ4BlockInputStreamBuilder withStopOnEmptyBlock(boolean stopOnEmptyBlock) {
376+
this.stopOnEmptyBlock = stopOnEmptyBlock;
377+
return this;
378+
}
379+
380+
/**
381+
* Registers {@link LZ4FastDecompressor} to be used by the builder as a decompressor. Overrides one set by
382+
* {@link #withDecompressor(LZ4SafeDecompressor)}
383+
*
384+
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to use
385+
*/
386+
public LZ4BlockInputStreamBuilder withDecompressor(LZ4FastDecompressor fastDecompressor) {
387+
this.fastDecompressor = fastDecompressor;
388+
this.safeDecompressor = null;
389+
return this;
390+
}
391+
392+
/**
393+
* Registers {@link LZ4SafeDecompressor} to be used by the builder as a decompressor. Overrides one set by
394+
* {@link #withDecompressor(LZ4FastDecompressor)}
395+
*
396+
* @param safeDecompressor the {@link LZ4SafeDecompressor} instance to use.
397+
*/
398+
public LZ4BlockInputStreamBuilder withDecompressor(LZ4SafeDecompressor safeDecompressor) {
399+
this.safeDecompressor = safeDecompressor;
400+
this.fastDecompressor = null;
401+
return this;
402+
}
403+
404+
/**
405+
* Registers {@link Checksum} to be used by the builder
406+
*
407+
* @param checksum the {@link Checksum} instance to use, must be
408+
* equivalent to the instance which has been used to
409+
* write the stream
410+
*/
411+
public LZ4BlockInputStreamBuilder withChecksum(Checksum checksum) {
412+
this.checksum = checksum;
413+
return this;
414+
}
415+
416+
/**
417+
* Creates a new LZ4 input stream to read from the specified underlying InputStream with specified parameters
418+
*
419+
* @see #withChecksum(Checksum)
420+
* @see #withDecompressor(LZ4FastDecompressor)
421+
* @see #withDecompressor(LZ4SafeDecompressor)
422+
* @see #withStopOnEmptyBlock(boolean)
423+
*/
424+
public LZ4BlockInputStream build() {
425+
Checksum checksum = this.checksum;
426+
LZ4FastDecompressor fastDecompressor = this.fastDecompressor;
427+
LZ4SafeDecompressor safeDecompressor = this.safeDecompressor;
428+
429+
if (checksum == null) {
430+
checksum = XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum();
431+
}
432+
if (fastDecompressor == null && safeDecompressor == null) {
433+
fastDecompressor = LZ4Factory.fastestInstance().fastDecompressor();
434+
}
435+
return new LZ4BlockInputStream(in, fastDecompressor, safeDecompressor, checksum, stopOnEmptyBlock);
436+
}
437+
}
310438
}

0 commit comments

Comments
 (0)