Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 297 additions & 0 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
#include "H5PBprivate.h" /* Page Buffer */
#include "H5SLprivate.h" /* Skip Lists */
#include "H5VMprivate.h" /* Vector and array functions */
#include "H5Zprivate.h" /* Data filters */
#include "H5FDprivate.h" /* File drivers */
#include <omp.h> /* OpenMP */

/****************/
/* Local Macros */
Expand Down Expand Up @@ -409,6 +412,12 @@ H5FL_BLK_DEFINE_STATIC(chunk);
/* Declare extern free list to manage the H5S_sel_iter_t struct */
H5FL_EXTERN(H5S_sel_iter_t);

/* Declare extern free list to manage sequences of size_t */
H5FL_SEQ_EXTERN(size_t);

/* Declare extern free list to manage sequences of hsize_t */
H5FL_SEQ_EXTERN(hsize_t);

/*-------------------------------------------------------------------------
* Function: H5D__chunk_direct_write
*
Expand Down Expand Up @@ -3121,6 +3130,16 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)

/* Iterate through nodes in chunk skip list */
chunk_node = H5D_CHUNK_GET_FIRST_NODE(dset_info);

/* Variables for parallel hyperslab optimization */
bool hslab_selection = true;
size_t chunk_info_index = 0;
size_t num_chunks = H5D_CHUNK_GET_NODE_COUNT(dset_info);
H5D_chunk_info_light_t *chunk_nodes_info = NULL;

if (num_chunks == 0)
goto done;

while (chunk_node) {
H5D_piece_info_t *chunk_info; /* Chunk information */
H5D_chunk_ud_t udata; /* Chunk index pass-through */
Expand All @@ -3140,6 +3159,35 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)
/* Check for non-existent chunk & skip it if appropriate */
if (H5_addr_defined(udata.chunk_block.offset) || UINT_MAX != udata.idx_hint ||
!skip_missing_chunks) {

/* Check if we can use parallel hyperslab optimization:
* - Hyperslab selection
* - No type conversion needed
*/
if (H5S_GET_SELECT_TYPE(chunk_info->fspace) == H5S_SEL_HYPERSLABS &&
(dset_info->type_info.is_conv_noop == true &&
dset_info->type_info.is_xform_noop == true)) {
/* Collect chunk info for parallel processing */
if (chunk_nodes_info == NULL) {
chunk_nodes_info =
(H5D_chunk_info_light_t *)malloc(num_chunks * sizeof(H5D_chunk_info_light_t));
if (chunk_nodes_info == NULL)
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
"memory allocation failed for chunk info");
}
chunk_nodes_info[chunk_info_index].chunk_info = chunk_info;
chunk_nodes_info[chunk_info_index].offset = udata.chunk_block.offset;
chunk_nodes_info[chunk_info_index].length = udata.chunk_block.length;
chunk_nodes_info[chunk_info_index].filter_mask = udata.filter_mask;
chunk_info_index++;

chunk_node = H5D_CHUNK_GET_NEXT_NODE(dset_info, chunk_node);
continue;
}

/* Not suitable for parallel optimization - use serial path */
hslab_selection = false;

H5D_io_info_t *chk_io_info; /* Pointer to I/O info object for this chunk */
void *chunk = NULL; /* Pointer to locked chunk buffer */

Expand Down Expand Up @@ -3198,6 +3246,255 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info)
/* Advance to next chunk in list */
chunk_node = H5D_CHUNK_GET_NEXT_NODE(dset_info, chunk_node);
} /* end while */

