forked from alpaka-group/alpaka3
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemFenceProducerConsumer.cpp
More file actions
330 lines (289 loc) · 12.3 KB
/
memFenceProducerConsumer.cpp
File metadata and controls
330 lines (289 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/* Copyright 2022 Jan Stephan, Mehmet Yusufoglu, René Widera
* SPDX-License-Identifier: MPL-2.0
*/
/** @file
* Unit tests for non-blocking memory visibility fences (memFence).
* Contains:
* - ProducerConsumerKernel: publication pattern using device-scope fences.
* - BlockSharedMemOrderKernel: shared-memory ordering using block-scope fences.
*/
#include <alpaka/alpaka.hpp>
#include <catch2/catch_template_test_macros.hpp>
#include <catch2/catch_test_macros.hpp>
using namespace alpaka;
using TestApis = std::decay_t<decltype(onHost::allBackends(onHost::enabledApis, exec::enabledExecutors))>;
// Producer-Consumer kernel documentation:
// - Producer (thread 0) publishes a payload (value = iteration index) to global memory,
// then issues a device-scope memFence to ensure the data write becomes visible
// before setting the corresponding ready flag to 1.
// - Consumer (thread 1) busy-waits on the ready flag becoming 1, then issues its own
// device-scope fence before reading the payload. This is like a typical acquire
// after producer's release.
//
// If the fence were missing on producer side, a reordering (or visibility delay) could
// allow the consumer to observe ready==1 but still see an old payload value. This would
// not be caught by a simple correctness test, so we count mismatches. Reordering is not
// guaranteed to manifest every run on all hardware; this encodes the canonical
// correctness pattern and will fail if a backend ever weakens ordering.
struct ProducerConsumerKernel
{
template<class Acc, class TVal, class TFlag, class TMis>
ALPAKA_FN_ACC void operator()(
Acc const& acc,
TVal payload,
TFlag readyFlags,
TMis mismatches,
uint32_t const iterations) const
{
using namespace alpaka::onAcc;
auto [tid] = acc.getIdxWithin(alpaka::onAcc::origin::grid, alpaka::onAcc::unit::threads);
// Only two active participants: 0 = producer, 1 = consumer.
if(!(tid == 0 || tid == 2))
// other threads exit fast.
return;
for(uint32_t i = 0; i < iterations; ++i)
{
// only for tid == 0, first thread is producer
if(tid == 0u)
{
// Publish payload value first.
atomicExch(acc, &payload[i], i);
// Ensure payload write is visible before flag store.
memFence(acc, scope::device);
atomicExch(acc, &readyFlags[i], 1u);
}
// consumer, only for tid == 1, second thread is consumer
else
{
// Spin until flag is set at each iteration.
while(atomicCas(acc, &readyFlags[i], 0u, 0u) == 0u)
{ /* busy wait */
}
// Acquire fence: prevents compiler/hardware from speculatively reading
// payload before observing the flag. This is the "Acquire" part; which is completing the
// release-acquire pair The busy‑wait loop guarantees thread 1 doesn’t leave the loop until it
// has observed flag==1, but it does not by itself force any refresh/invalidation of the cache
// line holding payload array values, nor prevent the compiler from reordering a payload-read
// above the while-loop unless the flag read is treated as a dependency (atomic/volatile) and
// an acquire fence follows.
memFence(acc, scope::device);
auto v = payload[i];
if(v != i)
{
// Increment mismatch counter (benign race: only consumer writes on mismatch).
onAcc::atomicAdd(acc, mismatches.data(), 1u);
}
}
}
}
};
TEMPLATE_LIST_TEST_CASE("memFence producer-consumer publication", "[memFence][producer-consumer]", TestApis)
{
auto cfg = TestType::makeDict();
auto deviceSpec = cfg[object::deviceSpec];
auto exec = cfg[object::exec];
auto selector = onHost::makeDeviceSelector(deviceSpec);
if(!selector.isAvailable())
{
WARN("No device available for " << deviceSpec.getName());
return;
}
auto device = selector.makeDevice(0);
auto queue = device.makeQueue();
// modest to keep test fast.
constexpr uint32_t iterations = 10u;
auto payload = onHost::alloc<uint32_t>(device, Vec{iterations});
auto flags = onHost::alloc<uint32_t>(device, Vec{iterations});
auto mis = onHost::alloc<uint32_t>(device, Vec{1u});
auto hFlags = onHost::allocHostLike(flags);
auto hMis = onHost::allocHostLike(mis);
// Init device buffers.
meta::ndLoopIncIdx(Vec{iterations}, [&](auto i) { hFlags[i] = 0u; });
hMis[Vec{0u}] = 0u;
// Copy initial state to device.
onHost::memcpy(queue, flags, hFlags);
onHost::memcpy(queue, mis, hMis);
/* Launch with exactly three logical threads using a FrameSpec (extent = 3, frameSize = 1).
* We would for the test only require 2 threads but for unknown reason on OneAPi with an Intel GPU (tested with ARC
* A770 + icpx 2025.2) the code deadlock. The reason for it is that it looks like that OneApi is starting 2 native
* sycl threads blocked groups of 1 thread within a single group and not two independent groups. Due to the reason
* that if conditions are executed lock step first the branch with the busy wait is executed and then never the
* branch for thread with id zero. Using 3 groups aka 3 thread blocks and working with thread 0 and 2 worked fine.
*
* @todo: find out if this strange behaviour is an bug (most likely) or somewhere documented in SYCL or OneApi.
*/
queue.enqueue(
exec,
onHost::ThreadSpec{3, 1},
KernelBundle{ProducerConsumerKernel{}, payload, flags, mis, iterations});
onHost::wait(queue);
// Copy mismatch counter back.
onHost::memcpy(queue, hMis, mis);
onHost::wait(queue);
CHECK(hMis[Vec{0u}] == 0u);
}
struct DeviceFenceTestKernelWriter
{
template<typename TAcc>
ALPAKA_FN_ACC auto operator()(TAcc const& acc, int volatile* vars) const -> void
{
auto const [idx] = acc.getIdxWithin(alpaka::onAcc::origin::grid, alpaka::onAcc::unit::threads);
// Use a single writer thread
if(idx == 0)
{
vars[0] = 10;
onAcc::memFence(acc, onAcc::scope::Device{});
vars[1] = 20;
}
}
};
struct DeviceFenceTestKernelReader
{
template<typename TAcc>
ALPAKA_FN_ACC auto operator()(TAcc const& acc, auto successFlag, int volatile* vars) const -> void
{
auto const [idx] = acc.getIdxWithin(alpaka::onAcc::origin::grid, alpaka::onAcc::unit::threads);
// Use a single reader thread
if(idx == 0)
{
auto const b = vars[1];
onAcc::memFence(acc, onAcc::scope::Device{});
auto const a = vars[0];
// If the fence is working correctly, the following case can never happen
if(a == 1 && b == 20)
{
// mark failure atomically to handle concurrent writes
onAcc::atomicExch(acc, &successFlag[0], 0u);
}
}
}
};
struct DeviceFenceTestKernel
{
template<typename TAcc>
ALPAKA_FN_ACC auto operator()(TAcc const& acc, auto successFlag, int volatile* vars) const -> void
{
auto const [idx] = acc.getIdxWithin(alpaka::onAcc::origin::grid, alpaka::onAcc::unit::threads);
// Global thread 0 is producer
if(idx == 0)
{
vars[0] = 10;
onAcc::memFence(acc, onAcc::scope::Device{});
vars[1] = 20;
}
auto const b = vars[1];
onAcc::memFence(acc, onAcc::scope::Device{});
auto const a = vars[0];
// If the fence is working correctly, the following case can never happen
if(a == 1 && b == 20)
{
// mark failure atomically to handle concurrent writes
onAcc::atomicExch(acc, &successFlag[0], 0u);
}
}
};
// -------------------------------------------------------------------------------------------------
// Block shared-memory ordering test:
// Validates that writes to dynamic shared memory from one thread become visible to sibling threads
// in the same block after a block-scope fence. Pattern mirrors legacy BlockFenceTestKernel from the
// original alpaka repo but adapted to memFence API and Catch2 style.
struct BlockSharedMemOrderKernel
{
// number of bytes of dynamic shared memory required by this kernel
static constexpr ::std::uint32_t dynSharedMemBytes = 2u * sizeof(int);
ALPAKA_FN_ACC void operator()(auto const& acc, auto successFlag) const
{
using namespace alpaka::onAcc;
// need space for 2 ints
auto* shared = getDynSharedMem<int>(acc);
for(auto [tid] : onAcc::makeIdxMap(acc, onAcc::worker::threadsInGrid, onAcc::range::threadsInGrid))
{
// Initialize once by the producer thread with id 0.
if(tid == 0u)
{
// A
shared[0] = 1;
// B
shared[1] = 2;
}
syncBlockThreads(acc);
// Producer thread with id 0 updates A then fences then updates B.
if(tid == 0u)
{
// publish new A
shared[0] = 10;
// ensure visibility of A before B write
memFence(acc, scope::block);
// publish B
shared[1] = 20;
}
// allow consumer threads to observe writes after fence ordering
syncBlockThreads(acc);
// All threads perform the read/validation (any non-producer could be consumer)
auto b = shared[1];
// acquire side
memFence(acc, scope::block);
auto a = shared[0];
// Forbidden outcome: observe updated B (20) but stale A (1)
if(a == 1 && b == 20)
{
// mark failure atomically to handle concurrent writes
onAcc::atomicExch(acc, &successFlag[0], 0u);
}
}
}
};
TEMPLATE_LIST_TEST_CASE("memFence block shared-memory ordering", "[memFence][block-shared]", TestApis)
{
auto cfg = TestType::makeDict();
auto deviceSpec = cfg[object::deviceSpec];
auto exec = cfg[object::exec];
auto selector = onHost::makeDeviceSelector(deviceSpec);
if(!selector.isAvailable())
{
WARN("No device available for " << deviceSpec.getName());
return;
}
auto device = selector.makeDevice(0);
auto queue = device.makeQueue();
// success flag: 1 = pass, 0 = failure detected
auto flag = onHost::allocUnified<uint32_t>(device, Vec{1u});
int const numElements = 2;
auto vars_host = onHost::allocMapped<int>(device, numElements);
auto vars_dev = onHost::alloc<int>(device, numElements);
vars_host[0] = 1;
vars_host[1] = 2;
{
flag[0u] = 1u;
queue.enqueue(exec, onHost::FrameSpec{1, 2}, KernelBundle{BlockSharedMemOrderKernel{}, flag});
onHost::wait(queue);
CHECK(flag[0u] == 1u);
}
{
// Run a single kernel, testing a memory fence in global memory across threads in different blocks
onHost::memcpy(queue, vars_dev, vars_host);
onHost::wait(queue);
flag[0u] = 1u;
// Device-scope variant, use thread specification to guarantee that we have two thread blocks
// A frame specification is allowed silently to change the number of real thread blocks and the block size
queue.enqueue(exec, onHost::ThreadSpec{2, 1}, KernelBundle{DeviceFenceTestKernel{}, flag, vars_dev.data()});
onHost::wait(queue);
CHECK(flag[0u] == 1u);
}
{
// Run two kernels in parallel, in two different queues on the same device, testing a memory fence
// in global memory across threads in different grids
onHost::memcpy(queue, vars_dev, vars_host);
onHost::wait(queue);
auto queue1 = device.makeQueue();
flag[0u] = 1u;
queue.enqueue(exec, onHost::ThreadSpec{1, 1}, KernelBundle{DeviceFenceTestKernelWriter{}, vars_dev.data()});
queue1.enqueue(
exec,
onHost::ThreadSpec{1, 1},
KernelBundle{DeviceFenceTestKernelReader{}, flag, vars_dev.data()});
onHost::wait(queue);
onHost::wait(queue1);
CHECK(flag[0u] == 1u);
}
}