-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_workflow_status_bug.py
More file actions
157 lines (130 loc) Β· 6.23 KB
/
test_workflow_status_bug.py
File metadata and controls
157 lines (130 loc) Β· 6.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
"""
Test script to reproduce the exact workflow status update bug.
This simulates: Claudio opens session -> User1 changes workflow status to RESOLVED
"""
import sys
import os
sys.path.append('backend/lambda')
import boto3
from workflow_tracker import WorkflowTracker
import json
from datetime import datetime, timezone
def test_workflow_status_bug():
"""Test the exact scenario: Claudio opens, User1 resolves via workflow status."""
print("π TESTING WORKFLOW STATUS UPDATE BUG")
print("=" * 60)
# Test data matching the user's scenario
test_arn = "arn:aws:securityhub:il-central-1:381492112996:security-control/EC2.182/finding/test-workflow-bug"
claudio_email = "claudio@chinicz.com"
user1_email = "user1@chinicz.com"
# Initialize workflow tracker
table_name = "security-hub-findings-prod-workflow-tracker"
tracker = WorkflowTracker(table_name)
# Extract finding ID
extracted_finding_id = tracker.extract_finding_id(test_arn)
print(f"Testing with finding ID: '{extracted_finding_id}'")
# Clean up any existing session
try:
tracker.delete_session(extracted_finding_id)
print("Cleaned up any existing session")
except:
pass
# Step 1: Claudio opens session (session start API)
print(f"\nStep 1: Claudio opens session...")
open_timestamp = datetime.now(timezone.utc).isoformat()
try:
tracker.create_session(extracted_finding_id, claudio_email, open_timestamp)
print(f"β
Session created by Claudio")
# Verify session after creation
session_after_create = tracker.get_session_by_finding_id(extracted_finding_id)
if session_after_create:
print(f"After creation:")
print(f" Opener: {session_after_create.get('opener_email')}")
print(f" Resolver: {session_after_create.get('resolver_email', 'NOT_SET')}")
print(f" Open timestamp: {session_after_create.get('open_timestamp')}")
print(f" Resolution timestamp: {session_after_create.get('resolution_timestamp', 'NOT_SET')}")
else:
print("β Session not found after creation")
return
except Exception as e:
print(f"β Error creating session: {e}")
return
# Step 2: User1 changes workflow status to RESOLVED (via PUT /findings API)
print(f"\nStep 2: User1 changes workflow status to RESOLVED...")
resolution_timestamp = datetime.now(timezone.utc).isoformat()
try:
# This simulates what happens in lambda_function.py when workflow status is updated
print(f"Calling ensure_audit_trail_preservation with:")
print(f" finding_id: {extracted_finding_id}")
print(f" resolver_email: {user1_email}")
print(f" timestamp: {resolution_timestamp}")
tracker.ensure_audit_trail_preservation(extracted_finding_id, user1_email, resolution_timestamp)
print(f"β
Audit trail preservation completed")
except Exception as e:
print(f"β Error in audit trail preservation: {e}")
return
# Step 3: Check final state
print(f"\nStep 3: Checking final state...")
try:
final_session = tracker.get_session_by_finding_id(extracted_finding_id)
if final_session:
opener_final = final_session.get('opener_email')
resolver_final = final_session.get('resolver_email')
open_ts = final_session.get('open_timestamp')
res_ts = final_session.get('resolution_timestamp')
print(f"Final state:")
print(f" Opener: {opener_final}")
print(f" Resolver: {resolver_final}")
print(f" Open timestamp: {open_ts}")
print(f" Resolution timestamp: {res_ts}")
# Check for the reported issues
if opener_final != claudio_email:
print(f"β BUG FOUND: Opener changed from {claudio_email} to {opener_final}")
else:
print(f"β
Opener correctly preserved: {opener_final}")
if resolver_final != user1_email:
print(f"β BUG FOUND: Resolver should be {user1_email}, got {resolver_final}")
else:
print(f"β
Resolver correctly set: {resolver_final}")
if opener_final == resolver_final:
print(f"β BUG FOUND: Opener and resolver are the same: {opener_final}")
else:
print(f"β
Opener and resolver are different")
if open_ts == res_ts:
print(f"β BUG FOUND: Open and resolution timestamps are identical: {open_ts}")
else:
print(f"β
Timestamps are different")
else:
print("β Session not found after workflow status update")
except Exception as e:
print(f"β Error retrieving final session: {e}")
# Step 4: Check what's actually in DynamoDB using raw client
print(f"\nStep 4: Raw DynamoDB check...")
try:
dynamodb = boto3.client('dynamodb')
response = dynamodb.get_item(
TableName=table_name,
Key={'finding_id': {'S': extracted_finding_id}}
)
if 'Item' in response:
item = response['Item']
print("Raw DynamoDB item:")
print(f" finding_id: {item.get('finding_id', {}).get('S', 'NOT_FOUND')}")
print(f" opener_email: {item.get('opener_email', {}).get('S', 'NOT_FOUND')}")
print(f" resolver_email: {item.get('resolver_email', {}).get('S', 'NOT_FOUND')}")
print(f" open_timestamp: {item.get('open_timestamp', {}).get('S', 'NOT_FOUND')}")
print(f" resolution_timestamp: {item.get('resolution_timestamp', {}).get('S', 'NOT_FOUND')}")
else:
print("β Item not found in raw DynamoDB query")
except Exception as e:
print(f"β Error in raw DynamoDB query: {e}")
# Cleanup
print(f"\nCleaning up...")
try:
tracker.delete_session(extracted_finding_id)
print("β
Test session deleted")
except Exception as e:
print(f"Error cleaning up: {e}")
if __name__ == "__main__":
test_workflow_status_bug()