Skip to content

Commit 29249f5

Browse files
committed
Updating fastmon agent and utils from the example agent in swf-testbed
1 parent 0544a3e commit 29249f5

File tree

2 files changed

+91
-112
lines changed

2 files changed

+91
-112
lines changed

src/swf_fastmon_agent/fastmon_utils.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
import random
1010
import re
1111
from datetime import datetime, timedelta
12-
from pathlib import Path
1312
from typing import List, Dict, Any
1413

14+
1515
# File status constants (matching Django FileStatus choices)
1616
class FileStatus:
1717
REGISTERED = 'registered'
@@ -22,14 +22,9 @@ class FileStatus:
2222

2323

2424
def validate_config(config: dict) -> None:
25-
"""Validate the configuration parameters."""
25+
"""Validate the configuration parameters for message-driven agent."""
2626
required_keys = [
27-
"watch_directories",
28-
"file_patterns",
29-
"check_interval",
30-
"lookback_time",
3127
"selection_fraction",
32-
"default_run_number",
3328
]
3429

3530
for key in required_keys:
@@ -135,12 +130,13 @@ def extract_run_number(file_path: Path, default_run_number: int) -> int:
135130
return default_run_number
136131

137132

138-
def calculate_checksum(file_path: Path, logger: logging.Logger) -> str:
133+
134+
def calculate_checksum(file_path: str, logger: logging.Logger) -> str:
139135
"""
140136
Calculate MD5 checksum of file.
141137
142138
Args:
143-
file_path: Path to the file
139+
file_path: Path to the file as string
144140
logger: Logger instance
145141
146142
Returns:
@@ -277,7 +273,8 @@ def record_stf_file(file_path: Path, config: dict, agent, logger: logging.Logger
277273
raise
278274

279275

280-
def simulate_tf_subsamples(stf_file: Dict[str, Any], config: dict, logger: logging.Logger) -> List[Dict[str, Any]]:
276+
def simulate_tf_subsamples(stf_file: Dict[str, Any], config: dict, logger: logging.Logger, agent_name: str) -> List[
277+
Dict[str, Any]]:
281278
"""
282279
Simulate creation of Time Frame (TF) subsamples from a Super Time Frame (STF) file.
283280
@@ -313,12 +310,12 @@ def simulate_tf_subsamples(stf_file: Dict[str, Any], config: dict, logger: loggi
313310
"tf_filename": tf_filename,
314311
"file_size_bytes": tf_size,
315312
"sequence_number": sequence_number,
316-
"stf_parent": stf_file.get("file_id", "fd6dc0c4-a55a-43fb-a023-3eaccb8dd35a"), # TODO: how to link to parent STF file?
313+
"stf_parent": stf_file.get("filename"), # Use unique filename as parent identifier
317314
"metadata": {
318315
"simulation": True,
319316
"created_from": stf_file.get('filename'),
320317
"tf_size_fraction": tf_size_fraction,
321-
"agent_name": config.get("agent_name", "swf-fastmon-agent"),
318+
"agent_name": agent_name,
322319
"state": stf_file.get('state'),
323320
"substate": stf_file.get('substate'),
324321
"start": stf_file.get('start'),
@@ -340,7 +337,6 @@ def record_tf_file(tf_metadata: Dict[str, Any], config: dict, agent, logger: log
340337
Record a Time Frame (TF) file in the database using REST API.
341338
342339
Args:
343-
stf_file: Parent STF file data dictionary
344340
tf_metadata: TF metadata dictionary from simulate_tf_subsamples
345341
config: Configuration dictionary
346342
agent: BaseAgent instance for API access
@@ -352,7 +348,7 @@ def record_tf_file(tf_metadata: Dict[str, Any], config: dict, agent, logger: log
352348
try:
353349
# Prepare FastMonFile data for API
354350
tf_file_data = {
355-
"stf_file": tf_metadata.get("stf_parent", None),
351+
"stf_file": tf_metadata.get("stf_parent", None), # STF filename as parent identifier
356352
"tf_filename": tf_metadata["tf_filename"],
357353
"file_size_bytes": tf_metadata["file_size_bytes"],
358354
"status": FileStatus.REGISTERED,
@@ -361,31 +357,32 @@ def record_tf_file(tf_metadata: Dict[str, Any], config: dict, agent, logger: log
361357

362358
# Create TF file record via FastMonFile API
363359
tf_file = agent.call_monitor_api('post', '/fastmon-files/', tf_file_data)
364-
logger.debug(f"Recorded TF file: {tf_metadata['tf_filename']} -> {tf_file['tf_file_id']}")
360+
tf_file_id = tf_file.get('tf_file_id') or tf_file.get('id') or 'unknown'
361+
logger.debug(f"Recorded TF file: {tf_metadata['tf_filename']} -> {tf_file_id}")
365362
return tf_file
366-
363+
367364
except Exception as e:
368365
logger.error(f"Error recording TF file {tf_metadata['tf_filename']}: {e}")
369366
return {}
370367

371368

372-
def create_sse_tf_message(tf_file: Dict[str, Any], stf_file: Dict[str, Any], agent_name: str) -> Dict[str, Any]:
369+
def create_tf_message(tf_file: Dict[str, Any], stf_file: Dict[str, Any], agent_name: str) -> Dict[str, Any]:
373370
"""
374-
Create an SSE-compatible message for TF file registration notifications.
375-
371+
Create a message for TF file registration notifications.
372+
376373
Args:
377374
tf_file: TF file data from the FastMonFile API
378375
stf_file: Parent STF file data
379376
agent_name: Name of the agent sending the message
380377
381378
Returns:
382-
SSE message dictionary ready for broadcasting
379+
Message dictionary ready for broadcasting
383380
"""
384381
from datetime import datetime
385-
386-
# Extract run number from STF file metadata
387-
run_number = stf_file.get('run', {}).get('run_number') if isinstance(stf_file.get('run'), dict) else stf_file.get('run')
388-
382+
383+
# Extract run number from message data
384+
run_number = stf_file.get('run_id')
385+
389386
message = {
390387
"msg_type": "tf_file_registered",
391388
"processed_by": agent_name,
@@ -402,18 +399,18 @@ def create_sse_tf_message(tf_file: Dict[str, Any], stf_file: Dict[str, Any], age
402399
return message
403400

404401

405-
def create_sse_status_message(agent_name: str, status: str, message_text: str, run_id: str = None) -> Dict[str, Any]:
402+
def create_status_message(agent_name: str, status: str, message_text: str, run_id: str = None) -> Dict[str, Any]:
406403
"""
407-
Create an SSE-compatible status message for agent notifications.
408-
404+
Create a status message for agent notifications.
405+
409406
Args:
410407
agent_name: Name of the agent sending the message
411408
status: Status of the operation (e.g., 'started', 'completed', 'error')
412409
message_text: Human-readable message describing the status
413410
run_id: Optional run identifier
414411
415412
Returns:
416-
SSE message dictionary ready for broadcasting
413+
Message dictionary ready for broadcasting
417414
"""
418415
from datetime import datetime
419416

0 commit comments

Comments
 (0)