Skip to content

Commit e313b1e

Browse files
authored
Implemented event queueing (#13)
* Implemented event queueing * Fixing conformance error
1 parent 29ec632 commit e313b1e

File tree

6 files changed

+468
-14
lines changed

6 files changed

+468
-14
lines changed

BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ cc_library(
1212
]),
1313
hdrs = glob([
1414
"include/**",
15-
"framework/include/*.h",
15+
"framework/include/*.h*",
1616
]),
1717
includes = [
1818
"include",

framework/include/vx_context.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <memory>
2020

21+
#include "vx_event_queue.hpp"
2122
#include "vx_internal.h"
2223
#include "vx_reference.h"
2324

@@ -207,7 +208,7 @@ class Context : public Reference
207208
/*! \brief The number of available targets in the implementation */
208209
vx_uint32 num_targets;
209210
/*! \brief The list of implemented targets */
210-
vx_target targets[VX_INT_MAX_NUM_TARGETS];
211+
vx_target targets[VX_INT_MAX_NUM_TARGETS];
211212
/*! \brief The list of priority sorted target indexes */
212213
vx_uint32 priority_targets[VX_INT_MAX_NUM_TARGETS];
213214
/*! \brief The log callback for errors */
@@ -266,6 +267,10 @@ class Context : public Reference
266267
cl_context opencl_context;
267268
cl_command_queue opencl_command_queue;
268269
#endif
270+
#ifdef OPENVX_USE_EVENTS
271+
/*! \brief The event queue for the context */
272+
EventQueue event_queue;
273+
#endif
269274
};
270275

