Skip to content

Commit 8a1af55

Browse files
committed
Fix dict state reuse across chunk compression and decompression
Reset decompression dict state per chunk so reused dctx instances do not carry stale dictionaries into plain chunks. For lazy dict-enabled VL chunks, load the dictionary from the backing frame/chunk instead of reusing prior context state, which fixes selective block reads for fresh contexts and for LZ4/LZ4HC as well as ZSTD. Keep dict-compression fallback local to the current chunk by avoiding persistent mutation of context->use_dict. This preserves dictionary compression for later chunks when a small regular or VL chunk falls back to plain compression. Add regression tests covering: - reused dctx after dict then plain chunks - reused cctx after per-chunk dict fallback - lazy dict-enabled VL block reads with fresh dctx
1 parent 0a42e98 commit 8a1af55

File tree

4 files changed

+513
-22
lines changed

4 files changed

+513
-22
lines changed

blosc/blosc2.c

Lines changed: 157 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2475,6 +2475,137 @@ static int initialize_context_compression(
24752475
return 1;
24762476
}
24772477

2478+
static void release_context_dict_buffer(blosc2_context* context) {
2479+
if (context->dict_buffer_owned && context->dict_buffer != NULL) {
2480+
free(context->dict_buffer);
2481+
}
2482+
context->dict_buffer = NULL;
2483+
context->dict_buffer_owned = false;
2484+
context->dict_size = 0;
2485+
}
2486+
2487+
2488+
static void clear_context_decompression_dict(blosc2_context* context) {
2489+
context->use_dict = 0;
2490+
release_context_dict_buffer(context);
2491+
#if defined(HAVE_ZSTD)
2492+
if (context->dict_ddict != NULL) {
2493+
ZSTD_freeDDict(context->dict_ddict);
2494+
context->dict_ddict = NULL;
2495+
}
2496+
#else
2497+
context->dict_ddict = NULL;
2498+
#endif
2499+
}
2500+
2501+
2502+
static int read_lazy_chunk_bytes(blosc2_context* context, int32_t offset, uint8_t* buffer, int32_t nbytes,
2503+
const char* open_error, const char* read_error) {
2504+
if (context->schunk == NULL || context->schunk->frame == NULL) {
2505+
BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk with a frame.");
2506+
return BLOSC2_ERROR_INVALID_PARAM;
2507+
}
2508+
2509+
blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
2510+
blosc2_io_cb* io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
2511+
if (io_cb == NULL) {
2512+
BLOSC_TRACE_ERROR("Error getting the input/output API");
2513+
return BLOSC2_ERROR_PLUGIN_IO;
2514+
}
2515+
2516+
int32_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH +
2517+
context->nblocks * (int32_t)sizeof(int32_t);
2518+
int32_t nchunk_lazy = *(const int32_t*)(context->src + trailer_offset);
2519+
int64_t chunk_offset = *(const int64_t*)(context->src + trailer_offset +
2520+
(int32_t)sizeof(int32_t));
2521+
2522+
void* fp = NULL;
2523+
int64_t io_pos;
2524+
if (frame->sframe) {
2525+
char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
2526+
BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
2527+
sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk_lazy);
2528+
fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
2529+
free(chunkpath);
2530+
io_pos = offset;
2531+
}
2532+
else {
2533+
fp = io_cb->open(frame->urlpath, "rb", context->schunk->storage->io->params);
2534+
io_pos = frame->file_offset + chunk_offset + offset;
2535+
}
2536+
if (fp == NULL) {
2537+
BLOSC_TRACE_ERROR("%s", open_error);
2538+
return BLOSC2_ERROR_FILE_OPEN;
2539+
}
2540+
2541+
uint8_t* read_buffer = buffer;
2542+
int64_t rbytes = io_cb->read((void**)&read_buffer, 1, nbytes, io_pos, fp);
2543+
io_cb->close(fp);
2544+
if (read_buffer != buffer) {
2545+
memcpy(buffer, read_buffer, (size_t)nbytes);
2546+
free(read_buffer);
2547+
}
2548+
if (rbytes != nbytes) {
2549+
BLOSC_TRACE_ERROR("%s", read_error);
2550+
return BLOSC2_ERROR_FILE_READ;
2551+
}
2552+
2553+
return 0;
2554+
}
2555+
2556+
2557+
static int load_lazy_chunk_dict(blosc2_context* context, blosc_header* header, int32_t bstarts_end) {
2558+
int32_t dict_offset = bstarts_end;
2559+
if (header->cbytes < dict_offset + (int32_t)sizeof(int32_t)) {
2560+
BLOSC_TRACE_ERROR("Lazy chunk dictionary header exceeds chunk length.");
2561+
return BLOSC2_ERROR_INVALID_HEADER;
2562+
}
2563+
2564+
uint8_t dict_size_buf[sizeof(int32_t)];
2565+
int rc = read_lazy_chunk_bytes(context, dict_offset, dict_size_buf, (int32_t)sizeof(dict_size_buf),
2566+
"Cannot open frame file for lazy chunk dictionary read.",
2567+
"Cannot read lazy chunk dictionary size from disk.");
2568+
if (rc < 0) {
2569+
return rc;
2570+
}
2571+
2572+
context->dict_size = sw32_(dict_size_buf);
2573+
if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2574+
BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2575+
return BLOSC2_ERROR_CODEC_DICT;
2576+
}
2577+
if (header->cbytes < dict_offset + (int32_t)sizeof(int32_t) + context->dict_size) {
2578+
BLOSC_TRACE_ERROR("Lazy chunk dictionary exceeds chunk length.");
2579+
return BLOSC2_ERROR_INVALID_HEADER;
2580+
}
2581+
2582+
context->dict_buffer = malloc((size_t)context->dict_size);
2583+
BLOSC_ERROR_NULL(context->dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2584+
context->dict_buffer_owned = true;
2585+
rc = read_lazy_chunk_bytes(context, dict_offset + (int32_t)sizeof(int32_t),
2586+
context->dict_buffer, context->dict_size,
2587+
"Cannot open frame file for lazy chunk dictionary read.",
2588+
"Cannot read lazy chunk dictionary from disk.");
2589+
if (rc < 0) {
2590+
release_context_dict_buffer(context);
2591+
return rc;
2592+
}
2593+
2594+
context->use_dict = 1;
2595+
#if defined(HAVE_ZSTD)
2596+
if (context->compcode == BLOSC_ZSTD_FORMAT) {
2597+
context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2598+
if (context->dict_ddict == NULL) {
2599+
release_context_dict_buffer(context);
2600+
BLOSC_TRACE_ERROR("Cannot create ZSTD dictionary for lazy chunk.");
2601+
return BLOSC2_ERROR_CODEC_DICT;
2602+
}
2603+
}
2604+
#endif
2605+
2606+
return 0;
2607+
}
2608+
24782609

24792610
static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
24802611
int32_t srcsize, void* dest, int32_t destsize) {
@@ -2507,6 +2638,7 @@ static int initialize_context_decompression(blosc2_context* context, blosc_heade
25072638
if (rc < 0) {
25082639
return rc;
25092640
}
2641+
clear_context_decompression_dict(context);
25102642
vlblocks = (context->blosc2_flags2 & BLOSC2_VL_BLOCKS) != 0;
25112643
bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
25122644
(context->blosc2_flags & 0x08u));
@@ -2577,23 +2709,25 @@ static int initialize_context_decompression(blosc2_context* context, blosc_heade
25772709
srcsize -= context->dict_size;
25782710
// dict_buffer points directly into the source chunk — no copy needed
25792711
context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2712+
context->dict_buffer_owned = false;
25802713
#if defined(HAVE_ZSTD)
25812714
// context->compcode during decompression holds the format code (flags >> 5),
25822715
// so compare against BLOSC_ZSTD_FORMAT (not BLOSC_ZSTD).
25832716
if (context->compcode == BLOSC_ZSTD_FORMAT) {
2584-
if (context->dict_ddict != NULL) {
2585-
// Free the existing dictionary (probably from another chunk)
2586-
ZSTD_freeDDict(context->dict_ddict);
2587-
}
25882717
context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2718+
if (context->dict_ddict == NULL) {
2719+
BLOSC_TRACE_ERROR("Cannot create ZSTD dictionary for chunk.");
2720+
return BLOSC2_ERROR_CODEC_DICT;
2721+
}
25892722
}
25902723
#endif // HAVE_ZSTD
25912724
// For LZ4/LZ4HC: dict_buffer and dict_size are sufficient; no digested object needed.
25922725
}
25932726
else if ((context->blosc2_flags & BLOSC2_USEDICT) && is_lazy) {
2594-
// Lazy chunks don't carry the dictionary in their buffer (only header+bstarts+trailer).
2595-
// Reuse dict_ddict from a previous full-chunk initialization of this context if available.
2596-
context->use_dict = (context->dict_ddict != NULL) ? 1 : 0;
2727+
rc = load_lazy_chunk_dict(context, header, bstarts_end);
2728+
if (rc < 0) {
2729+
return rc;
2730+
}
25972731
}
25982732

25992733
if (vlblocks && !context->special_type && !memcpyed) {
@@ -2857,6 +2991,16 @@ static int blosc_compress_context(blosc2_context* context) {
28572991
}
28582992

28592993

2994+
static int blosc_compress_context_without_dict(blosc2_context* context) {
2995+
int saved_use_dict = context->use_dict;
2996+
context->use_dict = 0;
2997+
context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] &= ~(uint8_t)BLOSC2_USEDICT;
2998+
int cbytes = blosc_compress_context(context);
2999+
context->use_dict = saved_use_dict;
3000+
return cbytes;
3001+
}
3002+
3003+
28603004
/* The public secure routine for compression with context. */
28613005
int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
28623006
void* dest, int32_t destsize) {
@@ -2928,11 +3072,9 @@ int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsiz
29283072
if (dict_maxsize < BLOSC2_MINUSEFULDICT || sample_size == 0) {
29293073
BLOSC_TRACE_WARNING("Data too small for dict training (dict_maxsize=%d, sample_size=%zu)."
29303074
" Falling back to plain compression.", dict_maxsize, sample_size);
2931-
context->use_dict = 0;
2932-
context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] &= ~(uint8_t)BLOSC2_USEDICT;
29333075
context->bstarts = (int32_t*)(context->dest + context->header_overhead);
29343076
context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2935-
cbytes = blosc_compress_context(context);
3077+
cbytes = blosc_compress_context_without_dict(context);
29363078
}
29373079
else if (is_lz4) {
29383080
// LZ4/LZ4HC: use raw sample data directly as the dictionary (no training step).
@@ -3011,11 +3153,9 @@ int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsiz
30113153
" Falling back to plain compression.",
30123154
ZDICT_getErrorName(dict_actual_size));
30133155
free(dict_buffer);
3014-
context->use_dict = 0;
3015-
context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] &= ~(uint8_t)BLOSC2_USEDICT;
30163156
context->bstarts = (int32_t*)(context->dest + context->header_overhead);
30173157
context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
3018-
cbytes = blosc_compress_context(context);
3158+
cbytes = blosc_compress_context_without_dict(context);
30193159
}
30203160
else {
30213161
assert(dict_actual_size > 0);
@@ -3245,12 +3385,10 @@ int blosc2_vlcompress_ctx(blosc2_context* context, const void* const* srcs, cons
32453385
if (dict_maxsize < BLOSC2_MINUSEFULDICT || nblocks < 8 || vl_sample_size == 0) {
32463386
// Data is too small or too few VL blocks to build a useful dictionary;
32473387
// fall back to plain compression without a dict.
3248-
context->use_dict = 0;
3249-
context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] &= ~(uint8_t)BLOSC2_USEDICT;
32503388
context->bstarts = (int32_t*)(context->dest + context->header_overhead);
32513389
context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * nblocks;
32523390
context->vlblock_sources = (const uint8_t**)srcs;
3253-
cbytes = blosc_compress_context(context);
3391+
cbytes = blosc_compress_context_without_dict(context);
32543392
context->vlblock_sources = NULL;
32553393
}
32563394
else if (is_lz4) {
@@ -3321,12 +3459,10 @@ int blosc2_vlcompress_ctx(blosc2_context* context, const void* const* srcs, cons
33213459
BLOSC_TRACE_WARNING("ZDICT_trainFromBuffer() failed ('%s'); falling back to plain compression.",
33223460
ZDICT_getErrorName(dict_actual_size));
33233461
free(dict_buffer);
3324-
context->use_dict = 0;
3325-
context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] &= ~(uint8_t)BLOSC2_USEDICT;
33263462
context->bstarts = (int32_t*)(context->dest + context->header_overhead);
33273463
context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * nblocks;
33283464
context->vlblock_sources = (const uint8_t**)srcs;
3329-
cbytes = blosc_compress_context(context);
3465+
cbytes = blosc_compress_context_without_dict(context);
33303466
context->vlblock_sources = NULL;
33313467
}
33323468
else {
@@ -3367,12 +3503,10 @@ int blosc2_vlcompress_ctx(blosc2_context* context, const void* const* srcs, cons
33673503
}
33683504
size_t vl_sample_size_lz4 = (nblocks > 0) ? ((size_t)srcsize / (size_t)nblocks / 16) : 0;
33693505
if (!is_lz4 || dict_maxsize < BLOSC2_MINUSEFULDICT || nblocks < 8 || vl_sample_size_lz4 == 0) {
3370-
context->use_dict = 0;
3371-
context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] &= ~(uint8_t)BLOSC2_USEDICT;
33723506
context->bstarts = (int32_t*)(context->dest + context->header_overhead);
33733507
context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * nblocks;
33743508
context->vlblock_sources = (const uint8_t**)srcs;
3375-
cbytes = blosc_compress_context(context);
3509+
cbytes = blosc_compress_context_without_dict(context);
33763510
context->vlblock_sources = NULL;
33773511
}
33783512
else {
@@ -5164,6 +5298,7 @@ void blosc2_free_ctx(blosc2_context* context) {
51645298
if (context->serial_context != NULL) {
51655299
free_thread_context(context->serial_context);
51665300
}
5301+
release_context_dict_buffer(context);
51675302
if (context->dict_cdict != NULL) {
51685303
if (context->compcode == BLOSC_LZ4) {
51695304
LZ4_freeStream((LZ4_stream_t*)context->dict_cdict);

blosc/context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ struct blosc2_context_s {
109109
int dref_not_init; /* data ref in delta not initialized */
110110
blosc2_pthread_mutex_t delta_mutex;
111111
blosc2_pthread_cond_t delta_cv;
112+
bool dict_buffer_owned; /* Whether dict_buffer must be freed by the context */
112113
// Add new fields here to avoid breaking the ABI.
113114
};
114115

0 commit comments

Comments
 (0)