Skip to content

Commit 6dc372f

Browse files
authored
Streaming (#18)
* Adding support for streaming * Updating kernel attributes * Updates to kernel members * Minor readability update * Current progress on streaming * Fix to test env in bazelrc to lessen overhead * Enabling pipelining conformance tests * Successful streaming conformance * One more attempt to enable pipelining * Fixing uninitialized parameter that caused overflow
1 parent d1c579e commit 6dc372f

File tree

12 files changed

+408
-119
lines changed

12 files changed

+408
-119
lines changed

.bazelrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ build --experimental_ui_max_stdouterr_bytes=-1
1111

1212
# Test settings
1313
test --test_output=errors # Show errors from tests
14-
test --test_env=PATH=/usr/local/bin:/usr/bin:/bin # Example of setting environment variables for tests
14+
# test --test_env=PATH=/usr/local/bin:/usr/bin:/bin # Example of setting environment variables for tests
1515

1616
# General settings
1717
build --jobs=4 # Use 4 parallel jobs for builds

framework/include/vx_graph.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#define VX_GRAPH_H
1818

1919
#include <COREVX/execution_queue.hpp>
20+
#include <atomic>
2021

2122
#include "vx_internal.h"
2223
#include "vx_reference.h"
@@ -171,6 +172,12 @@ class Graph : public Reference
171172
vx_status pipelineValidateRefsList(
172173
const vx_graph_parameter_queue_params_t graph_parameters_queue_param);
173174

175+
/**
176+
* @brief Streaming loop function
177+
*
178+
*/
179+
void streamingLoop();
180+
174181
/**
175182
* @brief Destruct function for the Graph object
176183
* @ingroup group_int_graph
@@ -207,9 +214,9 @@ class Graph : public Reference
207214
/*! \brief the max buffers that can be enqueued */
208215
vx_uint32 numBufs;
209216
/*! \brief The internal data ref queue */
210-
ExecutionQueue<vx_reference, VX_INT_MAX_QUEUE_DEPTH> queue;
217+
ExecutionQueue<vx_reference, VX_INT_MAX_PARAM_QUEUE_DEPTH> queue;
211218
/*! \brief references that can be queued into data ref queue */
212-
vx_reference refs_list[VX_INT_MAX_QUEUE_DEPTH];
219+
vx_reference refs_list[VX_INT_MAX_PARAM_QUEUE_DEPTH];
213220
#endif
214221
} parameters[VX_INT_MAX_PARAMS];
215222
/*! \brief The number of graph parameters. */
@@ -228,6 +235,16 @@ class Graph : public Reference
228235
/*! \brief The number of times to schedule a graph */
229236
vx_size scheduleCount;
230237
#endif
238+
#ifdef OPENVX_USE_STREAMING
239+
/*! \brief This indicates that the graph is streaming enabled */
240+
std::atomic<vx_bool> isStreamingEnabled;
241+
/*! \brief This indicates that the graph is currently streaming */
242+
std::atomic<vx_bool> isStreaming;
243+
/*! \brief The index of the trigger node */
244+
vx_uint32 triggerNodeIndex;
245+
/*! \brief The thread used for streaming */
246+
vx_thread streamingThread;
247+
#endif
231248
};
232249

233250
#endif /* VX_GRAPH_H */

framework/include/vx_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,10 @@ typedef struct vx_sem_t
329329
*/
330330
typedef sem_t vx_sem_t;
331331
#endif
332+
/*! \brief A C++ STL thread
333+
* \ingroup group_int_osal
334+
*/
335+
typedef std::thread vx_thread;
332336
/*! \brief A POSIX thread
333337
* \ingroup group_int_osal
334338
*/

framework/include/vx_kernel.h

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,27 +142,27 @@ class Kernel : public Reference
142142
*/
143143
static void printKernel(vx_kernel kernel);
144144

