diff --git a/README.md b/README.md index 779bfe0208..90b6f662e7 100644 --- a/README.md +++ b/README.md @@ -217,6 +217,7 @@ The following environment variables can be set to alter default behavior of the | DEPTHAI_CRASHDUMP_TIMEOUT | Specifies the duration in milliseconds to wait for device reboot when obtaining a crash dump. Crash dump retrieval disabled if 0. | | DEPTHAI_ENABLE_ANALYTICS_COLLECTION | Enables automatic analytics collection (pipeline schemas) used to improve the library | | DEPTHAI_DISABLE_CRASHDUMP_COLLECTION | Disables automatic crash dump collection used to improve the library | +| DEPTHAI_HUB_EVENTS_BASE_URL | URL for the Luxonis Hub | | DEPTHAI_HUB_API_KEY | API key for the Luxonis Hub | | DEPTHAI_ZOO_INTERNET_CHECK | (Default) 1 - perform internet check, if available, download the newest model version 0 - skip internet check and use cached model | | DEPTHAI_ZOO_INTERNET_CHECK_TIMEOUT | (Default) 1000 - timeout in milliseconds for the internet check | diff --git a/bindings/python/src/utility/EventsManagerBindings.cpp b/bindings/python/src/utility/EventsManagerBindings.cpp index 19e3dfc209..e09d314413 100644 --- a/bindings/python/src/utility/EventsManagerBindings.cpp +++ b/bindings/python/src/utility/EventsManagerBindings.cpp @@ -21,53 +21,112 @@ void EventsManagerBindings::bind(pybind11::module& m, void* pCallstack) { #ifdef DEPTHAI_ENABLE_EVENTS_MANAGER using namespace dai::utility; - py::class_>(m, "EventData") - .def(py::init(), py::arg("data"), py::arg("fileName"), py::arg("mimeType")) - .def(py::init(), py::arg("fileUrl")) - .def(py::init&, std::string>(), py::arg("imgFrame"), py::arg("fileName")) - .def(py::init&, std::string>(), py::arg("encodedFrame"), py::arg("fileName")) - .def(py::init&, std::string>(), py::arg("nnData"), py::arg("fileName")); + py::class_>(m, "FileGroup") + .def(py::init<>()) + .def("addFile", + static_cast(&FileGroup::addFile), + py::arg("fileName"), + py::arg("data"), + py::arg("mimeType"), + DOC(dai, utility, FileGroup, addFile)) + .def("addFile", + static_cast(&FileGroup::addFile), + py::arg("fileName"), + py::arg("filePath"), + DOC(dai, utility, FileGroup, addFile)) + .def("addFile", + static_cast&, const std::shared_ptr&)>(&FileGroup::addFile), + py::arg("fileName"), + py::arg("imgFrame"), + DOC(dai, utility, FileGroup, addFile)) + .def("addFile", + static_cast&, const std::shared_ptr&)>(&FileGroup::addFile), + py::arg("fileName"), + py::arg("encodedFrame"), + DOC(dai, utility, FileGroup, addFile)) + //.def("addFile", + // static_cast&)>(&FileGroup::addFile), + // py::arg("fileName"), + // py::arg("nnData"), + // DOC(dai, utility, FileGroup, addFile)) + .def("addFile", + static_cast&, const std::shared_ptr&)>(&FileGroup::addFile), + py::arg("fileName"), + py::arg("imgDetections"), + DOC(dai, utility, FileGroup, addFile)) + .def("addImageDetectionsPair", + static_cast&, const std::shared_ptr&, const std::shared_ptr&)>( + &FileGroup::addImageDetectionsPair), + py::arg("fileName"), + py::arg("imgFrame"), + py::arg("imgDetections"), + DOC(dai, utility, FileGroup, addImageDetectionsPair)) + .def("addImageDetectionsPair", + static_cast&, const std::shared_ptr&, const std::shared_ptr&)>( + &FileGroup::addImageDetectionsPair), + py::arg("fileName"), + py::arg("encodedFrame"), + py::arg("imgDetections"), + DOC(dai, utility, FileGroup, addImageDetectionsPair)); + //.def("addImageNNDataPair", + // static_cast&, const std::shared_ptr&)>(&FileGroup::addImageNNDataPair), + // py::arg("fileName"), + // py::arg("imgFrame"), + // py::arg("nnData"), + // DOC(dai, utility, FileGroup, addImageNNDataPair)) + //.def( + // "addImageNNDataPair", + // static_cast&, const std::shared_ptr&)>(&FileGroup::addImageNNDataPair), + // py::arg("fileName"), + // py::arg("encodedFrame"), + // py::arg("nnData"), + // DOC(dai, utility, FileGroup, addImageNNDataPair)); py::class_(m, "EventsManager") .def(py::init<>()) - .def(py::init(), py::arg("url"), py::arg("uploadCachedOnStart") = false, py::arg("publishInterval") = 10.0) - .def("setUrl", &EventsManager::setUrl, py::arg("url"), DOC(dai, utility, EventsManager, setUrl)) - .def("setSourceAppId", &EventsManager::setSourceAppId, py::arg("sourceAppId"), DOC(dai, utility, EventsManager, setSourceAppId)) - .def("setSourceAppIdentifier", - &EventsManager::setSourceAppIdentifier, - py::arg("sourceAppIdentifier"), - DOC(dai, utility, EventsManager, setSourceAppIdentifier)) + .def(py::init(), py::arg("uploadCachedOnStart") = false) .def("setToken", &EventsManager::setToken, py::arg("token"), DOC(dai, utility, EventsManager, setToken)) - .def("setQueueSize", &EventsManager::setQueueSize, py::arg("queueSize"), DOC(dai, utility, EventsManager, setQueueSize)) .def("setLogResponse", &EventsManager::setLogResponse, py::arg("logResponse"), DOC(dai, utility, EventsManager, setLogResponse)) - .def("setDeviceSerialNumber", - &EventsManager::setDeviceSerialNumber, - py::arg("deviceSerialNumber"), - DOC(dai, utility, EventsManager, setDeviceSerialNumber)) .def("setVerifySsl", &EventsManager::setVerifySsl, py::arg("verifySsl"), DOC(dai, utility, EventsManager, setVerifySsl)) .def("setCacheDir", &EventsManager::setCacheDir, py::arg("cacheDir"), DOC(dai, utility, EventsManager, setCacheDir)) .def("setCacheIfCannotSend", &EventsManager::setCacheIfCannotSend, py::arg("cacheIfCannotUpload"), DOC(dai, utility, EventsManager, setCacheIfCannotSend)) - .def("checkConnection", &EventsManager::checkConnection, DOC(dai, utility, EventsManager, checkConnection)) - .def("uploadCachedData", &EventsManager::uploadCachedData, DOC(dai, utility, EventsManager, uploadCachedData)) .def("sendEvent", &EventsManager::sendEvent, py::arg("name"), - py::arg("imgFrame").none(true) = nullptr, - py::arg("data") = std::vector>(), py::arg("tags") = std::vector(), - py::arg("extraData") = std::unordered_map(), + py::arg("extras") = std::unordered_map(), py::arg("deviceSerialNo") = "", + py::arg("associateFiles") = std::vector(), DOC(dai, utility, EventsManager, sendEvent)) .def("sendSnap", - &EventsManager::sendSnap, + static_cast, + const std::vector&, + const std::unordered_map&, + const std::string&)>(&EventsManager::sendSnap), + py::arg("name"), + py::arg("fileGroup") = std::shared_ptr(), + py::arg("tags") = std::vector(), + py::arg("extras") = std::unordered_map(), + py::arg("deviceSerialNo") = "", + DOC(dai, utility, EventsManager, sendSnap)) + .def("sendSnap", + static_cast&, + const std::shared_ptr, + const std::optional>&, + const std::vector&, + const std::unordered_map&, + const std::string&)>(&EventsManager::sendSnap), py::arg("name"), - py::arg("imgFrame").none(true) = nullptr, - py::arg("data") = std::vector>(), + py::arg("fileName"), + py::arg("imgFrame"), + py::arg("imgDetections"), py::arg("tags") = std::vector(), - py::arg("extraData") = std::unordered_map(), + py::arg("extras") = std::unordered_map(), py::arg("deviceSerialNo") = "", DOC(dai, utility, EventsManager, sendSnap)); #endif diff --git a/examples/cpp/Events/CMakeLists.txt b/examples/cpp/Events/CMakeLists.txt index f9824550ad..f5298decdc 100644 --- a/examples/cpp/Events/CMakeLists.txt +++ b/examples/cpp/Events/CMakeLists.txt @@ -6,4 +6,5 @@ cmake_minimum_required(VERSION 3.10) if(DEPTHAI_ENABLE_EVENTS_MANAGER) dai_add_example(events events.cpp OFF OFF) + dai_add_example(events_file_group events_file_group.cpp OFF OFF) endif() \ No newline at end of file diff --git a/examples/cpp/Events/events.cpp b/examples/cpp/Events/events.cpp index dba3019a50..9c58b27aaf 100644 --- a/examples/cpp/Events/events.cpp +++ b/examples/cpp/Events/events.cpp @@ -1,43 +1,80 @@ #include #include +#include #include #include "depthai/depthai.hpp" #include "depthai/utility/EventsManager.hpp" -int main(int argc, char* argv[]) { +// Helper function to normalize frame coordinates +cv::Rect frameNorm(const cv::Mat& frame, const dai::Point2f& topLeft, const dai::Point2f& bottomRight) { + float width = frame.cols, height = frame.rows; + return cv::Rect(cv::Point(topLeft.x * width, topLeft.y * height), cv::Point(bottomRight.x * width, bottomRight.y * height)); +} + +int main() { dai::Pipeline pipeline(true); + // Enter you hub team's api-key auto eventsManager = std::make_shared(); - eventsManager->setLogResponse(true); - // Color camera node + eventsManager->setToken(""); + auto camRgb = pipeline.create()->build(); - auto* preview = camRgb->requestOutput(std::make_pair(256, 256)); + auto detectionNetwork = pipeline.create(); - auto previewQ = preview->createOutputQueue(); + dai::NNModelDescription modelDescription; + modelDescription.model = "yolov6-nano"; + detectionNetwork->build(camRgb, modelDescription); + auto labelMap = detectionNetwork->getClasses(); - pipeline.start(); - bool sent = false; - eventsManager->sendEvent("test", nullptr, {}, {"tag1", "tag2"}, {{"key1", "value1"}}); + // Create output queues + auto qRgb = detectionNetwork->passthrough.createOutputQueue(); + auto qDet = detectionNetwork->out.createOutputQueue(); - std::this_thread::sleep_for(std::chrono::milliseconds(7000)); + pipeline.start(); - auto fileData = std::make_shared("abc", "test_bin.txt", "text/plain"); - std::vector> data; - data.emplace_back(fileData); - eventsManager->sendEvent("testdata", nullptr, data, {"tag3", "tag4"}, {{"key8", "value8"}}); while(pipeline.isRunning()) { - auto rgb = previewQ->get(); + if(cv::waitKey(1) == 'q') { + break; + } - // Do something with the data - // ... + auto inRgb = qRgb->get(); + auto inDet = qDet->get(); + if(inRgb == nullptr || inDet == nullptr) { + continue; + } + + // Display the video stream and detections + cv::Mat frame = inRgb->getCvFrame(); + if(!frame.empty()) { + // Display detections + for(const auto& detection : inDet->detections) { + auto bbox = frameNorm(frame, dai::Point2f(detection.xmin, detection.ymin), dai::Point2f(detection.xmax, detection.ymax)); + + // Draw label + cv::putText( + frame, labelMap.value()[detection.label], cv::Point(bbox.x + 10, bbox.y + 20), cv::FONT_HERSHEY_TRIPLEX, 0.5, cv::Scalar(255, 255, 255)); + + // Draw confidence + cv::putText(frame, + std::to_string(static_cast(detection.confidence * 100)) + "%", + cv::Point(bbox.x + 10, bbox.y + 40), + cv::FONT_HERSHEY_TRIPLEX, + 0.5, + cv::Scalar(255, 255, 255)); + + // Draw rectangle + cv::rectangle(frame, bbox, cv::Scalar(255, 0, 0), 2); + } + + // Show the frame + cv::imshow("rgb", frame); + } - if(!sent) { - eventsManager->sendSnap("rgb", rgb, {}, {"tag11", "tag12"}, {{"key", "value"}}); - sent = true; + // Trigger sendSnap() + if(cv::waitKey(1) == 's') { + eventsManager->sendSnap("ImageDetection", std::nullopt, inRgb, inDet, {"EventsExample", "C++"}, {{"key_0", "value_0"}, {"key_1", "value_1"}}); } - // - std::this_thread::sleep_for(std::chrono::milliseconds(200)); } return EXIT_SUCCESS; diff --git a/examples/cpp/Events/events_file_group.cpp b/examples/cpp/Events/events_file_group.cpp new file mode 100644 index 0000000000..286ea0ebf2 --- /dev/null +++ b/examples/cpp/Events/events_file_group.cpp @@ -0,0 +1,98 @@ +#include +#include +#include +#include + +#include "depthai/depthai.hpp" +#include "depthai/utility/EventsManager.hpp" + +// Helper function to normalize frame coordinates +cv::Rect frameNorm(const cv::Mat& frame, const dai::Point2f& topLeft, const dai::Point2f& bottomRight) { + float width = frame.cols, height = frame.rows; + return cv::Rect(cv::Point(topLeft.x * width, topLeft.y * height), cv::Point(bottomRight.x * width, bottomRight.y * height)); +} + +int main() { + dai::Pipeline pipeline(true); + + // Enter you hub team's api-key + auto eventsManager = std::make_shared(); + eventsManager->setToken(""); + + auto camRgb = pipeline.create()->build(); + auto detectionNetwork = pipeline.create(); + + dai::NNModelDescription modelDescription; + modelDescription.model = "yolov6-nano"; + detectionNetwork->build(camRgb, modelDescription); + auto labelMap = detectionNetwork->getClasses(); + + // Create output queues + auto qRgb = detectionNetwork->passthrough.createOutputQueue(); + auto qDet = detectionNetwork->out.createOutputQueue(); + + pipeline.start(); + + int counter = 0; + while(pipeline.isRunning()) { + if(cv::waitKey(1) == 'q') { + break; + } + + auto inRgb = qRgb->get(); + auto inDet = qDet->get(); + if(inRgb == nullptr || inDet == nullptr) { + continue; + } + + // Display the video stream and detections + cv::Mat frame = inRgb->getCvFrame(); + if(!frame.empty()) { + // Display detections + for(const auto& detection : inDet->detections) { + auto bbox = frameNorm(frame, dai::Point2f(detection.xmin, detection.ymin), dai::Point2f(detection.xmax, detection.ymax)); + + // Draw label + cv::putText( + frame, labelMap.value()[detection.label], cv::Point(bbox.x + 10, bbox.y + 20), cv::FONT_HERSHEY_TRIPLEX, 0.5, cv::Scalar(255, 255, 255)); + + // Draw confidence + cv::putText(frame, + std::to_string(static_cast(detection.confidence * 100)) + "%", + cv::Point(bbox.x + 10, bbox.y + 40), + cv::FONT_HERSHEY_TRIPLEX, + 0.5, + cv::Scalar(255, 255, 255)); + + // Draw rectangle + cv::rectangle(frame, bbox, cv::Scalar(255, 0, 0), 2); + } + + // Show the frame + cv::imshow("rgb", frame); + } + + // Suppose we are only interested in the detections with confidence between 50% and 60% + auto borderDetections = std::make_shared(); + for(const auto& detection : inDet->detections) { + if(detection.confidence > 0.5f && detection.confidence < 0.6f) { + borderDetections->detections.emplace_back(detection); + } + } + + // Are there any border detections + if(borderDetections->detections.size() > 0) { + std::string fileName = "ImageDetection_"; + std::stringstream ss; + ss << fileName << counter; + + auto fileGroup = std::make_shared(); + fileGroup->addImageDetectionsPair(ss.str(), inRgb, borderDetections); + eventsManager->sendSnap("LowConfidenceDetection", fileGroup, {"EventsExample", "C++"}, {{"key_0", "value_0"}, {"key_1", "value_1"}}); + + counter++; + } + } + + return EXIT_SUCCESS; +} diff --git a/examples/python/Events/events.py b/examples/python/Events/events.py index fdedb29ca1..27544e3ee5 100644 --- a/examples/python/Events/events.py +++ b/examples/python/Events/events.py @@ -8,37 +8,67 @@ # Create pipeline with dai.Pipeline() as pipeline: - # Define sources and outputs - camRgb = pipeline.create(dai.node.Camera).build() - # Properties + # Enter you hub team's api-key + eventMan = dai.EventsManager() + eventMan.setToken("") - qRgb = camRgb.requestOutput((256,256)).createOutputQueue() + cameraNode = pipeline.create(dai.node.Camera).build() + detectionNetwork = pipeline.create(dai.node.DetectionNetwork).build(cameraNode, dai.NNModelDescription("yolov6-nano")) + labelMap = detectionNetwork.getClasses() - eventMan = dai.EventsManager() - eventMan.setLogResponse(True) + # Create output queues + qRgb = detectionNetwork.passthrough.createOutputQueue() + qDet = detectionNetwork.out.createOutputQueue() - eventMan.sendEvent("test1", None, [], ["tag1", "tag2"], {"key1": "value1"}) - time.sleep(2) - fileData = dai.EventData(b'Hello, world!', "hello.txt", "text/plain") - eventMan.sendEvent("test2", None, [fileData], ["tag1", "tag2"], {"key1": "value1"}) pipeline.start() - frame = None - counter = 0 + + # nn data, being the bounding box locations, are in <0..1> range - they need to be normalized with frame width/height + def frameNorm(frame, bbox): + normVals = np.full(len(bbox), frame.shape[0]) + normVals[::2] = frame.shape[1] + return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int) - eventSent = False while pipeline.isRunning(): + if cv2.waitKey(1) == ord("q"): + pipeline.stop() + break + inRgb: dai.ImgFrame = qRgb.get() - if inRgb is not None: - frame = inRgb.getCvFrame() - if not eventSent: - eventMan.sendSnap("rgb", inRgb, [], ["tag1", "tag2"], {"key1": "value1"}) - eventSent = True + inDet: dai.ImgDetections = qDet.get() + if inRgb is None or inDet is None: + continue + # Display the video stream and detections + color = (255, 0, 0) + frame = inRgb.getCvFrame() if frame is not None: + for detection in inDet.detections: + bbox = frameNorm( + frame, + (detection.xmin, detection.ymin, detection.xmax, detection.ymax), + ) + cv2.putText( + frame, + labelMap[detection.label], + (bbox[0] + 10, bbox[1] + 20), + cv2.FONT_HERSHEY_TRIPLEX, + 0.5, + 255, + ) + cv2.putText( + frame, + f"{int(detection.confidence * 100)}%", + (bbox[0] + 10, bbox[1] + 40), + cv2.FONT_HERSHEY_TRIPLEX, + 0.5, + 255, + ) + cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) + # Show the frame cv2.imshow("rgb", frame) - if cv2.waitKey(1) == ord("q"): - pipeline.stop() - break + # Trigger sendSnap() + if cv2.waitKey(1) == ord("s"): + eventMan.sendSnap("ImageDetection", None, inRgb, inDet, ["EventsExample", "Python"], {"key_0" : "value_0", "key_1" : "value_1"}) \ No newline at end of file diff --git a/examples/python/Events/events_file_group.py b/examples/python/Events/events_file_group.py new file mode 100644 index 0000000000..3cb21aea4e --- /dev/null +++ b/examples/python/Events/events_file_group.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +import cv2 +import depthai as dai +import numpy as np +import time + + +# Create pipeline +with dai.Pipeline() as pipeline: + # Enter you hub team's api-key + eventMan = dai.EventsManager() + eventMan.setToken("") + + cameraNode = pipeline.create(dai.node.Camera).build() + detectionNetwork = pipeline.create(dai.node.DetectionNetwork).build(cameraNode, dai.NNModelDescription("yolov6-nano")) + labelMap = detectionNetwork.getClasses() + + # Create output queues + qRgb = detectionNetwork.passthrough.createOutputQueue() + qDet = detectionNetwork.out.createOutputQueue() + + pipeline.start() + + + # nn data, being the bounding box locations, are in <0..1> range - they need to be normalized with frame width/height + def frameNorm(frame, bbox): + normVals = np.full(len(bbox), frame.shape[0]) + normVals[::2] = frame.shape[1] + return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int) + + + counter = 0 + while pipeline.isRunning(): + if cv2.waitKey(1) == ord("q"): + pipeline.stop() + break + + inRgb: dai.ImgFrame = qRgb.get() + inDet: dai.ImgDetections = qDet.get() + if inRgb is None or inDet is None: + continue + + # Display the video stream and detections + color = (255, 0, 0) + frame = inRgb.getCvFrame() + if frame is not None: + for detection in inDet.detections: + bbox = frameNorm( + frame, + (detection.xmin, detection.ymin, detection.xmax, detection.ymax), + ) + cv2.putText( + frame, + labelMap[detection.label], + (bbox[0] + 10, bbox[1] + 20), + cv2.FONT_HERSHEY_TRIPLEX, + 0.5, + 255, + ) + cv2.putText( + frame, + f"{int(detection.confidence * 100)}%", + (bbox[0] + 10, bbox[1] + 40), + cv2.FONT_HERSHEY_TRIPLEX, + 0.5, + 255, + ) + cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) + # Show the frame + cv2.imshow("rgb", frame) + + # Suppose we are only interested in the detections with confidence between 50% and 60% + borderDetectionsList = [] + for detection in inDet.detections: + if detection.confidence > 0.5 and detection.confidence < 0.6: + borderDetectionsList.append(detection) + + # Are there any border detections + if len(borderDetectionsList) > 0: + borderDetections = dai.ImgDetections() + borderDetections.detections = borderDetectionsList + fileName = f"ImageDetection_{counter}" + + fileGroup = dai.FileGroup() + fileGroup.addImageDetectionsPair(fileName, inRgb, borderDetections) + eventMan.sendSnap("LowConfidenceDetection", fileGroup, ["EventsExample", "Python"], {"key_0" : "value_0", "key_1" : "value_1"}) + + counter += 1 \ No newline at end of file diff --git a/include/depthai/utility/EventsManager.hpp b/include/depthai/utility/EventsManager.hpp index 17b0d4410d..09ba9caa12 100644 --- a/include/depthai/utility/EventsManager.hpp +++ b/include/depthai/utility/EventsManager.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -11,6 +12,7 @@ #include "depthai/pipeline/datatype/ADatatype.hpp" #include "depthai/pipeline/datatype/EncodedFrame.hpp" +#include "depthai/pipeline/datatype/ImgDetections.hpp" #include "depthai/pipeline/datatype/ImgFrame.hpp" #include "depthai/pipeline/datatype/NNData.hpp" @@ -18,95 +20,112 @@ namespace dai { namespace proto { namespace event { class Event; +class FileUploadGroupResult; +enum PrepareFileUploadClass : int; } // namespace event } // namespace proto + namespace utility { -enum class EventDataType { DATA, FILE_URL, IMG_FRAME, ENCODED_FRAME, NN_DATA }; -class EventData { + +class FileData { public: - EventData(const std::string& data, const std::string& fileName, const std::string& mimeType); - explicit EventData(std::string fileUrl); - explicit EventData(const std::shared_ptr& imgFrame, std::string fileName); - explicit EventData(const std::shared_ptr& encodedFrame, std::string fileName); - explicit EventData(const std::shared_ptr& nnData, std::string fileName); - bool toFile(const std::string& path); + FileData(std::string data, std::string fileName, std::string mimeType); + explicit FileData(std::filesystem::path filePath, std::string fileName); + explicit FileData(const std::shared_ptr& imgFrame, std::string fileName); + explicit FileData(const std::shared_ptr& encodedFrame, std::string fileName); + // explicit FileData(const std::shared_ptr& nnData, std::string fileName); + explicit FileData(const std::shared_ptr& imgDetections, std::string fileName); + bool toFile(const std::filesystem::path& inputPath); private: - std::string fileName; std::string mimeType; + std::string fileName; std::string data; - EventDataType type; + uint64_t size; + std::string checksum; + proto::event::PrepareFileUploadClass classification; + friend class EventsManager; +}; + +class FileGroup { + public: + void addFile(std::string fileName, std::string data, std::string mimeType); + void addFile(std::string fileName, std::filesystem::path filePath); + void addFile(const std::optional& fileName, const std::shared_ptr& imgFrame); + void addFile(const std::optional& fileName, const std::shared_ptr& encodedFrame); + // void addFile(std::string fileName, const std::shared_ptr& nnData); + void addFile(const std::optional& fileName, const std::shared_ptr& imgDetections); + void addImageDetectionsPair(const std::optional& fileName, + const std::shared_ptr& imgFrame, + const std::shared_ptr& imgDetections); + void addImageDetectionsPair(const std::optional& fileName, + const std::shared_ptr& encodedFrame, + const std::shared_ptr& imgDetections); + // void addImageNNDataPair(std::string fileName, const std::shared_ptr& imgFrame, const std::shared_ptr& imgDetections); + // void addImageNNDataPair(std::string fileName, const std::shared_ptr& encodedFrame, const std::shared_ptr& imgDetections); + + private: + std::vector> fileData; friend class EventsManager; }; + class EventsManager { public: - explicit EventsManager(std::string url = "https://events-ingest.cloud.luxonis.com", bool uploadCachedOnStart = false, float publishInterval = 10.0); + explicit EventsManager(bool uploadCachedOnStart = false); ~EventsManager(); /** * Send an event to the events service * @param name Name of the event - * @param imgFrame Image frame to send - * @param data List of EventData objects to send * @param tags List of tags to send - * @param extraData Extra data to send + * @param extras Extra data to send * @param deviceSerialNo Device serial number + * @param associateFiles List of associate files with ids * @return bool */ bool sendEvent(const std::string& name, - const std::shared_ptr& imgFrame = nullptr, - std::vector> data = {}, const std::vector& tags = {}, - const std::unordered_map& extraData = {}, - const std::string& deviceSerialNo = ""); + const std::unordered_map& extras = {}, + const std::string& deviceSerialNo = "", + const std::vector& associateFiles = {}); /** - * Send a snap to the events service. Snaps should be used for sending images and other large files. + * Send a snap to the events service. Snaps should be used for sending images and other files. * @param name Name of the snap - * @param imgFrame Image frame to send - * @param data List of EventData objects to send + * @param fileGroup FileGroup containing FileData objects to send * @param tags List of tags to send - * @param extraData Extra data to send + * @param extras Extra data to send * @param deviceSerialNo Device serial number * @return bool */ bool sendSnap(const std::string& name, - const std::shared_ptr& imgFrame = nullptr, - std::vector> data = {}, + const std::shared_ptr fileGroup, const std::vector& tags = {}, - const std::unordered_map& extraData = {}, + const std::unordered_map& extras = {}, const std::string& deviceSerialNo = ""); - - void setDeviceSerialNumber(const std::string& deviceSerialNumber); - /** - * Set the URL of the events service. By default, the URL is set to https://events-ingest.cloud.luxonis.com - * @param url URL of the events service - * @return void - */ - void setUrl(const std::string& url); /** - * Set the source app ID. By default, the source app ID is taken from the environment variable AGENT_APP_ID - * @param sourceAppId Source app ID - * @return void - */ - void setSourceAppId(const std::string& sourceAppId); - /** - * Set the source app identifier. By default, the source app identifier is taken from the environment variable AGENT_APP_IDENTIFIER - * @param sourceAppIdentifier Source app identifier - * @return void + * Send a snap to the events service, with an ImgFrame and ImgDetections pair as files + * @param name Name of the snap + * @param fileName File name used to create FileData + * @param imgFrame ImgFrame to send + * @param imgDetections ImgDetections to sent + * @param tags List of tags to send + * @param extras Extra data to send + * @param deviceSerialNo Device serial number + * @return bool */ - void setSourceAppIdentifier(const std::string& sourceAppIdentifier); + bool sendSnap(const std::string& name, + const std::optional& fileName, + const std::shared_ptr imgFrame, + const std::optional>& imgDetections = std::nullopt, + const std::vector& tags = {}, + const std::unordered_map& extras = {}, + const std::string& deviceSerialNo = ""); /** * Set the token for the events service. By default, the token is taken from the environment variable DEPTHAI_HUB_API_KEY * @param token Token for the events service * @return void */ void setToken(const std::string& token); - /** - * Set the queue size for the amount of events that can be added and sent. By default, the queue size is set to 10 - * @param queueSize Queue size - * @return void - */ - void setQueueSize(uint64_t queuSize); /** * Set whether to log the responses from the server. By default, logResponse is set to false * @param logResponse bool @@ -119,26 +138,12 @@ class EventsManager { * @return void */ void setVerifySsl(bool verifySsl); - - /** - * Check if the device is connected to Hub. Performs a simple GET request to the URL/health endpoint - * @return bool - */ - bool checkConnection(); - - /** - * Upload cached data to the events service - * @return void - */ - void uploadCachedData(); - /** * Set the cache directory for storing cached data. By default, the cache directory is set to /internal/private * @param cacheDir Cache directory * @return void */ void setCacheDir(const std::string& cacheDir); - /** * Set whether to cache data if it cannot be sent. By default, cacheIfCannotSend is set to false * @param cacheIfCannotSend bool @@ -147,33 +152,108 @@ class EventsManager { void setCacheIfCannotSend(bool cacheIfCannotSend); private: - struct EventMessage { + struct SnapData { std::shared_ptr event; - std::vector> data; - std::string cachePath; + std::shared_ptr fileGroup; }; - static std::string createUUID(); - void sendEventBuffer(); - void sendFile(const std::shared_ptr& file, const std::string& url); + + struct UploadRetryPolicy { + int maxAttempts = 10; + float factor = 2.0f; + std::chrono::milliseconds baseDelay{100}; + }; + + /** + * Fetch the configuration limits and quotas for snaps & events + * @return bool + */ + bool fetchConfigurationLimits(); + /** + * Prepare a batch of file groups from inputSnapBatch + */ + void uploadFileBatch(std::deque> inputSnapBatch); + /** + * Upload a prepared group of files from snapData, using prepareGroupResult + */ + bool uploadGroup(std::shared_ptr snapData, dai::proto::event::FileUploadGroupResult prepareGroupResult); + /** + * Upload a file from fileData using the chosen uploadUrl + */ + bool uploadFile(std::shared_ptr fileData, std::string uploadUrl); + /** + * Upload events from eventBuffer in batch + */ + void uploadEventBatch(); + /** + * Validate the input event by checking that its fields adhere to defined limitations + * @param inputEvent Input event to be validated + * @return bool + */ + bool validateEvent(const proto::event::Event& inputEvent); + /** + * Cache events from the eventBuffer to the filesystem + */ void cacheEvents(); + /** + * Cache snapData from the inputSnapBatch to the filesystem + */ + void cacheSnapData(std::deque>& inputSnapBatch); + /** + * Upload cached data to the events service + * @return void + */ + void uploadCachedData(); + /** + * Check if there's any cached data in the filesystem + */ bool checkForCachedData(); + /** + * Clear cached data in the filesystem (if any) + */ + void clearCachedData(const std::filesystem::path& directory); + std::string token; - std::string deviceSerialNumber; std::string url; std::string sourceAppId; std::string sourceAppIdentifier; - uint64_t queueSize; - std::unique_ptr eventBufferThread; - std::vector> eventBuffer; - std::mutex eventBufferMutex; float publishInterval; bool logResponse; bool verifySsl; std::string cacheDir; bool cacheIfCannotSend; - std::atomic stopEventBuffer; + std::unique_ptr uploadThread; + std::deque> eventBuffer; + std::deque> snapBuffer; + std::deque> uploadFileBatchFutures; + std::mutex eventBufferMutex; + std::mutex snapBufferMutex; + std::mutex stopThreadConditionMutex; + std::atomic stopUploadThread; + std::atomic configurationLimitsFetched; std::condition_variable eventBufferCondition; - std::mutex eventBufferConditionMutex; + + uint64_t maxFileSizeBytes; + uint64_t remainingStorageBytes; + uint64_t warningStorageBytes; + uint64_t bytesPerHour; + uint32_t uploadsPerHour; + uint32_t maxGroupsPerBatch; + uint32_t maxFilesPerGroup; + uint32_t eventsPerHour; + uint32_t snapsPerHour; + uint32_t eventsPerRequest; + + UploadRetryPolicy uploadRetryPolicy; + + static constexpr int EVENT_BUFFER_MAX_SIZE = 300; + + static constexpr int EVENT_VALIDATION_NAME_LENGTH = 56; + static constexpr int EVENT_VALIDATION_MAX_TAGS = 20; + static constexpr int EVENT_VALIDATION_TAG_LENGTH = 56; + static constexpr int EVENT_VALIDATION_MAX_EXTRAS = 25; + static constexpr int EVENT_VALIDATION_EXTRA_KEY_LENGTH = 40; + static constexpr int EVENT_VALIDATION_EXTRA_VALUE_LENGTH = 100; + static constexpr int EVENT_VALIDATION_MAX_ASSOCIATE_FILES = 20; }; } // namespace utility } // namespace dai diff --git a/protos/Event.proto b/protos/Event.proto index e3c99b995a..ba5db555af 100644 --- a/protos/Event.proto +++ b/protos/Event.proto @@ -1,58 +1,140 @@ syntax = "proto3"; +import "ImgDetections.proto"; package dai.proto.event; -message BatchUploadEvents { - repeated Event events = 1; +message BatchPrepareFileUpload { + repeated PrepareFileUploadGroup groups = 1; } -message Event { - // prevents dual uploads, can be a local ID for example - optional string nonce = 1; +message PrepareFileUploadGroup { + repeated PrepareFileUpload files = 1; +} - // timestamp (seconds since 1970) when event originated (useful especially with offline/later upload) - int64 created_at = 2; +message PrepareFileUpload { + string checksum = 1; + string mime_type = 2; + int64 size = 3; + string filename = 4; + PrepareFileUploadClass classification = 5; +} - // name to identify event - string name = 3; +enum PrepareFileUploadClass { + UNKNOWN_FILE = 0; + IMAGE_COLOR = 1; + IMAGE_STEREO_LEFT = 2; + IMAGE_STEREO_RIGHT = 3; + DISPARITY = 4; + VIDEO = 5; + POINTCLOUD = 6; + ANNOTATION = 7; +} + +message BatchFileUploadResult { + repeated FileUploadGroupResult groups = 1; +} + +message FileUploadGroupResult { + optional RejectedFileGroup rejected = 1; + repeated FileUploadResult files = 2; +} + +message RejectedFileGroup { + RejectedFileGroupReason reason = 1; +} + +enum RejectedFileGroupReason { + GROUP_UNEXPECTED_ERROR = 0; + FILE_INPUT_VALIDATION = 1; + STORAGE_QUOTA_EXCEEDED = 2; +} + +message FileUploadResult { + oneof result { + AcceptedFile accepted = 1; + RejectedFile rejected = 2; + } +} - // arbitrary tags, include tag "snap" for event to be processed snap - repeated string tags = 4; +message AcceptedFile { + string upload_url = 1; + string id = 2; +} - // arbitrary key/value data - map extras = 5; +message RejectedFile { + RejectedFileReason reason = 1; + string message = 2; +} - // how many files to wait to upload, before event is considered complete, - // notified about, snap created, ... - int32 expect_files_num = 6; +enum RejectedFileReason { + FILE_UNEXPECTED_ERROR = 0; + INPUT_VALIDATION = 1; +} - // serial number of source device - optional string source_serial_number = 7; +message BatchUploadEvents { + repeated Event events = 1; +} - // ID of sending Hub application - // (to be provided by agent as ENV - `AGENT_APP_ID` - eg `01916edb-3ded-793a-b6ad-cd4395768425`) - optional string source_app_id = 8; +message Event { + int64 created_at = 1; + string name = 2; + repeated string tags = 3; + map extras = 4; + optional string source_serial_number = 5; + optional string source_app_id = 6; + optional string source_app_identifier = 7; + repeated AssociateFile associate_files = 8; +} - // Identifier of sending Hub application - // (to be provided by agent as ENV - `AGENT_APP_IDENTIFIER` - eg `com.luxonis.counter-app`) - optional string source_app_identifier = 9; +message AssociateFile { + string id = 1; } + message BatchUploadEventsResult { - repeated EventResult events = 1; + repeated EventResult events = 1; } message EventResult { - string nonce = 1; oneof result { - AcceptedEvent accepted = 2; - IngestError error = 3; + AcceptedEvent accepted = 1; + RejectedEvent rejected = 2; } } message AcceptedEvent { - repeated string file_upload_urls = 1; + string id = 1; } -message IngestError { - string message = 1; +message RejectedEvent { + RejectedEventReason reason = 1; + string message = 2; } + +enum RejectedEventReason { + GENERIC_EVENT_REJECTED = 0; + EVENT_VALIDATION = 1; +} + +message FileLimits { + uint64 max_file_size_bytes = 1; + uint64 remaining_storage_bytes = 2; + uint64 bytes_per_hour_rate = 3; + uint32 uploads_per_hour_rate = 4; + uint32 groups_per_allocation = 5; + uint32 files_per_group_in_allocation = 6; +} + +message EventLimits { + uint32 events_per_hour_rate = 1; + uint32 snaps_per_hour_rate = 2; + uint32 events_per_request = 3; +} + +message ApiUsage { + FileLimits files = 1; + EventLimits events = 2; +} + +message SnapAnnotations { + optional img_detections.ImgDetections detections = 1; +} \ No newline at end of file diff --git a/src/utility/EventsManager.cpp b/src/utility/EventsManager.cpp index a42bd8062b..75b42e2f72 100644 --- a/src/utility/EventsManager.cpp +++ b/src/utility/EventsManager.cpp @@ -1,5 +1,7 @@ #include "depthai/utility/EventsManager.hpp" +#include + #include #include #include @@ -15,45 +17,139 @@ namespace dai { namespace utility { -using std::move; - -EventData::EventData(const std::string& data, const std::string& fileName, const std::string& mimeType) - : fileName(fileName), mimeType(mimeType), data(data), type(EventDataType::DATA) {} - -EventData::EventData(std::string fileUrl) : data(std::move(fileUrl)), type(EventDataType::FILE_URL) { - fileName = std::filesystem::path(data).filename().string(); - static std::map mimeTypes = {{".html", "text/html"}, - {".htm", "text/html"}, - {".css", "text/css"}, - {".js", "application/javascript"}, - {".png", "image/png"}, - {".jpg", "image/jpeg"}, - {".jpeg", "image/jpeg"}, - {".gif", "image/gif"}, - {".svg", "image/svg+xml"}, - {".json", "application/json"}, - {".txt", "text/plain"}}; - auto ext = std::filesystem::path(data).extension().string(); - auto it = mimeTypes.find(ext); - mimeType = "application/octet-stream"; - if(it != mimeTypes.end()) { + +template +void addToFileData(std::vector>& container, Args&&... args) { + container.emplace_back(std::make_shared(std::forward(args)...)); +} + +void FileGroup::addFile(std::string fileName, std::string data, std::string mimeType) { + addToFileData(fileData, std::move(data), std::move(fileName), std::move(mimeType)); +} + +void FileGroup::addFile(std::string fileName, std::filesystem::path filePath) { + addToFileData(fileData, std::move(filePath), std::move(fileName)); +} + +void FileGroup::addFile(const std::optional& fileName, const std::shared_ptr& imgFrame) { + std::string dataFileName = fileName.value_or("Image"); + addToFileData(fileData, imgFrame, std::move(dataFileName)); +} + +void FileGroup::addFile(const std::optional& fileName, const std::shared_ptr& encodedFrame) { + std::string dataFileName = fileName.value_or("Image"); + addToFileData(fileData, encodedFrame, std::move(dataFileName)); +} + +// void FileGroup::addFile(std::string fileName, const std::shared_ptr& nnData) { +// addToFileData(fileData, nnData, std::move(fileName)); +// } + +void FileGroup::addFile(const std::optional& fileName, const std::shared_ptr& imgDetections) { + std::string dataFileName = fileName.value_or("Detections"); + addToFileData(fileData, imgDetections, std::move(dataFileName)); +} + +void FileGroup::addImageDetectionsPair(const std::optional& fileName, + const std::shared_ptr& imgFrame, + const std::shared_ptr& imgDetections) { + std::string dataFileName = fileName.value_or("ImageDetection"); + addToFileData(fileData, imgFrame, dataFileName); + addToFileData(fileData, imgDetections, std::move(dataFileName)); +} + +void FileGroup::addImageDetectionsPair(const std::optional& fileName, + const std::shared_ptr& encodedFrame, + const std::shared_ptr& imgDetections) { + std::string dataFileName = fileName.value_or("ImageDetection"); + addToFileData(fileData, encodedFrame, dataFileName); + addToFileData(fileData, imgDetections, std::move(dataFileName)); +} + +// void FileGroup::addImageNNDataPair(std::string fileName, const std::shared_ptr& imgFrame, const std::shared_ptr& nnData) { +// addToFileData(fileData, imgFrame, std::move(fileName)); +// addToFileData(fileData, nnData, std::move(fileName)); +// } + +// void FileGroup::addImageNNDataPair(std::string fileName, const std::shared_ptr& encodedFrame, const std::shared_ptr& nnData) { +// addToFileData(fileData, encodedFrame, std::move(fileName)); +// addToFileData(fileData, nnData, std::move(fileName)); +// } + +std::string calculateSHA256Checksum(const std::string& data) { + unsigned char digest[SHA256_DIGEST_LENGTH]; + SHA256(reinterpret_cast(data.data()), data.size(), digest); + + std::ostringstream oss; + for(int i = 0; i < SHA256_DIGEST_LENGTH; ++i) { + oss << std::hex << std::setw(2) << std::setfill('0') << static_cast(digest[i]); + } + return oss.str(); +} + +FileData::FileData(std::string data, std::string fileName, std::string mimeType) + : mimeType(std::move(mimeType)), + fileName(std::move(fileName)), + data(std::move(data)), + size(data.size()), + checksum(calculateSHA256Checksum(data)), + classification(proto::event::PrepareFileUploadClass::UNKNOWN_FILE) {} + +FileData::FileData(std::filesystem::path filePath, std::string fileName) : fileName(std::move(fileName)) { + static const std::unordered_map mimeTypeExtensionMap = {{".html", "text/html"}, + {".htm", "text/html"}, + {".css", "text/css"}, + {".js", "text/javascript"}, + {".png", "image/png"}, + {".jpg", "image/jpeg"}, + {".jpeg", "image/jpeg"}, + {".gif", "image/gif"}, + {".svg", "image/svg+xml"}, + {".json", "application/json"}, + {".txt", "text/plain"}}; + // Read the data + std::ifstream fileStream(filePath, std::ios::binary | std::ios::ate); + if(!fileStream) { + logger::error("File: {} doesn't exist", filePath.string()); + return; + } + std::streamsize fileSize = fileStream.tellg(); + data.resize(static_cast(fileSize)); + fileStream.seekg(0, std::ios::beg); + fileStream.read(data.data(), fileSize); + size = data.size(); + checksum = calculateSHA256Checksum(data); + // Determine the mime type + auto it = mimeTypeExtensionMap.find(filePath.extension().string()); + if(it != mimeTypeExtensionMap.end()) { mimeType = it->second; + } else { + mimeType = "application/octet-stream"; + } + static const std::unordered_set imageMimeTypes = {"image/jpeg", "image/png", "image/webp", "image/bmp", "image/tiff"}; + if(imageMimeTypes.find(mimeType) != imageMimeTypes.end()) { + classification = proto::event::PrepareFileUploadClass::IMAGE_COLOR; + } else { + classification = proto::event::PrepareFileUploadClass::UNKNOWN_FILE; } } -EventData::EventData(const std::shared_ptr& imgFrame, std::string fileName) - : fileName(std::move(fileName)), mimeType("image/jpeg"), type(EventDataType::IMG_FRAME) { +FileData::FileData(const std::shared_ptr& imgFrame, std::string fileName) + : mimeType("image/jpeg"), fileName(std::move(fileName)), classification(proto::event::PrepareFileUploadClass::IMAGE_COLOR) { // Convert ImgFrame to bytes cv::Mat cvFrame = imgFrame->getCvFrame(); - std::vector buf; - cv::imencode(".jpg", cvFrame, buf); + std::vector buffer; + cv::imencode(".jpg", cvFrame, buffer); + std::stringstream ss; - ss.write((const char*)buf.data(), buf.size()); + ss.write((const char*)buffer.data(), buffer.size()); data = ss.str(); + size = data.size(); + checksum = calculateSHA256Checksum(data); } -EventData::EventData(const std::shared_ptr& encodedFrame, std::string fileName) - : fileName(std::move(fileName)), type(EventDataType::ENCODED_FRAME) { +FileData::FileData(const std::shared_ptr& encodedFrame, std::string fileName) + : mimeType("image/jpeg"), fileName(std::move(fileName)), classification(proto::event::PrepareFileUploadClass::IMAGE_COLOR) { // Convert EncodedFrame to bytes if(encodedFrame->getProfile() != EncodedFrame::Profile::JPEG) { logger::error("Only JPEG encoded frames are supported"); @@ -62,106 +158,406 @@ EventData::EventData(const std::shared_ptr& encodedFrame, std::str std::stringstream ss; ss.write((const char*)encodedFrame->getData().data(), encodedFrame->getData().size()); data = ss.str(); - mimeType = "image/jpeg"; + size = data.size(); + checksum = calculateSHA256Checksum(data); } -EventData::EventData(const std::shared_ptr& nnData, std::string fileName) - : fileName(std::move(fileName)), mimeType("application/octet-stream"), type(EventDataType::NN_DATA) { - // Convert NNData to bytes - std::stringstream ss; - ss.write((const char*)nnData->data->getData().data(), nnData->data->getData().size()); - data = ss.str(); +// FileData::FileData(const std::shared_ptr& nnData, std::string fileName) +// : mimeType("application/octet-stream"), fileName(std::move(fileName)), classification(proto::event::PrepareFileUploadClass::UNKNOWN_FILE) { +// // Convert NNData to bytes +// std::stringstream ss; +// ss.write((const char*)nnData->data->getData().data(), nnData->data->getData().size()); +// data = ss.str(); +// size = data.size(); +// checksum = calculateSHA256Checksum(data); +// } + +FileData::FileData(const std::shared_ptr& imgDetections, std::string fileName) + : mimeType("application/x-protobuf; proto=SnapAnnotation"), + fileName(std::move(fileName)), + classification(proto::event::PrepareFileUploadClass::ANNOTATION) { + // Serialize imgDetections object, add it to SnapAnnotation proto + proto::event::SnapAnnotations snapAnnotation; + proto::img_detections::ImgDetections imgDetectionsProto; + + if(imgDetections) { + std::vector imgDetectionsSerialized = imgDetections->serializeProto(); + if(imgDetectionsProto.ParseFromArray(imgDetectionsSerialized.data(), imgDetectionsSerialized.size())) { + *snapAnnotation.mutable_detections() = imgDetectionsProto; + } else { + logger::error("Failed to parse ImgDetections proto from serialized bytes"); + return; + } + } + if(!snapAnnotation.SerializeToString(&data)) { + logger::error("Failed to serialize SnapAnnotations proto object to string"); + return; + } + size = data.size(); + checksum = calculateSHA256Checksum(data); } -bool EventData::toFile(const std::string& path) { - // check if filename is not empty +bool FileData::toFile(const std::filesystem::path& inputPath) { if(fileName.empty()) { logger::error("Filename is empty"); return false; } - std::filesystem::path p(path); - if(type == EventDataType::FILE_URL) { - // get the filename from the url - std::filesystem::copy(data, p / fileName); - } else { - std::string extension = mimeType == "image/jpeg" ? ".jpg" : ".txt"; - // check if file exists, if yes, append a number to the filename - std::string fileNameTmp = fileName; - int i = 0; - while(std::filesystem::exists(p / (fileNameTmp + extension))) { - logger::warn("File {} already exists, appending number to filename", fileNameTmp); - fileNameTmp = fileName + "_" + std::to_string(i); - i++; - } - std::ofstream fileStream(p / (fileNameTmp + extension), std::ios::binary); - fileStream.write(data.data(), data.size()); + std::string extension = mimeType == "image/jpeg" ? ".jpg" : ".txt"; + // Choose a unique filename + std::filesystem::path target = inputPath / (fileName + extension); + for(int i = 1; std::filesystem::exists(target); ++i) { + logger::warn("File {} exists, trying a new name", target.string()); + target = inputPath / (fileName + "_" + std::to_string(i) + extension); + } + std::ofstream fileStream(target, std::ios::binary); + if(!fileStream) { + logger::error("Failed to open file for writing: {}", target.string()); + return false; + } + fileStream.write(data.data(), static_cast(data.size())); + if(!fileStream) { + logger::error("Failed to write all data to: {}", target.string()); + return false; } return true; } -EventsManager::EventsManager(std::string url, bool uploadCachedOnStart, float publishInterval) - : url(std::move(url)), - queueSize(10), - publishInterval(publishInterval), + +EventsManager::EventsManager(bool uploadCachedOnStart) + : publishInterval(30.0f), logResponse(false), verifySsl(true), cacheDir("/internal/private"), cacheIfCannotSend(false), - stopEventBuffer(false) { - sourceAppId = utility::getEnvAs("OAKAGENT_APP_VERSION", ""); + stopUploadThread(false), + configurationLimitsFetched(false), + warningStorageBytes(52428800) { + auto appId = utility::getEnvAs("OAKAGENT_APP_ID", ""); + auto containerId = utility::getEnvAs("OAKAGENT_CONTAINER_ID", ""); + sourceAppId = appId == "" ? containerId : appId; sourceAppIdentifier = utility::getEnvAs("OAKAGENT_APP_IDENTIFIER", ""); + url = utility::getEnvAs("DEPTHAI_HUB_EVENTS_BASE_URL", "https://events.cloud.luxonis.com"); token = utility::getEnvAs("DEPTHAI_HUB_API_KEY", ""); - eventBufferThread = std::make_unique([this]() { - while(!stopEventBuffer) { - sendEventBuffer(); - std::unique_lock lock(eventBufferMutex); - eventBufferCondition.wait_for(lock, std::chrono::seconds(static_cast(this->publishInterval))); + // Thread handling preparation and uploads + uploadThread = std::make_unique([this]() { + // Fetch configuration limits when starting the new thread + configurationLimitsFetched = fetchConfigurationLimits(); + auto currentTime = std::chrono::steady_clock::now(); + auto nextTime = currentTime + std::chrono::hours(1); + while(!stopUploadThread) { + // Hourly check for fetching configuration and limits + currentTime = std::chrono::steady_clock::now(); + if(currentTime >= nextTime) { + fetchConfigurationLimits(); + nextTime += std::chrono::hours(1); + if(remainingStorageBytes <= warningStorageBytes) { + logger::warn("Current remaining storage is running low: {} MB", remainingStorageBytes / (1024 * 1024)); + } + } + // Prepare the batch first to reduce contention + std::deque> snapBatch; + { + std::lock_guard lock(snapBufferMutex); + const std::size_t size = std::min(snapBuffer.size(), maxGroupsPerBatch); + snapBatch.insert(snapBatch.end(), std::make_move_iterator(snapBuffer.begin()), std::make_move_iterator(snapBuffer.begin() + size)); + snapBuffer.erase(snapBuffer.begin(), snapBuffer.begin() + size); + } + + uploadFileBatchFutures.emplace_back( + std::async(std::launch::async, [&, inputSnapBatch = std::move(snapBatch)]() mutable { uploadFileBatch(std::move(inputSnapBatch)); })); + // Clean up finished futures + for(auto iterator = uploadFileBatchFutures.begin(); iterator != uploadFileBatchFutures.end();) { + if(iterator->wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + iterator->get(); + iterator = uploadFileBatchFutures.erase(iterator); + } else { + ++iterator; + } + } + + uploadEventBatch(); + std::unique_lock lock(stopThreadConditionMutex); + eventBufferCondition.wait_for(lock, std::chrono::seconds(static_cast(this->publishInterval)), [this]() { return stopUploadThread.load(); }); } }); - checkConnection(); + // Upload or clear previously cached data on start if(uploadCachedOnStart) { uploadCachedData(); + } else { + clearCachedData(cacheDir); } } EventsManager::~EventsManager() { - stopEventBuffer = true; - { - std::unique_lock lock(eventBufferMutex); - eventBufferCondition.notify_one(); + stopUploadThread = true; + eventBufferCondition.notify_all(); + if(uploadThread && uploadThread->joinable()) { + uploadThread->join(); + } +} + +bool EventsManager::fetchConfigurationLimits() { + logger::info("Fetching configuration limits"); + if(token.empty()) { + logger::warn("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use the setToken method"); + return false; + } + auto header = cpr::Header(); + header["Authorization"] = "Bearer " + token; + cpr::Url requestUrl = static_cast(this->url + "/v2/api-usage"); + int retryAttempt = 0; + while(!stopUploadThread) { + cpr::Response response = cpr::Get( + cpr::Url{requestUrl}, + cpr::Header{header}, + cpr::VerifySsl(verifySsl), + cpr::ProgressCallback( + [&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool { + (void)userdata; + (void)downloadTotal; + (void)downloadNow; + (void)uploadTotal; + (void)uploadNow; + if(stopUploadThread) { + return false; + } + return true; + })); + if(response.status_code != cpr::status::HTTP_OK) { + logger::error("Failed to fetch configuration limits, status code: {}", response.status_code); + + // Apply exponential backoff + auto factor = std::pow(uploadRetryPolicy.factor, ++retryAttempt); + std::chrono::milliseconds duration = std::chrono::milliseconds(uploadRetryPolicy.baseDelay.count() * static_cast(factor)); + logger::info("Retrying to fetch configuration limits, (attempt {} in {} ms)", retryAttempt, duration.count()); + + std::unique_lock lock(stopThreadConditionMutex); + eventBufferCondition.wait_for(lock, duration, [this]() { return stopUploadThread.load(); }); + } else { + logger::info("Configuration limits fetched successfully"); + auto apiUsage = std::make_unique(); + apiUsage->ParseFromString(response.text); + if(logResponse) { + logger::info("ApiUsage response: \n{}", apiUsage->DebugString()); + } + // TO DO: Use this data + maxFileSizeBytes = apiUsage->files().max_file_size_bytes(); // + remainingStorageBytes = apiUsage->files().remaining_storage_bytes(); // + bytesPerHour = apiUsage->files().bytes_per_hour_rate(); + uploadsPerHour = apiUsage->files().uploads_per_hour_rate(); + maxGroupsPerBatch = apiUsage->files().groups_per_allocation(); // + maxFilesPerGroup = apiUsage->files().files_per_group_in_allocation(); // + eventsPerHour = apiUsage->events().events_per_hour_rate(); + snapsPerHour = apiUsage->events().snaps_per_hour_rate(); + eventsPerRequest = apiUsage->events().events_per_request(); // + + return true; + } + } + return false; +} + +void EventsManager::uploadFileBatch(std::deque> inputSnapBatch) { + // Prepare files for upload + auto fileGroupBatchPrepare = std::make_unique(); + if(inputSnapBatch.empty()) { + return; + } + if(token.empty()) { + logger::warn("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use the setToken method"); + return; + } + // Fill the batch with the groups from inputSnapBatch and their corresponding files + for(size_t i = 0; i < inputSnapBatch.size(); ++i) { + auto fileGroup = std::make_unique(); + for(auto& file : inputSnapBatch.at(i)->fileGroup->fileData) { + auto addedFile = fileGroup->add_files(); + addedFile->set_checksum(file->checksum); + addedFile->set_mime_type(file->mimeType); + addedFile->set_size(file->size); + addedFile->set_filename(file->fileName); + addedFile->set_classification(file->classification); + } + fileGroupBatchPrepare->add_groups()->Swap(fileGroup.get()); + } + + int retryAttempt = 0; + while(!stopUploadThread) { + std::string serializedBatch; + fileGroupBatchPrepare->SerializeToString(&serializedBatch); + cpr::Url requestUrl = static_cast(this->url + "/v2/files/prepare-batch"); + cpr::Response response = cpr::Post( + cpr::Url{requestUrl}, + cpr::Body{serializedBatch}, + cpr::Header{{"Authorization", "Bearer " + token}}, + cpr::VerifySsl(verifySsl), + cpr::ProgressCallback( + [&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool { + (void)userdata; + (void)downloadTotal; + (void)downloadNow; + (void)uploadTotal; + (void)uploadNow; + if(stopUploadThread) { + return false; + } + return true; + })); + if(response.status_code != cpr::status::HTTP_OK && response.status_code != cpr::status::HTTP_CREATED) { + logger::error("Failed to prepare a batch of file groups, status code: {}", response.status_code); + // Apply exponential backoff + auto factor = std::pow(uploadRetryPolicy.factor, ++retryAttempt); + std::chrono::milliseconds duration = std::chrono::milliseconds(uploadRetryPolicy.baseDelay.count() * static_cast(factor)); + logger::info("Retrying to prepare a batch of file groups (attempt {} in {} ms)", retryAttempt, duration.count()); + + std::unique_lock lock(stopThreadConditionMutex); + eventBufferCondition.wait_for(lock, duration, [this]() { return stopUploadThread.load(); }); + // After retrying a defined number of times, we can determine the connection is not established, cache if enabled + if(retryAttempt >= uploadRetryPolicy.maxAttempts) { + if(cacheIfCannotSend) { + cacheSnapData(inputSnapBatch); + } else { + logger::warn("Caching is not enabled, dropping snapBatch"); + } + return; + } + } else { + logger::info("Batch of file groups has been successfully prepared"); + auto prepareBatchResults = std::make_unique(); + prepareBatchResults->ParseFromString(response.text); + if(logResponse) { + logger::info("BatchFileUploadResult response: \n{}", prepareBatchResults->DebugString()); + } + + // Upload groups of files + std::vector> groupUploadResults; + for(int i = 0; i < prepareBatchResults->groups_size(); i++) { + auto snapData = inputSnapBatch.at(i); + auto prepareGroupResult = prepareBatchResults->groups(i); + // Skip rejected groups + if(prepareGroupResult.has_rejected()) { + std::string rejectionReason = dai::proto::event::RejectedFileGroupReason_descriptor() + ->FindValueByNumber(static_cast(prepareGroupResult.rejected().reason())) + ->name(); + logger::info("A group has been rejected because of {}", rejectionReason); + continue; + } + // Handle groups asynchronously + groupUploadResults.emplace_back( + std::async(std::launch::async, [&, snap = std::move(snapData), group = std::move(prepareGroupResult)]() mutable { + return uploadGroup(std::move(snap), std::move(group)); + })); + } + // Wait for all of the reponses, indicating the finish of group uploads + for(auto& uploadResult : groupUploadResults) { + if(!uploadResult.valid() || !uploadResult.get()) { + logger::info("Failed to upload all of the groups in the given batch"); + // File upload was unsuccesful, cache if enabled + if(cacheIfCannotSend) { + cacheSnapData(inputSnapBatch); + } else { + logger::warn("Caching is not enabled, dropping snapBatch"); + } + return; + } + } + return; + } + } +} + +bool EventsManager::uploadGroup(std::shared_ptr snapData, dai::proto::event::FileUploadGroupResult prepareGroupResult) { + std::vector> fileUploadResults; + for(int i = 0; i < prepareGroupResult.files_size(); i++) { + auto prepareFileResult = prepareGroupResult.files(i); + if(prepareFileResult.result_case() == proto::event::FileUploadResult::kAccepted) { + // Add an associate file to the event + auto associateFile = snapData->event->add_associate_files(); + associateFile->set_id(prepareFileResult.accepted().id()); + // Upload files asynchronously + fileUploadResults.emplace_back(std::async( + std::launch::async, + [&, fileData = std::move(snapData->fileGroup->fileData.at(i)), uploadUrl = std::move(prepareFileResult.accepted().upload_url())]() mutable { + return uploadFile(std::move(fileData), std::move(uploadUrl)); + })); + } else { + return false; + } } - if(eventBufferThread->joinable()) { - eventBufferThread->join(); + // Wait for all of the results, indicating the finish of file uploads + for(auto& uploadResult : fileUploadResults) { + if(!uploadResult.valid() || !uploadResult.get()) { + logger::info("Failed to upload all of the files in the given group"); + return false; + } } + // Once all of the files are uploaded, the event can be sent + std::lock_guard lock(eventBufferMutex); + eventBuffer.push_back(std::move(snapData->event)); + return true; +} + +bool EventsManager::uploadFile(std::shared_ptr fileData, std::string uploadUrl) { + logger::info("Uploading file {} to: {}", fileData->fileName, uploadUrl); + auto header = cpr::Header(); + header["Content-Type"] = fileData->mimeType; + for(int i = 0; i < uploadRetryPolicy.maxAttempts && !stopUploadThread; ++i) { + cpr::Response response = cpr::Put( + cpr::Url{uploadUrl}, + cpr::Body{fileData->data}, + cpr::Header{header}, + cpr::VerifySsl(verifySsl), + cpr::ProgressCallback( + [&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool { + (void)userdata; + (void)downloadTotal; + (void)downloadNow; + (void)uploadTotal; + (void)uploadNow; + if(stopUploadThread) { + return false; + } + return true; + })); + if(response.status_code != cpr::status::HTTP_OK && response.status_code != cpr::status::HTTP_CREATED) { + logger::error("Failed to upload file {}, status code: {}", fileData->fileName, response.status_code); + if(logResponse) { + logger::info("Response {}", response.text); + } + // Apply exponential backoff + auto factor = std::pow(uploadRetryPolicy.factor, i + 1); + std::chrono::milliseconds duration = std::chrono::milliseconds(uploadRetryPolicy.baseDelay.count() * static_cast(factor)); + logger::info("Retrying upload of file {}, (attempt {}/{}) in {} ms", fileData->fileName, i + 1, uploadRetryPolicy.maxAttempts, duration.count()); + + std::unique_lock lock(stopThreadConditionMutex); + eventBufferCondition.wait_for(lock, duration, [this]() { return stopUploadThread.load(); }); + } else { + return true; + } + } + return false; } -void EventsManager::sendEventBuffer() { - auto batchEvent = std::make_unique(); +void EventsManager::uploadEventBatch() { + auto eventBatch = std::make_unique(); { std::lock_guard lock(eventBufferMutex); if(eventBuffer.empty()) { return; } if(token.empty()) { - logger::warn("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use setToken method"); + logger::warn("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use the setToken method"); return; } - if(!checkConnection()) { - if(cacheIfCannotSend) { - cacheEvents(); - } - return; - } - for(auto& eventM : eventBuffer) { - auto& event = eventM->event; - batchEvent->add_events()->Swap(event.get()); + for(size_t i = 0; i < eventBuffer.size() && i < eventsPerRequest; ++i) { + eventBatch->add_events()->CopyFrom(*eventBuffer.at(i).get()); } } - std::string serializedEvent; - batchEvent->SerializeToString(&serializedEvent); - cpr::Url reqUrl = static_cast(this->url + "/v1/events"); - cpr::Response r = cpr::Post( - cpr::Url{reqUrl}, - cpr::Body{serializedEvent}, + std::string serializedBatch; + eventBatch->SerializeToString(&serializedBatch); + cpr::Url requestUrl = static_cast(this->url + "/v2/events"); + cpr::Response response = cpr::Post( + cpr::Url{requestUrl}, + cpr::Body{serializedBatch}, cpr::Header{{"Authorization", "Bearer " + token}}, cpr::VerifySsl(verifySsl), cpr::ProgressCallback( @@ -171,304 +567,331 @@ void EventsManager::sendEventBuffer() { (void)downloadNow; (void)uploadTotal; (void)uploadNow; - if(stopEventBuffer) { + if(stopUploadThread) { return false; } return true; })); - if(r.status_code != cpr::status::HTTP_OK) { - logger::error("Failed to send event: {} {}", r.text, r.status_code); + if(response.status_code != cpr::status::HTTP_OK) { + logger::error("Failed to send event, status code: {}", response.status_code); + // In case the eventBuffer gets too full (dropped connection), cache the events or drop them + if(eventBuffer.size() >= EVENT_BUFFER_MAX_SIZE) { + if(cacheIfCannotSend) { + cacheEvents(); + } else { + logger::warn("EventBuffer is full and caching is not enabled, dropping events"); + std::lock_guard lock(eventBufferMutex); + eventBuffer.clear(); + } + } } else { logger::info("Event sent successfully"); if(logResponse) { - logger::info("Response: {}", r.text); - } - // upload files - auto batchUploadEventResult = std::make_unique(); - batchUploadEventResult->ParseFromString(r.text); - for(int i = 0; i < batchUploadEventResult->events_size(); i++) { - auto eventResult = batchUploadEventResult->events(i); - if(eventResult.accepted().file_upload_urls_size() > 0) { - for(int j = 0; j < eventResult.accepted().file_upload_urls().size(); j++) { - cpr::Url fileUrl = static_cast(this->url + eventResult.accepted().file_upload_urls(j)); - - sendFile(eventBuffer[i]->data[j], fileUrl.str()); - } - } + auto eventBatchUploadResults = std::make_unique(); + eventBatchUploadResults->ParseFromString(response.text); + logger::info("BatchUploadEvents response: \n{}", eventBatchUploadResults->DebugString()); } - for(auto& eventM : eventBuffer) { - if(!eventM->cachePath.empty() && std::filesystem::exists(eventM->cachePath)) { - std::filesystem::remove_all(eventM->cachePath); - } - } - eventBuffer.clear(); + std::lock_guard lock(eventBufferMutex); + eventBuffer.erase(eventBuffer.begin(), eventBuffer.begin() + eventBatch->events_size()); } } bool EventsManager::sendEvent(const std::string& name, - const std::shared_ptr& imgFrame, - std::vector> data, const std::vector& tags, - const std::unordered_map& extraData, - const std::string& deviceSerialNo) { - // Create event + const std::unordered_map& extras, + const std::string& deviceSerialNo, + const std::vector& associateFiles) { + // Check if the configuration and limits have already been fetched + if(!configurationLimitsFetched) { + logger::error("The configuration and limits have not been successfully fetched, event not send"); + return false; + } + + // Create an event auto event = std::make_unique(); - event->set_nonce(createUUID()); - event->set_name(name); event->set_created_at(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()); + event->set_name(name); for(const auto& tag : tags) { event->add_tags(tag); } auto* extrasData = event->mutable_extras(); - for(const auto& entry : extraData) { + for(const auto& entry : extras) { extrasData->insert({entry.first, entry.second}); } - - if(imgFrame != nullptr) { - auto fileData = std::make_shared(imgFrame, "img.jpg"); - data.push_back(fileData); - } - event->set_expect_files_num(data.size()); - - event->set_source_serial_number(deviceSerialNo.empty() ? deviceSerialNumber : deviceSerialNo); + event->set_source_serial_number(deviceSerialNo); event->set_source_app_id(sourceAppId); event->set_source_app_identifier(sourceAppIdentifier); - // Add event to buffer - if(eventBuffer.size() <= queueSize) { - std::lock_guard lock(eventBufferMutex); - auto eventMessage = std::make_unique(); - eventMessage->data = std::move(data); - eventMessage->event = std::move(event); - eventBuffer.push_back(std::move(eventMessage)); - } else { - logger::warn("Event buffer is full, dropping event"); + for(const auto& file : associateFiles) { + auto addedFile = event->add_associate_files(); + addedFile->set_id(file); + } + if(!validateEvent(*event)) { + logger::error("Failed to send event, validation failed"); return false; } + + // Add event to eventBuffer + std::lock_guard lock(eventBufferMutex); + eventBuffer.push_back(std::move(event)); return true; } bool EventsManager::sendSnap(const std::string& name, - const std::shared_ptr& imgFrame, - std::vector> data, + const std::shared_ptr fileGroup, const std::vector& tags, - const std::unordered_map& extraData, + const std::unordered_map& extras, const std::string& deviceSerialNo) { - std::vector tagsTmp = tags; - tagsTmp.emplace_back("snap"); - // exactly one image needs to be sent, either from imgFrame or from data - bool send = false; - if(imgFrame != nullptr && !data.empty()) { - logger::error("For sending snap, provide either imgFrame or single image in data list, not both. Use sendEvent for multiple files"); + // Check if the configuration and limits have already been fetched + if(!configurationLimitsFetched) { + logger::error("The configuration and limits have not been successfully fetched, snap not send"); + return false; + } + + // Prepare snapData + auto snapData = std::make_unique(); + snapData->fileGroup = fileGroup; + // Create an event + snapData->event = std::make_unique(); + snapData->event->set_created_at(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()); + snapData->event->set_name(name); + snapData->event->add_tags("snap"); + for(const auto& tag : tags) { + snapData->event->add_tags(tag); + } + auto* extrasData = snapData->event->mutable_extras(); + for(const auto& entry : extras) { + extrasData->insert({entry.first, entry.second}); + } + snapData->event->set_source_serial_number(deviceSerialNo); + snapData->event->set_source_app_id(sourceAppId); + snapData->event->set_source_app_identifier(sourceAppIdentifier); + if(!validateEvent(*snapData->event)) { + logger::error("Failed to send snap, validation failed"); return false; - } else if(imgFrame == nullptr && data.empty()) { - logger::error("No image or data provided"); + } + if(fileGroup->fileData.size() > maxFilesPerGroup) { + logger::error("Failed to send snap, the number of files in a file group {} exceeds {}", fileGroup->fileData.size(), maxFilesPerGroup); return false; - } else if(imgFrame == nullptr && !data.empty()) { - if(data.size() > 1) { - logger::error("More than one file provided in data. For sendings snaps, only one image file is allowed. Use sendEvent for multiple files"); - return false; - } - if(data[0]->mimeType == "image/jpeg" || data[0]->mimeType == "image/png" || data[0]->mimeType == "image/webp") { - send = true; - } - if(send == false) { - logger::error("Only image files are allowed for snaps"); + } else if(fileGroup->fileData.empty()) { + logger::error("Failed to send snap, the file group is empty"); + return false; + } + for(const auto& file : fileGroup->fileData) { + if(file->size >= maxFileSizeBytes) { + logger::error("Failed to send snap, file: {} is bigger then the configured maximum size: {}", file->fileName, maxFileSizeBytes); return false; } - } else { - send = true; } - if(send) { - return sendEvent(name, imgFrame, data, tagsTmp, extraData, deviceSerialNo); - } - return false; + // Add the snap to snapBuffer + std::lock_guard lock(snapBufferMutex); + snapBuffer.push_back(std::move(snapData)); + return true; } -void EventsManager::sendFile(const std::shared_ptr& file, const std::string& url) { - // if file struct contains byte data, send it, along with filename and mime type - // if it file url, send it directly via url - logger::info("Uploading file: to {}", url); - auto header = cpr::Header{{"Authorization", "Bearer " + token}}; - cpr::Multipart fileM{}; - if(file->type != EventDataType::FILE_URL) { - fileM = cpr::Multipart{{"file", cpr::Buffer{file->data.begin(), file->data.end(), file->fileName}, file->mimeType}}; - header["File-Size"] = std::to_string(std::size(file->data)); +bool EventsManager::sendSnap(const std::string& name, + const std::optional& fileName, + const std::shared_ptr imgFrame, + const std::optional>& imgDetections, + const std::vector& tags, + const std::unordered_map& extras, + const std::string& deviceSerialNo) { + // Create a FileGroup and send a snap containing it + auto fileGroup = std::make_shared(); + if(imgDetections.has_value()) { + fileGroup->addImageDetectionsPair(fileName, imgFrame, imgDetections.value()); } else { - fileM = cpr::Multipart{{ - "file", - cpr::File{file->data}, - }}; - header["File-Size"] = std::to_string(std::filesystem::file_size(file->data)); - } - cpr::Response r = cpr::Post( - cpr::Url{url}, - cpr::Multipart{fileM}, - cpr::Header{header}, - cpr::VerifySsl(verifySsl), + fileGroup->addFile(fileName, imgFrame); + } - cpr::ProgressCallback( - [&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool { - (void)userdata; - (void)downloadTotal; - (void)downloadNow; - (void)uploadTotal; - (void)uploadNow; - if(stopEventBuffer) { - return false; - } - return true; - })); - if(r.status_code != cpr::status::HTTP_OK) { - logger::error("Failed to upload file: {} error code {}", r.text, r.status_code); + return sendSnap(name, fileGroup, tags, extras, deviceSerialNo); +} + +bool EventsManager::validateEvent(const proto::event::Event& inputEvent) { + // Name + const auto& name = inputEvent.name(); + if(name.empty()) { + logger::error("Invalid event name: empty string"); + return false; + } + if(name.length() > EVENT_VALIDATION_NAME_LENGTH) { + logger::error("Invalid event name: length {} exceeds {}", name.length(), EVENT_VALIDATION_NAME_LENGTH); + return false; + } + + // Tags + if(inputEvent.tags_size() > EVENT_VALIDATION_MAX_TAGS) { + logger::error("Invalid event tags: number of tags {} exceeds {}", inputEvent.tags_size(), EVENT_VALIDATION_MAX_TAGS); + return false; + } + for(int i = 0; i < inputEvent.tags_size(); ++i) { + const auto& tag = inputEvent.tags(i); + if(tag.empty()) { + logger::error("Invalid event tags: tag[{}] empty string", i); + return false; + } + if(tag.length() > EVENT_VALIDATION_TAG_LENGTH) { + logger::error("Invalid event tags: tag[{}] length {} exceeds {}", i, tag.length(), EVENT_VALIDATION_TAG_LENGTH); + return false; + } + } + + // Event extras + if(inputEvent.extras_size() > EVENT_VALIDATION_MAX_EXTRAS) { + logger::error("Invalid event extras: number of extras {} exceeds {}", inputEvent.extras_size(), EVENT_VALIDATION_MAX_EXTRAS); + return false; } - if(logResponse) { - logger::info("Response: {}", r.text); + int index = 0; + for(const auto& extra : inputEvent.extras()) { + const auto& key = extra.first; + const auto& value = extra.second; + if(key.empty()) { + logger::error("Invalid event extras: extra[{}] key empty string", index); + return false; + } + if(key.length() > EVENT_VALIDATION_EXTRA_KEY_LENGTH) { + logger::error("Invalid event extras: extra[{}] key length {} exceeds {}", index, key.length(), EVENT_VALIDATION_EXTRA_KEY_LENGTH); + return false; + } + if(value.length() > EVENT_VALIDATION_EXTRA_VALUE_LENGTH) { + logger::error("Invalid event extras: extra[{}] value length {} exceeds {}", index, value.length(), EVENT_VALIDATION_EXTRA_VALUE_LENGTH); + return false; + } + index++; } + + // Associate files + if(inputEvent.associate_files_size() > EVENT_VALIDATION_MAX_ASSOCIATE_FILES) { + logger::error( + "Invalid associate files: number of associate files {} exceeds {}", inputEvent.associate_files_size(), EVENT_VALIDATION_MAX_ASSOCIATE_FILES); + return false; + } + + return true; } void EventsManager::cacheEvents() { + // Create a unique directory and save the protobuf message for each event in the eventBuffer logger::info("Caching events"); - // for each event, create a unique directory, save protobuf message and associated files std::lock_guard lock(eventBufferMutex); - for(auto& eventM : eventBuffer) { - auto& event = eventM->event; - auto& data = eventM->data; - std::filesystem::path p(cacheDir); - p = p / ("event_" + event->name() + "_" + event->nonce()); - std::string eventDir = p.string(); + for(const auto& event : eventBuffer) { + std::filesystem::path path(cacheDir); + path = path / ("event_" + event->name() + "_" + std::to_string(event->created_at())); + std::string eventDir = path.string(); logger::info("Caching event to {}", eventDir); if(!std::filesystem::exists(cacheDir)) { std::filesystem::create_directories(cacheDir); } std::filesystem::create_directory(eventDir); - std::ofstream eventFile(p / "event.pb", std::ios::binary); + std::ofstream eventFile(path / "event.pb", std::ios::binary); event->SerializeToOstream(&eventFile); - for(auto& file : data) { - file->toFile(eventDir); - } } eventBuffer.clear(); } -void EventsManager::uploadCachedData() { - // iterate over all directories in cacheDir, read event.pb and associated files, and send them - logger::info("Uploading cached data"); - if(!checkConnection()) { - return; +void EventsManager::cacheSnapData(std::deque>& inputSnapBatch) { + // Create a unique directory and save the snapData + logger::info("Caching snapData"); + for(const auto& snap : inputSnapBatch) { + std::filesystem::path path(cacheDir); + path = path / ("snap_" + snap->event->name() + "_" + std::to_string(snap->event->created_at())); + std::string snapDir = path.string(); + logger::info("Caching snap to {}", snapDir); + if(!std::filesystem::exists(cacheDir)) { + std::filesystem::create_directories(cacheDir); + } + std::filesystem::create_directory(snapDir); + std::ofstream eventFile(path / "snap.pb", std::ios::binary); + snap->event->SerializeToOstream(&eventFile); + for(auto& file : snap->fileGroup->fileData) { + file->toFile(path); + } } - // check if cacheDir exists - if(!std::filesystem::exists(cacheDir)) { - logger::warn("Cache directory does not exist"); +} + +void EventsManager::uploadCachedData() { + // Iterate over the directories in cacheDir, add events and snapsData to buffers + if(!checkForCachedData()) { + logger::warn("Cache directory is empty, no cached data will be uploaded"); return; } + logger::info("Uploading cached data"); + for(const auto& entry : std::filesystem::directory_iterator(cacheDir)) { - if(entry.is_directory()) { - const auto& eventDir = entry.path(); - std::ifstream eventFile(eventDir / "event.pb", std::ios::binary); - proto::event::Event event; - event.ParseFromIstream(&eventFile); - std::vector> data; - for(const auto& fileEntry : std::filesystem::directory_iterator(eventDir)) { - if(fileEntry.is_regular_file() && fileEntry.path() != eventDir / "event.pb") { - auto fileData = std::make_shared(fileEntry.path().string()); - data.push_back(fileData); + if(!entry.is_directory()) { + continue; + } + + if(entry.path().filename().string().rfind("event", 0) == 0) { + std::ifstream eventFile(entry.path() / "event.pb", std::ios::binary); + auto event = std::make_shared(); + event->ParseFromIstream(&eventFile); + std::lock_guard lock(eventBufferMutex); + eventBuffer.push_back(std::move(event)); + // Clear entries added to the eventBuffer + clearCachedData(entry.path()); + + } else if(entry.path().filename().string().rfind("snap", 0) == 0) { + std::ifstream snapFile(entry.path() / "snap.pb", std::ios::binary); + auto snapData = std::make_unique(); + auto event = std::make_shared(); + auto fileGroup = std::make_shared(); + event->ParseFromIstream(&snapFile); + for(const auto& fileEntry : std::filesystem::directory_iterator(entry.path())) { + if(fileEntry.is_regular_file() && fileEntry.path() != entry.path() / "snap.pb") { + auto fileData = std::make_shared(fileEntry.path(), fileEntry.path().filename().string()); + fileGroup->fileData.push_back(fileData); } } - std::lock_guard lock(eventBufferMutex); - auto eventPtr = std::make_shared(event); - auto eventMessage = std::make_shared(); - eventMessage->event = eventPtr; - eventMessage->data = data; - eventMessage->cachePath = eventDir.string(); - eventBuffer.push_back(eventMessage); + snapData->event = event; + snapData->fileGroup = fileGroup; + std::lock_guard lock(snapBufferMutex); + snapBuffer.push_back(std::move(snapData)); + // Clear entries added to the snapBuffer + clearCachedData(entry.path()); } } } bool EventsManager::checkForCachedData() { - // check if cacheDir exists if(!std::filesystem::exists(cacheDir)) { - logger::warn("Cache directory does not exist"); return false; } return std::any_of( std::filesystem::directory_iterator(cacheDir), std::filesystem::directory_iterator(), [](const auto& entry) { return entry.is_directory(); }); } -void EventsManager::setCacheDir(const std::string& cacheDir) { - this->cacheDir = cacheDir; -} - -void EventsManager::setUrl(const std::string& url) { - this->url = url; -} - -void EventsManager::setSourceAppId(const std::string& sourceAppId) { - this->sourceAppId = sourceAppId; +void EventsManager::clearCachedData(const std::filesystem::path& directory) { + if(!checkForCachedData()) { + return; + } + std::error_code ec; + std::filesystem::remove_all(directory, ec); + if(ec) { + logger::error("Failed to remove cache directory {}: {}", directory.string(), ec.message()); + } else { + logger::info("Cleared cache directory {}", directory.string()); + } } -void EventsManager::setSourceAppIdentifier(const std::string& sourceAppIdentifier) { - this->sourceAppIdentifier = sourceAppIdentifier; +void EventsManager::setCacheDir(const std::string& cacheDir) { + this->cacheDir = cacheDir; } void EventsManager::setToken(const std::string& token) { this->token = token; } -bool EventsManager::checkConnection() { - cpr::Response r = cpr::Get(cpr::Url{url + "/health"}, cpr::VerifySsl(verifySsl)); - if(r.status_code != cpr::status::HTTP_OK) { - logger::error("Failed to connect to events service: {} {}", r.text, r.status_code); - return false; - } - logger::info("Connected to events service"); - return true; -} -std::string EventsManager::createUUID() { - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution<> dis(0, 15); - std::uniform_int_distribution<> dis2(8, 11); - - std::stringstream ss; - int i = 0; - ss << std::hex; - for(i = 0; i < 8; i++) { - ss << dis(gen); - } - ss << "-"; - for(i = 0; i < 4; i++) { - ss << dis(gen); - } - ss << "-4"; - for(i = 0; i < 3; i++) { - ss << dis(gen); - } - ss << "-"; - ss << dis2(gen); - for(i = 0; i < 3; i++) { - ss << dis(gen); - } - ss << "-"; - for(i = 0; i < 12; i++) { - ss << dis(gen); - }; - return ss.str(); -} -void EventsManager::setQueueSize(uint64_t queueSize) { - this->queueSize = queueSize; -} void EventsManager::setLogResponse(bool logResponse) { this->logResponse = logResponse; } -void EventsManager::setDeviceSerialNumber(const std::string& deviceSerialNumber) { - this->deviceSerialNumber = deviceSerialNumber; -} + void EventsManager::setVerifySsl(bool verifySsl) { this->verifySsl = verifySsl; } + void EventsManager::setCacheIfCannotSend(bool cacheIfCannotSend) { this->cacheIfCannotSend = cacheIfCannotSend; } + } // namespace utility } // namespace dai