diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index f2a0e85c032..5db92b64ef4 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -58,6 +58,9 @@ #include "H5PBprivate.h" /* Page Buffer */ #include "H5SLprivate.h" /* Skip Lists */ #include "H5VMprivate.h" /* Vector and array functions */ +#include "H5Zprivate.h" /* Data filters */ +#include "H5FDprivate.h" /* File drivers */ +#include /* OpenMP */ /****************/ /* Local Macros */ @@ -409,6 +412,12 @@ H5FL_BLK_DEFINE_STATIC(chunk); /* Declare extern free list to manage the H5S_sel_iter_t struct */ H5FL_EXTERN(H5S_sel_iter_t); +/* Declare extern free list to manage sequences of size_t */ +H5FL_SEQ_EXTERN(size_t); + +/* Declare extern free list to manage sequences of hsize_t */ +H5FL_SEQ_EXTERN(hsize_t); + /*------------------------------------------------------------------------- * Function: H5D__chunk_direct_write * @@ -3121,6 +3130,16 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info) /* Iterate through nodes in chunk skip list */ chunk_node = H5D_CHUNK_GET_FIRST_NODE(dset_info); + + /* Variables for parallel hyperslab optimization */ + bool hslab_selection = true; + size_t chunk_info_index = 0; + size_t num_chunks = H5D_CHUNK_GET_NODE_COUNT(dset_info); + H5D_chunk_info_light_t *chunk_nodes_info = NULL; + + if (num_chunks == 0) + goto done; + while (chunk_node) { H5D_piece_info_t *chunk_info; /* Chunk information */ H5D_chunk_ud_t udata; /* Chunk index pass-through */ @@ -3140,6 +3159,35 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info) /* Check for non-existent chunk & skip it if appropriate */ if (H5_addr_defined(udata.chunk_block.offset) || UINT_MAX != udata.idx_hint || !skip_missing_chunks) { + + /* Check if we can use parallel hyperslab optimization: + * - Hyperslab selection + * - No type conversion needed + */ + if (H5S_GET_SELECT_TYPE(chunk_info->fspace) == H5S_SEL_HYPERSLABS && + (dset_info->type_info.is_conv_noop == true && + dset_info->type_info.is_xform_noop == true)) { + /* Collect chunk info for parallel processing */ + if (chunk_nodes_info == NULL) { + chunk_nodes_info = + (H5D_chunk_info_light_t *)malloc(num_chunks * sizeof(H5D_chunk_info_light_t)); + if (chunk_nodes_info == NULL) + HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, + "memory allocation failed for chunk info"); + } + chunk_nodes_info[chunk_info_index].chunk_info = chunk_info; + chunk_nodes_info[chunk_info_index].offset = udata.chunk_block.offset; + chunk_nodes_info[chunk_info_index].length = udata.chunk_block.length; + chunk_nodes_info[chunk_info_index].filter_mask = udata.filter_mask; + chunk_info_index++; + + chunk_node = H5D_CHUNK_GET_NEXT_NODE(dset_info, chunk_node); + continue; + } + + /* Not suitable for parallel optimization - use serial path */ + hslab_selection = false; + H5D_io_info_t *chk_io_info; /* Pointer to I/O info object for this chunk */ void *chunk = NULL; /* Pointer to locked chunk buffer */ @@ -3198,6 +3246,255 @@ H5D__chunk_read(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info) /* Advance to next chunk in list */ chunk_node = H5D_CHUNK_GET_NEXT_NODE(dset_info, chunk_node); } /* end while */ + + /* If we collected chunks for parallel processing, do it now */ + if (hslab_selection && chunk_info_index > 0) { + H5F_shared_t *f_sh = H5F_SHARED(dset_info->dset->oloc.file); + H5FD_t *file = NULL; + H5F_shared_get_file_driver(f_sh, &file); + + hid_t dxpl_idx = H5CX_get_dxpl(); + size_t elsize = dset_info->type_info.src_type_size; + void *dstbuf = dset_info->buf.vp; + + /* Retrieve filter pipeline */ + H5O_pline_t *pline = &(dset_info->dset->shared->dcpl_cache.pline); + + H5Z_EDC_t err_detect; + H5Z_cb_t filter_cb; + if (H5CX_get_err_detect(&err_detect) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info"); + if (H5CX_get_filter_cb(&filter_cb) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get filter callback function"); + + size_t num_threads = 8; + size_t itersize = (chunk_info_index + num_threads - 1) / num_threads; + + /* Per-thread variables for parallel processing */ + H5S_sel_iter_t *mem_iter[num_threads]; + bool mem_iter_init[num_threads]; + H5S_sel_iter_t *file_iter[num_threads]; + bool file_iter_init[num_threads]; + hsize_t *mem_off[num_threads]; + hsize_t *file_off[num_threads]; + size_t *mem_len[num_threads]; + size_t *file_len[num_threads]; + size_t curr_mem_seq[num_threads]; + size_t curr_file_seq[num_threads]; + size_t mem_nseq[num_threads]; + size_t file_nseq[num_threads]; + size_t dxpl_vec_size[num_threads]; + size_t vec_size[num_threads]; + ssize_t tmp_file_len[num_threads]; + size_t nelmts[num_threads]; + + /* Initialize arrays */ + for (size_t t = 0; t < num_threads; t++) { + mem_iter[t] = NULL; + mem_iter_init[t] = false; + file_iter[t] = NULL; + file_iter_init[t] = false; + mem_off[t] = NULL; + file_off[t] = NULL; + mem_len[t] = NULL; + file_len[t] = NULL; + curr_mem_seq[t] = 0; + curr_file_seq[t] = 0; + mem_nseq[t] = 0; + file_nseq[t] = 0; + dxpl_vec_size[t] = 0; + vec_size[t] = 0; + tmp_file_len[t] = 0; + nelmts[t] = 0; + } + + /* Allocate per-thread resources */ + for (size_t t = 0; t < num_threads; t++) { + if (H5CX_get_vec_size(&dxpl_vec_size[t]) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't retrieve I/O vector size"); + + /* Allocate the vector I/O arrays */ + if (dxpl_vec_size[t] > H5D_IO_VECTOR_SIZE) + vec_size[t] = dxpl_vec_size[t]; + else + vec_size[t] = H5D_IO_VECTOR_SIZE; + + if (NULL == (mem_len[t] = H5FL_SEQ_MALLOC(size_t, vec_size[t]))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O length vector array"); + if (NULL == (mem_off[t] = H5FL_SEQ_MALLOC(hsize_t, vec_size[t]))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O offset vector array"); + if (NULL == (file_len[t] = H5FL_SEQ_MALLOC(size_t, vec_size[t]))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O length vector array"); + if (NULL == (file_off[t] = H5FL_SEQ_MALLOC(hsize_t, vec_size[t]))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate I/O offset vector array"); + + /* Allocate the iterators */ + if (NULL == (mem_iter[t] = H5FL_MALLOC(H5S_sel_iter_t))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate memory iterator"); + if (NULL == (file_iter[t] = H5FL_MALLOC(H5S_sel_iter_t))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "can't allocate file iterator"); + } + + bool error_tracker[num_threads]; + for (size_t t = 0; t < num_threads; t++) { + error_tracker[t] = false; + } + +#pragma omp parallel num_threads(num_threads) default(shared) + { + size_t tid = omp_get_thread_num(); + size_t start = tid * itersize; + size_t end = start + itersize; + if (end > chunk_info_index) + end = chunk_info_index; + + for (size_t i = start; i < end; ++i) { + if (error_tracker[tid]) + continue; + + H5D_piece_info_t *p_chunk_info = chunk_nodes_info[i].chunk_info; + haddr_t offset = chunk_nodes_info[i].offset; + size_t length = chunk_nodes_info[i].length; + void *iobuf = H5MM_malloc(length); + + /* Read chunk from file */ + (file->cls->read)(file, H5FD_MEM_DRAW, dxpl_idx, offset + file->base_addr, length, iobuf); + + /* Process through filter pipeline if filters are present */ + if (pline && pline->nused) { + size_t buf_size = length; + + H5Z_pipeline(pline, + H5Z_FLAG_REVERSE, + &chunk_nodes_info[i].filter_mask, + err_detect, + filter_cb, + /* IN: compressed chunk size */ + /* OUT: uncompressed chunk size */ + &length, + /* IN: compressed size (data size) + * OUT: allocated buffer size (may be reallocated) */ + /* + * Implementation detail for zstd filter: + * `iobuf` is swapped out for a new buffer which holds the decompressed data. + */ + &buf_size, + &iobuf); + } + + size_t file_nelem; + size_t mem_nelem; + + nelmts[tid] = p_chunk_info->piece_points; + + /* Initialize file iterator */ + if (H5S_select_iter_init(file_iter[tid], + p_chunk_info->fspace, + elsize, + H5S_SEL_ITER_GET_SEQ_LIST_SORTED) < 0) { + error_tracker[tid] = true; + continue; + } + file_iter_init[tid] = true; /* File selection iteration info has been initialized */ + + /* Initialize memory iterator */ + if (H5S_select_iter_init(mem_iter[tid], + p_chunk_info->mspace, + elsize, + 0) < 0) { + error_tracker[tid] = true; + continue; + } + mem_iter_init[tid] = true; /* Memory selection iteration info has been initialized */ + + curr_file_seq[tid] = curr_mem_seq[tid] = 0; + mem_nseq[tid] = file_nseq[tid] = 0; + + while (nelmts[tid] > 0) { + if (curr_file_seq[tid] >= file_nseq[tid]) { + if (H5S_SELECT_ITER_GET_SEQ_LIST(file_iter[tid], + vec_size[tid], + nelmts[tid], + &file_nseq[tid], + &file_nelem, + file_off[tid], + file_len[tid]) < 0) { + error_tracker[tid] = true; + break; + } + + curr_file_seq[tid] = 0; + } + + if (curr_mem_seq[tid] >= mem_nseq[tid]) { + if (H5S_SELECT_ITER_GET_SEQ_LIST(mem_iter[tid], + vec_size[tid], + nelmts[tid], + &mem_nseq[tid], + &mem_nelem, + mem_off[tid], + mem_len[tid]) < 0) { + error_tracker[tid] = true; + break; + } + + curr_mem_seq[tid] = 0; + } + + /* Writes to dstbuf are on disjoint locations, so concurrent writes are safe */ + if ((tmp_file_len[tid] = H5VM_memcpyvv(dstbuf, + mem_nseq[tid], + &curr_mem_seq[tid], + mem_len[tid], + mem_off[tid], + iobuf, + file_nseq[tid], + &curr_file_seq[tid], + file_len[tid], + file_off[tid])) < 0) { + error_tracker[tid] = true; + break; + } + + /* Decrement number of elements left to process */ + assert(((size_t)tmp_file_len[tid] % elsize) == 0); + nelmts[tid] -= ((size_t)tmp_file_len[tid] / elsize); + + } + + if (iobuf) + iobuf = H5MM_xfree(iobuf); + + } + } /* end parallel */ + + /* Release per-thread resources */ + for (size_t t = 0; t < num_threads; t++) { + /* Release selection iterators */ + if (file_iter_init[t] && H5S_SELECT_ITER_RELEASE(file_iter[t]) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRELEASE, FAIL, "unable to release selection iterator"); + if (file_iter[t]) + file_iter[t] = H5FL_FREE(H5S_sel_iter_t, file_iter[t]); + if (mem_iter_init[t] && H5S_SELECT_ITER_RELEASE(mem_iter[t]) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTRELEASE, FAIL, "unable to release selection iterator"); + if (mem_iter[t]) + mem_iter[t] = H5FL_FREE(H5S_sel_iter_t, mem_iter[t]); + + /* Release vector arrays */ + if (file_len[t]) + file_len[t] = H5FL_SEQ_FREE(size_t, file_len[t]); + if (file_off[t]) + file_off[t] = H5FL_SEQ_FREE(hsize_t, file_off[t]); + if (mem_len[t]) + mem_len[t] = H5FL_SEQ_FREE(size_t, mem_len[t]); + if (mem_off[t]) + mem_off[t] = H5FL_SEQ_FREE(hsize_t, mem_off[t]); + } + } /* end if hslab_selection */ + + /* Free chunk info array */ + if (chunk_nodes_info) + free(chunk_nodes_info); } /* end else */ done: diff --git a/src/H5Dpkg.h b/src/H5Dpkg.h index e28199cfe48..cfcef1d69d9 100644 --- a/src/H5Dpkg.h +++ b/src/H5Dpkg.h @@ -433,6 +433,14 @@ typedef struct H5D_chunk_ud_t { hsize_t chunk_idx; /* Chunk index for EA, FA indexing */ } H5D_chunk_ud_t; +/* Custom chunk info structure for optimized parallel I/O operations */ +typedef struct H5D_chunk_info_light_t { + H5D_piece_info_t *chunk_info; + haddr_t offset; + hsize_t length; + unsigned filter_mask; +} H5D_chunk_info_light_t; + /* Typedef for "generic" chunk callbacks */ typedef int (*H5D_chunk_cb_func_t)(const H5D_chunk_rec_t *chunk_rec, void *udata);