Skip to content

Commit 0a42e98

Browse files
committed
VL-block chunks support lazy loading; the trailer format is identical as original
1 parent e0ff896 commit 0a42e98

File tree

4 files changed

+273
-40
lines changed

4 files changed

+273
-40
lines changed

README_CHUNK_FORMAT.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ Trailer
292292
This is an optional section, mainly for lazy chunks use. A lazy chunk is similar to a regular one, except that
293293
only the meta-information has been loaded. The actual data from blocks is 'lazily' only loaded on demand.
294294
This allows for improved selectivity, and hence less input bandwidth demands, during partial chunk reads
295-
(e.g. `blosc1_getitem`) from data that is on disk.
295+
(e.g. `blosc1_getitem`) from data that is on disk. Both regular chunks and variable-length-block
296+
(VL-blocks) chunks support lazy loading; the trailer format is identical for both.
296297

297298
It is arranged like this::
298299

blosc/blosc2.c

Lines changed: 129 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1724,10 +1724,6 @@ static int blosc_d(
17241724
// Chunks with special values cannot be lazy
17251725
bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
17261726
(context->blosc2_flags & 0x08u) && !context->special_type);
1727-
if (is_lazy && vlblocks) {
1728-
BLOSC_TRACE_ERROR("VL-block lazy chunks are not supported yet.");
1729-
return BLOSC2_ERROR_INVALID_PARAM;
1730-
}
17311727
if (is_lazy) {
17321728
// The chunk is on disk, so just lazily load the block
17331729
if (context->schunk == NULL) {
@@ -2512,6 +2508,8 @@ static int initialize_context_decompression(blosc2_context* context, blosc_heade
25122508
return rc;
25132509
}
25142510
vlblocks = (context->blosc2_flags2 & BLOSC2_VL_BLOCKS) != 0;
2511+
bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
2512+
(context->blosc2_flags & 0x08u));
25152513

25162514
/* Check that we have enough space to decompress */
25172515
if (context->sourcesize > (int32_t)context->destsize) {
@@ -2558,7 +2556,7 @@ static int initialize_context_decompression(blosc2_context* context, blosc_heade
25582556
srcsize -= bstarts_end;
25592557

25602558
/* Read optional dictionary if flag set */
2561-
if (context->blosc2_flags & BLOSC2_USEDICT) {
2559+
if ((context->blosc2_flags & BLOSC2_USEDICT) && !is_lazy) {
25622560
context->use_dict = 1;
25632561
// The dictionary section is after the bstarts block: [int32 size | raw bytes]
25642562
if (srcsize < (signed)sizeof(int32_t)) {
@@ -2592,52 +2590,77 @@ static int initialize_context_decompression(blosc2_context* context, blosc_heade
25922590
#endif // HAVE_ZSTD
25932591
// For LZ4/LZ4HC: dict_buffer and dict_size are sufficient; no digested object needed.
25942592
}
2593+
else if ((context->blosc2_flags & BLOSC2_USEDICT) && is_lazy) {
2594+
// Lazy chunks don't carry the dictionary in their buffer (only header+bstarts+trailer).
2595+
// Reuse dict_ddict from a previous full-chunk initialization of this context if available.
2596+
context->use_dict = (context->dict_ddict != NULL) ? 1 : 0;
2597+
}
25952598

25962599
if (vlblocks && !context->special_type && !memcpyed) {
2597-
int32_t max_blocksize = 0;
2598-
int32_t total_nbytes = 0;
2599-
int32_t prev_bstart = 0;
2600-
bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
2601-
(context->blosc2_flags & 0x08u));
2602-
if (is_lazy) {
2603-
BLOSC_TRACE_ERROR("VL-block lazy chunks are not supported yet.");
2604-
return BLOSC2_ERROR_INVALID_PARAM;
2605-
}
2606-
26072600
context->blocknbytes = malloc((size_t)context->nblocks * sizeof(int32_t));
26082601
BLOSC_ERROR_NULL(context->blocknbytes, BLOSC2_ERROR_MEMORY_ALLOC);
26092602
context->blockoffsets = malloc((size_t)context->nblocks * sizeof(int32_t));
26102603
BLOSC_ERROR_NULL(context->blockoffsets, BLOSC2_ERROR_MEMORY_ALLOC);
26112604
context->blockcbytes = malloc((size_t)context->nblocks * sizeof(int32_t));
26122605
BLOSC_ERROR_NULL(context->blockcbytes, BLOSC2_ERROR_MEMORY_ALLOC);
26132606

2614-
for (int32_t i = 0; i < context->nblocks; ++i) {
2615-
int32_t bstart = sw32_(context->bstarts + i);
2616-
int32_t next_bstart = (i + 1 < context->nblocks) ? sw32_(context->bstarts + i + 1) : header->cbytes;
2617-
if (bstart < bstarts_end || bstart <= prev_bstart || next_bstart <= bstart ||
2618-
next_bstart > header->cbytes || bstart > context->srcsize - (int32_t)sizeof(int32_t)) {
2619-
BLOSC_TRACE_ERROR("Invalid VL-block offsets in chunk.");
2620-
return BLOSC2_ERROR_INVALID_HEADER;
2607+
if (is_lazy) {
2608+
// Lazy VL: block data is on disk, so blocknbytes is unknown at this point.
2609+
// Populate blockcbytes from bstarts differences; blocksize gets max(blockcbytes)
2610+
// as a safe upper bound so tmp buffers are large enough for the lazy block read.
2611+
int32_t max_csize = 0;
2612+
for (int32_t i = 0; i < context->nblocks; ++i) {
2613+
int32_t bstart = sw32_(context->bstarts + i);
2614+
int32_t next_bstart = (i + 1 < context->nblocks) ?
2615+
sw32_(context->bstarts + i + 1) : header->cbytes;
2616+
if (bstart < bstarts_end || next_bstart <= bstart ||
2617+
next_bstart > header->cbytes) {
2618+
BLOSC_TRACE_ERROR("Invalid VL-block offsets in lazy chunk.");
2619+
return BLOSC2_ERROR_INVALID_HEADER;
2620+
}
2621+
context->blocknbytes[i] = 0; // unknown until block is read from disk
2622+
context->blockoffsets[i] = 0; // unknown
2623+
context->blockcbytes[i] = next_bstart - bstart;
2624+
if (context->blockcbytes[i] > max_csize) {
2625+
max_csize = context->blockcbytes[i];
2626+
}
26212627
}
2622-
context->blockoffsets[i] = total_nbytes;
2623-
context->blocknbytes[i] = sw32_(context->src + bstart);
2624-
context->blockcbytes[i] = next_bstart - bstart;
2625-
if (context->blocknbytes[i] <= 0) {
2626-
BLOSC_TRACE_ERROR("Invalid VL-block uncompressed size in chunk.");
2627-
return BLOSC2_ERROR_INVALID_HEADER;
2628+
context->blocksize = max_csize;
2629+
context->leftover = 0;
2630+
}
2631+
else {
2632+
int32_t max_blocksize = 0;
2633+
int32_t total_nbytes = 0;
2634+
int32_t prev_bstart = 0;
2635+
for (int32_t i = 0; i < context->nblocks; ++i) {
2636+
int32_t bstart = sw32_(context->bstarts + i);
2637+
int32_t next_bstart = (i + 1 < context->nblocks) ?
2638+
sw32_(context->bstarts + i + 1) : header->cbytes;
2639+
if (bstart < bstarts_end || bstart <= prev_bstart || next_bstart <= bstart ||
2640+
next_bstart > header->cbytes || bstart > context->srcsize - (int32_t)sizeof(int32_t)) {
2641+
BLOSC_TRACE_ERROR("Invalid VL-block offsets in chunk.");
2642+
return BLOSC2_ERROR_INVALID_HEADER;
2643+
}
2644+
context->blockoffsets[i] = total_nbytes;
2645+
context->blocknbytes[i] = sw32_(context->src + bstart);
2646+
context->blockcbytes[i] = next_bstart - bstart;
2647+
if (context->blocknbytes[i] <= 0) {
2648+
BLOSC_TRACE_ERROR("Invalid VL-block uncompressed size in chunk.");
2649+
return BLOSC2_ERROR_INVALID_HEADER;
2650+
}
2651+
total_nbytes += context->blocknbytes[i];
2652+
if (context->blocknbytes[i] > max_blocksize) {
2653+
max_blocksize = context->blocknbytes[i];
2654+
}
2655+
prev_bstart = bstart;
26282656
}
2629-
total_nbytes += context->blocknbytes[i];
2630-
if (context->blocknbytes[i] > max_blocksize) {
2631-
max_blocksize = context->blocknbytes[i];
2657+
if (total_nbytes != context->sourcesize) {
2658+
BLOSC_TRACE_ERROR("VL-block sizes do not add up to chunk nbytes.");
2659+
return BLOSC2_ERROR_INVALID_HEADER;
26322660
}
2633-
prev_bstart = bstart;
2634-
}
2635-
if (total_nbytes != context->sourcesize) {
2636-
BLOSC_TRACE_ERROR("VL-block sizes do not add up to chunk nbytes.");
2637-
return BLOSC2_ERROR_INVALID_HEADER;
2661+
context->blocksize = max_blocksize;
2662+
context->leftover = 0;
26382663
}
2639-
context->blocksize = max_blocksize;
2640-
context->leftover = 0;
26412664
}
26422665

26432666
return 0;
@@ -3755,6 +3778,74 @@ static int decompress_single_vlblock(blosc2_context* context, int32_t nblock,
37553778
return BLOSC2_ERROR_INVALID_PARAM;
37563779
}
37573780

3781+
// For lazy VL chunks blocknbytes[nblock] == 0 because the uncompressed size is
3782+
// stored as the first 4 bytes of the block span on disk, not in the in-memory
3783+
// header. Peek at the file to resolve it before we can allocate the output buffer.
3784+
if (context->blocknbytes[nblock] == 0) {
3785+
if (context->schunk == NULL || context->schunk->frame == NULL) {
3786+
BLOSC_TRACE_ERROR("Lazy VL block needs an associated super-chunk with a frame.");
3787+
return BLOSC2_ERROR_INVALID_PARAM;
3788+
}
3789+
blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
3790+
blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
3791+
if (io_cb == NULL) {
3792+
BLOSC_TRACE_ERROR("Error getting the input/output API");
3793+
return BLOSC2_ERROR_PLUGIN_IO;
3794+
}
3795+
3796+
// Lazy chunk trailer: [nchunk int32 | chunk_offset int64 | block_csizes int32*N]
3797+
int32_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH +
3798+
context->nblocks * (int32_t)sizeof(int32_t);
3799+
int32_t nchunk_lazy = *(const int32_t*)(context->src + trailer_offset);
3800+
int64_t chunk_offset = *(const int64_t*)(context->src + trailer_offset +
3801+
(int32_t)sizeof(int32_t));
3802+
int32_t bstart = sw32_(context->bstarts + nblock);
3803+
3804+
void* fp = NULL;
3805+
int64_t io_pos;
3806+
if (frame->sframe) {
3807+
char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
3808+
BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
3809+
sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk_lazy);
3810+
fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
3811+
free(chunkpath);
3812+
io_pos = bstart;
3813+
}
3814+
else {
3815+
fp = io_cb->open(frame->urlpath, "rb", context->schunk->storage->io->params);
3816+
io_pos = frame->file_offset + chunk_offset + bstart;
3817+
}
3818+
if (fp == NULL) {
3819+
BLOSC_TRACE_ERROR("Cannot open frame file for lazy VL block size peek.");
3820+
return BLOSC2_ERROR_FILE_OPEN;
3821+
}
3822+
3823+
// Read only the 4-byte uncompressed-size prefix of the block span.
3824+
uint8_t nbuf[sizeof(int32_t)];
3825+
uint8_t* nbufp = nbuf;
3826+
int64_t rbytes = io_cb->read((void**)&nbufp, 1, sizeof(int32_t), io_pos, fp);
3827+
io_cb->close(fp);
3828+
if (nbufp != nbuf) {
3829+
// io_cb allocated new memory; copy the result and free.
3830+
memcpy(nbuf, nbufp, sizeof(int32_t));
3831+
free(nbufp);
3832+
}
3833+
if (rbytes != (int64_t)sizeof(int32_t)) {
3834+
BLOSC_TRACE_ERROR("Cannot read VL-block uncompressed-size prefix from disk.");
3835+
return BLOSC2_ERROR_FILE_READ;
3836+
}
3837+
int32_t neblock = sw32_(nbuf);
3838+
if (neblock <= 0) {
3839+
BLOSC_TRACE_ERROR("Invalid VL-block uncompressed size read from disk.");
3840+
return BLOSC2_ERROR_INVALID_HEADER;
3841+
}
3842+
context->blocknbytes[nblock] = neblock;
3843+
// Keep blocksize as an upper bound for tmp buffer allocation.
3844+
if (neblock > context->blocksize) {
3845+
context->blocksize = neblock;
3846+
}
3847+
}
3848+
37583849
int32_t bsize = context->blocknbytes[nblock];
37593850
uint8_t* buf = malloc((size_t)bsize);
37603851
if (buf == NULL) {

include/blosc2.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1570,8 +1570,12 @@ BLOSC_EXPORT int blosc2_vlchunk_get_nblocks(const void* src, int32_t srcsize,
15701570
* in the chunk are untouched.
15711571
*
15721572
* @param context A decompression context (#blosc2_context created with
1573-
* #blosc2_create_dctx).
1573+
* #blosc2_create_dctx). If @p src is a lazy chunk (obtained via
1574+
* #blosc2_schunk_get_lazychunk), the caller must set @c context->schunk
1575+
* to the owning super-chunk (which must have an associated frame) before
1576+
* calling this function; the frame is used to read block data from disk.
15741577
* @param src The buffer of compressed data. Must carry the #BLOSC2_VL_BLOCKS flag.
1578+
* May be a fully in-memory chunk or a lazy chunk proxy.
15751579
* @param srcsize The length of the compressed data buffer.
15761580
* @param nblock Zero-based index of the block to decompress.
15771581
* @param dest On success, points to a newly allocated buffer containing the

tests/test_vlblocks.c

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,140 @@ static char *test_vldecompress_block_ctx_dict(void) {
679679
#endif /* HAVE_ZSTD */
680680

681681

682+
/* ---- Lazy VL chunk tests ------------------------------------------------- */
683+
684+
/* Helper: build a 2-chunk file-backed schunk with 3 VL blocks per chunk.
685+
* Returns the schunk (caller must blosc2_schunk_free it), or NULL on error. */
686+
static blosc2_schunk* make_lazy_vl_schunk(const char* urlpath, bool contiguous) {
687+
blosc2_remove_urlpath(urlpath);
688+
689+
blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
690+
cparams.typesize = 1;
691+
blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
692+
blosc2_storage storage = {
693+
.contiguous = contiguous,
694+
.urlpath = (char *)urlpath,
695+
.cparams = &cparams,
696+
.dparams = &dparams,
697+
};
698+
699+
blosc2_schunk* schunk = blosc2_schunk_new(&storage);
700+
if (schunk == NULL) return NULL;
701+
702+
blosc2_context* cctx = blosc2_create_cctx(cparams);
703+
if (cctx == NULL) { blosc2_schunk_free(schunk); return NULL; }
704+
705+
int32_t destsize = total_nbytes() + BLOSC2_MAX_OVERHEAD + 64;
706+
uint8_t* chunk = malloc((size_t)destsize);
707+
if (chunk == NULL) { blosc2_free_ctx(cctx); blosc2_schunk_free(schunk); return NULL; }
708+
709+
int32_t cbytes = blosc2_vlcompress_ctx(cctx, (const void * const *)srcs, srcsizes, 3,
710+
chunk, destsize);
711+
if (cbytes <= 0 || blosc2_schunk_append_chunk(schunk, chunk, true) < 0) {
712+
free(chunk); blosc2_free_ctx(cctx); blosc2_schunk_free(schunk); return NULL;
713+
}
714+
/* Append a second chunk (same data) so nchunks > 1. */
715+
cbytes = blosc2_vlcompress_ctx(cctx, (const void * const *)srcs, srcsizes, 3,
716+
chunk, destsize);
717+
if (cbytes <= 0 || blosc2_schunk_append_chunk(schunk, chunk, true) < 0) {
718+
free(chunk); blosc2_free_ctx(cctx); blosc2_schunk_free(schunk); return NULL;
719+
}
720+
free(chunk);
721+
blosc2_free_ctx(cctx);
722+
return schunk;
723+
}
724+
725+
static char *test_lazy_vlchunk_get_nblocks(void) {
726+
const char* urlpath = "test_lazy_vlnblocks.b2frame";
727+
blosc2_schunk* schunk = make_lazy_vl_schunk(urlpath, true);
728+
mu_assert("ERROR: cannot create lazy VL schunk", schunk != NULL);
729+
730+
for (int64_t nchunk = 0; nchunk < schunk->nchunks; ++nchunk) {
731+
uint8_t* lazy_chunk = NULL;
732+
bool needs_free = false;
733+
int cbytes = blosc2_schunk_get_lazychunk(schunk, nchunk, &lazy_chunk, &needs_free);
734+
mu_assert("ERROR: blosc2_schunk_get_lazychunk failed", cbytes > 0);
735+
736+
int32_t nblocks = -1;
737+
int rc = blosc2_vlchunk_get_nblocks(lazy_chunk, cbytes, &nblocks);
738+
mu_assert("ERROR: blosc2_vlchunk_get_nblocks failed on lazy chunk", rc == 0);
739+
mu_assert("ERROR: wrong nblocks from lazy chunk", nblocks == 3);
740+
741+
if (needs_free) free(lazy_chunk);
742+
}
743+
744+
blosc2_schunk_free(schunk);
745+
blosc2_remove_urlpath(urlpath);
746+
return EXIT_SUCCESS;
747+
}
748+
749+
750+
static char *test_lazy_vldecompress_block_ctx(void) {
751+
const char* urlpath = "test_lazy_vldecomp.b2frame";
752+
blosc2_schunk* schunk = make_lazy_vl_schunk(urlpath, true);
753+
mu_assert("ERROR: cannot create lazy VL schunk", schunk != NULL);
754+
755+
/* Use the schunk's own dctx, which already has schunk->dctx->schunk set. */
756+
blosc2_context* dctx = schunk->dctx;
757+
758+
for (int64_t nchunk = 0; nchunk < schunk->nchunks; ++nchunk) {
759+
uint8_t* lazy_chunk = NULL;
760+
bool needs_free = false;
761+
int cbytes = blosc2_schunk_get_lazychunk(schunk, nchunk, &lazy_chunk, &needs_free);
762+
mu_assert("ERROR: blosc2_schunk_get_lazychunk failed", cbytes > 0);
763+
764+
for (int i = 0; i < 3; ++i) {
765+
uint8_t* blk = NULL;
766+
int32_t blksize = -1;
767+
int rc = blosc2_vldecompress_block_ctx(dctx, lazy_chunk, cbytes, i, &blk, &blksize);
768+
mu_assert("ERROR: lazy blosc2_vldecompress_block_ctx failed", rc == srcsizes[i]);
769+
mu_assert("ERROR: lazy returned size mismatch", blksize == srcsizes[i]);
770+
mu_assert("ERROR: lazy content mismatch",
771+
memcmp(blk, srcs[i], (size_t)srcsizes[i]) == 0);
772+
free(blk);
773+
}
774+
if (needs_free) free(lazy_chunk);
775+
}
776+
777+
blosc2_schunk_free(schunk);
778+
blosc2_remove_urlpath(urlpath);
779+
return EXIT_SUCCESS;
780+
}
781+
782+
783+
static char *test_lazy_vldecompress_block_ctx_sframe(void) {
784+
const char* urlpath = "test_lazy_vldecomp_s.b2frame";
785+
blosc2_schunk* schunk = make_lazy_vl_schunk(urlpath, false);
786+
mu_assert("ERROR: cannot create lazy VL sframe schunk", schunk != NULL);
787+
788+
/* Use the schunk's own dctx. */
789+
blosc2_context* dctx = schunk->dctx;
790+
791+
for (int64_t nchunk = 0; nchunk < schunk->nchunks; ++nchunk) {
792+
uint8_t* lazy_chunk = NULL;
793+
bool needs_free = false;
794+
int cbytes = blosc2_schunk_get_lazychunk(schunk, nchunk, &lazy_chunk, &needs_free);
795+
mu_assert("ERROR: sframe blosc2_schunk_get_lazychunk failed", cbytes > 0);
796+
797+
for (int i = 0; i < 3; ++i) {
798+
uint8_t* blk = NULL;
799+
int32_t blksize = -1;
800+
int rc = blosc2_vldecompress_block_ctx(dctx, lazy_chunk, cbytes, i, &blk, &blksize);
801+
mu_assert("ERROR: sframe lazy blosc2_vldecompress_block_ctx failed", rc == srcsizes[i]);
802+
mu_assert("ERROR: sframe lazy returned size mismatch", blksize == srcsizes[i]);
803+
mu_assert("ERROR: sframe lazy content mismatch",
804+
memcmp(blk, srcs[i], (size_t)srcsizes[i]) == 0);
805+
free(blk);
806+
}
807+
if (needs_free) free(lazy_chunk);
808+
}
809+
810+
blosc2_schunk_free(schunk);
811+
blosc2_remove_urlpath(urlpath);
812+
return EXIT_SUCCESS;
813+
}
814+
815+
682816
static char *all_tests(void) {
683817
for (int i = 0; i < (int)ARRAY_SIZE(backends); ++i) {
684818
tdata = backends[i];
@@ -690,6 +824,9 @@ static char *all_tests(void) {
690824
mu_run_test(test_schunk_get_vlblock);
691825
}
692826
mu_run_test(test_vlblocks_mt_roundtrip);
827+
mu_run_test(test_lazy_vlchunk_get_nblocks);
828+
mu_run_test(test_lazy_vldecompress_block_ctx);
829+
mu_run_test(test_lazy_vldecompress_block_ctx_sframe);
693830
#ifdef HAVE_ZSTD
694831
mu_run_test(test_vlblocks_dict_roundtrip);
695832
mu_run_test(test_vldecompress_block_ctx_dict);

0 commit comments

Comments
 (0)