-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_double_api_calls.py
More file actions
181 lines (147 loc) Β· 7.26 KB
/
test_double_api_calls.py
File metadata and controls
181 lines (147 loc) Β· 7.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
"""
Test script to simulate the double API call scenario.
This reproduces: Claudio opens -> User1 calls PUT /findings -> User1 calls POST /session/end
"""
import sys
import os
sys.path.append('backend/lambda')
import boto3
from workflow_tracker import WorkflowTracker
import json
from datetime import datetime, timezone
def test_double_api_calls():
"""Test the double API call scenario that happens in the UI."""
print("π TESTING DOUBLE API CALL SCENARIO")
print("=" * 60)
# Test data
test_arn = "arn:aws:securityhub:il-central-1:381492112996:security-control/EC2.182/finding/double-api-test"
claudio_email = "claudio@chinicz.com"
user1_email = "user1@chinicz.com"
# Initialize workflow tracker
table_name = "security-hub-findings-prod-workflow-tracker"
tracker = WorkflowTracker(table_name)
# Extract finding ID
extracted_finding_id = tracker.extract_finding_id(test_arn)
print(f"Testing with finding ID: '{extracted_finding_id}'")
# Clean up any existing session
try:
tracker.delete_session(extracted_finding_id)
except:
pass
# Step 1: Claudio opens session (Browser 1)
print(f"\nStep 1: Claudio opens session...")
open_timestamp = datetime.now(timezone.utc).isoformat()
try:
tracker.create_session(extracted_finding_id, claudio_email, open_timestamp)
print(f"β
Session created by Claudio")
# Verify session after creation
session_after_create = tracker.get_session_by_finding_id(extracted_finding_id)
if session_after_create:
print(f"After creation:")
print(f" Opener: {session_after_create.get('opener_email')}")
print(f" Resolver: {session_after_create.get('resolver_email', 'NOT_SET')}")
else:
print("β Session not found after creation")
return
except Exception as e:
print(f"β Error creating session: {e}")
return
# Step 2: User1 changes workflow status (Browser 2) - First API call: PUT /findings
print(f"\nStep 2: User1 calls PUT /findings (workflow status update)...")
workflow_timestamp = datetime.now(timezone.utc).isoformat()
try:
# This simulates the PUT /findings API call
tracker.ensure_audit_trail_preservation(extracted_finding_id, user1_email, workflow_timestamp)
print(f"β
PUT /findings completed")
# Check state after first API call
session_after_workflow = tracker.get_session_by_finding_id(extracted_finding_id)
if session_after_workflow:
print(f"After PUT /findings:")
print(f" Opener: {session_after_workflow.get('opener_email')}")
print(f" Resolver: {session_after_workflow.get('resolver_email')}")
except Exception as e:
print(f"β Error in PUT /findings: {e}")
return
# Step 3: User1 calls session end (Browser 2) - Second API call: POST /session/end
print(f"\nStep 3: User1 calls POST /session/end...")
session_end_timestamp = datetime.now(timezone.utc).isoformat()
try:
# This simulates the POST /session/end API call
tracker.update_session_resolution(extracted_finding_id, user1_email, session_end_timestamp)
print(f"β
POST /session/end completed")
except Exception as e:
print(f"β Error in POST /session/end: {e}")
return
# Step 4: Check final state
print(f"\nStep 4: Checking final state...")
try:
final_session = tracker.get_session_by_finding_id(extracted_finding_id)
if final_session:
opener_final = final_session.get('opener_email')
resolver_final = final_session.get('resolver_email')
open_ts = final_session.get('open_timestamp')
res_ts = final_session.get('resolution_timestamp')
print(f"Final state:")
print(f" Opener: {opener_final}")
print(f" Resolver: {resolver_final}")
print(f" Open timestamp: {open_ts}")
print(f" Resolution timestamp: {res_ts}")
# Check for the reported issues
if opener_final != claudio_email:
print(f"β BUG FOUND: Opener changed from {claudio_email} to {opener_final}")
else:
print(f"β
Opener correctly preserved: {opener_final}")
if resolver_final != user1_email:
print(f"β BUG FOUND: Resolver should be {user1_email}, got {resolver_final}")
else:
print(f"β
Resolver correctly set: {resolver_final}")
if opener_final == resolver_final:
print(f"β BUG FOUND: Opener and resolver are the same: {opener_final}")
else:
print(f"β
Opener and resolver are different")
if open_ts == res_ts:
print(f"β BUG FOUND: Open and resolution timestamps are identical")
else:
print(f"β
Timestamps are different")
else:
print("β Session not found after both API calls")
except Exception as e:
print(f"β Error retrieving final session: {e}")
# Step 5: Test the problematic scenario - what if User1 starts a NEW session?
print(f"\nStep 5: Testing if User1 accidentally creates a NEW session...")
try:
# What if the frontend accidentally calls session start with User1's email?
user1_session_timestamp = datetime.now(timezone.utc).isoformat()
tracker.create_session(extracted_finding_id, user1_email, user1_session_timestamp)
print(f"β οΈ NEW session created by User1 (this would overwrite Claudio's session)")
# Check what happened
overwritten_session = tracker.get_session_by_finding_id(extracted_finding_id)
if overwritten_session:
print(f"After User1 session creation:")
print(f" Opener: {overwritten_session.get('opener_email')} (should be {claudio_email})")
print(f" Resolver: {overwritten_session.get('resolver_email', 'NOT_SET')}")
if overwritten_session.get('opener_email') == user1_email:
print(f"β CRITICAL BUG: User1 overwrote Claudio's session!")
print(f"This would explain why both opener and resolver show as User1")
except Exception as e:
print(f"Error in step 5: {e}")
# Cleanup
print(f"\nCleaning up...")
try:
tracker.delete_session(extracted_finding_id)
print("β
Test session deleted")
except Exception as e:
print(f"Error cleaning up: {e}")
print(f"\n" + "=" * 60)
print("CONCLUSION:")
print("If the issue is that User1 appears as both opener and resolver,")
print("it's likely because the frontend is accidentally calling the")
print("session start API with User1's email instead of preserving")
print("Claudio's original session.")
print("")
print("Check the frontend logic to ensure that when User1 resolves")
print("a finding, it doesn't create a new session - it should only")
print("call the session end API or workflow status update API.")
if __name__ == "__main__":
test_double_api_calls()