diff --git a/CMakeLists.txt b/CMakeLists.txt index 2b68e7a..1ed493b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,7 @@ endif() include(${PROJECT_SOURCE_DIR}/cmake/neuroh5_utils.cmake) -set(NEUROH5_VERSION 0.1.17) +set(NEUROH5_VERSION 0.1.18) cmake_policy(SET CMP0074 NEW) # enables use of HDF5_ROOT variable diff --git a/setup.py b/setup.py index 461ae27..ae97f7b 100644 --- a/setup.py +++ b/setup.py @@ -141,7 +141,7 @@ def build_extensions(self): name="NeuroH5", package_dir={"": "python"}, packages=["neuroh5"], - version="0.1.17", + version="0.1.18", maintainer="Ivan Raikov", maintainer_email="ivan.g.raikov@gmail.com", description="A parallel HDF5-based library for storage and processing of large-scale graphs and neural cell model attributes.", diff --git a/src/graph/edge_attributes.cc b/src/graph/edge_attributes.cc index 55bce5a..5901f88 100644 --- a/src/graph/edge_attributes.cc +++ b/src/graph/edge_attributes.cc @@ -15,6 +15,7 @@ #include "path_names.hh" #include "serialize_data.hh" #include "read_template.hh" +#include "mpi_debug.hh" #include "throw_assert.hh" #include @@ -379,6 +380,10 @@ namespace neuroh5 "read_edge_attributes: error in H5Pset_dxpl_mpio"); } #endif + mpi::MPI_DEBUG(comm, "read_edge_attributes: reading attributes for ", + src_pop_name, " -> ", dst_pop_name, + " namespace ", name_space, " attribute ", attr_name, + " edge_base: ", edge_base, " edge count: ", edge_count); string dset_path = hdf5::edge_attribute_path(src_pop_name, dst_pop_name, name_space, attr_name); ierr = hdf5::exists_dataset (file, dset_path.c_str()); diff --git a/src/graph/read_projection.cc b/src/graph/read_projection.cc index a60d5f4..e6ca427 100644 --- a/src/graph/read_projection.cc +++ b/src/graph/read_projection.cc @@ -123,6 +123,11 @@ namespace neuroh5 "read_projection: error in append_edge_map"); local_num_nodes = prj_edge_map.size(); + mpi::MPI_DEBUG(comm, "read_projection: local_num_nodes = ", + local_num_nodes, + " local_prj_num_edges = ", local_prj_num_edges, + " edge_count = ", edge_count); + // ensure that all edges in the projection have been read and // appended to edge_list throw_assert(local_prj_num_edges == edge_count, diff --git a/src/graph/scatter_read_projection.cc b/src/graph/scatter_read_projection.cc index a856406..7fea9f3 100644 --- a/src/graph/scatter_read_projection.cc +++ b/src/graph/scatter_read_projection.cc @@ -124,11 +124,12 @@ namespace neuroh5 hsize_t local_read_blocks; mpi::MPI_DEBUG(io_comm, "scatter_read_projection: reading projection ", src_pop_name, " -> ", dst_pop_name); - throw_assert_nomsg(hdf5::read_projection_datasets(io_comm, file_name, src_pop_name, dst_pop_name, - block_base, edge_base, - dst_blk_ptr, dst_idx, dst_ptr, src_idx, - total_num_edges, total_read_blocks, local_read_blocks, - offset, numitems * size) >= 0); + throw_assert(hdf5::read_projection_datasets(io_comm, file_name, src_pop_name, dst_pop_name, + block_base, edge_base, + dst_blk_ptr, dst_idx, dst_ptr, src_idx, + total_num_edges, total_read_blocks, local_read_blocks, + offset, numitems * size) >= 0, + "error in read_projection_datasets"); mpi::MPI_DEBUG(io_comm, "scatter_read_projection: validating projection ", src_pop_name, " -> ", dst_pop_name); // validate the edges @@ -153,9 +154,10 @@ namespace neuroh5 // append to the edge map - throw_assert_nomsg(data::append_rank_edge_map(rank, size, dst_start, src_start, dst_blk_ptr, dst_idx, dst_ptr, src_idx, - attr_namespaces, edge_attr_map, node_rank_map, num_edges, prj_rank_edge_map, - edge_map_type) >= 0); + throw_assert(data::append_rank_edge_map(rank, size, dst_start, src_start, dst_blk_ptr, dst_idx, dst_ptr, src_idx, + attr_namespaces, edge_attr_map, node_rank_map, num_edges, prj_rank_edge_map, + edge_map_type) >= 0, + "error in append_rank_edge_map"); mpi::MPI_DEBUG(io_comm, "scatter_read_projection: read ", num_edges, " edges from projection ", src_pop_name, " -> ", dst_pop_name); @@ -180,7 +182,13 @@ namespace neuroh5 MPI_Comm_free(&io_comm); - throw_assert_nomsg(MPI_Bcast(&total_read_blocks, 1, MPI_SIZE_T, io_rank_root, all_comm) == MPI_SUCCESS); + MPI_Request bcast_req; + throw_assert(MPI_Ibcast(&total_read_blocks, 1, MPI_SIZE_T, io_rank_root, all_comm, + &bcast_req) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Wait(&bcast_req, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "error in MPI_Wait"); + throw_assert_nomsg(mpi::alltoallv_vector(all_comm, MPI_CHAR, sendcounts, sdispls, sendbuf, recvcounts, rdispls, recvbuf) >= 0); } @@ -203,11 +211,23 @@ namespace neuroh5 data::serialize_data(edge_attr_names, sendbuf); sendbuf_size = sendbuf.size(); } + + MPI_Request bcast_req; throw_assert_nomsg(MPI_Barrier(all_comm) == MPI_SUCCESS); - throw_assert_nomsg(MPI_Bcast(&sendbuf_size, 1, MPI_UINT32_T, 0, all_comm) == MPI_SUCCESS); + throw_assert(MPI_Ibcast(&sendbuf_size, 1, MPI_UINT32_T, 0, all_comm, + &bcast_req) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Wait(&bcast_req, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "error in MPI_Wait"); sendbuf.resize(sendbuf_size); - throw_assert_nomsg(MPI_Bcast(&sendbuf[0], sendbuf_size, MPI_CHAR, 0, all_comm) == MPI_SUCCESS); + throw_assert(MPI_Ibcast(&sendbuf[0], sendbuf_size, MPI_CHAR, 0, all_comm, + &bcast_req) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Wait(&bcast_req, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "error in MPI_Wait"); + + mpi::MPI_DEBUG(all_comm, "scatter_read_projection: sendbuf size is ", sendbuf_size); if (rank != 0) { @@ -215,6 +235,7 @@ namespace neuroh5 } edge_attr_names_vector.push_back(edge_attr_names); + mpi::MPI_DEBUG(all_comm, "scatter_read_projection: deserialized edge attr names"); } } diff --git a/src/hdf5/create_file_toplevel.cc b/src/hdf5/create_file_toplevel.cc index e3625b6..d4c7f9b 100644 --- a/src/hdf5/create_file_toplevel.cc +++ b/src/hdf5/create_file_toplevel.cc @@ -1,4 +1,5 @@ +#include #include #include diff --git a/src/hdf5/file_access.cc b/src/hdf5/file_access.cc index ab82a50..4313515 100644 --- a/src/hdf5/file_access.cc +++ b/src/hdf5/file_access.cc @@ -1,4 +1,4 @@ - +#include #include #include #include diff --git a/src/hdf5/read_projection_datasets.cc b/src/hdf5/read_projection_datasets.cc index 05fcafa..57ca34d 100644 --- a/src/hdf5/read_projection_datasets.cc +++ b/src/hdf5/read_projection_datasets.cc @@ -77,67 +77,68 @@ namespace neuroh5 MPI_Comm_rank(comm, (int*)&rank); // Only rank 0 prints - if (rank == 0) { - os << "\n"; - os << "==========================================================\n"; - os << " RANK ASSIGNMENT TABLE \n"; - os << "==========================================================\n"; - - // Table header - os << std::left << std::setw(6) << "Rank" - << std::setw(14) << "Dst Block Start" - << std::setw(14) << "Dst Block Count" - << std::setw(14) << "Src Idx Start" - << std::setw(14) << "Src Idx Count" - << std::setw(14) << "Dst Ptr Start" - << std::setw(14) << "Dst Ptr Count" - << "Dst Indices\n"; - os << std::string(100, '-') << "\n"; - - // Print information for each rank - for (unsigned int r = 0; r < size; r++) - { - os << std::left << std::setw(6) << r - << std::setw(14) << assignments.dst_block_start[r] - << std::setw(14) << assignments.dst_block_count[r] - << std::setw(14) << assignments.src_idx_start[r] - << std::setw(14) << assignments.src_idx_count[r] - << std::setw(14) << assignments.dst_ptr_start[r] - << std::setw(14) << assignments.dst_ptr_count[r]; - - // Print destination indices (limit to first 5 if there are many) - os << "["; - size_t num_indices = assignments.local_dst_indices[r].size(); - size_t display_count = std::min(num_indices, static_cast(5)); - - for (size_t i = 0; i < display_count; i++) - { - if (i > 0) os << ", "; - os << assignments.local_dst_indices[r][i]; - } - - if (num_indices > display_count) - { - os << ", ... (" << (num_indices - display_count) << " more)"; - } - os << "]\n"; + if (rank == 0) + { + os << "\n"; + os << "==========================================================\n"; + os << " RANK ASSIGNMENT TABLE \n"; + os << "==========================================================\n"; + + // Table header + os << std::left << std::setw(6) << "Rank" + << std::setw(14) << "Dst Blk Start" + << std::setw(14) << "Dst Blk Count" + << std::setw(14) << "Src Idx Start" + << std::setw(14) << "Src Idx Count" + << std::setw(14) << "Dst Ptr Start" + << std::setw(14) << "Dst Ptr Count" + << "Dst Indices\n"; + os << std::string(100, '-') << "\n"; + + // Print information for each rank + for (unsigned int r = 0; r < size; r++) + { + os << std::left << std::setw(6) << r + << std::setw(14) << assignments.dst_block_start[r] + << std::setw(14) << assignments.dst_block_count[r] + << std::setw(14) << assignments.src_idx_start[r] + << std::setw(14) << assignments.src_idx_count[r] + << std::setw(14) << assignments.dst_ptr_start[r] + << std::setw(14) << assignments.dst_ptr_count[r]; + + // Print destination indices (limit to first 5 if there are many) + os << "["; + size_t num_indices = assignments.local_dst_indices[r].size(); + size_t display_count = std::min(num_indices, static_cast(5)); + + for (size_t i = 0; i < display_count; i++) + { + if (i > 0) os << ", "; + os << assignments.local_dst_indices[r][i]; + } + + if (num_indices > display_count) + { + os << ", ... (" << (num_indices - display_count) << " more)"; + } + os << "]\n"; + } + + // Print summary statistics + hsize_t total_blocks = 0; + hsize_t total_edges = 0; + + for (unsigned int r = 0; r < size; r++) + { + total_blocks += assignments.dst_block_count[r]; + total_edges += assignments.src_idx_count[r]; + } + + os << std::string(100, '-') << "\n"; + os << "Total Blocks: " << total_blocks << "\n"; + os << "Total Edges: " << total_edges << "\n"; + os << "==========================================================\n\n"; } - - // Print summary statistics - hsize_t total_blocks = 0; - hsize_t total_edges = 0; - - for (unsigned int r = 0; r < size; r++) - { - total_blocks += assignments.dst_block_count[r]; - total_edges += assignments.src_idx_count[r]; - } - - os << std::string(100, '-') << "\n"; - os << "Total Blocks: " << total_blocks << "\n"; - os << "Total Edges: " << total_edges << "\n"; - os << "==========================================================\n\n"; - } // Make sure all ranks wait until printing is done throw_assert(MPI_Barrier(comm) == MPI_SUCCESS, @@ -228,19 +229,26 @@ namespace neuroh5 MPI_Comm_size(comm, (int*)&size); MPI_Comm_rank(comm, (int*)&rank); + MPI_Request bcast_req[6]; + int bcast_req_count = 0; + // Broadcast simple members - throw_assert(MPI_Bcast(rank_assignments.dst_block_start.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(rank_assignments.dst_block_count.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(rank_assignments.src_idx_start.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(rank_assignments.src_idx_count.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(rank_assignments.dst_ptr_start.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(rank_assignments.dst_ptr_count.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); + throw_assert(MPI_Ibcast(rank_assignments.dst_block_start.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(rank_assignments.dst_block_count.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(rank_assignments.src_idx_start.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(rank_assignments.src_idx_count.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(rank_assignments.dst_ptr_start.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(rank_assignments.dst_ptr_count.data(), size, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + + // Wait for broadcasts to complete + throw_assert(MPI_Waitall(bcast_req_count, bcast_req, MPI_STATUSES_IGNORE) == MPI_SUCCESS, + "error in MPI_Waitall"); // For the destination indices, we'll create a flattened array and index structure vector indices_counts(size, 0); @@ -278,10 +286,17 @@ namespace neuroh5 } // Broadcast the counts and displacements - throw_assert(MPI_Bcast(indices_counts.data(), size, MPI_INT, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(indices_displs.data(), size, MPI_INT, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); + bcast_req_count = 0; + throw_assert(MPI_Ibcast(indices_counts.data(), size, MPI_INT, 0, comm, + &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(indices_displs.data(), size, MPI_INT, 0, comm, + &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + + // Wait for count and displacement broadcasts to complete + throw_assert(MPI_Waitall(bcast_req_count, bcast_req, MPI_STATUSES_IGNORE) == MPI_SUCCESS, + "error in MPI_Waitall"); // Resize the local destination indices for this rank if (rank != 0) @@ -295,12 +310,17 @@ namespace neuroh5 rank_assignments.local_dst_indices[rank].data() : nullptr; - throw_assert(MPI_Scatterv(rank == 0 ? all_indices.data() : nullptr, + MPI_Request scatter_req; + + throw_assert(MPI_Iscatterv(rank == 0 ? all_indices.data() : nullptr, indices_counts.data(), indices_displs.data(), MPI_NODE_IDX_T, recv_buffer, indices_counts[rank], MPI_NODE_IDX_T, - 0, comm + 0, comm, &scatter_req ) == MPI_SUCCESS, "error in MPI_Scatterv"); + throw_assert(MPI_Wait(&scatter_req, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "error in MPI_Wait"); + } // Function to assign destination blocks to ranks @@ -653,14 +673,10 @@ namespace neuroh5 full_dst_blk_ptr, full_dst_idx, full_dst_ptr, total_num_edges, total_read_blocks); + // Step 2: Assign destination blocks to ranks assign_blocks_to_ranks(full_dst_blk_ptr, full_dst_idx, full_dst_ptr, rank_assignments, size); - if (debug_enabled) - { - print_rank_assignments(comm, rank_assignments); - } - // Step 3: Distribute appropriate parts of pointer arrays to all ranks distribute_ptr_arrays(comm, rank, rank_assignments, dst_blk_ptr, dst_idx, dst_ptr, full_dst_blk_ptr, full_dst_idx, full_dst_ptr); @@ -672,15 +688,30 @@ namespace neuroh5 distribute_ptr_arrays(comm, rank, rank_assignments, dst_blk_ptr, dst_idx, dst_ptr, vector(), vector(), vector()); } + + if (debug_enabled) + { + print_rank_assignments(comm, rank_assignments); + } // Step 4: Broadcast total edges and blocks to all ranks - throw_assert(MPI_Bcast(&total_num_edges, 1, MPI_SIZE_T, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); - throw_assert(MPI_Bcast(&total_read_blocks, 1, MPI_UNSIGNED_LONG_LONG, 0, comm) == MPI_SUCCESS, - "error in MPI_Bcast"); + MPI_Request bcast_req[2]; + int bcast_req_count = 0; + + throw_assert(MPI_Ibcast(&total_num_edges, 1, MPI_SIZE_T, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + throw_assert(MPI_Ibcast(&total_read_blocks, 1, MPI_UNSIGNED_LONG_LONG, 0, comm, &bcast_req[bcast_req_count++]) == MPI_SUCCESS, + "error in MPI_Ibcast"); + + // Wait for broadcasts to complete + throw_assert(MPI_Waitall(bcast_req_count, bcast_req, MPI_STATUSES_IGNORE) == MPI_SUCCESS, + "error in MPI_Waitall"); // Step 5: Distribute assignments to all ranks distribute_assignments(comm, rank_assignments); + block_base = rank_assignments.dst_block_start[rank]; + edge_base = rank_assignments.src_idx_start[rank]; + // Step 6: Each rank reads its portion of src_idx based on its assignment ierr = read_projection_src_idx(comm, file_name, src_pop_name, dst_pop_name,