Skip to content

Commit eb2242b

Browse files
authored
Merge pull request #254 from Helene/topo_refresher
Add monitor observing if MetaData refresh is required
2 parents 7c5ce66 + 30fe5be commit eb2242b

File tree

6 files changed

+135
-11
lines changed

6 files changed

+135
-11
lines changed

source/collector.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from threading import Thread
3131
from metadata import MetadataHandler
3232
from bridgeLogger import getBridgeLogger
33+
from refresher import TopoRefreshManager
3334
from utils import classattributes, cond_execution_time, get_runtime_statistics
3435

3536

@@ -73,15 +74,8 @@ def parse_tags(self, filtersMap, defaultLabels):
7374
break
7475
# detected zimon key, do we need refresh local TOPO?
7576
if not found:
76-
cache_size = len(local_cache)
77-
local_cache.union(ident)
78-
updated_size = len(local_cache)
79-
if updated_size > cache_size:
80-
logger.trace(MSG['NewKeyDetected'].format('|'.join(ident)))
81-
md = MetadataHandler()
82-
Thread(name='AdHocMetaDataUpdate', target=md.update).start()
83-
else:
84-
logger.trace(MSG['NewKeyAlreadyReported'].format('|'.join(ident)))
77+
topoRefresher = TopoRefreshManager()
78+
topoRefresher.update_local_cache(ident)
8579

8680
constructedTags = dict(zip(defaultLabels, ident))
8781
if len(self.columnInfo.keys) == 1:

source/messages.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
'CollectorErr': 'Failed to initialize connection to pmcollector: {}, quitting',
3636
'MetaError': 'Metadata could not be retrieved. Check log file for more details, quitting',
3737
'MetaSuccess': 'Successfully retrieved MetaData',
38+
'RequestMetaUpdate': 'MetaData update requested',
39+
'ElapsedTimeSinceLastUpdate': 'Elapsed time:{} since last MetaData update',
3840
'QueryError': 'Query request could not be proceed. Reason: {}',
3941
'SearchErr': 'Search for {} did cause exception: {}',
4042
'LookupErr': 'Lookup for metric {} did not return any results',
@@ -66,6 +68,8 @@
6668
'FileAddedToWatch': 'The file {} added to watching files.',
6769
'StartWatchingFiles': 'Start watching file changes in the path {}.',
6870
'StopWatchingFiles': 'Stop watching file changes in the path {}.',
71+
'StartMonitoringTopoChanges': 'Start monitoring if a Metadata refresh needed.',
72+
'StopMonitoringTopoChanges': 'Stop monitoring Metadata refresh requests.',
6973
'PathNoCfgFiles': 'The path does not contain any configuration files.',
7074
'NewKeyDetected': 'Detected not known single ts identifiers {}.',
7175
'NewKeyAlreadyReported': 'Single ts identifiers {} already reported as not known.',

source/metadata.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@
2525
from queryHandler.QueryHandler import QueryHandler2 as QueryHandler
2626
from queryHandler.Topo import Topo
2727
from queryHandler import SensorConfig
28-
from utils import execution_time
28+
from utils import execution_time, synchronized
2929
from messages import ERR, MSG
3030
from metaclasses import Singleton
3131
from time import time, sleep
3232
from datetime import datetime
33+
from threading import Lock
3334

3435

35-
local_cache = []
36+
topoUpdateLock = Lock()
3637

3738

3839
class MetadataHandler(metaclass=Singleton):
@@ -189,6 +190,7 @@ def GET(self, **params):
189190
return resp
190191

