Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion llvm/include/llvm/SYCLLowerIR/DeviceConfigFile.td
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def Aspectext_oneapi_clock_sub_group : Aspect<"ext_oneapi_clock_sub_group">;
def Aspectext_oneapi_clock_work_group : Aspect<"ext_oneapi_clock_work_group">;
def Aspectext_oneapi_clock_device : Aspect<"ext_oneapi_clock_device">;
def Aspectext_oneapi_is_integrated_gpu : Aspect<"ext_oneapi_is_integrated_gpu">;
def Aspectext_oneapi_device_wait : Aspect<"ext_oneapi_device_wait">;

// Deprecated aspects
def AspectInt64_base_atomics : Aspect<"int64_base_atomics">;
Expand Down Expand Up @@ -176,7 +177,8 @@ def : TargetInfo<"__TestAspectList",
Aspectext_oneapi_clock_sub_group,
Aspectext_oneapi_clock_work_group,
Aspectext_oneapi_clock_device,
Aspectext_oneapi_is_integrated_gpu],
Aspectext_oneapi_is_integrated_gpu,
Aspectext_oneapi_device_wait],
[]>;
// This definition serves the only purpose of testing whether the deprecated aspect list defined in here and in SYCL RT
// match.
Expand Down
15 changes: 15 additions & 0 deletions sycl/include/sycl/device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,21 @@ class __SYCL_STANDALONE_DEBUG __SYCL_EXPORT device
return profile.c_str();
}

/// Synchronizes with all queues associated with the device.
void ext_oneapi_wait();

/// Dispatches all unconsumed asynchronous exceptions for all queues or
/// contexts associated with the queues.
void ext_oneapi_throw_asynchronous();

/// Synchronizes with all queues associated with the device, then dispatches
/// all unconsumed asynchronous exceptions for all queues or contexts
/// associated with the queues.
void ext_oneapi_wait_and_throw() {
ext_oneapi_wait();
ext_oneapi_throw_asynchronous();
}

