diff --git a/README.md b/README.md index a02d819159..b71a8c0f07 100644 --- a/README.md +++ b/README.md @@ -215,7 +215,7 @@ The following environment variables can be set to alter default behavior of the | DEPTHAI_ALLOW_FACTORY_FLASHING | Internal use only | | DEPTHAI_LIBUSB_ANDROID_JAVAVM | JavaVM pointer that is passed to libusb for rootless Android interaction with devices. Interpreted as decimal value of uintptr_t | | DEPTHAI_CRASHDUMP | Directory in which to save the crash dump. | -| DEPTHAI_CRASHDUMP_TIMEOUT | Specifies the duration in seconds to wait for device reboot when obtaining a crash dump. Crash dump retrieval disabled if 0. | +| DEPTHAI_CRASHDUMP_TIMEOUT | Specifies the duration in milliseconds to wait for device reboot when obtaining a crash dump. Crash dump retrieval disabled if 0. | | DEPTHAI_ENABLE_ANALYTICS_COLLECTION | Enables automatic analytics collection (pipeline schemas) used to improve the library | | DEPTHAI_DISABLE_CRASHDUMP_COLLECTION | Disables automatic crash dump collection used to improve the library | | DEPTHAI_HUB_API_KEY | API key for the Luxonis Hub | diff --git a/bindings/python/src/DeviceBindings.cpp b/bindings/python/src/DeviceBindings.cpp index f208555fae..d5a7eee266 100644 --- a/bindings/python/src/DeviceBindings.cpp +++ b/bindings/python/src/DeviceBindings.cpp @@ -25,30 +25,6 @@ PYBIND11_MAKE_OPAQUE(std::unordered_map); // Searches for available devices (as Device constructor) // but pooling, to check for python interrupts, and releases GIL in between -template -static auto deviceSearchHelper(Args&&... args) { - bool found; - dai::DeviceInfo deviceInfo; - // releases python GIL - py::gil_scoped_release release; - std::tie(found, deviceInfo) = DEVICE::getAnyAvailableDevice(DEVICE::getDefaultSearchTime(), []() { - py::gil_scoped_acquire acquire; - if(PyErr_CheckSignals() != 0) throw py::error_already_set(); - }); - - // if no devices found, then throw - if(!found) { - auto numConnected = DEVICE::getAllAvailableDevices().size(); - if(numConnected > 0) { - throw std::runtime_error("No available devices (" + std::to_string(numConnected) + " connected, but in use)"); - } else { - throw std::runtime_error("No available devices"); - } - } - - return deviceInfo; -} - // static std::vector deviceGetQueueEventsHelper(dai::Device& d, const std::vector& queueNames, std::size_t maxNumEvents, // std::chrono::microseconds timeout){ // using namespace std::chrono; diff --git a/bindings/python/src/DeviceBindings.hpp b/bindings/python/src/DeviceBindings.hpp index a962249d35..d2670c7756 100644 --- a/bindings/python/src/DeviceBindings.hpp +++ b/bindings/python/src/DeviceBindings.hpp @@ -1,8 +1,33 @@ #pragma once // pybind +#include "depthai/device/Device.hpp" #include "pybind11_common.hpp" +template +static auto deviceSearchHelper(Args&&... args) { + bool found; + dai::DeviceInfo deviceInfo; + // releases python GIL + py::gil_scoped_release release; + std::tie(found, deviceInfo) = DEVICE::getAnyAvailableDevice(DEVICE::getDefaultSearchTime(), []() { + py::gil_scoped_acquire acquire; + if(PyErr_CheckSignals() != 0) throw py::error_already_set(); + }); + + // if no devices found, then throw + if(!found) { + auto numConnected = DEVICE::getAllAvailableDevices().size(); + if(numConnected > 0) { + throw std::runtime_error("No available devices (" + std::to_string(numConnected) + " connected, but in use)"); + } else { + throw std::runtime_error("No available devices"); + } + } + + return deviceInfo; +} + struct DeviceBindings { static void bind(pybind11::module& m, void* pCallstack); }; diff --git a/bindings/python/src/pipeline/PipelineBindings.cpp b/bindings/python/src/pipeline/PipelineBindings.cpp index 58965de9f9..2ca4139e5b 100644 --- a/bindings/python/src/pipeline/PipelineBindings.cpp +++ b/bindings/python/src/pipeline/PipelineBindings.cpp @@ -4,6 +4,7 @@ #include #include +#include "DeviceBindings.hpp" #include "node/NodeBindings.hpp" // depthai @@ -63,7 +64,7 @@ void PipelineBindings::bind(pybind11::module& m, void* pCallstack) { py::class_ globalProperties(m, "GlobalProperties", DOC(dai, GlobalProperties)); py::class_ recordConfig(m, "RecordConfig", DOC(dai, RecordConfig)); py::class_ recordVideoConfig(recordConfig, "VideoEncoding", DOC(dai, RecordConfig, VideoEncoding)); - py::class_ pipeline(m, "Pipeline", DOC(dai, Pipeline, 2)); + py::class_> pipeline(m, "Pipeline", DOC(dai, Pipeline, 2)); /////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////// @@ -102,20 +103,70 @@ void PipelineBindings::bind(pybind11::module& m, void* pCallstack) { .def_readwrite("compressionLevel", &RecordConfig::compressionLevel, DOC(dai, RecordConfig, compressionLevel)); // bind pipeline - pipeline.def(py::init(), py::arg("createImplicitDevice") = true, DOC(dai, Pipeline, Pipeline)) + pipeline + .def(py::init([](bool createImplicitDevice) { + if(createImplicitDevice) { + auto deviceInfo = deviceSearchHelper(); + auto device = std::make_shared(deviceInfo); + auto pipeline = std::make_shared(device); + return pipeline; + } + + assert(createImplicitDevice == false); + return std::make_shared(createImplicitDevice); + }), + py::arg("createImplicitDevice") = true, + DOC(dai, Pipeline, Pipeline)) .def(py::init>(), py::arg("defaultDevice"), DOC(dai, Pipeline, Pipeline)) // Python only methods .def("__enter__", - [](Pipeline& p) -> Pipeline& { - setImplicitPipeline(&p); + [](std::shared_ptr p) -> std::shared_ptr { + setImplicitPipeline(p.get()); return p; }) .def("__exit__", - [](Pipeline& d, py::object type, py::object value, py::object traceback) { - py::gil_scoped_release release; + [](std::shared_ptr pipeline, py::object type, py::object value, py::object traceback) { delImplicitPipeline(); - d.stop(); - d.wait(); + + // Check how we got here (whether through an exception or the end of the with block was simply reached) + bool isException = !type.is_none(); + bool isKeyboardInterrupt = type.is(py::handle(PyExc_KeyboardInterrupt)); + + // Spawn a thread to clean up the pipeline + std::atomic threadRunning{true}; + auto cleanupThread = std::thread([pipeline, &threadRunning, isException]() { + // In case of an exception, we want the device to close as quickly as possible + if(isException) { + pipeline->closeQuick(); + } + pipeline->stop(); + pipeline->wait(); + threadRunning = false; + }); + + // While the thread above is cleaning up, check for interrupts + // https://docs.python.org/3/c-api/exceptions.html#c.PyErr_CheckSignals + py::gil_scoped_release release; + while(threadRunning) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + py::gil_scoped_acquire acquire; + if(PyErr_CheckSignals() != 0) { + // If we are handling a KeyboardInterrupt and another KeyboardInterrupt is thrown, continue + // Otherwise, rethrow the exception + if(PyErr_ExceptionMatches(PyExc_KeyboardInterrupt) && isKeyboardInterrupt) { + try { + throw py::error_already_set(); + } catch(const py::error_already_set& e) { + // Expect this to be thrown, but do nothing + } + continue; + } else { + throw py::error_already_set(); + } + } + } + + cleanupThread.join(); }) //.def(py::init()) .def("getDefaultDevice", &Pipeline::getDefaultDevice, DOC(dai, Pipeline, getDefaultDevice)) @@ -211,7 +262,7 @@ void PipelineBindings::bind(pybind11::module& m, void* pCallstack) { return node; }, py::keep_alive<1, 0>()) - .def("start", &Pipeline::start) + .def("start", &Pipeline::start, py::call_guard(), DOC(dai, Pipeline, start)) .def("wait", [](Pipeline& p) { py::gil_scoped_release release; diff --git a/include/depthai/device/DeviceBase.hpp b/include/depthai/device/DeviceBase.hpp index 3a8afb371c..b1f5e8d9d2 100644 --- a/include/depthai/device/DeviceBase.hpp +++ b/include/depthai/device/DeviceBase.hpp @@ -754,6 +754,11 @@ class DeviceBase { */ void close(); + /** + * @brief Close the device as quickly as possible without performing any cleanup tasks or other checks. Users should preferablly use close() instead. + */ + void closeQuick(); + /** * Is the device already closed (or disconnected) * @@ -816,6 +821,13 @@ class DeviceBase { */ virtual void closeImpl(); + /** + * @brief Whether the device should be closed as quickly as possible. Long-running methods ought to check this flag every so often and return if true, all + * long-running operations should be aborted. + * @note It is a responsibility of the developer to ensure that this flag is checked frequently. + */ + std::atomic shouldCloseQuickly{false}; + protected: // protected functions void init(); diff --git a/include/depthai/pipeline/Pipeline.hpp b/include/depthai/pipeline/Pipeline.hpp index 83457a05f9..bbc6e9c319 100644 --- a/include/depthai/pipeline/Pipeline.hpp +++ b/include/depthai/pipeline/Pipeline.hpp @@ -228,8 +228,17 @@ class PipelineImpl : public std::enable_shared_from_this { void resetConnections(); void disconnectXLinkHosts(); + inline void closeQuick() { + shouldCloseQuickly = true; + if(!isHostOnly()) { + defaultDevice->closeQuick(); + } + } + private: // Resource + std::atomic shouldCloseQuickly{false}; + std::vector loadResource(fs::path uri); std::vector loadResourceCwd(fs::path uri, fs::path cwd, bool moveAsset = false); }; @@ -503,6 +512,10 @@ class Pipeline { impl()->addTask(std::move(task)); } + void closeQuick() { + impl()->closeQuick(); + } + /// Record and Replay void enableHolisticRecord(const RecordConfig& config); void enableHolisticReplay(const std::string& pathToRecording); diff --git a/src/device/DeviceBase.cpp b/src/device/DeviceBase.cpp index 9e909afa3b..4e07fbeb98 100644 --- a/src/device/DeviceBase.cpp +++ b/src/device/DeviceBase.cpp @@ -58,7 +58,7 @@ const std::string MAGIC_FACTORY_FLASHING_VALUE = "413424129"; const std::string MAGIC_FACTORY_PROTECTED_FLASHING_VALUE = "868632271"; constexpr int DEVICE_SEARCH_FIRST_TIMEOUT_MS = 30; -const unsigned int DEFAULT_CRASHDUMP_TIMEOUT = 9000; +const unsigned int DEFAULT_CRASHDUMP_TIMEOUT_MS = 9000; const unsigned int RPC_READ_TIMEOUT = 10000; // local static function @@ -401,11 +401,13 @@ void DeviceBase::close() { } } +void DeviceBase::closeQuick() { + shouldCloseQuickly = true; +} + unsigned int getCrashdumpTimeout(XLinkProtocol_t protocol) { - int defaultTimeout = - DEFAULT_CRASHDUMP_TIMEOUT + (protocol == X_LINK_TCP_IP ? device::XLINK_TCP_WATCHDOG_TIMEOUT.count() : device::XLINK_USB_WATCHDOG_TIMEOUT.count()); - int timeoutSeconds = utility::getEnvAs("DEPTHAI_CRASHDUMP_TIMEOUT", defaultTimeout); - int timeoutMs = timeoutSeconds * 1000; + std::chrono::milliseconds protocolTimeout = (protocol == X_LINK_TCP_IP ? device::XLINK_TCP_WATCHDOG_TIMEOUT : device::XLINK_USB_WATCHDOG_TIMEOUT); + int timeoutMs = utility::getEnvAs("DEPTHAI_CRASHDUMP_TIMEOUT", DEFAULT_CRASHDUMP_TIMEOUT_MS + protocolTimeout.count()); return timeoutMs; } @@ -416,7 +418,7 @@ void DeviceBase::closeImpl() { bool shouldGetCrashDump = false; // Check if the device is RVC3 - in case it is, crash dump retrieval is done differently bool isRvc2 = deviceInfo.platform == X_LINK_MYRIAD_X; - if(!dumpOnly && isRvc2) { + if(!dumpOnly && isRvc2 && !shouldCloseQuickly) { pimpl->logger.debug("Device about to be closed..."); try { if(hasCrashDump()) { @@ -434,9 +436,8 @@ void DeviceBase::closeImpl() { shouldGetCrashDump = true; } } - bool waitForGate = true; - if(!isRvc2) { + if(!isRvc2 && !shouldCloseQuickly) { // Check if the device is still alive and well, if yes, don't wait for gate, crash dump not relevant try { waitForGate = !pimpl->rpcClient->call("isRunning").as(); @@ -445,15 +446,22 @@ void DeviceBase::closeImpl() { pimpl->logger.debug("isRunning call error: {}", ex.what()); } } - // Close connection first; causes Xlink internal calls to unblock semaphore waits and // return error codes, which then allows queues to unblock // always manage ownership because other threads (e.g. watchdog) are running and need to // keep the shared_ptr valid (even if closed). Otherwise leads to using null pointers, // invalid memory, etc. which hard crashes main app - connection->close(); - - if(gate && !waitForGate) { + try { + std::lock_guard lock(watchdogMtx); + if(watchdogRunning) { + if(pimpl->rpcClient->call("isRunning").as()) { + connection->close(); + } + } + } catch(const std::exception& ex) { + pimpl->logger.debug("isRunning call error: {}", ex.what()); + } + if(gate && !waitForGate && !shouldCloseQuickly) { gate->destroySession(); } { @@ -461,14 +469,11 @@ void DeviceBase::closeImpl() { watchdogRunning = false; watchdogCondVar.notify_all(); } - if(watchdogThread.joinable()) watchdogThread.join(); - // Stop various threads timesyncRunning = false; loggingRunning = false; profilingRunning = false; - // Then stop timesync if(timesyncThread.joinable()) timesyncThread.join(); // And at the end stop logging thread @@ -477,20 +482,18 @@ void DeviceBase::closeImpl() { if(profilingThread.joinable()) profilingThread.join(); // At the end stop the monitor thread if(monitorThread.joinable()) monitorThread.join(); - // If the device was operated through gate, wait for the session to end - if(gate && waitForGate) { + if(gate && waitForGate && !shouldCloseQuickly) { auto crashDump = gate->waitForSessionEnd(); if(crashDump) { logCollection::logCrashDump(pipelineSchema, crashDump.value(), deviceInfo); } } - // Close rpcStream pimpl->rpcStream = nullptr; pimpl->rpcClient = nullptr; - if(!dumpOnly) { + if(!dumpOnly && !shouldCloseQuickly) { auto timeout = getCrashdumpTimeout(deviceInfo.protocol); // Get crash dump if needed if(shouldGetCrashDump && timeout > 0) { @@ -501,11 +504,20 @@ void DeviceBase::closeImpl() { do { DeviceInfo rebootingDeviceInfo; std::tie(found, rebootingDeviceInfo) = XLinkConnection::getDeviceById(deviceInfo.getDeviceId(), X_LINK_ANY_STATE, false); + if(shouldCloseQuickly) { + break; + } if(found && (rebootingDeviceInfo.state == X_LINK_UNBOOTED || rebootingDeviceInfo.state == X_LINK_BOOTLOADER)) { pimpl->logger.trace("Found rebooting device in {}ns", duration_cast(steady_clock::now() - t1).count()); DeviceBase rebootingDevice(config, rebootingDeviceInfo, firmwarePath, true); + if(shouldCloseQuickly) { + break; + } if(rebootingDevice.hasCrashDump()) { auto dump = rebootingDevice.getCrashDump(); + if(shouldCloseQuickly) { + break; + } logCollection::logCrashDump(pipelineSchema, dump, deviceInfo); } else { pimpl->logger.warn("Device crashed, but no crash dump could be extracted."); @@ -513,6 +525,9 @@ void DeviceBase::closeImpl() { gotDump = true; break; } + if(shouldCloseQuickly) { + break; + } } while(!found && steady_clock::now() - t1 < std::chrono::milliseconds(timeout)); if(!gotDump) { pimpl->logger.error("Device likely crashed but did not reboot in time to get the crash dump"); @@ -520,7 +535,6 @@ void DeviceBase::closeImpl() { } else if(shouldGetCrashDump) { pimpl->logger.warn("Device crashed. Crash dump retrieval disabled."); } - pimpl->logger.debug("Device closed, {}", duration_cast(steady_clock::now() - t1).count()); } } @@ -961,6 +975,9 @@ void DeviceBase::monitorCallback(std::chrono::milliseconds watchdogTimeout, Prev // Ping with a period half of that of the watchdog timeout std::unique_lock lock(watchdogMtx); watchdogCondVar.wait_for(lock, watchdogTimeout, [this]() { return !watchdogRunning; }); + if(shouldCloseQuickly) { + break; + } // Check if wd was pinged in the specified watchdogTimeout time. decltype(lastWatchdogPingTime) prevPingTime; { @@ -1036,16 +1053,30 @@ void DeviceBase::monitorCallback(std::chrono::milliseconds watchdogTimeout, Prev auto reconnected = false; for(attempts = 0; attempts < maxReconnectionAttempts; attempts++) { if(reconnectionCallback) reconnectionCallback(ReconnectionStatus::RECONNECTING); - if(std::get<0>(getAnyAvailableDevice(reconnectTimeout))) { - init2(prev.cfg, prev.pathToMvcmd, prev.hasPipeline, true); - if(hasCrashDump()) { - auto dump = getCrashDump(); - logCollection::logCrashDump(pipelineSchema, dump, deviceInfo); + + auto start_time = std::chrono::steady_clock::now(); + constexpr std::chrono::milliseconds attemptTimeout = std::chrono::milliseconds(200); + bool extra = false; + + while(std::chrono::steady_clock::now() - start_time < reconnectTimeout) { + if(std::get<0>(getAnyAvailableDevice(attemptTimeout))) { + init2(prev.cfg, prev.pathToMvcmd, prev.hasPipeline, true); + if(hasCrashDump()) { + auto dump = getCrashDump(); + logCollection::logCrashDump(pipelineSchema, dump, deviceInfo); + } + auto shared = pipelinePtr.lock(); + if(!shared) throw std::runtime_error("Pipeline was destroyed"); + shared->resetConnections(); + reconnected = true; + break; + } + if(shouldCloseQuickly) { + extra = true; + break; } - auto shared = pipelinePtr.lock(); - if(!shared) throw std::runtime_error("Pipeline was destroyed"); - shared->resetConnections(); - reconnected = true; + } + if(extra) { break; } pimpl->logger.warn("Reconnection unsuccessful, trying again. Attempts left: {}\n", maxReconnectionAttempts - attempts - 1);