Skip to content

Commit bbdf062

Browse files
committed
Modifying fastmon agent to use most recent version of DB schema in swf-monitor, and refactoring to use same workflow for continuous and message-driven mode
1 parent c5e186a commit bbdf062

File tree

5 files changed

+210
-25
lines changed

5 files changed

+210
-25
lines changed

CLAUDE.md

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,31 @@ The project is designed to work with PostgreSQL databases and ActiveMQ messaging
2828
- **Code Formatter**: Black
2929
- **License**: Apache 2.0
3030
- **Environment Variable**: `SWF_HOME` automatically set to parent directory containing all swf-* repos (via swf-testbed CLI)
31+
- **Architecture**: Extends BaseAgent from swf-common-lib for standardized agent behavior
32+
33+
## Recent Infrastructure Updates (2025-11)
34+
35+
### BaseAgent Integration
36+
The agent now inherits from **BaseAgent** (swf-common-lib) providing:
37+
- Automatic environment setup and .env loading
38+
- REST logging to swf-monitor
39+
- Sequential agent ID generation
40+
- Enhanced heartbeat with workflow metadata
41+
- Automatic subscriber registration
42+
- Connection resilience with auto-reconnection
43+
44+
### Workflow Tracking
45+
Integrated with swf-monitor's workflow tracking system:
46+
- Creates workflow stages via `/api/workflow-stages/`
47+
- Tracks statuses: `fastmon_received`, `fastmon_processing`, `fastmon_complete`
48+
- Records input/output messages and processing times
49+
- Enables end-to-end workflow visibility
50+
51+
### MQ Communications
52+
Updated to use swf-common-lib's mq_comms module:
53+
- Requires `client_id` parameter for durable subscriptions
54+
- SSL support with `MQ_CAFILE` environment variable
55+
- Standardized error handling and reconnection logic
3156

3257
## Project Structure
3358

@@ -115,8 +140,44 @@ This project integrates with:
115140
- **mypy**: Static type checking (>=1.0.0)
116141
- **django-stubs**: Django type stubs (>=1.13.0)
117142

118-
### Database Environment Variables
119-
Django settings support standard environment variables:
143+
### Environment Variables Configuration
144+
145+
The agent requires a `.env` file with the following variables:
146+
147+
**Monitor Connection:**
148+
- `SWF_MONITOR_URL` - HTTPS URL for authenticated API calls (required)
149+
- `SWF_MONITOR_HTTP_URL` - HTTP URL for REST logging (optional)
150+
- `SWF_API_TOKEN` - Authentication token for swf-monitor API (required)
151+
152+
**ActiveMQ Configuration:**
153+
- `ACTIVEMQ_HOST` - ActiveMQ broker host (default: localhost)
154+
- `ACTIVEMQ_PORT` - STOMP port (default: 61612)
155+
- `ACTIVEMQ_USER` - ActiveMQ username (required)
156+
- `ACTIVEMQ_PASSWORD` - ActiveMQ password (required)
157+
- `ACTIVEMQ_USE_SSL` - Enable SSL connections (true/false)
158+
- `ACTIVEMQ_SSL_CA_CERTS` - Path to CA certificate file
159+
160+
**MQ Communications (swf-common-lib):**
161+
- `MQ_USER` - Message queue username (required)
162+
- `MQ_PASSWD` - Message queue password (required)
163+
- `MQ_HOST` - Message queue host (required)
164+
- `MQ_PORT` - Message queue port (required)
165+
- `MQ_CAFILE` - SSL CA certificate path (required for SSL)
166+
167+
**Logging:**
168+
- `SWF_LOG_LEVEL` - Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
169+
- `SWF_STOMP_DEBUG` - Enable STOMP protocol debugging (true/false)
170+
- `SWF_AGENT_QUIET` - Minimal output mode (true/false)
171+
172+
**Agent Configuration:**
173+
- `FASTMON_MODE` - Operation mode: `message` (default) or `continuous`
174+
- `FASTMON_SELECTION_FRACTION` - STF sampling fraction (0.0-1.0, default: 0.1)
175+
- `FASTMON_TF_FILES_PER_STF` - TF files per STF (default: 7)
176+
177+
See `.env.example` for a complete template with all available options.
178+
179+
### Database Environment Variables (Django Legacy)
180+
Legacy Django settings (if needed for local development):
120181
- `POSTGRES_HOST` (default: localhost)
121182
- `POSTGRES_PORT` (default: 5432)
122183
- `POSTGRES_DB` (default: epic_monitoring)
@@ -129,17 +190,71 @@ Django settings support standard environment variables:
129190
- Database credentials (`.pgpass`) are excluded from version control
130191
- Log files are excluded from commits
131192

193+
## API Integration
194+
195+
The agent integrates with swf-monitor REST API endpoints:
196+
197+
### Core Endpoints Used
198+
- `POST /api/runs/` - Create/retrieve run records
199+
- `POST /api/stf-files/` - Register STF files (development mode only)
200+
- `POST /api/fastmon-files/` - Register TF files (primary endpoint)
201+
- `POST /api/workflow-stages/` - Create workflow stage tracking
202+
- `PATCH /api/workflow-stages/{id}/` - Update stage status and timestamps
203+
- `POST /api/subscribers/` - Auto-register as ActiveMQ subscriber (via BaseAgent)
204+
205+
### FastMonFile API Schema
206+
```json
207+
{
208+
"stf_file": "parent_stf_filename.stf",
209+
"tf_filename": "tf_001.tf",
210+
"file_size_bytes": 1234567,
211+
"status": "registered",
212+
"metadata": {
213+
"simulation": true,
214+
"created_from": "stf_filename.stf",
215+
"agent_name": "swf-fastmon-agent-1"
216+
}
217+
}
218+
```
219+
220+
### Workflow Stage Tracking
221+
The agent creates and updates workflow stages for each STF processed:
222+
```python
223+
# Create stage
224+
stage_data = {
225+
'workflow': workflow_id,
226+
'agent_name': 'swf-fastmon-agent-1',
227+
'agent_type': 'fastmon',
228+
'status': 'fastmon_received',
229+
'input_message': {...}
230+
}
231+
232+
# Update during processing
233+
{'status': 'fastmon_processing', 'started_at': '2025-11-19T10:30:00Z'}
234+
235+
# Mark complete
236+
{'status': 'fastmon_complete', 'completed_at': '2025-11-19T10:30:15Z', 'output_message': {...}}
237+
```
238+
132239
## Development Commands
133240

