Skip to content

Commit 6451ee2

Browse files
authored
Merge pull request #1657 from luxonis/feature/Snaps_cache_upload_during_runtime
Cached snaps and events are now gradually uploaded during runtime, mi…
2 parents 1e2efff + ffaecf9 commit 6451ee2

File tree

2 files changed

+116
-44
lines changed

2 files changed

+116
-44
lines changed

include/depthai/utility/EventsManager.hpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,10 @@ class EventsManager {
193193

194194
/**
195195
* Fetch the configuration limits and quotas for snaps & events
196+
* @param retryOnFail Retry fetching on failure; when true, keeps retrying until successful
196197
* @return bool
197198
*/
198-
bool fetchConfigurationLimits();
199+
bool fetchConfigurationLimits(const bool retryOnFail);
199200
/**
200201
* Prepare a batch of file groups from inputSnapBatch
201202
*/
@@ -227,10 +228,13 @@ class EventsManager {
227228
*/
228229
void cacheSnapData(std::deque<std::shared_ptr<SnapData>>& inputSnapBatch);
229230
/**
230-
* Upload cached data to the events service
231-
* @return void
231+
* Upload cached events to the events service
232+
*/
233+
void uploadCachedEvents();
234+
/**
235+
* Upload cached snaps to the events service
232236
*/
233-
void uploadCachedData();
237+
void uploadCachedSnaps();
234238
/**
235239
* Check if there's any cached data in the filesystem
236240
*/
@@ -259,6 +263,7 @@ class EventsManager {
259263
std::mutex stopThreadConditionMutex;
260264
std::atomic<bool> stopUploadThread;
261265
std::atomic<bool> configurationLimitsFetched;
266+
std::atomic<bool> connectionEstablished;
262267
std::condition_variable eventBufferCondition;
263268

264269
uint64_t maxFileSizeBytes;

src/utility/EventsManager.cpp

Lines changed: 107 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,41 @@ std::string generateLocalID(int64_t timestamp) {
1717
oss << timestamp << "_" << std::setw(6) << std::setfill('0') << counter++;
1818
return oss.str();
1919
}
20+
21+
const std::unordered_map<std::string, std::string> extensionToMimeTypeMap = {
22+
{".html", "text/html"},
23+
{".css", "text/css"},
24+
{".js", "application/javascript"},
25+
{".png", "image/png"},
26+
{".jpg", "image/jpeg"},
27+
{".jpeg", "image/jpeg"},
28+
{".gif", "image/gif"},
29+
{".webp", "image/webp"},
30+
{".bmp", "image/bmp"},
31+
{".tiff", "image/tiff"},
32+
{".svg", "image/svg+xml"},
33+
{".json", "application/json"},
34+
{".txt", "text/plain"},
35+
{".annotation", "application/x-protobuf; proto=SnapAnnotation"},
36+
{"", "application/octet-stream"},
37+
};
38+
39+
const std::unordered_map<std::string, std::string> mimeTypeToExtensionMap = {
40+
{"text/html", ".html"},
41+
{"text/css", ".css"},
42+
{"application/javascript", ".js"},
43+
{"image/png", ".png"},
44+
{"image/jpeg", ".jpg"},
45+
{"image/gif", ".gif"},
46+
{"image/webp", ".webp"},
47+
{"image/bmp", ".bmp"},
48+
{"image/tiff", ".tiff"},
49+
{"image/svg+xml", ".svg"},
50+
{"application/json", ".json"},
51+
{"text/plain", ".txt"},
52+
{"application/x-protobuf; proto=SnapAnnotation", ".annotation"},
53+
{"application/octet-stream", ""},
54+
};
2055
} // namespace
2156

2257
#include "Environment.hpp"
@@ -131,17 +166,6 @@ FileData::FileData(std::string data, std::string fileName, std::string mimeType)
131166
classification(proto::event::PrepareFileUploadClass::UNKNOWN_FILE) {}
132167

133168
FileData::FileData(std::filesystem::path filePath, std::string fileName) : fileName(std::move(fileName)) {
134-
static const std::unordered_map<std::string, std::string> mimeTypeExtensionMap = {{".html", "text/html"},
135-
{".htm", "text/html"},
136-
{".css", "text/css"},
137-
{".js", "application/javascript"},
138-
{".png", "image/png"},
139-
{".jpg", "image/jpeg"},
140-
{".jpeg", "image/jpeg"},
141-
{".gif", "image/gif"},
142-
{".svg", "image/svg+xml"},
143-
{".json", "application/json"},
144-
{".txt", "text/plain"}};
145169
// Read the data
146170
std::ifstream fileStream(filePath, std::ios::binary | std::ios::ate);
147171
if(!fileStream) {
@@ -154,15 +178,17 @@ FileData::FileData(std::filesystem::path filePath, std::string fileName) : fileN
154178
size = data.size();
155179
checksum = calculateSHA256Checksum(data);
156180
// Determine the mime type
157-
auto it = mimeTypeExtensionMap.find(filePath.extension().string());
158-
if(it != mimeTypeExtensionMap.end()) {
181+
auto it = extensionToMimeTypeMap.find(filePath.extension().string());
182+
if(it != extensionToMimeTypeMap.end()) {
159183
mimeType = it->second;
160184
} else {
161185
mimeType = "application/octet-stream";
162186
}
163187
static const std::unordered_set<std::string> imageMimeTypes = {"image/jpeg", "image/png", "image/webp", "image/bmp", "image/tiff"};
164188
if(imageMimeTypes.find(mimeType) != imageMimeTypes.end()) {
165189
classification = proto::event::PrepareFileUploadClass::IMAGE_COLOR;
190+
} else if(mimeType == "application/x-protobuf; proto=SnapAnnotation") {
191+
classification = proto::event::PrepareFileUploadClass::ANNOTATION;
166192
} else {
167193
classification = proto::event::PrepareFileUploadClass::UNKNOWN_FILE;
168194
}
@@ -239,7 +265,11 @@ bool FileData::toFile(const std::filesystem::path& inputPath) {
239265
logger::error("Filename is empty");
240266
return false;
241267
}
242-
std::string extension = mimeType == "image/jpeg" ? ".jpg" : ".txt";
268+
std::string extension;
269+
auto mimeIt = mimeTypeToExtensionMap.find(mimeType);
270+
if(mimeIt != mimeTypeToExtensionMap.end()) {
271+
extension = mimeIt->second;
272+
}
243273
// Choose a unique filename
244274
std::filesystem::path target = inputPath / (fileName + extension);
245275
for(int i = 1; std::filesystem::exists(target); ++i) {
@@ -284,18 +314,15 @@ EventsManager::EventsManager(bool uploadCachedOnStart)
284314
// Thread handling preparation and uploads
285315
uploadThread = std::make_unique<std::thread>([this]() {
286316
// Fetch configuration limits when starting the new thread
287-
configurationLimitsFetched = fetchConfigurationLimits();
288-
auto currentTime = std::chrono::steady_clock::now();
289-
auto nextTime = currentTime + std::chrono::hours(1);
317+
configurationLimitsFetched = fetchConfigurationLimits(true);
290318
while(!stopUploadThread) {
291-
// Hourly check for fetching configuration and limits
292-
currentTime = std::chrono::steady_clock::now();
293-
if(currentTime >= nextTime) {
294-
fetchConfigurationLimits();
295-
nextTime += std::chrono::hours(1);
296-
if(remainingStorageBytes <= warningStorageBytes) {
297-
logger::warn("Current remaining storage is running low: {} MB", remainingStorageBytes / (1024 * 1024));
298-
}
319+
connectionEstablished = fetchConfigurationLimits(false);
320+
if(remainingStorageBytes <= warningStorageBytes) {
321+
logger::warn("Current remaining storage is running low: {} MB", remainingStorageBytes / (1024 * 1024));
322+
}
323+
// Add cached snaps (if any) to the snapBuffer
324+
if(connectionEstablished) {
325+
uploadCachedSnaps();
299326
}
300327
// Prepare the batch first to reduce contention
301328
std::deque<std::shared_ptr<SnapData>> snapBatch;
@@ -323,10 +350,8 @@ EventsManager::EventsManager(bool uploadCachedOnStart)
323350
eventBufferCondition.wait_for(lock, std::chrono::seconds(static_cast<int>(this->publishInterval)), [this]() { return stopUploadThread.load(); });
324351
}
325352
});
326-
// Upload or clear previously cached data on start
327-
if(uploadCachedOnStart) {
328-
uploadCachedData();
329-
} else {
353+
// If upload on start is not set, clear the previously cached data when starting
354+
if(!uploadCachedOnStart) {
330355
clearCachedData(cacheDir);
331356
}
332357
}
@@ -339,7 +364,7 @@ EventsManager::~EventsManager() {
339364
}
340365
}
341366

342-
bool EventsManager::fetchConfigurationLimits() {
367+
bool EventsManager::fetchConfigurationLimits(const bool retryOnFail) {
343368
logger::info("Fetching configuration limits");
344369
if(token.empty()) {
345370
logger::warn("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use the setToken method");
@@ -368,6 +393,10 @@ bool EventsManager::fetchConfigurationLimits() {
368393
}));
369394
if(response.status_code != cpr::status::HTTP_OK) {
370395
logger::error("Failed to fetch configuration limits, status code: {}", response.status_code);
396+
// Fetching should be retried indefinetly only on startup
397+
if(!retryOnFail) {
398+
return false;
399+
}
371400

372401
// Apply exponential backoff
373402
auto factor = std::pow(uploadRetryPolicy.factor, ++retryAttempt);
@@ -628,6 +657,11 @@ bool EventsManager::uploadFile(std::shared_ptr<FileData> fileData, std::string u
628657
}
629658

630659
void EventsManager::uploadEventBatch() {
660+
// Add cached events, if any
661+
if(connectionEstablished) {
662+
uploadCachedEvents();
663+
}
664+
631665
auto eventBatch = std::make_unique<proto::event::BatchUploadEvents>();
632666
{
633667
std::lock_guard<std::mutex> lock(eventBufferMutex);
@@ -915,8 +949,11 @@ void EventsManager::cacheEvents() {
915949
logger::info("Caching events");
916950
std::lock_guard<std::mutex> lock(eventBufferMutex);
917951
for(const auto& eventData : eventBuffer) {
918-
std::filesystem::path path(cacheDir);
919-
path = path / ("event_" + eventData->event->name() + "_" + std::to_string(eventData->event->created_at()));
952+
std::filesystem::path inputPath(cacheDir);
953+
std::filesystem::path path = inputPath / (fmt::format("event_{}_{}", eventData->event->name(), eventData->event->created_at()));
954+
for(int i = 1; std::filesystem::exists(path); ++i) {
955+
path = inputPath / (fmt::format("event_{}_{}_{}", eventData->event->name(), eventData->event->created_at(), i));
956+
}
920957
std::string eventDir = path.string();
921958
logger::info("Caching event to {}", eventDir);
922959
if(!std::filesystem::exists(cacheDir)) {
@@ -933,8 +970,12 @@ void EventsManager::cacheSnapData(std::deque<std::shared_ptr<SnapData>>& inputSn
933970
// Create a unique directory and save the snapData
934971
logger::info("Caching snapData");
935972
for(const auto& snap : inputSnapBatch) {
936-
std::filesystem::path path(cacheDir);
937-
path = path / ("snap_" + snap->eventData->event->name() + "_" + std::to_string(snap->eventData->event->created_at()));
973+
std::filesystem::path inputPath(cacheDir);
974+
std::filesystem::path path = inputPath / ("snap_" + snap->eventData->event->name() + "_" + std::to_string(snap->eventData->event->created_at()));
975+
for(int i = 1; std::filesystem::exists(path); ++i) {
976+
path =
977+
inputPath / ("snap_" + snap->eventData->event->name() + "_" + std::to_string(snap->eventData->event->created_at()) + "_" + std::to_string(i));
978+
}
938979
std::string snapDir = path.string();
939980
logger::info("Caching snap to {}", snapDir);
940981
if(!std::filesystem::exists(cacheDir)) {
@@ -949,13 +990,12 @@ void EventsManager::cacheSnapData(std::deque<std::shared_ptr<SnapData>>& inputSn
949990
}
950991
}
951992

952-
void EventsManager::uploadCachedData() {
953-
// Iterate over the directories in cacheDir, add events and snapsData to buffers
993+
void EventsManager::uploadCachedEvents() {
994+
// Iterate over the directories in cacheDir, add events to eventBuffer
954995
if(!checkForCachedData()) {
955-
logger::warn("Cache directory is empty, no cached data will be uploaded");
956996
return;
957997
}
958-
logger::info("Uploading cached data");
998+
logger::info("Uploading cached events");
959999

9601000
for(const auto& entry : std::filesystem::directory_iterator(cacheDir)) {
9611001
if(!entry.is_directory()) {
@@ -967,13 +1007,34 @@ void EventsManager::uploadCachedData() {
9671007
auto eventData = std::make_unique<EventData>();
9681008
auto event = std::make_shared<proto::event::Event>();
9691009
event->ParseFromIstream(&eventFile);
1010+
1011+
// Cached events should be added only if there is enough space in the upcoming request
9701012
std::lock_guard<std::mutex> lock(eventBufferMutex);
1013+
if(eventBuffer.size() >= eventsPerRequest) {
1014+
return;
1015+
}
1016+
9711017
eventData->event = event;
9721018
eventBuffer.push_back(std::move(eventData));
9731019
// Clear entries added to the eventBuffer
9741020
clearCachedData(entry.path());
1021+
}
1022+
}
1023+
}
1024+
1025+
void EventsManager::uploadCachedSnaps() {
1026+
// Iterate over the directories in cacheDir, add snaps to snapBuffer
1027+
if(!checkForCachedData()) {
1028+
return;
1029+
}
1030+
logger::info("Uploading cached snaps");
9751031

976-
} else if(entry.path().filename().string().rfind("snap", 0) == 0) {
1032+
for(const auto& entry : std::filesystem::directory_iterator(cacheDir)) {
1033+
if(!entry.is_directory()) {
1034+
continue;
1035+
}
1036+
1037+
if(entry.path().filename().string().rfind("snap", 0) == 0) {
9771038
std::ifstream snapFile(entry.path() / "snap.pb", std::ios::binary);
9781039
auto snapData = std::make_unique<SnapData>();
9791040
auto eventData = std::make_shared<EventData>();
@@ -982,14 +1043,20 @@ void EventsManager::uploadCachedData() {
9821043
event->ParseFromIstream(&snapFile);
9831044
for(const auto& fileEntry : std::filesystem::directory_iterator(entry.path())) {
9841045
if(fileEntry.is_regular_file() && fileEntry.path() != entry.path() / "snap.pb") {
985-
auto fileData = std::make_shared<FileData>(fileEntry.path(), fileEntry.path().filename().string());
1046+
auto fileData = std::make_shared<FileData>(fileEntry.path(), fileEntry.path().stem().string());
9861047
fileGroup->fileData.push_back(fileData);
9871048
}
9881049
}
9891050
snapData->eventData = eventData;
9901051
snapData->eventData->event = event;
9911052
snapData->fileGroup = fileGroup;
1053+
1054+
// Cached snaps should be added only if there is enough space in the upcoming request
9921055
std::lock_guard<std::mutex> lock(snapBufferMutex);
1056+
if(snapBuffer.size() >= maxGroupsPerBatch) {
1057+
return;
1058+
}
1059+
9931060
snapBuffer.push_back(std::move(snapData));
9941061
// Clear entries added to the snapBuffer
9951062
clearCachedData(entry.path());

0 commit comments

Comments
 (0)