145-
/*! \brief */
145+
/*! \brief The name of the kernel */
146146
vx_char name[VX_MAX_KERNEL_NAME];
147-
/*! \brief */
147+
/*! \brief The kernel enum ID */
148148
vx_enum enumeration;
149-
/*! \brief */
149+
/*! \brief The kernel function pointer */
150150
vx_kernel_f function;
151-
/*! \brief */
151+
/*! \brief The kernel signature */
152152
vx_signature_t signature;
153-
/*! Indicates that the kernel is not yet enabled. */
153+
/*! \brief Indicates that the kernel is not yet enabled. */
154154
vx_bool enabled;
155-
/*! Indicates that this kernel is added by user. */
155+
/*! \brief Indicates that this kernel is added by user. */
156156
vx_bool user_kernel;
157-
/*! \brief */
157+
/*! \brief The kernel validate function pointer */
158158
vx_kernel_validate_f validate;
159-
/*! \brief */
159+
/*! \brief The kernel input validate function pointer */
160160
vx_kernel_input_validate_f validate_input;
161-
/*! \brief */
161+
/*! \brief The kernel output validate function pointer */
162162
vx_kernel_output_validate_f validate_output;
163-
/*! \brief */
163+
/*! \brief The kernel init function pointer */
164164
vx_kernel_initialize_f initialize;
165-
/*! \brief */
165+
/*! \brief The kernel deinit function pointer */
166166
vx_kernel_deinitialize_f deinitialize;
167167
/*! \brief The collection of attributes of a kernel */
168168
vx_kernel_attr_t attributes;
@@ -172,9 +172,15 @@ class Kernel : public Reference
172172
/*! \brief The tiling function pointer interface */
173173
vx_tiling_kernel_f tilingfast_function;
174174
vx_tiling_kernel_f tilingflexible_function;
175-
#endif
175+
#endif /* OPENVX_KHR_TILING */
176176
/*! \brief The pointer to the kernel object deinitializer. */
177177
vx_kernel_object_deinitialize_f kernel_object_deinitialize;
178+
/*! \brief The kernel's input depth required to start */
179+
vx_uint32 input_depth;
180+
/*! \brief The kernel's output depth required to start */
181+
vx_uint32 output_depth;
182+
/*! \brief Indicates whether kernel has piped up */
183+
vx_uint32 pipeUpCounter;
178184
};
179185

180186
#endif /* VX_KERNEL_H */

framework/include/vx_node.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#define VX_NODE_H
1818

1919
#include <VX/vx.h>
20+
#include <vector>
2021

2122
/*!
2223
* \file
@@ -132,6 +133,8 @@ class Node : public Reference
132133
vx_bool is_replicated;
133134
/*! \brief The replicated parameters flags */
134135
vx_bool replicated_flags[VX_INT_MAX_PARAMS];
136+
/*! \brief The node state */
137+
vx_node_state_e state;
135138
};
136139

137140
#endif /* VX_NODE_H */

framework/src/vx_graph.cpp

Lines changed: 108 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -230,21 +230,33 @@ void vxContaminateGraphs(vx_reference ref)
230230
/* INTERNAL FUNCTIONS */
231231
/******************************************************************************/
232232

