Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 92 additions & 8 deletions src/pyFAI/opencl/test/test_collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
__contact__ = "jerome.kieffer@esrf.eu"
__license__ = "MIT"
__copyright__ = "2013 European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "07/11/2024"
__date__ = "12/11/2024"

import logging
import numpy
Expand All @@ -51,11 +51,11 @@

@unittest.skipIf(UtilsTest.opencl is False, "User request to skip OpenCL tests")
@unittest.skipUnless(ocl, "PyOpenCl is missing")
class TestReduction(unittest.TestCase):
class TestGroupFunction(unittest.TestCase):

@classmethod
def setUpClass(cls):
super(TestReduction, cls).setUpClass()
super(TestGroupFunction, cls).setUpClass()

if ocl:
cls.ctx = ocl.create_context()
Expand All @@ -74,8 +74,8 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
super(TestReduction, cls).tearDownClass()
print("Maximum valid workgroup size %s on device %s" % (cls.max_valid_wg, cls.ctx.devices[0]))
super(TestGroupFunction, cls).tearDownClass()
# print("Maximum valid workgroup size %s on device %s" % (cls.max_valid_wg, cls.ctx.devices[0]))
cls.ctx = None
cls.queue = None

Expand All @@ -88,8 +88,8 @@ def setUp(self):
self.data_d = pyopencl.array.to_device(self.queue, self.data)
self.sum_d = pyopencl.array.zeros_like(self.data_d)
self.program = pyopencl.Program(self.ctx, get_opencl_code("pyfai:openCL/collective/reduction.cl")+
get_opencl_code("pyfai:openCL/collective/scan.cl")
).build()
get_opencl_code("pyfai:openCL/collective/scan.cl")+
get_opencl_code("pyfai:openCL/collective/comb_sort.cl")).build()

def tearDown(self):
self.img = self.data = None
Expand Down Expand Up @@ -230,10 +230,94 @@ def test_Blelloch_multipass(self):
logger.info("Wg: %s result: cumsum good: %s", wg, good)
self.assertTrue(good, "calculation is correct for WG=%s" % wg)


@unittest.skipUnless(ocl, "pyopencl is missing")
def test_sort(self):
"""
tests the sort of floating points in a workgroup
"""
data = numpy.arange(self.shape).astype(numpy.float32)
numpy.random.shuffle(data)
data_d = pyopencl.array.to_device(self.queue, data)

maxi = int(round(numpy.log2(self.shape)))+1
for i in range(5,maxi):
wg = 1 << i

ref = data.reshape((-1, wg))
positions = ((numpy.arange(ref.shape[0])+1)*wg).astype(numpy.int32)
positions_d = pyopencl.array.to_device(self.queue, positions)
data_d = pyopencl.array.to_device(self.queue, data)
# print(ref.shape, (ref.shape[0],min(wg, self.max_valid_wg)), (1, min(wg, self.max_valid_wg)), positions)
try:
evt = self.program.test_combsort_float(self.queue, (ref.shape[0],min(wg, self.max_valid_wg)), (1, min(wg, self.max_valid_wg)),
data_d.data,
positions_d.data,
pyopencl.LocalMemory(4*min(wg, self.max_valid_wg)))
evt.wait()
except Exception as error:
logger.error("Error %s on WG=%s: test_sort", error, wg)
break
else:
res = data_d.get()
ref = numpy.sort(ref)
good = numpy.allclose(res, ref.ravel())
logger.info("Wg: %s result: sort OK %s", wg, good)
if not good:
print(res.reshape(ref.shape))
print(ref)
print(numpy.where(res.reshape(ref.shape)-ref))

self.assertTrue(good, "calculation is correct for WG=%s" % wg)

@unittest.skipUnless(ocl, "pyopencl is missing")
def test_sort4(self):
"""
tests the sort of floating points in a workgroup
"""
data = numpy.arange(self.shape).astype(numpy.float32)
data = numpy.outer(data, numpy.ones(4, numpy.float32)).view(numpy.dtype([("s0","<f4"),("s1","<f4"),("s2","<f4"),("s3","<f4")]))
numpy.random.shuffle(data)
data_d = pyopencl.array.to_device(self.queue, data)

maxi = int(round(numpy.log2(self.shape)))+1
for i in range(5,maxi):
wg = 1 << i