271276
#endif /* VX_CONTEXT_H */
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/**
2+
* @file vx_event_queue.hpp
3+
* @brief
4+
* @version 0.1
5+
* @date 2025-05-09
6+
*
7+
* @copyright Copyright (c) 2025
8+
*
9+
*/
10+
#pragma once
11+
12+
#include <chrono>
13+
#include <condition_variable>
14+
#include <deque>
15+
#include <memory>
16+
#include <mutex>
17+
#include <optional>
18+
#include <vector>
19+
20+
#include <VX/vx_khr_pipelining.h>
21+
22+
/**
23+
* @brief Internal Event Queue Object
24+
*
25+
*/
26+
class EventQueue
27+
{
28+
// Registration structure that stores additional event parameters for a vx_reference.
29+
struct RegistrationEntry
30+
{
31+
vx_reference ref; // vx_reference being registered
32+
vx_event_type_e type; // Event type associated with this registration
33+
vx_uint32 param; // Optional extra parameter (e.g., graph parameter index)
34+
vx_uint32 app_value; // Application-defined value to associate with events
35+
};
36+
37+
public:
38+
/**
39+
* @brief Construct a new Event Queue object
40+
*
41+
* @param enabled
42+
* @param max_size
43+
*/
44+
explicit EventQueue(bool enabled = false, size_t max_size = 128)
45+
: enabled_(enabled), max_size_(max_size) {}
46+
47+
48+
/**
49+
* @brief Clear the event queue
50+
*
51+
*/
52+
void clear()
53+
{
54+
std::unique_lock lock(mutex_);
55+
queue_.clear();
56+
}
57+
58+
/**
59+
* @brief Update the event queue status
60+
*
61+
* @param status true to enable, false to disable
62+
*/
63+
vx_status status(bool status)
64+
{
65+
std::lock_guard lock(mutex_);
66+
enabled_ = status;
67+
return VX_SUCCESS;
68+
}
69+
70+
/**
71+
* @brief Check if the event queue is enabled
72+
*
73+
* @return true if enabled
74+
* @return false if disabled
75+
*/
76+
bool isEnabled() const
77+
{
78+
return enabled_;
79+
}
80+
81+
/**
82+
* @brief Push an event to the queue
83+
*
84+
* @param type Event type
85+
* @param data Optional event value
86+
* @param info Optional event info pointer
87+
* @param reg Optional vx_reference used to look up additional registration data
88+
* @return true if successful
89+
* @return false if failed
90+
*/
91+
vx_status push(vx_event_type_e type, vx_uint32 data = 0, vx_event_info_t* info = nullptr,
92+
vx_reference ref = nullptr)
93+
{
94+
vx_event_t evt{};
95+
evt.type = type;
96+
evt.timestamp = getTimestamp();
97+
evt.app_value = data;
98+
if (info != nullptr)
99+
{
100+
evt.event_info = *info;
101+
}
102+
103+
// If a reference is provided, look up additional registration info
104+
if (ref != nullptr)
105+
{
106+
const RegistrationEntry* reg = lookupRegistration(ref, type);
107+
if (reg && evt.type == reg->type)
108+
{
109+
// override the app_value with that in the registration.
110+
evt.app_value = reg->app_value;
111+
}
112+
}
113+
114+
return push(evt);
115+
}
116+
117+
/**
118+
* @brief Push an event to the queue
119+
*
120+
* @param event Event to push
121+
* @return true if successful
122+
* @return false if failed
123+
*/
124+
vx_status push(vx_event_t event)
125+
{
126+
vx_status status = VX_SUCCESS;
127+
std::unique_lock lock(mutex_);
128+
129+
if (!enabled_)
130+
{
131+
status = VX_FAILURE;
132+
}
133+
134+
if (VX_SUCCESS == status)
135+
{
136+
if (queue_.size() >= max_size_)
137+
{
138+
queue_.pop_front(); // Drop the oldest event
139+
}
140+
queue_.emplace_back(std::move(event));
141+
cv_.notify_one();
142+
}
143+
144+
return status;
145+
}
146+
147+
vx_status wait(vx_event_t* event, vx_bool do_not_block)
148+
{
149+
vx_status status = VX_SUCCESS;
150+
std::optional<vx_event_t> evt;
151+
std::unique_lock lock(mutex_);
152+
153+
if (!enabled_)
154+
{
155+
status = VX_FAILURE;
156+
}
157+
158+
if (VX_SUCCESS == status)
159+
{
160+
if (do_not_block)
161+
{
162+
evt = wait_and_pop(std::chrono::milliseconds(0));
163+
}
164+
else
165+
{
166+
evt = wait_and_pop();
167+
}
168+
169+
if (std::nullopt == evt)
170+
{
171+
status = VX_FAILURE;
172+
}
173+
}
174+
175+
if (VX_SUCCESS == status)
176+
{
177+
*event = std::move(*evt);
178+
}
179+
180+
return status;
181+
}
182+
183+
vx_status registerEvent(vx_reference ref, vx_event_type_e type, vx_uint32 param,
184+
vx_uint32 app_value)
185+
{
186+
if (ref == nullptr) return VX_ERROR_INVALID_REFERENCE;
187+
std::unique_lock lock(mutex_);
188+
189+
RegistrationEntry entry;
190+
entry.ref = ref;
191+
entry.type = type;
192+
entry.param = param;
193+
entry.app_value = app_value;
194+
195+
registrations_.push_back(entry);
196+
return VX_SUCCESS;
197+
}
198+
199+
private : bool enabled_;
200+
mutable std::mutex mutex_;
201+
std::condition_variable cv_;
202+
std::deque<vx_event_t> queue_;
203+
size_t max_size_;
204+
std::vector<RegistrationEntry> registrations_;
205+
206+
/**
207+
* @brief Lookup a registration entry matching the given vx_reference and event type.
208+
*
209+
* @param ref The vx_reference to look up.
210+
* @param type The event type.
211+
* @return const RegistrationEntry* Pointer to the matching entry, or nullptr if not found.
212+
*/
213+
const RegistrationEntry* lookupRegistration(vx_reference ref, vx_event_type_e type) const
214+
{
215+
for (const auto& entry : registrations_)
216+
{
217+
if (entry.ref == ref && entry.type == type)
218+
{
219+
return &entry;
220+
}
221+
}
222+
223+
return nullptr;
224+
}
225+
226+
/**
227+
* @brief Wait for an event and pop it from the queue
228+
*
229+
* @param timeout Timeout duration
230+
* @return std::optional<vx_event_t> Event if available, otherwise std::nullopt
231+
*/
232+
std::optional<vx_event_t> wait_and_pop(
233+
std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
234+
{
235+
std::unique_lock lock(mutex_);
236+
if (!cv_.wait_for(lock, timeout, [this] { return !queue_.empty(); }))
237+
{
238+
return std::nullopt; // Timeout
239+
}
240+
241+
vx_event_t evt = std::move(queue_.front());
242+
queue_.pop_front();
243+
return evt;
244+
}
245+
246+
/**
247+
* @brief Get the current timestamp in nanoseconds
248+
*
249+
* @return vx_uint64 Current timestamp
250+
*/
251+
vx_uint64 getTimestamp() const
252+
{
253+
auto now = std::chrono::steady_clock::now();
254+
return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
255+
}
256+
};

0 commit comments

Comments
 (0)