233-
Graph::Graph(vx_context context, vx_reference scope) : Reference(context, VX_TYPE_GRAPH, scope),
234-
nodes(),
235-
perf(),
236-
numNodes(0),
237-
heads(),
238-
numHeads(0),
239-
state(VX_FAILURE),
240-
verified(vx_false_e),
241-
reverify(vx_false_e),
242-
lock(),
243-
parameters(),
244-
numParams(0),
245-
shouldSerialize(vx_false_e),
246-
parentGraph(nullptr),
247-
delays()
233+
Graph::Graph(vx_context context, vx_reference scope)
234+
: Reference(context, VX_TYPE_GRAPH, scope),
235+
nodes(),
236+
perf(),
237+
numNodes(0),
238+
heads(),
239+
numHeads(0),
240+
state(VX_FAILURE),
241+
verified(vx_false_e),
242+
reverify(vx_false_e),
243+
lock(),
244+
parameters(),
245+
numParams(0),
246+
shouldSerialize(vx_false_e),
247+
parentGraph(nullptr),
248+
delays(),
249+
scheduleMode(VX_GRAPH_SCHEDULE_MODE_NORMAL),
250+
#ifdef OPENVX_USE_PIPELINING
251+
numEnqueableParams(0),
252+
scheduleCount(0),
253+
#endif /* OPENVX_USE_PIPELINING */
254+
#ifdef OPENVX_USE_STREAMING
255+
isStreamingEnabled(vx_false_e),
256+
isStreaming(vx_false_e),
257+
triggerNodeIndex(0),
258+
streamingThread()
259+
#endif /* OPENVX_USE_STREAMING */
248260
{
249261
}
250262

@@ -1723,6 +1735,35 @@ vx_status Graph::pipelineValidateRefsList(
17231735
return status;
17241736
}
17251737

1738+
void Graph::streamingLoop()
1739+
{
1740+
#ifdef OPENVX_USE_STREAMING
1741+
while (isStreaming)
1742+
{
1743+
/* Wait for trigger node event if set */
1744+
// if (triggerNodeIndex != UINT32_MAX)
1745+
// {
1746+
// /* Wait for the trigger node to complete */
1747+
// while (!nodes[triggerNodeIndex]->executed)
1748+
// {
1749+
// std::cout << "Waiting for trigger node to complete" << std::endl;
1750+
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
1751+
// /* Allow clean exit */
1752+
// if (!isStreaming) return;
1753+
// }
1754+
// /* Reset the event for the next iteration */
1755+
// nodes[triggerNodeIndex]->executed = vx_false_e;
1756+
// }
1757+
1758+
/* Schedule and wait for the graph */
1759+
vx_status status = vxScheduleGraph(this);
1760+
if (status != VX_SUCCESS) break;
1761+
status = vxWaitGraph(this);
1762+
if (status != VX_SUCCESS) break;
1763+
}
1764+
#endif /* OPENVX_USE_STREAMING */
1765+
}
1766+
17261767
void Graph::destruct()
17271768
{
17281769
while (numNodes)
@@ -2068,7 +2109,8 @@ VX_API_ENTRY vx_status VX_API_CALL vxVerifyGraph(vx_graph graph)
20682109
{
20692110
if (((graph->nodes[n]->kernel->signature.directions[p] == VX_BIDIRECTIONAL) ||
20702111
(graph->nodes[n]->kernel->signature.directions[p] == VX_INPUT)) &&
2071-
(graph->nodes[n]->parameters[p] != nullptr))
2112+
(graph->nodes[n]->parameters[p] != nullptr) &&
2113+
(graph->nodes[n]->kernel->validate_input != nullptr))
20722114
{
20732115
vx_status input_validation_status = graph->nodes[n]->kernel->validate_input((vx_node)graph->nodes[n], p);
20742116
if (input_validation_status != VX_SUCCESS)
@@ -2100,25 +2142,30 @@ VX_API_ENTRY vx_status VX_API_CALL vxVerifyGraph(vx_graph graph)
21002142
if (graph->setupOutput(n, p, &vref, &metas[p], &status, &num_errors) ==
21012143
vx_false_e)
21022144
break;
2103-
output_validation_status = graph->nodes[n]->kernel->validate_output(
2104-
(vx_node)graph->nodes[n], p, metas[p]);
2105-
if (output_validation_status == VX_SUCCESS)
2145+
if (graph->nodes[n]->kernel->validate_output != nullptr)
21062146
{
2107-
if (graph->postprocessOutput(n, p, &vref, metas[p], &status,
2108-
&num_errors) == vx_false_e)
2147+
output_validation_status = graph->nodes[n]->kernel->validate_output(
2148+
(vx_node)graph->nodes[n], p, metas[p]);
2149+
if (output_validation_status == VX_SUCCESS)
21092150
{
2110-
break;
2151+
if (graph->postprocessOutput(n, p, &vref, metas[p], &status,
2152+
&num_errors) == vx_false_e)
2153+
{
2154+
break;
2155+
}
2156+
}
2157+
else
2158+
{
2159+
status = output_validation_status;
2160+
vxAddLogEntry(reinterpret_cast<vx_reference>(graph), status,
2161+
"Node %s: parameter[%u] failed output validation! "
2162+
"(status = %d)\n",
2163+
graph->nodes[n]->kernel->name, p, status);
2164+
VX_PRINT(VX_ZONE_ERROR,
2165+
"Failed on validation of output parameter[%u] on kernel "
2166+
"%s, status=%d\n",
2167+
p, graph->nodes[n]->kernel->name, status);
21112168
}
2112-
}
2113-
else
2114-
{
2115-
status = output_validation_status;
2116-
vxAddLogEntry(reinterpret_cast<vx_reference>(graph), status, "Node %s: parameter[%u] failed output validation! (status = %d)\n",
2117-
graph->nodes[n]->kernel->name, p, status);
2118-
VX_PRINT(VX_ZONE_ERROR,"Failed on validation of output parameter[%u] on kernel %s, status=%d\n",
2119-
p,
2120-
graph->nodes[n]->kernel->name,
2121-
status);
21222169
}
21232170
}
21242171
}
@@ -2511,6 +2558,7 @@ static vx_status vxExecuteGraph(vx_graph graph, vx_uint32 depth)
25112558
vx_uint32 next_nodes[VX_INT_MAX_REF];
25122559
vx_uint32 left_nodes[VX_INT_MAX_REF];
25132560
vx_context context = vxGetContext((vx_reference)graph);
2561+
vx_uint32 max_pipeup_depth = 1;
25142562
(void)depth;
25152563

