Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sycl/include/sycl/handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2839,10 +2839,15 @@ class __SYCL_EXPORT handler {
/// Executes a command_graph.
///
/// \param Graph Executable command_graph to run
#ifdef __INTEL_PREVIEW_BREAKING_CHANGES
void
ext_oneapi_graph(const ext::oneapi::experimental::command_graph<
ext::oneapi::experimental::graph_state::executable> &Graph);
#else
void ext_oneapi_graph(ext::oneapi::experimental::command_graph<
ext::oneapi::experimental::graph_state::executable>
Graph);

#endif
/// Copies data from host to device, where \p Src is a USM pointer and \p Dest
/// is an opaque image memory handle. An exception is thrown if either \p Src
/// is nullptr or \p Dest is incomplete. The behavior is undefined if
Expand Down
62 changes: 32 additions & 30 deletions sycl/source/detail/graph/graph_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ void exec_graph_impl::createCommandBuffers(
throw sycl::exception(errc::invalid, "Failed to create UR command-buffer");
}

Partition->MCommandBuffers[Device] = OutCommandBuffer;
Partition->MCommandBuffer = OutCommandBuffer;

for (node_impl &Node : Partition->schedule()) {
// Some nodes are not scheduled like other nodes, and only their
Expand Down Expand Up @@ -910,13 +910,12 @@ exec_graph_impl::~exec_graph_impl() {

for (const auto &Partition : MPartitions) {
Partition->MSchedule.clear();
for (const auto &Iter : Partition->MCommandBuffers) {
if (auto CmdBuf = Iter.second; CmdBuf) {
ur_result_t Res = Adapter.call_nocheck<
sycl::detail::UrApiKind::urCommandBufferReleaseExp>(CmdBuf);
(void)Res;
assert(Res == UR_RESULT_SUCCESS);
}
if (Partition->MCommandBuffer) {
ur_result_t Res = Adapter.call_nocheck<
sycl::detail::UrApiKind::urCommandBufferReleaseExp>(
Partition->MCommandBuffer);
(void)Res;
assert(Res == UR_RESULT_SUCCESS);
}
}
} catch (std::exception &e) {
Expand Down Expand Up @@ -988,11 +987,9 @@ EventImplPtr exec_graph_impl::enqueuePartitionWithScheduler(
Partition->MAccessors.end());
}

auto CommandBuffer = Partition->MCommandBuffers[Queue.get_device()];

std::unique_ptr<sycl::detail::CG> CommandGroup =
std::make_unique<sycl::detail::CGExecCommandBuffer>(
CommandBuffer, nullptr, std::move(CGData));
Partition->MCommandBuffer, nullptr, std::move(CGData));

EventImplPtr SchedulerEvent = sycl::detail::Scheduler::getInstance().addCG(
std::move(CommandGroup), Queue, EventNeeded);
Expand All @@ -1009,25 +1006,28 @@ EventImplPtr exec_graph_impl::enqueuePartitionDirectly(
std::shared_ptr<partition> &Partition, sycl::detail::queue_impl &Queue,
std::vector<detail::EventImplPtr> &WaitEvents, bool EventNeeded) {

ur_event_handle_t *UrEnqueueWaitList = nullptr;
size_t UrEnqueueWaitListSize = 0;

// Create a list containing all the UR event handles in WaitEvents. WaitEvents
// is assumed to be safe for scheduler bypass and any host-task events that it
// contains can be ignored.
std::vector<ur_event_handle_t> UrEventHandles{};
UrEventHandles.reserve(WaitEvents.size());
for (auto &SyclWaitEvent : WaitEvents) {
if (auto URHandle = SyclWaitEvent->getHandle()) {
UrEventHandles.push_back(URHandle);
if (!WaitEvents.empty()) {
UrEventHandles.reserve(WaitEvents.size());
for (auto &SyclWaitEvent : WaitEvents) {
if (auto URHandle = SyclWaitEvent->getHandle()) {
UrEventHandles.push_back(URHandle);
}
}
}

auto CommandBuffer = Partition->MCommandBuffers[Queue.get_device()];
const size_t UrEnqueueWaitListSize = UrEventHandles.size();
const ur_event_handle_t *UrEnqueueWaitList =
UrEnqueueWaitListSize == 0 ? nullptr : UrEventHandles.data();
UrEnqueueWaitList = UrEventHandles.data();
UrEnqueueWaitListSize = UrEventHandles.size();
}

if (!EventNeeded) {
Queue.getAdapter().call<sycl::detail::UrApiKind::urEnqueueCommandBufferExp>(
Queue.getHandleRef(), CommandBuffer, UrEnqueueWaitListSize,
Queue.getHandleRef(), Partition->MCommandBuffer, UrEnqueueWaitListSize,
UrEnqueueWaitList, nullptr);
return nullptr;
} else {
Expand All @@ -1037,7 +1037,7 @@ EventImplPtr exec_graph_impl::enqueuePartitionDirectly(
NewEvent->setSubmissionTime();
ur_event_handle_t UrEvent = nullptr;
Queue.getAdapter().call<sycl::detail::UrApiKind::urEnqueueCommandBufferExp>(
Queue.getHandleRef(), CommandBuffer, UrEventHandles.size(),
Queue.getHandleRef(), Partition->MCommandBuffer, UrEventHandles.size(),
UrEnqueueWaitList, &UrEvent);
NewEvent->setHandle(UrEvent);
NewEvent->setEventFromSubmittedExecCommandBuffer(true);
Expand Down Expand Up @@ -1161,17 +1161,20 @@ exec_graph_impl::enqueuePartitions(sycl::detail::queue_impl &Queue,

EventImplPtr
exec_graph_impl::enqueue(sycl::detail::queue_impl &Queue,
sycl::detail::CG::StorageInitHelper CGData,
sycl::detail::CG::StorageInitHelper &CGData,
bool EventNeeded) {
WriteLock Lock(MMutex);

cleanupExecutionEvents(MSchedulerDependencies);
CGData.MEvents.insert(CGData.MEvents.end(), MSchedulerDependencies.begin(),
MSchedulerDependencies.end());
if (!MSchedulerDependencies.empty()) {
cleanupExecutionEvents(MSchedulerDependencies);
CGData.MEvents.insert(CGData.MEvents.end(), MSchedulerDependencies.begin(),
MSchedulerDependencies.end());
}

bool IsCGDataSafeForSchedulerBypass =
detail::Scheduler::areEventsSafeForSchedulerBypass(
CGData.MEvents, Queue.getContextImpl()) &&
(CGData.MEvents.empty() ||
detail::Scheduler::areEventsSafeForSchedulerBypass(
CGData.MEvents, Queue.getContextImpl())) &&
CGData.MRequirements.empty();

// This variable represents the returned event. It will always be nullptr if
Expand Down Expand Up @@ -1478,8 +1481,7 @@ void exec_graph_impl::update(nodes_range Nodes) {
auto PartitionedNodes = getURUpdatableNodes(Nodes);
for (auto &[PartitionIndex, NodeImpl] : PartitionedNodes) {
auto &Partition = MPartitions[PartitionIndex];
auto CommandBuffer = Partition->MCommandBuffers[MDevice];
updateURImpl(CommandBuffer, NodeImpl);
updateURImpl(Partition->MCommandBuffer, NodeImpl);
}
}

Expand Down
13 changes: 7 additions & 6 deletions sycl/source/detail/graph/graph_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ class dynamic_command_group_impl;
class partition {
public:
/// Constructor.
partition() : MSchedule(), MCommandBuffers() {}
partition() : MSchedule() {}

/// List of root nodes.
std::set<node_impl *> MRoots;
/// Execution schedule of nodes in the graph.
std::list<node_impl *> MSchedule;
/// Map of devices to command buffers.
std::unordered_map<sycl::device, ur_exp_command_buffer_handle_t>
MCommandBuffers;

/// Command buffer associated with this partition
ur_exp_command_buffer_handle_t MCommandBuffer = nullptr;

/// List of predecessors to this partition.
std::vector<partition *> MPredecessors;

Expand Down Expand Up @@ -588,7 +589,7 @@ class graph_impl : public std::enable_shared_from_this<graph_impl> {
};

/// Class representing the implementation of command_graph<executable>.
class exec_graph_impl {
class exec_graph_impl : public std::enable_shared_from_this<exec_graph_impl> {
public:
using ReadLock = std::shared_lock<std::shared_mutex>;
using WriteLock = std::unique_lock<std::shared_mutex>;
Expand Down Expand Up @@ -628,7 +629,7 @@ class exec_graph_impl {
/// @return Returns an event if EventNeeded is true. Returns nullptr
/// otherwise.
EventImplPtr enqueue(sycl::detail::queue_impl &Queue,
sycl::detail::CG::StorageInitHelper CGData,
sycl::detail::CG::StorageInitHelper &CGData,
bool EventNeeded);

/// Iterates through all the nodes in the graph to build the list of
Expand Down
5 changes: 1 addition & 4 deletions sycl/source/detail/handler_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ class handler_impl {

/// If we are submitting a graph using ext_oneapi_graph this will be the graph
/// to be executed.
std::shared_ptr<ext::oneapi::experimental::detail::exec_graph_impl>
MExecGraph;
/// Storage for a node created from a subgraph submission.
std::shared_ptr<ext::oneapi::experimental::detail::node_impl> MSubgraphNode;
ext::oneapi::experimental::detail::exec_graph_impl *MExecGraph;
/// Storage for the CG created when handling graph nodes added explicitly.
std::unique_ptr<detail::CG> MGraphNodeCG;
/// Storage for node dependencies passed when adding a graph node explicitly
Expand Down
38 changes: 19 additions & 19 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,15 @@ queue_impl::submit_impl(const detail::type_erased_cgfo_ty &CGF,
// Host and interop tasks, however, are not submitted to low-level runtimes
// and require separate dependency management.
const CGType Type = HandlerImpl.MCGType;
std::vector<StreamImplPtr> Streams;
if (Type == CGType::Kernel)
Streams = std::move(Handler.MStreamStorage);

HandlerImpl.MEventMode = SubmitInfo.EventMode();

auto isHostTask = Type == CGType::CodeplayHostTask ||
(Type == CGType::ExecCommandBuffer &&
HandlerImpl.MExecGraph->containsHostTask());

auto requiresPostProcess = SubmitInfo.PostProcessorFunc() || Streams.size();
auto requiresPostProcess =
SubmitInfo.PostProcessorFunc() || Handler.MStreamStorage.size();
auto noLastEventPath = !isHostTask &&
MNoLastEventMode.load(std::memory_order_acquire) &&
!requiresPostProcess;
Expand Down Expand Up @@ -402,21 +400,23 @@ queue_impl::submit_impl(const detail::type_erased_cgfo_ty &CGF,
handlerPostProcess(Handler, SubmitInfo.PostProcessorFunc(), Event);
}

for (auto &Stream : Streams) {
// We don't want stream flushing to be blocking operation that is why submit
// a host task to print stream buffer. It will fire up as soon as the kernel
// finishes execution.
auto L = [&](handler &ServiceCGH) {
Stream->generateFlushCommand(ServiceCGH);
};
detail::type_erased_cgfo_ty CGF{L};
detail::EventImplPtr FlushEvent =
submit_impl(CGF, /*CallerNeedsEvent*/ true, Loc, IsTopCodeLoc, {});
if (EventImpl)
EventImpl->attachEventToCompleteWeak(FlushEvent);
if (!isInOrder()) {
// For in-order queue, the dependencies will be tracked by LastEvent
registerStreamServiceEvent(FlushEvent);
if (Type == CGType::Kernel) {
for (auto &Stream : Handler.MStreamStorage) {
// We don't want stream flushing to be blocking operation that is why
// submit a host task to print stream buffer. It will fire up as soon as
// the kernel finishes execution.
auto L = [&](handler &ServiceCGH) {
Stream->generateFlushCommand(ServiceCGH);
};
detail::type_erased_cgfo_ty CGF{L};
detail::EventImplPtr FlushEvent =
submit_impl(CGF, /*CallerNeedsEvent*/ true, Loc, IsTopCodeLoc, {});
if (EventImpl)
EventImpl->attachEventToCompleteWeak(FlushEvent);
if (!isInOrder()) {
// For in-order queue, the dependencies will be tracked by LastEvent
registerStreamServiceEvent(FlushEvent);
}
}
}

Expand Down
16 changes: 13 additions & 3 deletions sycl/source/detail/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,9 @@ class queue_impl : public std::enable_shared_from_this<queue_impl> {

MEmpty.store(false, std::memory_order_release);

synchronizeWithExternalEvent(Handler);
if (MInOrderExternalEvent.read_unlocked()) {
synchronizeWithExternalEvent(Handler);
}

auto Event = parseEvent(Handler.finalize());

Expand Down Expand Up @@ -778,7 +780,9 @@ class queue_impl : public std::enable_shared_from_this<queue_impl> {
MEmpty = false;
MNoLastEventMode = false;

synchronizeWithExternalEvent(Handler);
if (MInOrderExternalEvent.read_unlocked()) {
synchronizeWithExternalEvent(Handler);
}

EventToBuildDeps = parseEvent(Handler.finalize());
assert(EventToBuildDeps);
Expand Down Expand Up @@ -808,7 +812,9 @@ class queue_impl : public std::enable_shared_from_this<queue_impl> {

MEmpty = false;

synchronizeWithExternalEvent(Handler);
if (MInOrderExternalEvent.read_unlocked()) {
synchronizeWithExternalEvent(Handler);
}

EventToBuildDeps = parseEvent(Handler.finalize());
if (EventToBuildDeps)
Expand Down Expand Up @@ -1010,12 +1016,16 @@ class queue_impl : public std::enable_shared_from_this<queue_impl> {
}
}
}

DataType read() {
if (!MIsSet.load(std::memory_order_acquire))
return DataType{};
std::lock_guard<std::mutex> Lock(MDataMtx);
return MData;
}

// To use when the queue is already acquired a mutex lock.
DataType read_unlocked() { return MData; }
};

const bool MIsInorder;
Expand Down
3 changes: 1 addition & 2 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3845,8 +3845,7 @@ ur_result_t UpdateCommandBufferCommand::enqueueImp() {
auto Device = MQueue->get_device();
auto &Partitions = MGraph->getPartitions();
for (auto &[PartitionIndex, NodeImpl] : PartitionedNodes) {
auto CommandBuffer = Partitions[PartitionIndex]->MCommandBuffers[Device];
MGraph->updateURImpl(CommandBuffer, NodeImpl);
MGraph->updateURImpl(Partitions[PartitionIndex]->MCommandBuffer, NodeImpl);
}

return UR_RESULT_SUCCESS;
Expand Down
Loading
Loading