ref = data.reshape((-1, wg))
positions = ((numpy.arange(ref.shape[0])+1)*wg).astype(numpy.int32)
positions_d = pyopencl.array.to_device(self.queue, positions)
data_d = pyopencl.array.to_device(self.queue, data)
# print(ref.shape, (ref.shape[0],min(wg, self.max_valid_wg)), (1, min(wg, self.max_valid_wg)), positions)
try:
evt = self.program.test_combsort_float4(self.queue, (ref.shape[0],min(wg, self.max_valid_wg)), (1, min(wg, self.max_valid_wg)),
data_d.data,
positions_d.data,
pyopencl.LocalMemory(4*min(wg, self.max_valid_wg)))
evt.wait()
except Exception as error:
logger.error("Error %s on WG=%s: test_sort", error, wg)
break
else:
res = data_d.get()
# print(res.dtype)
ref = numpy.sort(ref, order="s0")
# print(ref.dtype)
good = numpy.allclose(res.view(numpy.float32).ravel(), ref.view(numpy.float32).ravel())
logger.info("Wg: %s result: sort OK %s", wg, good)
if not good:
print(res.reshape(ref.shape))
print(ref)
print(numpy.where(res.reshape(ref.shape)-ref))

self.assertTrue(good, "calculation is correct for WG=%s" % wg)



def suite():
loader = unittest.defaultTestLoader.loadTestsFromTestCase
testSuite = unittest.TestSuite()
testSuite.addTest(loader(TestReduction))
testSuite.addTest(loader(TestGroupFunction))
return testSuite


Expand Down
67 changes: 50 additions & 17 deletions src/pyFAI/resources/openCL/collective/comb_sort.cl
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,46 @@ int inline first_step(int step, int size, float ratio)
return step;
}

// returns 1 if swapped, else 0
int compare_and_swap(global volatile float* elements, int i, int j)
{
float vi = elements[i];
float vj = elements[j];
if (vi>vj)
{
elements[i] = vj;
elements[j] = vi;
return 1;
}
else
return 0;
}

// returns 1 if swapped, else 0
int compare_and_swap_float4(global volatile float4* elements, int i, int j)
{
float4 vi = elements[i];
float4 vj = elements[j];
if (vi.s0>vj.s0)
{
elements[i] = vj;
elements[j] = vi;
return 1;
}
else
return 0;
}



// returns the number of swap performed
int passe(global volatile float* elements,
int size,
int step,
local volatile int* shared)
local int* shared)
{
int wg = get_local_size(0);
int tid = get_local_id(0);
int wg = get_local_size(1);
int tid = get_local_id(1);
int cnt = 0;
int i, j, k;
barrier(CLK_GLOBAL_MEM_FENCE);
Expand Down Expand Up @@ -76,7 +106,7 @@ int passe(global volatile float* elements,
if (step==1)
{
shared[tid] = cnt;
return sum_reduction(shared);
return sum_int_reduction(shared);
}
else
return 0;
Expand All @@ -88,10 +118,10 @@ int passe(global volatile float* elements,
int passe_float4(global volatile float4* elements,
int size,
int step,
local volatile int* shared)
local int* shared)
{
int wg = get_local_size(0);
int tid = get_local_id(0);
int wg = get_local_size(1);
int tid = get_local_id(1);
int cnt = 0;
int i, j, k;

Expand Down Expand Up @@ -135,19 +165,20 @@ int passe_float4(global volatile float4* elements,
if (step==1)
{
shared[tid] = cnt;
return sum_reduction(shared);
return sum_int_reduction(shared);
}
else
return 0;
}

// workgroup: (wg, 1) grid:(wg, nb_lines), shared wg*sizeof(int)
kernel void test_combsort_many(global volatile float* elements,
global int* positions,
local int* shared)
// workgroup: (1, wg)
// grid: (nb_lines, wg)
// shared: wg*sizeof(int)
kernel void test_combsort_float(global volatile float* elements,
global int* positions,
local int* shared)
{
local volatile int shared[1024];
int gid = get_group_id(1);
int gid = get_group_id(0);
int step = 11; // magic value
float ratio=1.3f; // magic value
int cnt;
Expand All @@ -171,12 +202,14 @@ kernel void test_combsort_many(global volatile float* elements,

}

// workgroup: (wg, 1) grid:(wg, nb_lines), shared wg*sizeof(int)
// workgroup: (1, wg)
// grid: (nb_lines, wg)
// shared: wg*sizeof(int)
kernel void test_combsort_float4(global volatile float4* elements,
global int* positions,
local volatile int* shared)
local int* shared)
{
int gid = get_group_id(1);
int gid = get_group_id(0);
int step = 11; // magic value
float ratio=1.3f; // magic value
int cnt;
Expand Down
4 changes: 2 additions & 2 deletions src/pyFAI/resources/openCL/collective/reduction.cl
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

int inline sum_int_reduction(local int* shared)
{
int wg = get_local_size(0);
int tid = get_local_id(0);
int wg = get_local_size(0) * get_local_size(1);
int tid = get_local_id(0) + get_local_size(0)*get_local_id(1);

// local reduction based implementation
for (int stride=wg>>1; stride>0; stride>>=1)
Expand Down