Skip to content

Conversation

@asahtik
Copy link
Contributor

@asahtik asahtik commented Sep 24, 2025

Purpose

Implements pipeline events which should simplify debugging stuck pipelines and help profile the pipeline.

Specification

None / not applicable

Dependencies & Potential Impact

None / not applicable

Deployment Plan

None / not applicable

Testing & Validation

None / not applicable

@asahtik asahtik marked this pull request as draft September 24, 2025 07:12
src/pipeline/node/UVCBindings.cpp
src/pipeline/node/ToFBindings.cpp
src/pipeline/node/PointCloudBindings.cpp
src/pipeline/node/SyncBindings.cpp
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncBindings were duplicated (line 96)

Copy link
Collaborator

@moratom moratom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will play around more with it, so this just an initial review;

  • The general design looks good
  • The example seems to produce strange results (shared locally)
  • I think we should create a design document to doucment this as it's quite involved PipelineEvent agregations both in device context and in host context, the merger between the two, ...
  • Maybe good to add an example of how to subscribe to events for a particular node


std::string name;
std::string alias;
std::string device;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be the deviceId?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add bool deviceNode as well, to be explicit.

while(mainLoop()) {
std::shared_ptr<dai::ImgFrame> imgFrame = nullptr;
{
auto blockEvent = this->inputBlockEvent();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainLoop sends a main loop event and returns isRunning(), blockEvent sends the start event in the constructor and the end event in the destructor.

.def_readwrite("queueSize", &PipelineEvent::queueSize, DOC(dai, PipelineEvent, queueSize))
.def_readwrite("interval", &PipelineEvent::interval, DOC(dai, PipelineEvent, interval))
.def_readwrite("type", &PipelineEvent::type, DOC(dai, PipelineEvent, type))
.def_readwrite("source", &PipelineEvent::source, DOC(dai, PipelineEvent, source))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need bindings, should work "automagically"

Comment on lines +109 to +114
.def("getTimestamp", &PipelineState::Buffer::getTimestamp, DOC(dai, Buffer, getTimestamp))
.def("getTimestampDevice", &PipelineState::Buffer::getTimestampDevice, DOC(dai, Buffer, getTimestampDevice))
.def("getSequenceNum", &PipelineState::Buffer::getSequenceNum, DOC(dai, Buffer, getSequenceNum))
.def("setTimestamp", &PipelineState::setTimestamp, DOC(dai, Buffer, setTimestamp))
.def("setTimestampDevice", &PipelineState::setTimestampDevice, DOC(dai, Buffer, setTimestampDevice))
.def("setSequenceNum", &PipelineState::setSequenceNum, DOC(dai, Buffer, setSequenceNum));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for PipelineEvent bindings

std::unordered_map<std::string, std::filesystem::path> recordReplayFilenames;
bool removeRecordReplayFiles = true;
std::string defaultDeviceId;
bool pipelineOnHost = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this signify? If the pipeline has any host nodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline debugging and record & replay sections should be skipped when the pipeline is being built on device. This flag specifies if it's building on host. I added a comment and changed the name to make it more clear.

Comment on lines 500 to 503
void buildDevice() {
impl()->pipelineOnHost = false;
impl()->build();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used on RVC4

Comment on lines +45 to +72
# If pipeline debugging is disabled, this will raise an exception
pipelineState = pipeline.getPipelineState().nodes().detailed()
for nodeId, nodeState in pipelineState.nodeStates.items():
nodeName = pipeline.getNode(nodeId).getName()
print(f"\n# State for node {pipeline.getNode(nodeId).getName()} ({nodeId}):")
print(f"## State: {nodeState.state}")
print(f"## mainLoopTiming: {'invalid' if not nodeState.mainLoopTiming.isValid() else ''}")
if(nodeState.mainLoopTiming.isValid()):
print("-----")
print(nodeState.mainLoopTiming)
print("-----")
print(f"## inputsGetTiming: {'invalid' if not nodeState.inputsGetTiming.isValid() else ''}")
if(nodeState.inputsGetTiming.isValid()):
print("-----")
print(nodeState.inputsGetTiming)
print("-----")
print(f"## outputsSendTiming: {'invalid' if not nodeState.outputsSendTiming.isValid() else ''}")
if(nodeState.outputsSendTiming.isValid()):
print("-----")
print(nodeState.outputsSendTiming)
print("-----")
print(f"## inputStates: {'empty' if not nodeState.inputStates else ''}")
for inputName, inputState in nodeState.inputStates.items():
if inputState.isValid():
print(f"### {inputName}:\n-----{inputState}\n-----")
else:
print(f"### {inputName}: invalid")
print(f"## outputStates: {'empty' if not nodeState.outputStates else ''}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we print json version of the object instead? Likely easier to digest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes it more clear what data is in the pipeline state and how the user can interact with it. Also just printing the serialized json doesn't handle invalid timings so there'd be a lot more junk.

Comment on lines 540 to 742

if(defaultDevice) {
auto recordPath = std::filesystem::path(utility::getEnvAs<std::string>("DEPTHAI_RECORD", ""));
auto replayPath = std::filesystem::path(utility::getEnvAs<std::string>("DEPTHAI_REPLAY", ""));
if(pipelineOnHost) {
if(defaultDevice) {
auto recordPath = std::filesystem::path(utility::getEnvAs<std::string>("DEPTHAI_RECORD", ""));
auto replayPath = std::filesystem::path(utility::getEnvAs<std::string>("DEPTHAI_REPLAY", ""));

if(defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_2
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_X
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_RVC4) {
try {
if(defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_2
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_X
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_RVC4) {
try {
#ifdef DEPTHAI_MERGED_TARGET
if(enableHolisticRecordReplay) {
switch(recordConfig.state) {
case RecordConfig::RecordReplayState::RECORD:
recordPath = recordConfig.outputDir;
replayPath = "";
break;
case RecordConfig::RecordReplayState::REPLAY:
recordPath = "";
replayPath = recordConfig.outputDir;
break;
case RecordConfig::RecordReplayState::NONE:
enableHolisticRecordReplay = false;
break;
if(enableHolisticRecordReplay) {
switch(recordConfig.state) {
case RecordConfig::RecordReplayState::RECORD:
recordPath = recordConfig.outputDir;
replayPath = "";
break;
case RecordConfig::RecordReplayState::REPLAY:
recordPath = "";
replayPath = recordConfig.outputDir;
break;
case RecordConfig::RecordReplayState::NONE:
enableHolisticRecordReplay = false;
break;
}
}
}

defaultDeviceId = defaultDevice->getDeviceId();

if(!recordPath.empty() && !replayPath.empty()) {
Logging::getInstance().logger.warn("Both DEPTHAI_RECORD and DEPTHAI_REPLAY are set. Record and replay disabled.");
} else if(!recordPath.empty()) {
if(enableHolisticRecordReplay || utility::checkRecordConfig(recordPath, recordConfig)) {
if(platform::checkWritePermissions(recordPath)) {
if(utility::setupHolisticRecord(parent,
defaultDeviceId,
recordConfig,
recordReplayFilenames,
defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_2
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_X)) {
recordConfig.state = RecordConfig::RecordReplayState::RECORD;
Logging::getInstance().logger.info("Record enabled.");
defaultDeviceId = defaultDevice->getDeviceId();

if(!recordPath.empty() && !replayPath.empty()) {
Logging::getInstance().logger.warn("Both DEPTHAI_RECORD and DEPTHAI_REPLAY are set. Record and replay disabled.");
} else if(!recordPath.empty()) {
if(enableHolisticRecordReplay || utility::checkRecordConfig(recordPath, recordConfig)) {
if(platform::checkWritePermissions(recordPath)) {
if(utility::setupHolisticRecord(parent,
defaultDeviceId,
recordConfig,
recordReplayFilenames,
defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_2
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_X)) {
recordConfig.state = RecordConfig::RecordReplayState::RECORD;
Logging::getInstance().logger.info("Record enabled.");
} else {
Logging::getInstance().logger.warn("Could not set up holistic record. Record and replay disabled.");
}
} else {
Logging::getInstance().logger.warn("Could not set up holistic record. Record and replay disabled.");
Logging::getInstance().logger.warn("DEPTHAI_RECORD path does not have write permissions. Record disabled.");
}
} else {
Logging::getInstance().logger.warn("DEPTHAI_RECORD path does not have write permissions. Record disabled.");
Logging::getInstance().logger.warn("Could not successfully parse DEPTHAI_RECORD. Record disabled.");
}
} else {
Logging::getInstance().logger.warn("Could not successfully parse DEPTHAI_RECORD. Record disabled.");
}
} else if(!replayPath.empty()) {
if(platform::checkPathExists(replayPath)) {
if(platform::checkWritePermissions(replayPath)) {
if(utility::setupHolisticReplay(parent,
replayPath,
defaultDeviceId,
recordConfig,
recordReplayFilenames,
defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_2
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_X)) {
recordConfig.state = RecordConfig::RecordReplayState::REPLAY;
if(platform::checkPathExists(replayPath, true)) {
removeRecordReplayFiles = false;
} else if(!replayPath.empty()) {
if(platform::checkPathExists(replayPath)) {
if(platform::checkWritePermissions(replayPath)) {
if(utility::setupHolisticReplay(parent,
replayPath,
defaultDeviceId,
recordConfig,
recordReplayFilenames,
defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_2
|| defaultDevice->getDeviceInfo().platform == XLinkPlatform_t::X_LINK_MYRIAD_X)) {
recordConfig.state = RecordConfig::RecordReplayState::REPLAY;
if(platform::checkPathExists(replayPath, true)) {
removeRecordReplayFiles = false;
}
Logging::getInstance().logger.info("Replay enabled.");
} else {
Logging::getInstance().logger.warn("Could not set up holistic replay. Record and replay disabled.");
}
Logging::getInstance().logger.info("Replay enabled.");
} else {
Logging::getInstance().logger.warn("Could not set up holistic replay. Record and replay disabled.");
Logging::getInstance().logger.warn("DEPTHAI_REPLAY path does not have write permissions. Replay disabled.");
}
} else {
Logging::getInstance().logger.warn("DEPTHAI_REPLAY path does not have write permissions. Replay disabled.");
Logging::getInstance().logger.warn("DEPTHAI_REPLAY path does not exist or is invalid. Replay disabled.");
}
} else {
Logging::getInstance().logger.warn("DEPTHAI_REPLAY path does not exist or is invalid. Replay disabled.");
}
}
#else
recordConfig.state = RecordConfig::RecordReplayState::NONE;
if(!recordPath.empty() || !replayPath.empty()) {
Logging::getInstance().logger.warn("Merged target is required to use holistic record/replay.");
}
recordConfig.state = RecordConfig::RecordReplayState::NONE;
if(!recordPath.empty() || !replayPath.empty()) {
Logging::getInstance().logger.warn("Merged target is required to use holistic record/replay.");
}
#endif
} catch(std::runtime_error& e) {
Logging::getInstance().logger.warn("Could not set up record / replay: {}", e.what());
} catch(std::runtime_error& e) {
Logging::getInstance().logger.warn("Could not set up record / replay: {}", e.what());
}
} else if(enableHolisticRecordReplay || !recordPath.empty() || !replayPath.empty()) {
throw std::runtime_error("Holistic record/replay is only supported on RVC2 devices for now.");
}
} else if(enableHolisticRecordReplay || !recordPath.empty() || !replayPath.empty()) {
throw std::runtime_error("Holistic record/replay is only supported on RVC2 devices for now.");
}
}

// Go through the build stages sequentially
// Run first build stage for all nodes
for(const auto& node : getAllNodes()) {
node->buildStage1();
}

if(pipelineOnHost) {
// Create pipeline event aggregator node and link
enablePipelineDebugging = enablePipelineDebugging || utility::getEnvAs<bool>("DEPTHAI_PIPELINE_DEBUGGING", false);
if(enablePipelineDebugging) {
// Check if any nodes are on host or device
bool hasHostNodes = false;
bool hasDeviceNodes = false;
for(const auto& node : getAllNodes()) {
if(std::string(node->getName()) == std::string("NodeGroup") || std::string(node->getName()) == std::string("DeviceNodeGroup")) continue;

if(node->runOnHost()) {
hasHostNodes = true;
} else {
hasDeviceNodes = true;
}
}
std::shared_ptr<node::internal::PipelineEventAggregation> hostEventAgg = nullptr;
std::shared_ptr<node::internal::PipelineEventAggregation> deviceEventAgg = nullptr;
if(hasHostNodes) {
hostEventAgg = parent.create<node::internal::PipelineEventAggregation>();
hostEventAgg->setRunOnHost(true);
}
if(hasDeviceNodes) {
deviceEventAgg = parent.create<node::internal::PipelineEventAggregation>();
deviceEventAgg->setRunOnHost(false);
}
for(auto& node : getAllNodes()) {
if(std::string(node->getName()) == std::string("NodeGroup") || std::string(node->getName()) == std::string("DeviceNodeGroup")) continue;

auto threadedNode = std::dynamic_pointer_cast<ThreadedNode>(node);
if(threadedNode) {
if(node->runOnHost() && hostEventAgg && node->id != hostEventAgg->id) {
threadedNode->pipelineEventOutput.link(hostEventAgg->inputs[fmt::format("{} - {}", node->getName(), node->id)]);
} else if(!node->runOnHost() && deviceEventAgg && node->id != deviceEventAgg->id) {
threadedNode->pipelineEventOutput.link(deviceEventAgg->inputs[fmt::format("{} - {}", node->getName(), node->id)]);
}
}
}
auto stateMerge = parent.create<node::PipelineStateMerge>()->build(hasDeviceNodes, hasHostNodes);
if(deviceEventAgg) {
deviceEventAgg->out.link(stateMerge->inputDevice);
stateMerge->outRequest.link(deviceEventAgg->request);
}
if(hostEventAgg) {
hostEventAgg->out.link(stateMerge->inputHost);
stateMerge->outRequest.link(hostEventAgg->request);
}
pipelineStateOut = stateMerge->out.createOutputQueue(1, false);
pipelineStateRequest = stateMerge->request.createInputQueue();
}
}

{
auto allNodes = getAllNodes();
if(std::find_if(allNodes.begin(), allNodes.end(), [](const std::shared_ptr<Node>& n) { return strcmp(n->getName(), "PipelineEventAggregation") == 0; })
== allNodes.end()) {
for(auto& node : allNodes) node->pipelineEventDispatcher->sendEvents = false;
}
}

isBuild = true;

// Go through the build stages sequentially
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should split this in a separate functions as the build() is already hard to follow.

@asahtik asahtik removed the testable PR is ready to be tested label Nov 12, 2025
@asahtik asahtik added the testable PR is ready to be tested label Nov 12, 2025
@asahtik asahtik removed the testable PR is ready to be tested label Nov 12, 2025
@asahtik asahtik added the testable PR is ready to be tested label Nov 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

testable PR is ready to be tested

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants