Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Grasshopper/config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"scan_interval_secs": 86400,
"scan_interval_secs": 120,
"low_limit": 0,
"high_limit": 4194303,
"batch_broadcast_size": 10000,
Expand All @@ -25,5 +25,10 @@
"port": 5000,
"certfile": null,
"keyfile": null
}
},
"scavenge_enabled": true,
"scavenge_margin": 10,
"scavenge_max_range": 25,
"scavenge_gap_threshold": 5,
"scavenge_gap_max_range": null
}
66 changes: 54 additions & 12 deletions Grasshopper/grasshopper/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def grasshopper(config_path: str, **kwargs: Any) -> "Grasshopper":
if not config:
_log.info("Using Agent defaults for starting configuration.")

scan_interval_secs: int = config.get("scan_interval_secs", seconds_in_day)
scan_interval_secs: int = config.get("scan_interval_secs", 86400)
low_limit: int = config.get("low_limit", 0)
high_limit: int = config.get("high_limit", 4194303)
device_broadcast_full_step_size: int = config.get(
Expand Down Expand Up @@ -278,6 +278,12 @@ def configure(
},
)

self.scavenge_enabled: bool = contents.get("scavenge_enabled", True)
self.scavenge_margin: int = contents.get("scavenge_margin", 10)
self.scavenge_max_range: int = contents.get("scavenge_max_range", 25)
self.scavenge_gap_threshold: int = contents.get("scavenge_gap_threshold", 5)
self.scavenge_gap_max_range: int = contents.get("scavenge_gap_max_range", None)

if self.webapp_settings.get("enabled", False):
if self.http_server_process is not None:
self._stop_server()
Expand Down Expand Up @@ -458,26 +464,41 @@ def who_is_broadcast(self) -> None:
def extract_datetime(filename: str) -> datetime:
"""Convert a timestamped filename to a datetime object."""
datetime_str = filename.replace(".ttl", "")
# Replace first two hyphens after 'T' back to colons for time parsing
# This handles the format: YYYY-MM-DDTHH-MM-SS -> YYYY-MM-DDTHH:MM:SS
if "T" in datetime_str:
date_part, time_part = datetime_str.split("T")
time_part = time_part.replace("-", ":", 2) # Replace first 2 hyphens in time
datetime_str = f"{date_part}T{time_part}"
return datetime.fromisoformat(datetime_str)

def is_valid_filename(filename: str) -> bool:
"""Check if a filename matches the timestamped TTL format."""
pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.ttl$"
# Pattern matches YYYY-MM-DDTHH-MM-SS.ttl (note hyphens instead of colons)
pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.ttl$"
return bool(re.match(pattern, filename))

def find_latest_file(directory: str) -> Optional[str]:
"""Find the most recent timestamped TTL file in a directory."""
if not os.path.exists(directory):
_log.debug(f"TTL directory does not exist: {directory}")
return None

files = [
f
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
]
_log.debug(f"Found {len(files)} files in {directory}: {files[:5]}...") # Show first 5

valid_files = [f for f in files if is_valid_filename(f)]
_log.debug(f"Found {len(valid_files)} valid TTL files: {valid_files[:3]}...") # Show first 3

if not valid_files:
return None

latest_file = max(valid_files, key=extract_datetime)
_log.debug(f"Selected latest file: {latest_file}")
return latest_file

try:
Expand All @@ -486,26 +507,40 @@ def find_latest_file(directory: str) -> Optional[str]:
return

base_rdf_path = os.path.join(self.agent_data_path, "ttl/base.ttl")
recent_ttl_file = find_latest_file(
os.path.join(self.agent_data_path, "ttl")
)
ttl_directory = os.path.join(self.agent_data_path, "ttl")
recent_ttl_file = find_latest_file(ttl_directory)

prev_graph: Graph = Graph()
graph: Graph = Graph()

if os.path.exists(base_rdf_path):
_log.debug(f"Loading base graph from: {base_rdf_path}")
graph.parse(base_rdf_path, format="ttl")
_log.debug(f"Base graph loaded with {len(graph)} triples")

if recent_ttl_file:
prev_graph.parse(
os.path.join(self.agent_data_path, f"ttl/{recent_ttl_file}"),
format="ttl",
)
recent_ttl_path = os.path.join(ttl_directory, recent_ttl_file)
_log.info(f"Loading previous graph from: {recent_ttl_path}")
try:
prev_graph.parse(recent_ttl_path, format="ttl")
_log.info(f"Previous graph loaded successfully with {len(prev_graph)} triples")
except Exception as e:
_log.error(f"Failed to parse previous graph file {recent_ttl_path}: {e}")
prev_graph = Graph() # Reset to empty graph
else:
_log.info("No previous TTL files found for scavenge scanning")

now = datetime.now()

bbmds = self.config_retrieve_bbmd_devices()
subnets = self.config_retrieve_subnets()

_log.info(f"Initializing scanner with scavenge_enabled={self.scavenge_enabled}")
if prev_graph and len(prev_graph) > 0:
_log.info(f"Previous graph loaded with {len(prev_graph)} triples")
else:
_log.info("No previous graph available - will perform full scan")

scanner = bacpypes3_scanner(
self.bacpypes_settings,
prev_graph,
Expand All @@ -515,13 +550,18 @@ def find_latest_file(directory: str) -> Optional[str]:
self.device_broadcast_full_step_size,
self.low_limit,
self.high_limit,
self.scavenge_enabled,
self.scavenge_margin,
self.scavenge_max_range,
self.scavenge_gap_threshold,
self.scavenge_gap_max_range,
)
# This is a wrapper that returns None, but gevent.spawn expects a callable
# Using type ignore as this is a valid pattern even though the types don't
# align perfectly
# Spawn a task to run the async function
gevent.spawn(
self.run_async_function(scanner.get_device_and_router, graph) # type: ignore
self.run_async_function(scanner.get_device_and_router_with_scavenge, graph) # type: ignore
) # type: ignore

rdf_path = os.path.join(
Expand Down Expand Up @@ -640,10 +680,12 @@ def _start_server(

q: Queue = Queue()
processing_task_q: Queue = Queue()
finished_task_q: Queue = Queue()
app.state.task_queue = q
app.state.processing_task_queue = processing_task_q
app.state.finished_task_queue = finished_task_q

worker = Process(target=process_compare_rdf_queue, args=(q, processing_task_q))
worker = Process(target=process_compare_rdf_queue, args=(q, processing_task_q, finished_task_q))
worker.daemon = True
worker.start()
print(f"[serve_app] queue worker PID={worker.pid}")
Expand Down Expand Up @@ -900,4 +942,4 @@ def main() -> None:
main() # main() returns None, but we want to exit with code 0
sys.exit(0)
except KeyboardInterrupt:
pass
pass
Loading
Loading