Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 72 additions & 13 deletions src/hdf5/read_projection_datasets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,26 +330,79 @@ namespace neuroh5
const vector<NODE_IDX_T>& dst_idx,
const vector<DST_PTR_T>& dst_ptr,
RankAssignments &rank_assignments,
unsigned int size
unsigned int size,
size_t offset = 0,
size_t numitems = 0
)
{

// Calculate the range to process
hsize_t total_blocks = dst_blk_ptr.size() - 1;
hsize_t read_blocks = 0;

if (numitems > 0)
{
// Read a specific number of blocks starting from offset
if (offset < total_blocks)
{
read_blocks = std::min((hsize_t)numitems, total_blocks - offset);
} else
{
// Offset is beyond available blocks
read_blocks = 0;
}
} else
{
// Read all blocks
read_blocks = total_blocks;
offset = 0;
}

// If nothing to read, return with empty assignments
if (read_blocks == 0)
{
for (unsigned int i = 0; i < size; i++)
{
rank_assignments.dst_block_start[i] = 0;
rank_assignments.dst_block_count[i] = 0;
rank_assignments.src_idx_start[i] = 0;
rank_assignments.src_idx_count[i] = 0;
rank_assignments.dst_ptr_start[i] = 0;
rank_assignments.dst_ptr_count[i] = 0;
rank_assignments.local_dst_indices[i].clear();
}
return;
}

// Simple approach: distribute blocks evenly
hsize_t blocks_per_rank = total_blocks / size;
hsize_t remainder = total_blocks % size;
hsize_t current_block = 0;
hsize_t dst_ptr_size = dst_ptr.size();

// Calculate the end of the destination block pointer range
hsize_t last_block = offset + read_blocks - 1;
hsize_t block_ptr_end = (last_block < total_blocks) ?
dst_blk_ptr[last_block + 1] : dst_blk_ptr[total_blocks];

hsize_t current_block = offset;
hsize_t dst_ptr_end = dst_ptr.size()-1;
if (last_block < total_blocks)
{
dst_ptr_end = std::min(dst_ptr_end, block_ptr_end);
}

for (unsigned int i = 0; i < size; i++)
{
// Assign blocks to this rank
// Calculate how many blocks this rank gets
hsize_t rank_block_count = blocks_per_rank + (i < remainder ? 1 : 0);

// Assign blocks to ranks
rank_assignments.dst_block_start[i] = current_block;
rank_assignments.dst_block_count[i] = blocks_per_rank + (i < remainder ? 1 : 0);
rank_assignments.dst_block_count[i] = rank_block_count;

if (rank_assignments.dst_block_count[i] > 0)
if (rank_block_count > 0)
{
hsize_t block_end = current_block + rank_assignments.dst_block_count[i] - 1;
// Calculate the end block for this rank
hsize_t block_end = current_block + rank_block_count - 1;
hsize_t dst_ptr_start=0, dst_ptr_count=0;

// Calculate destination pointer range
Expand All @@ -360,20 +413,26 @@ namespace neuroh5

// Calculate source index range
rank_assignments.src_idx_start[i] = dst_ptr[dst_ptr_start];
rank_assignments.src_idx_count[i] = dst_ptr[min(dst_ptr_start + dst_ptr_count, dst_ptr_size-1)] - dst_ptr[dst_ptr_start];
rank_assignments.src_idx_count[i] = dst_ptr[min(dst_ptr_start + dst_ptr_count, dst_ptr_end)] - dst_ptr[dst_ptr_start];

// Store relevant destination indices
rank_assignments.local_dst_indices[i].clear();
for (hsize_t j = current_block; j <= block_end; j++)
{
rank_assignments.local_dst_indices[i].push_back(dst_idx[j]);
}
}

if (rank_assignments.dst_block_count[i] > 0)
{
rank_assignments.last_rank = i;
} else
{
// This rank gets no blocks
rank_assignments.dst_ptr_start[i] = 0;
rank_assignments.dst_ptr_count[i] = 0;
rank_assignments.src_idx_start[i] = 0;
rank_assignments.src_idx_count[i] = 0;
rank_assignments.local_dst_indices[i].clear();
}
current_block += rank_assignments.dst_block_count[i];

current_block += rank_block_count;
}
}

Expand Down
Loading