/* If we collected chunks for parallel processing, do it now */
if (hslab_selection && chunk_info_index > 0) {
H5F_shared_t *f_sh = H5F_SHARED(dset_info->dset->oloc.file);
H5FD_t *file = NULL;
H5F_shared_get_file_driver(f_sh, &file);

hid_t dxpl_idx = H5CX_get_dxpl();
size_t elsize = dset_info->type_info.src_type_size;
void *dstbuf = dset_info->buf.vp;

/* Retrieve filter pipeline */
H5O_pline_t *pline = &(dset_info->dset->shared->dcpl_cache.pline);

H5Z_EDC_t err_detect;
H5Z_cb_t filter_cb;
if (H5CX_get_err_detect(&err_detect) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info");
if (H5CX_get_filter_cb(&filter_cb) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get filter callback function");

size_t num_threads = 8;
size_t itersize = (chunk_info_index + num_threads - 1) / num_threads;

/* Per-thread variables for parallel processing */
H5S_sel_iter_t *mem_iter[num_threads];
bool mem_iter_init[num_threads];
H5S_sel_iter_t *file_iter[num_threads];
bool file_iter_init[num_threads];
hsize_t *mem_off[num_threads];
hsize_t *file_off[num_threads];
size_t *mem_len[num_threads];
size_t *file_len[num_threads];
size_t curr_mem_seq[num_threads];
size_t curr_file_seq[num_threads];
size_t mem_nseq[num_threads];
size_t file_nseq[num_threads];
size_t dxpl_vec_size[num_threads];
size_t vec_size[num_threads];
ssize_t tmp_file_len[num_threads];
size_t nelmts[num_threads];

/* Initialize arrays */
for (size_t t = 0; t < num_threads; t++) {
mem_iter[t] = NULL;
mem_iter_init[t] = false;
file_iter[t] = NULL;
file_iter_init[t] = false;
mem_off[t] = NULL;
file_off[t] = NULL;
mem_len[t] = NULL;
file_len[t] = NULL;
curr_mem_seq[t] = 0;
curr_file_seq[t] = 0;
mem_nseq[t] = 0;
file_nseq[t] = 0;
dxpl_vec_size[t] = 0;
vec_size[t] = 0;
tmp_file_len[t] = 0;
nelmts[t] = 0;
}

/* Allocate per-thread resources */
for (size_t t = 0; t < num_threads; t++) {
if (H5CX_get_vec_size(&dxpl_vec_size[t]) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't retrieve I/O vector size");

/* Allocate the vector I/O arrays */
if (dxpl_vec_size[t] > H5D_IO_VECTOR_SIZE)
vec_size[t] = dxpl_vec_size[t];
else
vec_size[t] = H5D_IO_VECTOR_SIZE;

if (NULL == (mem_len[t] = H5FL_SEQ_MALLOC(size_t, vec_size[t])))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O length vector array");
if (NULL == (mem_off[t] = H5FL_SEQ_MALLOC(hsize_t, vec_size[t])))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O offset vector array");
if (NULL == (file_len[t] = H5FL_SEQ_MALLOC(size_t, vec_size[t])))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O length vector array");
if (NULL == (file_off[t] = H5FL_SEQ_MALLOC(hsize_t, vec_size[t])))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O offset vector array");

/* Allocate the iterators */
if (NULL == (mem_iter[t] = H5FL_MALLOC(H5S_sel_iter_t)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate memory iterator");
if (NULL == (file_iter[t] = H5FL_MALLOC(H5S_sel_iter_t)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate file iterator");
}

bool error_tracker[num_threads];
for (size_t t = 0; t < num_threads; t++) {
error_tracker[t] = false;
}

#pragma omp parallel num_threads(num_threads) default(shared)
{
size_t tid = omp_get_thread_num();
size_t start = tid * itersize;
size_t end = start + itersize;
if (end > chunk_info_index)
end = chunk_info_index;

for (size_t i = start; i < end; ++i) {
if (error_tracker[tid])
continue;

H5D_piece_info_t *p_chunk_info = chunk_nodes_info[i].chunk_info;
haddr_t offset = chunk_nodes_info[i].offset;
size_t length = chunk_nodes_info[i].length;
void *iobuf = H5MM_malloc(length);

/* Read chunk from file */
(file->cls->read)(file, H5FD_MEM_DRAW, dxpl_idx, offset + file->base_addr, length, iobuf);

/* Process through filter pipeline if filters are present */
if (pline && pline->nused) {
size_t buf_size = length;

H5Z_pipeline(pline,
H5Z_FLAG_REVERSE,
&chunk_nodes_info[i].filter_mask,
err_detect,
filter_cb,
/* IN: compressed chunk size */
/* OUT: uncompressed chunk size */
&length,
/* IN: compressed size (data size)
* OUT: allocated buffer size (may be reallocated) */
/*
* Implementation detail for zstd filter:
* `iobuf` is swapped out for a new buffer which holds the decompressed data.
*/
&buf_size,
&iobuf);
}

size_t file_nelem;
size_t mem_nelem;

nelmts[tid] = p_chunk_info->piece_points;

/* Initialize file iterator */
if (H5S_select_iter_init(file_iter[tid],
p_chunk_info->fspace,
elsize,
H5S_SEL_ITER_GET_SEQ_LIST_SORTED) < 0) {
error_tracker[tid] = true;
continue;
}
file_iter_init[tid] = true; /* File selection iteration info has been initialized */

/* Initialize memory iterator */
if (H5S_select_iter_init(mem_iter[tid],
p_chunk_info->mspace,
elsize,
0) < 0) {
error_tracker[tid] = true;
continue;
}
mem_iter_init[tid] = true; /* Memory selection iteration info has been initialized */

curr_file_seq[tid] = curr_mem_seq[tid] = 0;
mem_nseq[tid] = file_nseq[tid] = 0;

while (nelmts[tid] > 0) {
if (curr_file_seq[tid] >= file_nseq[tid]) {
if (H5S_SELECT_ITER_GET_SEQ_LIST(file_iter[tid],
vec_size[tid],
nelmts[tid],
&file_nseq[tid],
&file_nelem,
file_off[tid],
file_len[tid]) < 0) {
error_tracker[tid] = true;
break;
}

curr_file_seq[tid] = 0;
}

if (curr_mem_seq[tid] >= mem_nseq[tid]) {
if (H5S_SELECT_ITER_GET_SEQ_LIST(mem_iter[tid],
vec_size[tid],
nelmts[tid],
&mem_nseq[tid],
&mem_nelem,
mem_off[tid],
mem_len[tid]) < 0) {
error_tracker[tid] = true;
break;
}

curr_mem_seq[tid] = 0;
}

/* Writes to dstbuf are on disjoint locations, so concurrent writes are safe */
if ((tmp_file_len[tid] = H5VM_memcpyvv(dstbuf,
mem_nseq[tid],
&curr_mem_seq[tid],
mem_len[tid],
mem_off[tid],
iobuf,
file_nseq[tid],
&curr_file_seq[tid],
file_len[tid],
file_off[tid])) < 0) {
error_tracker[tid] = true;
break;
}

/* Decrement number of elements left to process */
assert(((size_t)tmp_file_len[tid] % elsize) == 0);
nelmts[tid] -= ((size_t)tmp_file_len[tid] / elsize);

}

if (iobuf)
iobuf = H5MM_xfree(iobuf);

}
} /* end parallel */

/* Release per-thread resources */
for (size_t t = 0; t < num_threads; t++) {
/* Release selection iterators */
if (file_iter_init[t] && H5S_SELECT_ITER_RELEASE(file_iter[t]) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTRELEASE, FAIL, "unable to release selection iterator");
if (file_iter[t])
file_iter[t] = H5FL_FREE(H5S_sel_iter_t, file_iter[t]);
if (mem_iter_init[t] && H5S_SELECT_ITER_RELEASE(mem_iter[t]) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTRELEASE, FAIL, "unable to release selection iterator");
if (mem_iter[t])
mem_iter[t] = H5FL_FREE(H5S_sel_iter_t, mem_iter[t]);

/* Release vector arrays */
if (file_len[t])
file_len[t] = H5FL_SEQ_FREE(size_t, file_len[t]);
if (file_off[t])
file_off[t] = H5FL_SEQ_FREE(hsize_t, file_off[t]);
if (mem_len[t])
mem_len[t] = H5FL_SEQ_FREE(size_t, mem_len[t]);
if (mem_off[t])
mem_off[t] = H5FL_SEQ_FREE(hsize_t, mem_off[t]);
}
} /* end if hslab_selection */

/* Free chunk info array */
if (chunk_nodes_info)
free(chunk_nodes_info);
} /* end else */

done:
Expand Down
8 changes: 8 additions & 0 deletions src/H5Dpkg.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,14 @@ typedef struct H5D_chunk_ud_t {
hsize_t chunk_idx; /* Chunk index for EA, FA indexing */
} H5D_chunk_ud_t;

/* Custom chunk info structure for optimized parallel I/O operations */
typedef struct H5D_chunk_info_light_t {
H5D_piece_info_t *chunk_info;
haddr_t offset;
hsize_t length;
unsigned filter_mask;
} H5D_chunk_info_light_t;

/* Typedef for "generic" chunk callbacks */
typedef int (*H5D_chunk_cb_func_t)(const H5D_chunk_rec_t *chunk_rec, void *udata);

Expand Down
Loading