-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_session_workflow_debug.py
More file actions
134 lines (110 loc) Β· 5.55 KB
/
test_session_workflow_debug.py
File metadata and controls
134 lines (110 loc) Β· 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""
Test script to debug the complete session tracking workflow.
This simulates the exact sequence: session start -> workflow update -> session lookup
"""
import sys
import os
sys.path.append('backend/lambda')
from workflow_tracker import WorkflowTracker
import json
from datetime import datetime, timezone
def test_complete_workflow():
"""Test the complete session tracking workflow to identify the issue."""
print("π TESTING COMPLETE SESSION TRACKING WORKFLOW")
print("=" * 60)
# Test data from user's debug logs
test_arn = "arn:aws:securityhub:il-central-1:381492112996:security-control/EC2.182/finding/f7d43128-5721-4883-ba59-22d37621dbdc"
opener_email = "claudio@chinicz.com"
resolver_email = "user1@chinicz.com"
# Initialize workflow tracker (using actual table name from environment)
table_name = "security-hub-findings-prod-workflow-tracker"
tracker = WorkflowTracker(table_name)
# Step 1: Extract finding ID (as done in session start)
extracted_finding_id = tracker.extract_finding_id(test_arn)
print(f"Extracted finding ID: '{extracted_finding_id}'")
# Step 2: Check if session already exists
print(f"\nStep 2: Checking for existing session...")
try:
existing_session = tracker.get_session_by_finding_id(extracted_finding_id)
print(f"Existing session: {existing_session}")
if existing_session:
print("β οΈ Session already exists! This might be the issue.")
print(f"Opener: {existing_session.get('opener_email')}")
print(f"Resolver: {existing_session.get('resolver_email')}")
print(f"Open timestamp: {existing_session.get('open_timestamp')}")
print(f"Resolution timestamp: {existing_session.get('resolution_timestamp')}")
# Clean up for testing
print("Deleting existing session for clean test...")
tracker.delete_session(extracted_finding_id)
print("Session deleted.")
except Exception as e:
print(f"Error checking existing session: {e}")
# Step 3: Create new session (simulate session start)
print(f"\nStep 3: Creating new session...")
open_timestamp = datetime.now(timezone.utc).isoformat()
try:
tracker.create_session(extracted_finding_id, opener_email, open_timestamp)
print(f"β
Session created successfully")
print(f"Finding ID: '{extracted_finding_id}'")
print(f"Opener: {opener_email}")
print(f"Timestamp: {open_timestamp}")
except Exception as e:
print(f"β Error creating session: {e}")
return
# Step 4: Verify session was created
print(f"\nStep 4: Verifying session creation...")
try:
created_session = tracker.get_session_by_finding_id(extracted_finding_id)
print(f"Retrieved session: {created_session}")
if created_session:
print("β
Session found after creation")
else:
print("β Session NOT found after creation - this is the problem!")
return
except Exception as e:
print(f"β Error retrieving session: {e}")
return
# Step 5: Simulate workflow status update (ensure_audit_trail_preservation)
print(f"\nStep 5: Simulating workflow status update...")
resolution_timestamp = datetime.now(timezone.utc).isoformat()
try:
# This is what happens when workflow status is updated to RESOLVED
tracker.ensure_audit_trail_preservation(extracted_finding_id, resolver_email, resolution_timestamp)
print(f"β
Audit trail preservation completed")
except Exception as e:
print(f"β Error in audit trail preservation: {e}")
return
# Step 6: Verify final state
print(f"\nStep 6: Verifying final session state...")
try:
final_session = tracker.get_session_by_finding_id(extracted_finding_id)
print(f"Final session: {final_session}")
if final_session:
print("π FINAL SESSION STATE:")
print(f" Opener email: {final_session.get('opener_email')}")
print(f" Resolver email: {final_session.get('resolver_email')}")
print(f" Open timestamp: {final_session.get('open_timestamp')}")
print(f" Resolution timestamp: {final_session.get('resolution_timestamp')}")
# Check for issues
if final_session.get('opener_email') == resolver_email:
print("β ISSUE: Opener email was overwritten with resolver email!")
if final_session.get('open_timestamp') == final_session.get('resolution_timestamp'):
print("β ISSUE: Open and resolution timestamps are the same!")
if final_session.get('opener_email') != opener_email:
print(f"β ISSUE: Expected opener '{opener_email}', got '{final_session.get('opener_email')}'")
if final_session.get('resolver_email') != resolver_email:
print(f"β ISSUE: Expected resolver '{resolver_email}', got '{final_session.get('resolver_email')}'")
else:
print("β Session NOT found after workflow update!")
except Exception as e:
print(f"β Error retrieving final session: {e}")
# Cleanup
print(f"\nCleaning up test session...")
try:
tracker.delete_session(extracted_finding_id)
print("β
Test session deleted")
except Exception as e:
print(f"Error cleaning up: {e}")
if __name__ == "__main__":
test_complete_workflow()