134241
### System Initialization
135242
```bash
136243
cd $SWF_PARENT_DIR/swf-testbed
137244
source .venv/bin/activate # or conda activate your_env_name
138245
pip install -e $SWF_PARENT_DIR/swf-common-lib $SWF_PARENT_DIR/swf-monitor $SWF_PARENT_DIR/swf-fastmon-agent .
139-
# CRITICAL: Set up Django environment
246+
247+
# CRITICAL: Set up environment configuration
248+
cd $SWF_PARENT_DIR/swf-fastmon-agent
249+
cp .env.example .env
250+
# Edit .env with actual values for SWF_MONITOR_URL, SWF_API_TOKEN, ActiveMQ credentials, etc.
251+
252+
# Set up Django environment (swf-monitor)
140253
cp $SWF_PARENT_DIR/swf-monitor/.env.example $SWF_PARENT_DIR/swf-monitor/.env
141254
# Edit .env to set DB_PASSWORD='your_db_password' and SECRET_KEY
142255
cd $SWF_PARENT_DIR/swf-monitor/src && python manage.py migrate
256+
257+
# Initialize testbed
143258
cd $SWF_PARENT_DIR/swf-testbed && swf-testbed init
144259
```
145260

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ dependencies = [
1414
"psycopg2-binary>=2.9.0",
1515
"typer>=0.9.0",
1616
"stomp.py>=8.1.0",
17-
# This is a dependency from our other local package
18-
#"swf-common-lib",
19-
#"supervisor"
17+
"requests>=2.31.0",
18+
"python-json-logger>=2.0.0",
19+
"swf-common-lib",
2020
]
2121
[project.optional-dependencies]
2222
dev = [

requirements.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ Django>=4.2,<5.0
55
psycopg>=3.2.0
66
psycopg2-binary>=2.9.0
77

8+
# HTTP and API
9+
requests>=2.31.0
10+
11+
# Logging
12+
python-json-logger>=2.0.0
13+
814
# Development and testing
915
pytest>=7.0.0
1016
pytest-django>=4.5.0
@@ -13,4 +19,4 @@ flake8>=4.0.0
1319

1420
# CLI and messaging
1521
typer>=0.9.0
16-
stomp.py>=8.1.0
22+
stomp.py>=8.1.0

src/swf_fastmon_agent/fastmon_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ def simulate_tf_subsamples(stf_file: Dict[str, Any], config: dict, logger: loggi
311311
"tf_filename": tf_filename,
312312
"file_size_bytes": tf_size,
313313
"sequence_number": sequence_number,
314-
"stf_parent": stf_file.get("filename"), # Use unique filename as parent identifier
314+
"stf_file_id": stf_file.get("file_id"), # UUID for foreign key reference
315+
"stf_parent": stf_file.get("filename"), # Keep filename for reference
315316
"metadata": {
316317
"simulation": True,
317318
"created_from": stf_file.get('filename'),
@@ -349,7 +350,7 @@ def record_tf_file(tf_metadata: Dict[str, Any], config: dict, agent, logger: log
349350
try:
350351
# Prepare FastMonFile data for API
351352
tf_file_data = {
352-
"stf_file": tf_metadata.get("stf_parent", None), # STF filename as parent identifier
353+
"stf_file": tf_metadata.get("stf_file_id"), # UUID foreign key to StfFile
353354
"tf_filename": tf_metadata["tf_filename"],
354355
"file_size_bytes": tf_metadata["file_size_bytes"],
355356
"status": FileStatus.REGISTERED,

src/swf_fastmon_agent/main.py

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
"""
1111

1212
import sys
13+
import os
14+
import time
1315
import json
1416
from datetime import datetime
1517

1618
from swf_common_lib.base_agent import BaseAgent, setup_environment
17-
import fastmon_utils as fastmon_utils
19+
from swf_fastmon_agent import fastmon_utils
1820

1921

2022
class FastMonitorAgent(BaseAgent):
@@ -42,6 +44,9 @@ def __init__(self, config: dict, debug=False):
4244

4345
self.logger.info("Fast Monitor Agent initialized successfully")
4446

47+
# Set destination for broadcasting TF file notifications
48+
self.destination = os.getenv('ACTIVEMQ_FASTMON_TOPIC', 'epictopic')
49+
4550
self.config = config
4651

4752
# Validate configuration
@@ -51,7 +56,12 @@ def __init__(self, config: dict, debug=False):
5156
# Fast monitoring specific state
5257
self.stf_messages_processed = 0
5358
self.last_message_time = None
54-
self.processing_stats = {'total_stf_messages': 0, 'total_tf_files_created': 0}
59+
self.files_processed = 0 # For continuous monitoring mode
60+
self.processing_stats = {
61+
'total_stf_messages': 0,
62+
'total_tf_files_created': 0,
63+
'total_files': 0 # For continuous monitoring mode
64+
}
5565

5666

5767
def _emulate_stf_registration_and_sampling(self):
@@ -88,20 +98,27 @@ def _emulate_stf_registration_and_sampling(self):
8898
stf_file = fastmon_utils.record_stf_file(file_path, self.config, self, self.logger)
8999
self.files_processed += 1
90100

91-
# Simulate TF subsamples for this STF file
92-
tf_subsamples = fastmon_utils.simulate_tf_subsamples(stf_file, file_path, self.config, self.logger)
93-
94-
# Record each TF file in the FastMonFile table
95-
tf_files_created = 0
96-
for tf_metadata in tf_subsamples:
97-
tf_file = fastmon_utils.record_tf_file(stf_file, tf_metadata, self.config, self, self.logger)
98-
if tf_file:
99-
tf_files_created += 1
100-
# Send notification to clients about new TF file
101-
self.send_tf_file_notification(tf_file, stf_file)
102-
tf_files_registered.append(tf_file)
101+
# Create mock stf_ready message (matching format from data agent)
102+
message_data = {
103+
"msg_type": "stf_ready",
104+
"filename": stf_file.get('stf_filename'),
105+
"file_id": stf_file.get('file_id'), # UUID for foreign key
106+
"run_id": stf_file.get('run'),
107+
"file_url": stf_file.get('metadata', {}).get('file_url', ''),
108+
"checksum": stf_file.get('checksum', ''),
109+
"size_bytes": stf_file.get('file_size_bytes'),
110+
"start": stf_file.get('metadata', {}).get('creation_time', ''),
111+
"end": stf_file.get('metadata', {}).get('modification_time', ''),
112+
"state": "physics",
113+
"substate": "running",
114+
"processed_by": self.agent_name
115+
}
116+
117+
# Use the same sample_timeframes method as message-driven mode
118+
tf_files = self.sample_timeframes(message_data)
119+
tf_files_registered.extend(tf_files)
103120

104-
self.logger.info(f"Registered {tf_files_created} TF subsamples for STF file {stf_file['filename']}")
121+
self.logger.info(f"Processed STF file {stf_file['stf_filename']} -> {len(tf_files)} TF files")
105122

106123
# Report successful processing
107124
self.report_agent_status('OK', f'Emulating {len(tf_files_registered)} fast monitoring files')
@@ -173,22 +190,68 @@ def sample_timeframes(self, message_data):
173190
self.logger.error("No filename provided in message")
174191
return tf_files_registered
175192

193+
# Track workflow stage (optional - controlled by FASTMON_TRACK_WORKFLOW env var)
194+
workflow_id = message_data.get('workflow_id')
195+
stage_id = None
196+
track_workflow = os.getenv('FASTMON_TRACK_WORKFLOW', 'false').lower() == 'true'
197+
198+
if workflow_id and track_workflow:
199+
try:
200+
# Create workflow stage entry for fast monitoring
201+
stage_data = {
202+
'workflow': workflow_id,
203+
'agent_name': self.agent_name,
204+
'agent_type': 'fastmon',
205+
'status': 'fastmon_received',
206+
'input_message': message_data
207+
}
208+
stage = self.call_monitor_api('POST', '/workflow-stages/', stage_data)
209+
stage_id = stage.get('id')
210+
self.logger.debug(f"Created workflow stage {stage_id} for workflow {workflow_id}")
211+
212+
# Update to processing status
213+
self.call_monitor_api('PATCH', f'/workflow-stages/{stage_id}/', {
214+
'status': 'fastmon_processing',
215+
'started_at': datetime.now().isoformat()
216+
})
217+
except Exception as e:
218+
self.logger.warning(f"Could not create workflow stage: {e}")
219+
220+
# Simulate TF subsamples from STF data
176221
tf_subsamples = fastmon_utils.simulate_tf_subsamples(message_data, self.config, self.logger, self.agent_name)
177222

178-
# Record each TF file in the FastMonFile table
223+
# Record each TF file in the FastMonFile table and send notifications
179224
# TODO: register in bulk
180225
tf_files_created = 0
181226
for tf_metadata in tf_subsamples:
182227
self.logger.debug(f"Processing {tf_metadata}")
183228
tf_file = fastmon_utils.record_tf_file(tf_metadata, self.config, self, self.logger)
184229
if tf_file:
185230
tf_files_created += 1
231+
# Send notification to clients about new TF file
232+
self.send_tf_file_notification(tf_file, message_data)
186233
tf_files_registered.append(tf_file)
187234

188235
# Update TF creation stats
189236
self.processing_stats['total_tf_files_created'] += tf_files_created
190237

191238
self.logger.info(f"Registered {tf_files_created} TF subsamples for STF file {message_data.get('filename')}")
239+
240+
# Mark workflow stage as complete
241+
if stage_id:
242+
try:
243+
output_message = {
244+
'tf_files_created': tf_files_created,
245+
'tf_filenames': [tf.get('tf_filename') for tf in tf_files_registered if tf]
246+
}
247+
self.call_monitor_api('PATCH', f'/workflow-stages/{stage_id}/', {
248+
'status': 'fastmon_complete',
249+
'completed_at': datetime.now().isoformat(),
250+
'output_message': output_message
251+
})
252+
except Exception as e:
253+
self.logger.warning(f"Could not update workflow stage: {e}")
254+
192255
return tf_files_registered
193256

194257
def start_continuous_monitoring(self):

0 commit comments

Comments
 (0)