-
-
Notifications
You must be signed in to change notification settings - Fork 22
Open
Description
Hi! Using ArrayCache.setDefaultCache(BasicArrayCache.getInstance());
in concurrent cases, the exception occurred with the stacktrace:
org.tukaani.xz.CorruptedInputException: Compressed data is corrupt
at org.tukaani.xz.lz.LZDecoder.repeat(LZDecoder.java:83)
at org.tukaani.xz.lzma.LZMADecoder.decode(LZMADecoder.java:61)
at org.tukaani.xz.LZMAInputStream.read(LZMAInputStream.java:708)
at java.base/java.io.InputStream.read(InputStream.java:220)The above exception can be reproduced with the following code:
import org.tukaani.xz.ArrayCache;
import org.tukaani.xz.BasicArrayCache;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.LZMAInputStream;
import org.tukaani.xz.LZMAOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Test LZMA BasicArrayCache caching behavior
*
* From BasicArrayCache source code:
* private static final int CACHEABLE_SIZE_MIN = 32 << 10; // 32KB = 32768 bytes
*
* Arrays smaller than 32KB will NOT be cached
*/
public class LzmaArrayCacheTest {
// BasicArrayCache minimum cacheable size threshold
private static final int CACHEABLE_SIZE_MIN = 32 << 10; // 32768 bytes = 32KB
private static final AtomicLong successCount = new AtomicLong(0);
private static final AtomicLong errorCount = new AtomicLong(0);
public static void main(String[] args) throws Exception {
System.out.println("=== LZMA BasicArrayCache Threshold Test ===\n");
System.out.println("CACHEABLE_SIZE_MIN = 32 << 10 = " + CACHEABLE_SIZE_MIN + " bytes (" + (CACHEABLE_SIZE_MIN / 1024) + " KB)\n");
// Enable BasicArrayCache
ArrayCache.setDefaultCache(BasicArrayCache.getInstance());
System.out.println("BasicArrayCache enabled\n");
// Test different data sizes
testWithSize("Below threshold (16KB)", 16 * 1024);
testWithSize("At threshold (32KB)", 32 * 1024);
testWithSize("Above threshold (64KB)", 64 * 1024);
testWithSize("Well above threshold (128KB)", 128 * 1024);
System.out.println("\n=== High Concurrency Test ===\n");
concurrentTest();
}
/**
* Test compression/decompression with specified data size
*/
private static void testWithSize(String description, int dataSize) throws Exception {
System.out.println("--- " + description + " ---");
System.out.println("Data size: " + dataSize + " bytes");
// Generate random data
byte[] originalData = generateRandomData(dataSize);
System.out.println("Original data size: " + originalData.length + " bytes");
// Compress
byte[] compressed = compress(originalData);
System.out.println("Compressed size: " + compressed.length + " bytes");
System.out.println("Compression ratio: " + String.format("%.2f%%", (double) compressed.length / originalData.length * 100));
// Decompress and verify
byte[] decompressed = decompress(compressed);
boolean match = java.util.Arrays.equals(originalData, decompressed);
System.out.println("Decompression verification: " + (match ? "PASSED" : "FAILED"));
// Analyze caching behavior
if (dataSize < CACHEABLE_SIZE_MIN) {
System.out.println("Cache status: NOT CACHED (below " + CACHEABLE_SIZE_MIN + " bytes threshold)");
} else {
System.out.println("Cache status: CACHED (>= " + CACHEABLE_SIZE_MIN + " bytes threshold)");
}
System.out.println();
}
/**
* High concurrency test
*/
private static void concurrentTest() throws Exception {
// Generate larger test data (ensure it will be cached)
int dataSize = 64 * 1024; // 64KB
byte[] originalData = generateRandomData(dataSize);
byte[] compressedData = compress(originalData);
System.out.println("Test data: " + dataSize + " bytes original");
System.out.println("Compressed: " + compressedData.length + " bytes");
int threadCount = Runtime.getRuntime().availableProcessors();
int iterations = 10000;
System.out.println("Thread count: " + threadCount);
System.out.println("Iterations: " + iterations);
System.out.println();
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage beforeHeap = memoryMXBean.getHeapMemoryUsage();
System.out.println("Heap memory before: " + formatBytes(beforeHeap.getUsed()));
long startTime = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
final byte[] data = compressedData;
executorService.execute(() -> {
try {
byte[] decompressed = decompress(data);
if (decompressed.length == dataSize) {
successCount.incrementAndGet();
}
} catch (Exception e) {
errorCount.incrementAndGet();
System.err.println("Decompression error: " + e.getMessage());
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(5, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis();
MemoryUsage afterHeap = memoryMXBean.getHeapMemoryUsage();
System.out.println("\n=== Test Results ===");
System.out.println("Completion status: " + (terminated ? "Completed" : "Timeout"));
System.out.println("Success count: " + successCount.get());
System.out.println("Error count: " + errorCount.get());
System.out.println("Duration: " + (endTime - startTime) + " ms");
System.out.println("Heap memory after: " + formatBytes(afterHeap.getUsed()));
System.out.println("Memory growth: " + formatBytes(afterHeap.getUsed() - beforeHeap.getUsed()));
// Force GC and check memory again
System.gc();
Thread.sleep(1000);
MemoryUsage afterGcHeap = memoryMXBean.getHeapMemoryUsage();
System.out.println("Heap memory after GC: " + formatBytes(afterGcHeap.getUsed()));
}
/**
* Generate random byte array
*/
private static byte[] generateRandomData(int size) {
Random random = new Random();
byte[] data = new byte[size];
random.nextBytes(data);
return data;
}
/**
* LZMA compression
*/
private static byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (LZMAOutputStream lzmaOut = new LZMAOutputStream(baos, new LZMA2Options(), data.length)) {
lzmaOut.write(data);
}
return baos.toByteArray();
}
/**
* LZMA decompression
*/
private static byte[] decompress(byte[] compressedData) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(compressedData);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (LZMAInputStream lzmaIn = new LZMAInputStream(bais)) {
byte[] buffer = new byte[8192];
int len;
while ((len = lzmaIn.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}
}
return baos.toByteArray();
}
/**
* Format bytes to human readable string
*/
private static String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return String.format("%.2f KB", bytes / 1024.0);
if (bytes < 1024 * 1024 * 1024) return String.format("%.2f MB", bytes / (1024.0 * 1024));
return String.format("%.2f GB", bytes / (1024.0 * 1024 * 1024));
}
}Metadata
Metadata
Assignees
Labels
No labels