191192
@execution_time()
193+
@synchronized(topoUpdateLock)
192194
def update(self, refresh_all=False):
193195
'''Read the topology from ZIMon and update
194196
the tables for metrics, keys, key elements (tag keys)

source/refresher.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
'''
2+
##############################################################################
3+
# Copyright 2023 IBM Corp.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
##############################################################################
17+
18+
Created on Dec 9, 2024
19+
20+
@author: HWASSMAN
21+
'''
22+
23+
import cherrypy
24+
import time
25+
from bridgeLogger import getBridgeLogger
26+
from utils import synchronized
27+
from metaclasses import Singleton
28+
from metadata import MetadataHandler
29+
from messages import MSG
30+
from threading import Thread, Lock
31+
32+
LOCK = Lock()
33+
34+
35+
class TopoRefreshManager(object, metaclass=Singleton):
36+
running = False
37+
thread = None
38+
refresh_delay_secs = 30
39+
40+
def __init__(self, call_func_on_change=None, *args, **kwargs):
41+
self._cached_stamp = {}
42+
self.logger = getBridgeLogger()
43+
self.update_required = False
44+
self.new_keys = set()
45+
self.call_func_on_change = call_func_on_change
46+
self.args = args
47+
self.kwargs = kwargs
48+
49+
def start_monitor(self):
50+
""" Function to start monitor in a thread"""
51+
self.running = True
52+
if not self.thread:
53+
self.thread = Thread(name='TopoRefreshManager', target=self.monitor)
54+
self.thread.start()
55+
cherrypy.engine.log('Started custom thread %r.' % self.thread.name)
56+
self.logger.debug(MSG['StartMonitoringTopoChanges'])
57+
58+
@synchronized(LOCK)
59+
def update_local_cache(self, key):
60+
""" Function to cache unknown key locally"""
61+
cache_size = len(self.new_keys)
62+
self.new_keys = self.new_keys.union(key)
63+
updated_size = len(self.new_keys)
64+
if updated_size > cache_size:
65+
self.logger.trace(MSG['NewKeyDetected'].format('|'.join(key)))
66+
self.update_required = True
67+
else:
68+
self.logger.trace(MSG['NewKeyAlreadyReported'].format('|'.join(key)))
69+
70+
@synchronized(LOCK)
71+
def clear_local_cache(self):
72+
""" Function to clear local cache after topo refresh"""
73+
self.new_keys.clear()
74+
self.update_required = False
75+
76+
def monitor(self):
77+
""" Function to keep watching in a loop """
78+
while self.running:
79+
try:
80+
time.sleep(self.refresh_delay_secs)
81+
if self.update_required:
82+
md = MetadataHandler()
83+
elapsed_time = time.time() - md.getUpdateTime
84+
self.logger.trace(MSG['ElapsedTimeSinceLastUpdate'].format(elapsed_time))
85+
if elapsed_time > 300 or self.new_keys > 100:
86+
self.logger.info(MSG['RequestMetaUpdate'])
87+
if self.call_func_on_change is not None:
88+
self.call_func_on_change(*self.args, **self.kwargs)
89+
self.clear_local_cache()
90+
except KeyboardInterrupt:
91+
self.logger.details(MSG['StopMonitoringTopoChanges'])
92+
break
93+
except Exception as e:
94+
self.logger.warning(
95+
MSG['UnexpecterError'].format(type(e).__name__))
96+
pass
97+
98+
def stop_monitor(self):
99+
""" Function to break monitoring """
100+
try:
101+
self.running = False
102+
if self.thread:
103+
self.thread.join()
104+
cherrypy.engine.log('Stopped custom thread %r.' % self.thread.name)
105+
self.thread = None
106+
except KeyboardInterrupt:
107+
print(f"Recived KeyboardInterrupt during stopping the thread {self.thread.name}")

source/utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import time
2424
import copy
25+
from threading import Lock
2526
from typing import Callable, TypeVar, Any
2627
from functools import wraps
2728
from messages import MSG
@@ -88,6 +89,18 @@ def no_outer(f: Callable[..., T]) -> Callable[..., T]:
8889
return outer if enabled else no_outer
8990

9091

92+
def synchronized(lock: Lock):
93+
"""Decorator which takes a Lock and runs the function under that Lock"""
94+
def funcWrapper(func):
95+
@wraps(func)
96+
def wrapper(*args, **kwargs):
97+
with lock:
98+
ret = func(*args, **kwargs)
99+
return ret
100+
return wrapper
101+
return funcWrapper
102+
103+
91104
def get_runtime_statistics(enabled: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
92105
""" Conditionally executes the passed through function f with profiling."""
93106

source/zimonGrafanaIntf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from opentsdb import OpenTsdbApi
4141
from prometheus import PrometheusExporter
4242
from profiler import Profiler
43+
from refresher import TopoRefreshManager
4344
from watcher import ConfigWatcher
4445
from cherrypy import _cperror
4546
from cherrypy.lib.cpstats import StatsPage
@@ -392,6 +393,9 @@ def main(argv):
392393
watcher = ConfigWatcher(files_to_watch, refresh_metadata, refresh_all=True)
393394
cherrypy.engine.subscribe('start', watcher.start_watch)
394395
cherrypy.engine.subscribe('stop', watcher.stop_watch)
396+
refresher = TopoRefreshManager(refresh_metadata, refresh_all=False)
397+
cherrypy.engine.subscribe('start', refresher.start_monitor)
398+
cherrypy.engine.subscribe('stop', refresher.stop_monitor)
395399
cherrypy.engine.start()
396400
cherrypy.engine.log('test')
397401
logger.info("%s", MSG['ConnApplications'].format(",\n ".join(registered_apps)))

0 commit comments

Comments
 (0)