Skip to content

Commit f84a5fb

Browse files
feature: add host functions workers
* add common host function worker interface * add worker as a single thread per csr with 3 modes * add logic for waiting on internal tag, check gpu hang * if tag is in pending state, read callback data, run callback and signal completion * threads will exit the work loop once stop request is called in finish * add multi thread unit tests Related-To: NEO-14577 Signed-off-by: Kamil Kopryk <[email protected]>
1 parent ae05e05 commit f84a5fb

24 files changed

+901
-6
lines changed

level_zero/core/source/cmdlist/cmdlist_hw.inl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,7 @@ void CommandListCoreFamily<gfxCoreFamily>::dispatchHostFunction(
18641864
auto csr = getCsr(false);
18651865
csr->ensureHostFunctionDataInitialization();
18661866
this->commandContainer.addToResidencyContainer(csr->getHostFunctionDataAllocation());
1867+
csr->signalHostFunctionWorker();
18671868
NEO::HostFunctionHelper::programHostFunction<GfxFamily>(*this->commandContainer.getCommandStream(), csr->getHostFunctionData(), userHostFunctionAddress, userDataAddress);
18681869
} else {
18691870
addHostFunctionToPatchCommands(userHostFunctionAddress, userDataAddress);

level_zero/core/source/cmdqueue/cmdqueue_hw_gen12lp.inl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ void CommandQueueHw<gfxCoreFamily>::patchCommands(CommandList &commandList, uint
199199
case CommandToPatch::HostFunctionEntry:
200200
csr->ensureHostFunctionDataInitialization();
201201
csr->makeResidentHostFunctionAllocation();
202+
csr->signalHostFunctionWorker();
202203
NEO::HostFunctionHelper::programHostFunctionAddress<GfxFamily>(nullptr, commandToPatch.pCommand, csr->getHostFunctionData(), commandToPatch.baseAddress);
203204
break;
204205

level_zero/core/source/cmdqueue/cmdqueue_xe_hp_core_and_later.inl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ void CommandQueueHw<gfxCoreFamily>::patchCommands(CommandList &commandList, uint
287287
case CommandToPatch::HostFunctionEntry:
288288
csr->ensureHostFunctionDataInitialization();
289289
csr->makeResidentHostFunctionAllocation();
290+
csr->signalHostFunctionWorker();
290291
NEO::HostFunctionHelper::programHostFunctionAddress<GfxFamily>(nullptr, commandToPatch.pCommand, csr->getHostFunctionData(), commandToPatch.baseAddress);
291292
break;
292293

level_zero/core/test/unit_tests/sources/cmdqueue/test_cmdqueue_2.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,6 +1156,10 @@ HWTEST_F(HostFunctionsCmdPatchTests, givenHostFunctionPatchCommandsWhenPatchComm
11561156
NEO::CommandStreamReceiver *csr = nullptr;
11571157
device->getCsrForOrdinalAndIndex(&csr, 0u, 0u, ZE_COMMAND_QUEUE_PRIORITY_NORMAL, 0, false);
11581158
auto commandQueue = std::make_unique<MockCommandQueueHw<FamilyType::gfxCoreFamily>>(device, csr, &desc);
1159+
MockCommandStreamReceiver mockCsr(*neoDevice->executionEnvironment, neoDevice->getRootDeviceIndex(), neoDevice->getDeviceBitfield());
1160+
const auto oldCsr = commandQueue->csr;
1161+
commandQueue->csr = &mockCsr;
1162+
11591163
auto commandList = std::make_unique<WhiteBox<::L0::CommandListCoreFamily<FamilyType::gfxCoreFamily>>>();
11601164
commandList->commandsToPatch.clear();
11611165

@@ -1197,6 +1201,8 @@ HWTEST_F(HostFunctionsCmdPatchTests, givenHostFunctionPatchCommandsWhenPatchComm
11971201
commandQueue->patchCommands(*commandList, 0, false, nullptr);
11981202

11991203
EXPECT_NE(nullptr, commandQueue->csr->getHostFunctionDataAllocation());
1204+
EXPECT_EQ(1u, mockCsr.createHostFunctionWorkerCounter);
1205+
EXPECT_EQ(1u, mockCsr.signalHostFunctionWorkerCounter);
12001206

12011207
auto &hostFunctionDataFromCsr = commandQueue->csr->getHostFunctionData();
12021208

@@ -1220,6 +1226,8 @@ HWTEST_F(HostFunctionsCmdPatchTests, givenHostFunctionPatchCommandsWhenPatchComm
12201226
// internal tag wait - semaphore wait
12211227
EXPECT_EQ(static_cast<uint32_t>(HostFunctionTagStatus::completed), internalTagMiWait.getSemaphoreDataDword());
12221228
EXPECT_EQ(reinterpret_cast<uint64_t>(hostFunctionDataFromCsr.internalTag), internalTagMiWait.getSemaphoreGraphicsAddress());
1229+
1230+
commandQueue->csr = oldCsr;
12231231
}
12241232

12251233
} // namespace ult

shared/source/command_stream/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,17 @@ set(NEO_CORE_COMMAND_STREAM
3030
${CMAKE_CURRENT_SOURCE_DIR}/definitions${BRANCH_DIR_SUFFIX}stream_properties.inl
3131
${CMAKE_CURRENT_SOURCE_DIR}/device_command_stream.h
3232
${CMAKE_CURRENT_SOURCE_DIR}/host_function.h
33+
${CMAKE_CURRENT_SOURCE_DIR}/host_function.cpp
3334
${CMAKE_CURRENT_SOURCE_DIR}/host_function.inl
3435
${CMAKE_CURRENT_SOURCE_DIR}/host_function_enablers.inl
36+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_cv.h
37+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_cv.cpp
38+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_interface.h
39+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_interface.cpp
40+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_counting_semaphore.cpp
41+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_counting_semaphore.h
42+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_atomic.h
43+
${CMAKE_CURRENT_SOURCE_DIR}/host_function_worker_atomic.cpp
3544
${CMAKE_CURRENT_SOURCE_DIR}/linear_stream.cpp
3645
${CMAKE_CURRENT_SOURCE_DIR}/linear_stream.h
3746
${CMAKE_CURRENT_SOURCE_DIR}/preemption.cpp

shared/source/command_stream/command_stream_receiver.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include "shared/source/command_container/implicit_scaling.h"
1111
#include "shared/source/command_stream/aub_subcapture_status.h"
12+
#include "shared/source/command_stream/host_function_worker_interface.h"
1213
#include "shared/source/command_stream/scratch_space_controller.h"
1314
#include "shared/source/command_stream/submission_status.h"
1415
#include "shared/source/command_stream/submissions_aggregator.h"
@@ -97,6 +98,7 @@ CommandStreamReceiver::CommandStreamReceiver(ExecutionEnvironment &executionEnvi
9798
auto &compilerProductHelper = rootDeviceEnvironment.getHelper<CompilerProductHelper>();
9899
this->heaplessModeEnabled = compilerProductHelper.isHeaplessModeEnabled(hwInfo);
99100
this->heaplessStateInitEnabled = compilerProductHelper.isHeaplessStateInitEnabled(heaplessModeEnabled);
101+
this->hostFunctionWorkerMode = debugManager.flags.HostFunctionWorkMode.get();
100102
}
101103

102104
CommandStreamReceiver::~CommandStreamReceiver() {
@@ -235,6 +237,25 @@ WaitStatus CommandStreamReceiver::waitForTaskCountAndCleanTemporaryAllocationLis
235237
return waitForTaskCountAndCleanAllocationList(requiredTaskCount, TEMPORARY_ALLOCATION);
236238
}
237239

240+
void CommandStreamReceiver::createHostFunctionWorker() {
241+
242+
if (this->hostFunctionWorker != nullptr) {
243+
return;
244+
}
245+
246+
this->hostFunctionWorker = HostFunctionFactory::createHostFunctionWorker(this->hostFunctionWorkerMode,
247+
this->isAubMode(),
248+
this->downloadAllocationImpl,
249+
this->getHostFunctionDataAllocation(),
250+
&this->getHostFunctionData());
251+
252+
this->hostFunctionWorker->start();
253+
}
254+
255+
IHostFunctionWorker *CommandStreamReceiver::getHostFunctionWorker() {
256+
return this->hostFunctionWorker;
257+
}
258+
238259
void CommandStreamReceiver::ensureCommandBufferAllocation(LinearStream &commandStream, size_t minimumRequiredSize, size_t additionalAllocationSize) {
239260
if (commandStream.getAvailableSpace() >= minimumRequiredSize) {
240261
return;
@@ -419,6 +440,10 @@ void CommandStreamReceiver::cleanupResources() {
419440
tagsMultiAllocation = nullptr;
420441
}
421442

443+
if (hostFunctionWorker) {
444+
cleanupHostFunctionWorker();
445+
}
446+
422447
if (hostFunctionDataMultiAllocation) {
423448
hostFunctionDataAllocation = nullptr;
424449

@@ -464,6 +489,12 @@ void CommandStreamReceiver::cleanupResources() {
464489
ownedPrivateAllocations.clear();
465490
}
466491

492+
void CommandStreamReceiver ::cleanupHostFunctionWorker() {
493+
hostFunctionWorker->finish();
494+
delete hostFunctionWorker;
495+
hostFunctionWorker = nullptr;
496+
}
497+
467498
WaitStatus CommandStreamReceiver::waitForCompletionWithTimeout(const WaitParams &params, TaskCountType taskCountToWait) {
468499
bool printWaitForCompletion = debugManager.flags.LogWaitingForCompletion.get();
469500
if (printWaitForCompletion) {
@@ -697,6 +728,10 @@ void *CommandStreamReceiver::getIndirectHeapCurrentPtr(IndirectHeapType heapType
697728
return nullptr;
698729
}
699730

731+
void CommandStreamReceiver::signalHostFunctionWorker() {
732+
hostFunctionWorker->submit();
733+
}
734+
700735
void CommandStreamReceiver::ensureHostFunctionDataInitialization() {
701736
if (!this->hostFunctionInitialized.load(std::memory_order_acquire)) {
702737
initializeHostFunctionData();
@@ -717,6 +752,9 @@ void CommandStreamReceiver::initializeHostFunctionData() {
717752
this->hostFunctionData.entry = reinterpret_cast<decltype(HostFunctionData::entry)>(ptrOffset(hostFunctionBuffer, HostFunctionHelper::entryOffset));
718753
this->hostFunctionData.userData = reinterpret_cast<decltype(HostFunctionData::userData)>(ptrOffset(hostFunctionBuffer, HostFunctionHelper::userDataOffset));
719754
this->hostFunctionData.internalTag = reinterpret_cast<decltype(HostFunctionData::internalTag)>(ptrOffset(hostFunctionBuffer, HostFunctionHelper::internalTagOffset));
755+
756+
createHostFunctionWorker();
757+
720758
this->hostFunctionInitialized.store(true, std::memory_order_release);
721759
}
722760

@@ -968,12 +1006,14 @@ bool CommandStreamReceiver::createPreemptionAllocation() {
9681006
std::unique_lock<CommandStreamReceiver::MutexType> CommandStreamReceiver::obtainUniqueOwnership() {
9691007
return std::unique_lock<CommandStreamReceiver::MutexType>(this->ownershipMutex);
9701008
}
1009+
9711010
std::unique_lock<CommandStreamReceiver::MutexType> CommandStreamReceiver::tryObtainUniqueOwnership() {
9721011
return std::unique_lock<CommandStreamReceiver::MutexType>(this->ownershipMutex, std::try_to_lock);
9731012
}
9741013
std::unique_lock<CommandStreamReceiver::MutexType> CommandStreamReceiver::obtainHostPtrSurfaceCreationLock() {
9751014
return std::unique_lock<CommandStreamReceiver::MutexType>(this->hostPtrSurfaceCreationMutex);
9761015
}
1016+
9771017
AllocationsList &CommandStreamReceiver::getTemporaryAllocations() { return internalAllocationStorage->getTemporaryAllocations(); }
9781018
AllocationsList &CommandStreamReceiver::getAllocationsForReuse() { return internalAllocationStorage->getAllocationsForReuse(); }
9791019
AllocationsList &CommandStreamReceiver::getDeferredAllocations() { return internalAllocationStorage->getDeferredAllocations(); }

shared/source/command_stream/command_stream_receiver.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class KmdNotifyHelper;
6767
class GfxCoreHelper;
6868
class ProductHelper;
6969
class ReleaseHelper;
70+
class IHostFunctionWorker;
7071
enum class WaitStatus;
7172
struct AubSubCaptureStatus;
7273
class SharedPoolAllocation;
@@ -148,6 +149,8 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
148149
MOCKABLE_VIRTUAL WaitStatus waitForTaskCount(TaskCountType requiredTaskCount);
149150
WaitStatus waitForTaskCountAndCleanAllocationList(TaskCountType requiredTaskCount, uint32_t allocationUsage);
150151
MOCKABLE_VIRTUAL WaitStatus waitForTaskCountAndCleanTemporaryAllocationList(TaskCountType requiredTaskCount);
152+
MOCKABLE_VIRTUAL void createHostFunctionWorker();
153+
IHostFunctionWorker *getHostFunctionWorker();
151154

152155
LinearStream &getCS(size_t minRequiredSize = 1024u);
153156
OSInterface *getOSInterface() const;
@@ -270,7 +273,6 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
270273
MOCKABLE_VIRTUAL bool createPerDssBackedBuffer(Device &device);
271274
[[nodiscard]] MOCKABLE_VIRTUAL std::unique_lock<MutexType> obtainUniqueOwnership();
272275
[[nodiscard]] MOCKABLE_VIRTUAL std::unique_lock<MutexType> tryObtainUniqueOwnership();
273-
274276
bool peekTimestampPacketWriteEnabled() const { return timestampPacketWriteEnabled; }
275277

276278
bool isLatestTaskCountFlushed() {
@@ -567,6 +569,7 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
567569
bool isLatestFlushIsTaskCountUpdateOnly() const { return latestFlushIsTaskCountUpdateOnly; }
568570

569571
MOCKABLE_VIRTUAL uint32_t getContextGroupId() const;
572+
MOCKABLE_VIRTUAL void signalHostFunctionWorker();
570573

571574
void ensureHostFunctionDataInitialization();
572575
HostFunctionData &getHostFunctionData();
@@ -584,6 +587,7 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
584587
TaskCountType taskLevel, DispatchFlags &dispatchFlags, Device &device) = 0;
585588

586589
void cleanupResources();
590+
void cleanupHostFunctionWorker();
587591
void printDeviceIndex();
588592
void checkForNewResources(TaskCountType submittedTaskCount, TaskCountType allocationTaskCount, GraphicsAllocation &gfxAllocation);
589593
bool checkImplicitFlushForGpuIdle();
@@ -610,6 +614,7 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
610614
std::unique_ptr<TagAllocatorBase> timestampPacketAllocator;
611615
std::unique_ptr<Thread> userPauseConfirmation;
612616
std::unique_ptr<IndirectHeap> globalStatelessHeap;
617+
IHostFunctionWorker *hostFunctionWorker = nullptr;
613618

614619
ResidencyContainer residencyAllocations;
615620
PrivateAllocsToReuseContainer ownedPrivateAllocations;
@@ -664,7 +669,6 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
664669
std::atomic<TaskCountType> taskCount{0};
665670

666671
std::atomic<uint32_t> numClients = 0u;
667-
668672
DispatchMode dispatchMode = DispatchMode::immediateDispatch;
669673
SamplerCacheFlushState samplerCacheFlushRequired = SamplerCacheFlushState::samplerCacheFlushNotRequired;
670674
PreemptionMode lastPreemptionMode = PreemptionMode::Initial;
@@ -673,7 +677,7 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
673677
uint32_t lastSentL3Config = 0;
674678
uint32_t latestSentStatelessMocsConfig;
675679
uint64_t lastSentSliceCount;
676-
680+
int32_t hostFunctionWorkerMode = -1;
677681
uint32_t requiredScratchSlot0Size = 0;
678682
uint32_t requiredScratchSlot1Size = 0;
679683
uint32_t lastAdditionalKernelExecInfo;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (C) 2025 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: MIT
5+
*
6+
*/
7+
8+
#include "shared/source/command_stream/host_function.h"
9+
10+
#include "shared/source/command_stream/command_stream_receiver.h"
11+
#include "shared/source/command_stream/host_function_worker_atomic.h"
12+
#include "shared/source/command_stream/host_function_worker_counting_semaphore.h"
13+
#include "shared/source/command_stream/host_function_worker_cv.h"
14+
#include "shared/source/command_stream/host_function_worker_interface.h"
15+
16+
namespace NEO::HostFunctionFactory {
17+
18+
IHostFunctionWorker *createHostFunctionWorker(int32_t hostFunctionWorkerMode,
19+
bool isAubMode,
20+
const std::function<void(GraphicsAllocation &)> &downloadAllocationImpl,
21+
GraphicsAllocation *allocation,
22+
HostFunctionData *data) {
23+
24+
bool skipHostFunctionExecution = isAubMode;
25+
26+
switch (hostFunctionWorkerMode) {
27+
default:
28+
case 0:
29+
return new HostFunctionWorkerCountingSemaphore(skipHostFunctionExecution, downloadAllocationImpl, allocation, data);
30+
case 1:
31+
return new HostFunctionWorkerCV(skipHostFunctionExecution, downloadAllocationImpl, allocation, data);
32+
case 2:
33+
return new HostFunctionWorkerAtomic(skipHostFunctionExecution, downloadAllocationImpl, allocation, data);
34+
}
35+
}
36+
37+
} // namespace NEO::HostFunctionFactory

shared/source/command_stream/host_function.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99

1010
#include <cstddef>
1111
#include <cstdint>
12+
#include <functional>
1213

1314
namespace NEO {
1415

1516
class LinearStream;
17+
class CommandStreamReceiver;
18+
class IHostFunctionWorker;
19+
class GraphicsAllocation;
1620

1721
struct HostFunctionData {
1822
volatile uint64_t *entry = nullptr;
@@ -47,4 +51,13 @@ struct HostFunctionHelper {
4751
static void programWaitForHostFunctionCompletion(LinearStream *commandStream, void *cmdBuffer, const HostFunctionData &hostFunctionData);
4852
};
4953

54+
namespace HostFunctionFactory {
55+
IHostFunctionWorker *createHostFunctionWorker(int32_t hostFunctionWorkerMode,
56+
bool isAubMode,
57+
const std::function<void(GraphicsAllocation &)> &downloadAllocationImpl,
58+
GraphicsAllocation *allocation,
59+
HostFunctionData *data);
60+
61+
}
62+
5063
} // namespace NEO
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (C) 2025 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: MIT
5+
*
6+
*/
7+
8+
#include "shared/source/command_stream/host_function_worker_atomic.h"
9+
10+
#include "shared/source/command_stream/host_function.h"
11+
12+
namespace NEO {
13+
HostFunctionWorkerAtomic::HostFunctionWorkerAtomic(bool skipHostFunctionExecution,
14+
const std::function<void(GraphicsAllocation &)> &downloadAllocationImpl,
15+
GraphicsAllocation *allocation,
16+
HostFunctionData *data)
17+
: IHostFunctionWorker(skipHostFunctionExecution, downloadAllocationImpl, allocation, data) {
18+
}
19+
20+
HostFunctionWorkerAtomic::~HostFunctionWorkerAtomic() = default;
21+
22+
void HostFunctionWorkerAtomic::start() {
23+
24+
std::lock_guard<std::mutex> lg{workerMutex};
25+
if (!worker) {
26+
worker = std::make_unique<std::jthread>([this](std::stop_token st) {
27+
this->workerLoop(st);
28+
});
29+
}
30+
}
31+
32+
void HostFunctionWorkerAtomic::finish() {
33+
std::lock_guard<std::mutex> lg{workerMutex};
34+
if (worker) {
35+
worker->request_stop();
36+
pending.fetch_add(1u);
37+
pending.notify_one();
38+
worker.reset(nullptr);
39+
}
40+
}
41+
42+
void HostFunctionWorkerAtomic::submit() noexcept {
43+
pending.fetch_add(1, std::memory_order_release);
44+
pending.notify_one();
45+
}
46+
47+
void HostFunctionWorkerAtomic::workerLoop(std::stop_token st) noexcept {
48+
49+
while (true) {
50+
51+
while (pending.load(std::memory_order_acquire) == 0) {
52+
pending.wait(0, std::memory_order_acquire);
53+
}
54+
55+
if (st.stop_requested()) {
56+
return;
57+
}
58+
59+
pending.fetch_sub(1, std::memory_order_acq_rel);
60+
61+
bool sucess = this->runHostFunction(st);
62+
if (!sucess) [[unlikely]] {
63+
return;
64+
}
65+
}
66+
}
67+
68+
} // namespace NEO

0 commit comments

Comments
 (0)