25162564
#if defined(OPENVX_USE_SMP)
@@ -2640,6 +2688,34 @@ static vx_status vxExecuteGraph(vx_graph graph, vx_uint32 depth)
26402688
next_nodes[n],
26412689
target->name, node->kernel->name);
26422690

2691+
/* Check for pipeup phase:
2692+
* If this is the first time we are executing the graph, we need to pipeup
2693+
* all nodes with kernels in the graph that need pipeup of refs.
2694+
*/
2695+
max_pipeup_depth = std::max(
2696+
{max_pipeup_depth, node->kernel->input_depth, node->kernel->output_depth});
2697+
if (node->kernel->pipeUpCounter < max_pipeup_depth - 1)
2698+
{
2699+
node->state = VX_NODE_STATE_PIPEUP;
2700+
std::cout << "max_pipeup_depth: " << max_pipeup_depth << std::endl;
2701+
node->kernel->pipeUpCounter++;
2702+
// Retain input buffers during PIPEUP
2703+
for (vx_uint32 i = 0; i < node->kernel->output_depth - 1; i++)
2704+
{
2705+
action = target->funcs.process(target, &node, 0, 1);
2706+
node->kernel->pipeUpCounter++;
2707+
}
2708+
// For source nodes, provide new output buffers during PIPEUP
2709+
for (vx_uint32 i = 0; i < node->kernel->input_depth - 1; i++)
2710+
{
2711+
action = target->funcs.process(target, &node, 0, 1);
2712+
node->kernel->pipeUpCounter++;
2713+
}
2714+
}
2715+
2716+
/* If this node was in pipeup, update its state */
2717+
node->state = VX_NODE_STATE_STEADY;
2718+
26432719
action = target->funcs.process(target, &node, 0, 1);
26442720

26452721
VX_PRINT(VX_ZONE_GRAPH, "Returned Node[%u] %s:%s Action %d\n",

0 commit comments

Comments
 (0)