Skip to content
Open
Binary file added commit__Cleaning.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
146 changes: 118 additions & 28 deletions include/graphblas/nonblocking/coordinates.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,29 @@
#ifndef _H_GRB_NONBLOCKING_COORDINATES
#define _H_GRB_NONBLOCKING_COORDINATES

#ifndef NDEBUG
# define ASSERT(condition, message) \
do { \
if (! (condition)) { \
std::cerr << "Assertion `" #condition "` failed in " << __FILE__ \
<< " line " << __LINE__ << ": " << message << std::endl; \
std::terminate(); \
} \
} while (false)
#else
# define ASSERT(condition, message) do { } while (false)
#endif


#include <stdexcept> //std::runtime_error
#include <vector>
#if defined _DEBUG && ! defined NDEBUG
#include <set>
#endif

#include <stddef.h> //size_t
#include <assert.h>
#include <cstddef> //size_t
#include <cassert>
#include <algorithm>

#include <graphblas/rc.hpp>
#include <graphblas/backends.hpp>
Expand All @@ -48,6 +63,9 @@

#include <graphblas/nonblocking/init.hpp>
#include <graphblas/nonblocking/analytic_model.hpp>
#include <iomanip>

// #define _LOCAL_DEBUG


namespace grb {
Expand All @@ -71,6 +89,10 @@ namespace grb {
typedef bool ArrayType;


// TODO: Remove me
bool _debug_is_counting_sort_done = false;


private:

bool * __restrict__ _assigned;
Expand All @@ -90,6 +112,9 @@ namespace grb {
config::VectorIndexType * __restrict__ local_new_nnzs;
config::VectorIndexType * __restrict__ pref_sum;

std::vector<config::VectorIndexType> counting_sum;


// the analytic model used during the execution of a pipeline
AnalyticModel analytic_model;

Expand Down Expand Up @@ -479,6 +504,60 @@ namespace grb {
pref_sum = _buffer + num_tiles * ( tile_size + 2 );
}

bool should_use_bitmask_asyncSubsetInit(
const size_t /* num_tiles */,
const size_t /* tile_id */,
const size_t lower_bound,
const size_t upper_bound
) const noexcept {
assert( _cap > 0 );
assert( _n <= _cap );
assert( lower_bound <= upper_bound );
return nonzeroes() * (upper_bound - lower_bound) > size();
}

void _asyncSubsetInit_bitmask(
const size_t lower_bound,
const size_t upper_bound,
const size_t /*tile_id*/,
config::VectorIndexType *local_nnzs,
config::VectorIndexType *local_stack
) noexcept {
assert( _cap > 0 );

for( size_t i = lower_bound; i < upper_bound; ++i ) {
if( _assigned[ i ] ) {
local_stack[ (*local_nnzs)++ ] = i - lower_bound;
}
}
}

void _asyncSubsetInit_search(
const size_t num_tiles,
const std::vector<size_t> &lower_bounds,
const std::vector<size_t> &upper_bounds
) noexcept {
for( size_t tile_id = 0; tile_id < num_tiles; ++tile_id ) {
*(local_buffer[ tile_id ]) = 0;
}
for( size_t i = 0; i < nonzeroes(); ++i ) {
const size_t k = _stack[ i ];
assert( _assigned[ k ] );

// Find the tile id of the element
const size_t tile_id = getTileId( k, num_tiles, lower_bounds, upper_bounds );
assert( tile_id < num_tiles );
assert( k < upper_bounds[tile_id] );
assert( k >= lower_bounds[tile_id] );

const size_t lower_bound = lower_bounds[tile_id];

config::VectorIndexType *local_nnzs = local_buffer[ tile_id ];
config::VectorIndexType *local_stack = local_buffer[ tile_id ] + 1;
local_stack[ (*local_nnzs)++ ] = k - lower_bound;
}
}

/**
* Initialises a Coordinate instance that refers to a subset of this
* coordinates instance. Multiple disjoint subsets may be retrieved
Expand All @@ -494,37 +573,48 @@ namespace grb {
* (exclusive).
*/
void asyncSubsetInit(
const size_t lower_bound,
const size_t upper_bound
const size_t num_tiles,
const std::vector<size_t> &lower_bound,
const std::vector<size_t> &upper_bound
) noexcept {
if( _cap == 0 ) {
return;
}
(void) num_tiles;
if( _cap == 0 ) { return; }

const size_t tile_id = lower_bound / analytic_model.getTileSize();
#ifdef GRB_ALREADY_DENSE_OPTIMIZATION
for( size_t tile_id = 0; tile_id < num_tiles; ++tile_id ) {
config::VectorIndexType *local_nnzs = local_buffer[ tile_id ];
config::VectorIndexType *local_stack = local_buffer[ tile_id ] + 1;

config::VectorIndexType *local_nnzs = local_buffer[ tile_id ];
config::VectorIndexType *local_stack = local_buffer[ tile_id ] + 1;

*local_nnzs = 0;
if( upper_bound - lower_bound < _n ) {
for( size_t i = lower_bound; i < upper_bound; ++i ) {
if( _assigned[ i ] ) {
local_stack[ (*local_nnzs)++ ] = i - lower_bound;
}
}
} else {
for( size_t i = 0; i < _n; ++i ) {
const size_t k = _stack[ i ];
if( lower_bound <= k && k < upper_bound ) {
assert( _assigned[ k ] );
local_stack[ (*local_nnzs)++ ] = k - lower_bound;
}
}
*local_nnzs = 0;
_asyncSubsetInit_bitmask( lower_bound[tile_id], upper_bound[tile_id], tile_id, local_nnzs, local_stack );
}
#else
_asyncSubsetInit_search( num_tiles, lower_bound, upper_bound );
#endif
for( size_t tile_id = 0; tile_id < num_tiles; ++tile_id ) {
local_new_nnzs[ tile_id ] = 0;
}
}

// the number of new nonzeroes is initialized here
local_new_nnzs[ tile_id ] = 0;
static size_t getTileId(
size_t k,
const size_t num_tiles,
const std::vector< size_t > &lower_bounds,
const std::vector< size_t > &upper_bounds
) {
ASSERT( num_tiles > 0, "num_tiles = " << num_tiles );
(void) num_tiles;
(void) lower_bounds;
(void) upper_bounds;

const auto tile_size = upper_bounds[0] - lower_bounds[0];
ASSERT( tile_size > 0, "tile_size = " << tile_size );
const size_t tile_id = k / tile_size;

ASSERT(tile_id < num_tiles, "tile_id = " << tile_id << ", num_tiles = " << num_tiles);
ASSERT(k < upper_bounds[tile_id], "k = " << k << ", tile_id = " << tile_id << ", upper_bounds[tile_id] = " << upper_bounds[tile_id]);
ASSERT(k >= lower_bounds[tile_id], "k = " << k << ", tile_id = " << tile_id << ", lower_bounds[tile_id] = " << lower_bounds[tile_id]);
return tile_id;
}

/**
Expand Down
103 changes: 61 additions & 42 deletions src/graphblas/nonblocking/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <graphblas/nonblocking/pipeline.hpp>
#include <graphblas/nonblocking/analytic_model.hpp>

// #define _LOCAL_DEBUG


using namespace grb::internal;

Expand Down Expand Up @@ -829,6 +831,10 @@ grb::RC Pipeline::execution() {

lower_bound.resize( num_tiles );
upper_bound.resize( num_tiles );
for( std::set< internal::Coordinates< nonblocking > * >::iterator vt = vbegin(); vt != vend(); ++vt ) {
auto* coords = *vt;
coords->_debug_is_counting_sort_done = false;
}

// if all vectors are already dense and there is no out-of-place operation to
// make them sparse we avoid paying the overhead for updating the coordinates
Expand Down Expand Up @@ -861,31 +867,43 @@ grb::RC Pipeline::execution() {
0, containers_size, tile_size, tile_id, num_tiles
);
assert( lower_bound[ tile_id ] <= upper_bound[ tile_id ] );
}
}

#if defined(_DEBUG) || defined(_LOCAL_DEBUG)
fprintf( stderr, "Pipeline::execution(2): check if any of the coordinates will use the search-variant of asyncSubsetInit:\n" );
#endif

#ifndef GRB_ALREADY_DENSE_OPTIMIZATION
for(
std::set< internal::Coordinates< nonblocking > * >::iterator vt = vbegin();
vt != vend(); ++vt
) {
if ( (**vt).size() != getContainersSize() ) {
continue;
}
std::vector< Coordinates< nonblocking > * > accessed_coordinates_vec( accessed_coordinates.begin(), accessed_coordinates.end() );
#pragma omp parallel for schedule(dynamic) num_threads(nthreads)
for( Coordinates< nonblocking > * vt : accessed_coordinates_vec ) {
if( (*vt).size() != getContainersSize() ) { continue; }

(**vt).asyncSubsetInit( lower_bound[ tile_id ], upper_bound[ tile_id ] );
}
(*vt).asyncSubsetInit( num_tiles, lower_bound, upper_bound );
}
#endif
#pragma omp parallel for schedule(dynamic) num_threads(nthreads)
for( size_t tile_id = 0; tile_id < num_tiles; ++tile_id ) {

RC local_ret = SUCCESS;
for( std::vector< stage_type >::iterator pt = pbegin();
pt != pend(); ++pt
) {
local_ret = local_ret
? local_ret
: (*pt)( *this, lower_bound[ tile_id ], upper_bound[ tile_id ] );
}
if( local_ret != SUCCESS ) {
ret = local_ret;
}
// // compute the lower and upper bounds
// config::OMP::localRange(
// lower_bound[ tile_id ], upper_bound[ tile_id ],
// 0, containers_size, tile_size, tile_id, num_tiles
// );
// assert( lower_bound[ tile_id ] <= upper_bound[ tile_id ] );


RC local_ret = SUCCESS;
for( std::vector< stage_type >::iterator pt = pbegin();
pt != pend(); ++pt
) {
local_ret = local_ret
? local_ret
: (*pt)( *this, lower_bound[ tile_id ], upper_bound[ tile_id ] );
}
if( local_ret != SUCCESS ) {
ret = local_ret;
}
}
} else {
Expand All @@ -911,41 +929,42 @@ grb::RC Pipeline::execution() {
(**vt).localCoordinatesInit( am );
}

#pragma omp parallel for schedule( dynamic ) num_threads( nthreads )
for( size_t tile_id = 0; tile_id < num_tiles; ++tile_id ) {

config::OMP::localRange(
lower_bound[ tile_id ], upper_bound[ tile_id ],
0, containers_size, tile_size, tile_id, num_tiles
);
assert( lower_bound[ tile_id ] <= upper_bound[ tile_id ] );

for(
std::set< internal::Coordinates< nonblocking > * >::iterator vt = vbegin();
vt != vend(); ++vt
) {
{ // Initialise the lower and upper bounds
#pragma omp parallel for schedule( dynamic ) num_threads( nthreads )
for( size_t tile_id = 0; tile_id < num_tiles; ++tile_id ) {
config::OMP::localRange(
lower_bound[tile_id], upper_bound[tile_id],
0, containers_size, tile_size, tile_id, num_tiles
);
assert(lower_bound[tile_id] <= upper_bound[tile_id]);
}
}

// skip the initialization of coordinates of different size, which may
// happen only for the input of vxm_generic as it's read-only for the
// current design
// namely, no stage of the same pipeline can overwrite it
if ( (**vt).size() != getContainersSize() ) {
{
#if defined(_DEBUG) || defined(_LOCAL_DEBUG)
fprintf( stderr, "Pipeline::execution(2): check if any of the coordinates will use the search-variant of asyncSubsetInit:\n" );
#endif
std::vector< Coordinates< nonblocking > * > accessed_coordinates_vec( accessed_coordinates.begin(), accessed_coordinates.end() );
#pragma omp parallel for schedule(dynamic) num_threads(nthreads)
for( Coordinates< nonblocking > * vt : accessed_coordinates_vec ) {
if ( (*vt).size() != getContainersSize() ) {
continue;
}

#ifdef GRB_ALREADY_DENSE_OPTIMIZATION
if( (**vt).isDense() && (
!contains_out_of_place_primitive || !outOfPlaceOutput( *vt )
if( (*vt).isDense() && (
!contains_out_of_place_primitive || !outOfPlaceOutput( vt )
) ) {
continue;
}
#endif

(**vt).asyncSubsetInit( lower_bound[ tile_id ], upper_bound[ tile_id ] );
initialized_coordinates = true;
(*vt).asyncSubsetInit( num_tiles, lower_bound, upper_bound );
}
initialized_coordinates = true;
}


// even if only one vector is sparse, we cannot reuse memory because the first
// two arguments that we pass to the lambda functions determine whether we
// reuse memory or not and they cannot vary for different vectors
Expand Down