2020#include " CCDB/CcdbApi.h"
2121#include " CommonConstants/LHCConstants.h"
2222#include " Framework/Signpost.h"
23- #include < typeinfo>
2423#include < TError.h>
2524#include < TMemFile.h>
26- #include < functional>
2725
2826O2_DECLARE_DYNAMIC_LOG (ccdb);
2927
@@ -159,6 +157,55 @@ CCDBHelpers::ParserResult CCDBHelpers::parseRemappings(char const* str)
159157 }
160158}
161159
160+ void initialiseHelper (CCDBFetcherHelper& helper, ConfigParamRegistry const & options, std::vector<o2::framework::OutputRoute> const & outputRoutes)
161+ {
162+ std::unordered_map<std::string, bool > accountedSpecs;
163+ auto defHost = options.get <std::string>(" condition-backend" );
164+ auto checkRate = options.get <int >(" condition-tf-per-query" );
165+ auto checkMult = options.get <int >(" condition-tf-per-query-multiplier" );
166+ helper.timeToleranceMS = options.get <int64_t >(" condition-time-tolerance" );
167+ helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int >::max ();
168+ helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1 ;
169+ LOGP (info, " CCDB Backend at: {}, validity check for every {} TF{}" , defHost, helper.queryPeriodGlo , helper.queryPeriodFactor == 1 ? std::string{} : fmt::format (" , (query for high-rate objects downscaled by {})" , helper.queryPeriodFactor ));
170+ LOGP (info, " Hook to enable signposts for CCDB messages at {}" , (void *)&private_o2_log_ccdb->stacktrace );
171+ auto remapString = options.get <std::string>(" condition-remap" );
172+ CCDBHelpers::ParserResult result = CCDBHelpers::parseRemappings (remapString.c_str ());
173+ if (!result.error .empty ()) {
174+ throw runtime_error_f (" Error while parsing remapping string %s" , result.error .c_str ());
175+ }
176+ helper.remappings = result.remappings ;
177+ helper.apis [" " ].init (defHost); // default backend
178+ LOGP (info, " Initialised default CCDB host {}" , defHost);
179+ //
180+ for (auto & entry : helper.remappings ) { // init api instances for every host seen in the remapping
181+ if (helper.apis .find (entry.second ) == helper.apis .end ()) {
182+ helper.apis [entry.second ].init (entry.second );
183+ LOGP (info, " Initialised custom CCDB host {}" , entry.second );
184+ }
185+ LOGP (info, " {} is remapped to {}" , entry.first , entry.second );
186+ }
187+ helper.createdNotBefore = std::to_string (options.get <int64_t >(" condition-not-before" ));
188+ helper.createdNotAfter = std::to_string (options.get <int64_t >(" condition-not-after" ));
189+
190+ for (auto & route : outputRoutes) {
191+ if (route.matcher .lifetime != Lifetime::Condition) {
192+ continue ;
193+ }
194+ auto specStr = DataSpecUtils::describe (route.matcher );
195+ if (accountedSpecs.find (specStr) != accountedSpecs.end ()) {
196+ continue ;
197+ }
198+ accountedSpecs[specStr] = true ;
199+ helper.routes .push_back (route);
200+ LOGP (info, " The following route is a condition {}" , DataSpecUtils::describe (route.matcher ));
201+ for (auto & metadata : route.matcher .metadata ) {
202+ if (metadata.type == VariantType::String) {
203+ LOGP (info, " - {}: {}" , metadata.name , metadata.defaultValue .asString ());
204+ }
205+ }
206+ }
207+ }
208+
162209auto getOrbitResetTime (std::pmr::vector<char > const & v) -> Long64_t
163210{
164211 Int_t previousErrorLevel = gErrorIgnoreLevel ;
@@ -261,19 +308,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
261308 LOGP (detail, " ******** Default entry used for {} ********" , path);
262309 }
263310 helper->mapURL2UUID [path].lastCheckedTF = timingInfo.tfCounter ;
264- if (etag.empty ()) {
265- helper->mapURL2UUID [path].etag = headers[" ETag" ]; // update uuid
266- helper->mapURL2UUID [path].cachePopulatedAt = timestampToUse;
267- helper->mapURL2UUID [path].cacheMiss ++;
268- helper->mapURL2UUID [path].minSize = std::min (v.size (), helper->mapURL2UUID [path].minSize );
269- helper->mapURL2UUID [path].maxSize = std::max (v.size (), helper->mapURL2UUID [path].maxSize );
270- api.appendFlatHeader (v, headers);
271- auto cacheId = allocator.adoptContainer (output, std::move (v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB );
272- helper->mapURL2DPLCache [path] = cacheId;
273- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Caching %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
274- continue ;
275- }
276- if (v.size ()) { // but should be overridden by fresh object
311+ if (etag.empty () || v.size ()) { // but should be overridden by fresh object
277312 // somewhere here pruneFromCache should be called
278313 helper->mapURL2UUID [path].etag = headers[" ETag" ]; // update uuid
279314 helper->mapURL2UUID [path].cachePopulatedAt = timestampToUse;
@@ -283,15 +318,16 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
283318 helper->mapURL2UUID [path].maxSize = std::max (v.size (), helper->mapURL2UUID [path].maxSize );
284319 api.appendFlatHeader (v, headers);
285320 auto cacheId = allocator.adoptContainer (output, std::move (v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB );
321+ if (v.size ()) {
322+ // one could modify the adoptContainer to take optional old cacheID to clean:
323+ // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
324+ }
286325 helper->mapURL2DPLCache [path] = cacheId;
287326 O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " populateCacheWith" , " Caching %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
288- // one could modify the adoptContainer to take optional old cacheID to clean:
289- // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
290327 continue ;
291- } else {
292- // Only once the etag is actually used, we get the information on how long the object is valid
293- helper->mapURL2UUID [path].cacheValidUntil = headers[" Cache-Valid-Until" ].empty () ? 0 : std::stoul (headers[" Cache-Valid-Until" ]);
294328 }
329+ // Only once the etag is actually used, we get the information on how long the object is valid
330+ helper->mapURL2UUID [path].cacheValidUntil = headers[" Cache-Valid-Until" ].empty () ? 0 : std::stoul (headers[" Cache-Valid-Until" ]);
295331 }
296332 // cached object is fine
297333 auto cacheId = helper->mapURL2DPLCache [path];
@@ -307,51 +343,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
307343{
308344 return adaptStateful ([](CallbackService& callbacks, ConfigParamRegistry const & options, DeviceSpec const & spec) {
309345 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
310- std::unordered_map<std::string, bool > accountedSpecs;
311- auto defHost = options.get <std::string>(" condition-backend" );
312- auto checkRate = options.get <int >(" condition-tf-per-query" );
313- auto checkMult = options.get <int >(" condition-tf-per-query-multiplier" );
314- helper->timeToleranceMS = options.get <int64_t >(" condition-time-tolerance" );
315- helper->queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int >::max ();
316- helper->queryPeriodFactor = checkMult > 0 ? checkMult : 1 ;
317- LOGP (info, " CCDB Backend at: {}, validity check for every {} TF{}" , defHost, helper->queryPeriodGlo , helper->queryPeriodFactor == 1 ? std::string{} : fmt::format (" , (query for high-rate objects downscaled by {})" , helper->queryPeriodFactor ));
318- LOGP (info, " Hook to enable signposts for CCDB messages at {}" , (void *)&private_o2_log_ccdb->stacktrace );
319- auto remapString = options.get <std::string>(" condition-remap" );
320- ParserResult result = CCDBHelpers::parseRemappings (remapString.c_str ());
321- if (!result.error .empty ()) {
322- throw runtime_error_f (" Error while parsing remapping string %s" , result.error .c_str ());
323- }
324- helper->remappings = result.remappings ;
325- helper->apis [" " ].init (defHost); // default backend
326- LOGP (info, " Initialised default CCDB host {}" , defHost);
327- //
328- for (auto & entry : helper->remappings ) { // init api instances for every host seen in the remapping
329- if (helper->apis .find (entry.second ) == helper->apis .end ()) {
330- helper->apis [entry.second ].init (entry.second );
331- LOGP (info, " Initialised custom CCDB host {}" , entry.second );
332- }
333- LOGP (info, " {} is remapped to {}" , entry.first , entry.second );
334- }
335- helper->createdNotBefore = std::to_string (options.get <int64_t >(" condition-not-before" ));
336- helper->createdNotAfter = std::to_string (options.get <int64_t >(" condition-not-after" ));
337-
338- for (auto &route : spec.outputs ) {
339- if (route.matcher .lifetime != Lifetime::Condition) {
340- continue ;
341- }
342- auto specStr = DataSpecUtils::describe (route.matcher );
343- if (accountedSpecs.find (specStr) != accountedSpecs.end ()) {
344- continue ;
345- }
346- accountedSpecs[specStr] = true ;
347- helper->routes .push_back (route);
348- LOGP (info, " The following route is a condition {}" , DataSpecUtils::describe (route.matcher ));
349- for (auto & metadata : route.matcher .metadata ) {
350- if (metadata.type == VariantType::String) {
351- LOGP (info, " - {}: {}" , metadata.name , metadata.defaultValue .asString ());
352- }
353- }
354- }
346+ initialiseHelper (*helper, options, spec.outputs );
355347 // / Add a callback on stop which dumps the statistics for the caching per
356348 // / path
357349 callbacks.set <CallbackService::Id::Stop>([helper]() {
@@ -398,18 +390,9 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
398390 // FIXME: I should send a dummy message.
399391 return ;
400392 }
401- if (etag.empty ()) {
402- helper->mapURL2UUID [path].etag = headers[" ETag" ]; // update uuid
403- helper->mapURL2UUID [path].cacheMiss ++;
404- helper->mapURL2UUID [path].minSize = std::min (v.size (), helper->mapURL2UUID [path].minSize );
405- helper->mapURL2UUID [path].maxSize = std::max (v.size (), helper->mapURL2UUID [path].maxSize );
406- newOrbitResetTime = getOrbitResetTime (v);
407- api.appendFlatHeader (v, headers);
408- auto cacheId = allocator.adoptContainer (output, std::move (v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone );
409- helper->mapURL2DPLCache [path] = cacheId;
410- O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " fetchFromCCDB" , " Caching %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
411- } else if (v.size ()) { // but should be overridden by fresh object
412- // somewhere here pruneFromCache should be called
393+
394+ if (etag.empty () || v.size ()) {
395+ // Overwrite on cache miss
413396 helper->mapURL2UUID [path].etag = headers[" ETag" ]; // update uuid
414397 helper->mapURL2UUID [path].cacheMiss ++;
415398 helper->mapURL2UUID [path].minSize = std::min (v.size (), helper->mapURL2UUID [path].minSize );
@@ -418,9 +401,12 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
418401 api.appendFlatHeader (v, headers);
419402 auto cacheId = allocator.adoptContainer (output, std::move (v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone );
420403 helper->mapURL2DPLCache [path] = cacheId;
404+ if (v.size ()) { // but should be overridden by fresh object
405+ // somewhere here pruneFromCache should be called
406+ // one could modify the adoptContainer to take optional old cacheID to clean:
407+ // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
408+ }
421409 O2_SIGNPOST_EVENT_EMIT (ccdb, sid, " fetchFromCCDB" , " Caching %{public}s for %{public}s (DPL id %" PRIu64 " )" , path.data (), headers[" ETag" ].data (), cacheId.value );
422- // one could modify the adoptContainer to take optional old cacheID to clean:
423- // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
424410 }
425411 // cached object is fine
426412 }
0 commit comments