// TODO: Remove this diagnostics when __SYCL_WARN_IMAGE_ASPECT is removed.
#if defined(__clang__)
#pragma clang diagnostic pop
Expand Down
2 changes: 2 additions & 0 deletions sycl/include/sycl/exception_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ inline namespace _V1 {
// Forward declaration
namespace detail {
class queue_impl;
class device_impl;
}

/// A list of asynchronous exceptions.
Expand All @@ -46,6 +47,7 @@ class __SYCL_EXPORT exception_list {

private:
friend class detail::queue_impl;
friend class detail::device_impl;
void PushBack(const_reference Value);
void PushBack(value_type &&Value);
void Clear() noexcept;
Expand Down
1 change: 1 addition & 0 deletions sycl/include/sycl/info/aspects.def
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ __SYCL_ASPECT(ext_oneapi_clock_sub_group, 91)
__SYCL_ASPECT(ext_oneapi_clock_work_group, 92)
__SYCL_ASPECT(ext_oneapi_clock_device, 93)
__SYCL_ASPECT(ext_oneapi_is_integrated_gpu, 94)
__SYCL_ASPECT(ext_oneapi_device_wait, 95)
28 changes: 28 additions & 0 deletions sycl/source/detail/device_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,34 @@ device_impl::getImmediateProgressGuarantee(
return forward_progress_guarantee::weakly_parallel;
}

void device_impl::wait() const {
// Firstly, all associated queues should be cleaned through of all
// not-yet-enqueued commands and host_task.
for (const std::weak_ptr<queue_impl> &WQueue : MQueues) {
std::shared_ptr<queue_impl> Queue = WQueue.lock();
assert(Queue && "Queue should never be dangling in the list of queues "
"associated with the device!");
Queue->waitForRuntimeLevelCmdsAndClear();
}

// Then we synchronize the entire device.
getAdapter().call<detail::UrApiKind::urDeviceWaitExp>(getHandleRef());
}

void device_impl::throwAsynchronous() {
std::lock_guard<std::mutex> Lock(MAsyncExceptionsMutex);
for (auto &ExceptionsEntryIt : MAsyncExceptions) {
exception_list Exceptions = std::move(ExceptionsEntryIt.second);
std::shared_ptr<queue_impl> Queue = ExceptionsEntryIt.first.lock();
if (Queue && Queue->getAsynchHandler()) {
Queue->getAsynchHandler()(std::move(Exceptions));
} else {
// If the queue is dead, use the default handler.
defaultAsyncHandler(std::move(Exceptions));
}
}
}

#ifndef __INTEL_PREVIEW_BREAKING_CHANGES
#define EXPORT_GET_INFO(PARAM) \
template <> \
Expand Down
59 changes: 59 additions & 0 deletions sycl/source/detail/device_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,10 @@ class device_impl : public std::enable_shared_from_this<device_impl> {
get_info_impl_nocheck<UR_DEVICE_INFO_IS_INTEGRATED_GPU>().value_or(
0);
}
CASE(ext_oneapi_device_wait) {
return get_info_impl_nocheck<UR_DEVICE_INFO_DEVICE_WAIT_SUPPORT_EXP>()
.value_or(0);
}
else {
return false; // This device aspect has not been implemented yet.
}
Expand Down Expand Up @@ -2267,6 +2271,47 @@ class device_impl : public std::enable_shared_from_this<device_impl> {
return {};
}

/// Puts exception to the list of asynchronous ecxeptions.
///
/// \param QueueWeakPtr is a weak pointer referring to the queue to report
/// the asynchronous exceptions for.
/// \param ExceptionPtr is a pointer to exception to be put.
void reportAsyncException(std::weak_ptr<queue_impl> QueueWeakPtr,
const std::exception_ptr &ExceptionPtr) {
std::lock_guard<std::mutex> Lock(MAsyncExceptionsMutex);
MAsyncExceptions[QueueWeakPtr].PushBack(ExceptionPtr);
}

/// Extracts all unconsumed asynchronous exceptions for a given queue.
///
/// \param QueueWeakPtr is a weak pointer referring to the queue to extract
/// unconsumed asynchronous exceptions for.
exception_list flushAsyncExceptions(std::weak_ptr<queue_impl> QueueWeakPtr) {
std::lock_guard<std::mutex> Lock(MAsyncExceptionsMutex);
auto ExceptionsEntryIt = MAsyncExceptions.find(QueueWeakPtr);
if (ExceptionsEntryIt == MAsyncExceptions.end())
return exception_list{};
exception_list Exceptions = std::move(ExceptionsEntryIt->second);
MAsyncExceptions.erase(ExceptionsEntryIt);
return Exceptions;
}

/// Synchronizes with all queues on the device.
void wait() const;

// Dispatch all unconsumed asynchronous exception to the appropriate handlers.
void throwAsynchronous();

void registerQueue(const std::weak_ptr<queue_impl> &Q) {
std::lock_guard<std::mutex> Lock(MQueuesMutex);
MQueues.insert(Q);
}

void unregisterQueue(const std::weak_ptr<queue_impl> &Q) {
std::lock_guard<std::mutex> Lock(MQueuesMutex);
MQueues.erase(Q);
}

private:
ur_device_handle_t MDevice = 0;
// This is used for getAdapter so should be above other properties.
Expand All @@ -2277,6 +2322,20 @@ class device_impl : public std::enable_shared_from_this<device_impl> {

const ur_device_handle_t MRootDevice;

// Devices track a list of active queues on it, to allow for synchronization
// with host_task and not-yet-enqueued commands.
std::mutex MQueuesMutex;
std::set<std::weak_ptr<queue_impl>,
std::owner_less<std::weak_ptr<queue_impl>>>
MQueues;

// Asynchronous exceptions are captured at device-level until flushed, either
// by queues, events or a synchronization on the device itself.
std::mutex MAsyncExceptionsMutex;
std::map<std::weak_ptr<queue_impl>, exception_list,
std::owner_less<std::weak_ptr<queue_impl>>>
MAsyncExceptions;

// Order of caches matters! UR must come before SYCL info descriptors (because
// get_info calls get_info_impl but the opposite never happens) and both
// should come before aspects.
Expand Down
27 changes: 24 additions & 3 deletions sycl/source/detail/event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ void event_impl::initHostProfilingInfo() {
MHostProfilingInfo->setDevice(&Device);
}

void event_impl::setSubmittedQueue(std::weak_ptr<queue_impl> SubmittedQueue) {
MSubmittedQueue = std::move(SubmittedQueue);
void event_impl::setSubmittedQueue(queue_impl *SubmittedQueue) {
MSubmittedQueue = SubmittedQueue->weak_from_this();
MSubmittedDevice = &SubmittedQueue->getDeviceImpl();
}

#ifdef XPTI_ENABLE_INSTRUMENTATION
Expand Down Expand Up @@ -308,8 +309,28 @@ void event_impl::wait(bool *Success) {
void event_impl::wait_and_throw() {
wait();

if (std::shared_ptr<queue_impl> SubmittedQueue = MSubmittedQueue.lock())
if (std::shared_ptr<queue_impl> SubmittedQueue = MSubmittedQueue.lock()) {
SubmittedQueue->throw_asynchronous();
return;
}

// If the queue has died, we rely on finding its exceptions through the
// device.
if (MSubmittedDevice == nullptr)
return;

// If MSubmittedQueue has died, get flush any exceptions associated with it
// still, then user either the context async_handler or the default
// async_handler.
exception_list Exceptions =
MSubmittedDevice->flushAsyncExceptions(MSubmittedQueue);
if (Exceptions.size() == 0)
return;

if (MContext && MContext->get_async_handler())
MContext->get_async_handler()(std::move(Exceptions));
else
defaultAsyncHandler(std::move(Exceptions));
}

void event_impl::checkProfilingPreconditions() const {
Expand Down
5 changes: 3 additions & 2 deletions sycl/source/detail/event_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ class event_impl {
MWorkerQueue = std::move(WorkerQueue);
};

/// Sets original queue used for submission.
/// Sets original queue and device used for submission.
///
/// @return
void setSubmittedQueue(std::weak_ptr<queue_impl> SubmittedQueue);
void setSubmittedQueue(queue_impl *SubmittedQueue);

/// Indicates if this event is not associated with any command and doesn't
/// have native handle.
Expand Down Expand Up @@ -394,6 +394,7 @@ class event_impl {

std::weak_ptr<queue_impl> MWorkerQueue;
std::weak_ptr<queue_impl> MSubmittedQueue;
device_impl *MSubmittedDevice = nullptr;

/// Dependency events prepared for waiting by backend.
std::vector<EventImplPtr> MPreparedDepsEvents;
Expand Down
70 changes: 43 additions & 27 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ EventImplPtr queue_impl::submit_command_to_graph(
std::unique_ptr<detail::CG> CommandGroup, sycl::detail::CGType CGType,
sycl::ext::oneapi::experimental::node_type UserFacingNodeType) {
auto EventImpl = detail::event_impl::create_completed_host_event();
EventImpl->setSubmittedQueue(weak_from_this());
EventImpl->setSubmittedQueue(this);
ext::oneapi::experimental::detail::node_impl *NodeImpl = nullptr;

// GraphImpl is read and written in this scope so we lock this graph
Expand Down Expand Up @@ -889,32 +889,7 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {
LastEvent->wait();
}
} else if (!isInOrder()) {
std::vector<std::weak_ptr<event_impl>> WeakEvents;
{
std::lock_guard<std::mutex> Lock(MMutex);
WeakEvents.swap(MEventsWeak);
MMissedCleanupRequests.unset(
[&](MissedCleanupRequestsType &MissedCleanupRequests) {
for (auto &UpdatedGraph : MissedCleanupRequests)
doUnenqueuedCommandCleanup(UpdatedGraph);
MissedCleanupRequests.clear();
});
}

// Wait for unenqueued or host task events, starting
// from the latest submitted task in order to minimize total amount of
// calls, then handle the rest with urQueueFinish.
for (auto EventImplWeakPtrIt = WeakEvents.rbegin();
EventImplWeakPtrIt != WeakEvents.rend(); ++EventImplWeakPtrIt) {
if (std::shared_ptr<event_impl> EventImplSharedPtr =
EventImplWeakPtrIt->lock()) {
// A nullptr UR event indicates that urQueueFinish will not cover it,
// either because it's a host task event or an unenqueued one.
if (nullptr == EventImplSharedPtr->getHandle()) {
EventImplSharedPtr->wait();
}
}
}
waitForRuntimeLevelCmdsAndClear();
}

getAdapter().call<UrApiKind::urQueueFinish>(getHandleRef());
Expand Down Expand Up @@ -1127,6 +1102,47 @@ void queue_impl::verifyProps(const property_list &Props) const {
CheckPropertiesWithData);
}

void queue_impl::waitForRuntimeLevelCmdsAndClear() {
if (isInOrder() && !MNoLastEventMode.load(std::memory_order_relaxed)) {
// if MLastEvent is not null and has no associated handle, we need to wait
// for it. We do not clear it however.
EventImplPtr LastEvent;
{
std::lock_guard<std::mutex> Lock(MMutex);
LastEvent = MDefaultGraphDeps.LastEventPtr;
}
if (LastEvent && nullptr == LastEvent->getHandle())
LastEvent->wait();
} else if (!isInOrder()) {
std::vector<std::weak_ptr<event_impl>> WeakEvents;
{
std::lock_guard<std::mutex> Lock(MMutex);
WeakEvents.swap(MEventsWeak);
MMissedCleanupRequests.unset(
[&](MissedCleanupRequestsType &MissedCleanupRequests) {
for (auto &UpdatedGraph : MissedCleanupRequests)
doUnenqueuedCommandCleanup(UpdatedGraph);
MissedCleanupRequests.clear();
});
}

// Wait for unenqueued or host task events, starting
// from the latest submitted task in order to minimize total amount of
// calls, then handle the rest with urQueueFinish.
for (auto EventImplWeakPtrIt = WeakEvents.rbegin();
EventImplWeakPtrIt != WeakEvents.rend(); ++EventImplWeakPtrIt) {
if (std::shared_ptr<event_impl> EventImplSharedPtr =
EventImplWeakPtrIt->lock()) {
// A nullptr UR event indicates that urQueueFinish will not cover it,
// either because it's a host task event or an unenqueued one.
if (nullptr == EventImplSharedPtr->getHandle()) {
EventImplSharedPtr->wait();
}
}
}
}
}

} // namespace detail
} // namespace _V1
} // namespace sycl
Loading
Loading