Skip to content

Commit 6adb76b

Browse files
Iustin-Mituangela-cncf
authored andcommitted
Pull request #28: attack based sample script
Merge in ISGAPPSEC/cyperf-api-wrapper from feature/ISGAPPSEC2-34308-create-an-attack-focused-api-wrapper-sample to main Squashed commit of the following: commit ff8f9110ef2b469fb296d2da59b4ac7806cddd13 Author: iustmitu <[email protected]> Date: Wed Jul 2 11:05:00 2025 +0300 fixed typo commit aaba2277a2674c924bc045811c9bc2a097dbbbb5 Author: iustmitu <[email protected]> Date: Tue Jul 1 15:00:46 2025 +0300 deleted pass commit 7c4d7fccf26b566833a834465835dbf8b8067797 Author: iustmitu <[email protected]> Date: Wed May 21 14:25:55 2025 +0300 attack based sample script
1 parent a010ec4 commit 6adb76b

File tree

1 file changed

+217
-0
lines changed

1 file changed

+217
-0
lines changed
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
import cyperf
2+
from cyperf import *
3+
from cyperf.utils import create_api_client_cli, TestRunner
4+
from time import sleep
5+
import os
6+
7+
if __name__ == "__main__":
8+
import urllib3; urllib3.disable_warnings()
9+
10+
# Enter a context with an instance of the API client
11+
with create_api_client_cli(verify_ssl=False) as api_client:
12+
13+
# Get some strikes
14+
api_application_resources_instance = cyperf.ApplicationResourcesApi(api_client)
15+
take = 3 # int | The number of search results to return (optional)
16+
skip = 0 # int | The number of search results to skip (optional)
17+
search_col = 'Name' # str | A list of comma-separated columns used to search for the supplied values (optional)
18+
search_val = 'Google Chrome' # str | The keywords used to filter the items (optional)
19+
categories = 'Browser:Chrome'
20+
21+
strikes = None
22+
try:
23+
print(f"Finding first {take} strikes with \'{search_val}\' in their name, from the \'Browser\' category...")
24+
api_application_resources_response = api_application_resources_instance.get_resources_strikes(take=take, skip=skip, search_col=search_col, search_val=search_val, categories=categories)
25+
strikes = api_application_resources_response.data
26+
if len(strikes) != take:
27+
raise ValueError(f"Couldn't find {take} strikes.")
28+
29+
print(f"{len(strikes)} strikes found:")
30+
for strike in strikes:
31+
print(f" {strike.name}")
32+
print()
33+
34+
except Exception as e:
35+
print("Exception when calling ApplicationResourcesApi->get_resources_strikes: %s\n" % e)
36+
37+
38+
39+
all_files = []
40+
# Add the attacks
41+
for strike in strikes:
42+
43+
# Find the pre-canned empty config
44+
api_config_instance = cyperf.ConfigurationsApi(api_client)
45+
take = 1 # int | The number of search results to return (optional)
46+
skip = None # int | The number of search results to skip (optional)
47+
search_col = 'displayName' # str | A list of comma-separated columns used to search for the supplied values (optional)
48+
search_val = 'Cyperf Empty Config' # str | The keywords used to filter the items (optional)
49+
filter_mode = None # str | The operator applied to the supplied values (optional)
50+
sort = None # str | A list of comma-separated field:direction pairs used to sort the items where direction must be asc or dsc (optional)
51+
config = api_config_instance.get_configs(take=1,
52+
skip=skip,
53+
search_col=search_col,
54+
search_val=search_val,
55+
filter_mode=filter_mode,
56+
sort=sort)
57+
58+
if len(config.data) == 0:
59+
raise ValueError("Couldn't find the specified configuration.")
60+
61+
# Load a pre-canned config
62+
api_session_instance = cyperf.SessionsApi(api_client)
63+
application = None # str | The user-friendly name for the application that controls this session (optional)
64+
config_name = None # str | The display name of the configuration loaded in the session (optional)
65+
config_url = config.data[0].config_url # str | The external URL of the configuration loaded in the session (optional)
66+
index = None # int | The session's index (optional) (readonly)
67+
name = None # str | The user-visible name of the session (optional)
68+
owner = None # str | The user-visible name of the session's owner (optional) (readonly)
69+
sessions = [cyperf.Session(application=application,
70+
config_name=config_name,
71+
configUrl=config_url,
72+
index=index,
73+
name=name,
74+
owner=owner)]
75+
# Create a session
76+
session = None
77+
try:
78+
print("Creating empty session...")
79+
api_session_response = api_session_instance.create_sessions(sessions=sessions)
80+
session = api_session_response[0]
81+
print("Session created.\n")
82+
except Exception as e:
83+
print("Exception when calling SessionsApi->create_sessions: %s\n" % e) # Add an app profile
84+
85+
# Create an attack profile
86+
print("Creating a Attack Profile...")
87+
attack_profile_name = "Custom Attack Profile"
88+
session.config.config.attack_profiles.append(cyperf.AttackProfile(id="1", name="My Attack Profile", attacks=[]))
89+
session.config.config.attack_profiles.update()
90+
print(attack_profile_name + " created succesfully.\n")
91+
92+
take = 1 # int | The number of search results to return (optional)
93+
skip = 0 # int | The number of search results to skip (optional)
94+
search_col = 'Name' # str | A list of comma-separated columns used to search for the supplied values (optional)
95+
search_val = strike.name # str | The keywords used to filter the items (optional)
96+
97+
attack = None
98+
try:
99+
print(f"Finding the attack with the same name as the strike...")
100+
api_application_resources_response = api_application_resources_instance.get_resources_attacks(take=take, skip=skip, search_col=search_col, search_val=search_val, categories=categories)
101+
attack = api_application_resources_response.data[0]
102+
print("Attack found.")
103+
104+
except Exception as e:
105+
print("Exception when calling ApplicationResourcesApi->get_resources_attacks: %s\n" % e)
106+
107+
session.config.config.attack_profiles[0].attacks.append(Attack(id = "1", name = strike.name, external_resource_url = attack.id))
108+
session.config.config.attack_profiles[0].attacks.update()
109+
110+
111+
print(f"Attack {strike.name} added successfully.\n")
112+
113+
# Set the objective to single iteration
114+
print("Setting the Iteration Count to 1...")
115+
session.config.config.attack_profiles[0].objectives_and_timeline.timeline_segments[0].iteration_count = 1
116+
session.config.config.attack_profiles[0].objectives_and_timeline.update()
117+
print("Iteration Count setted to 1 successfully.\n")
118+
119+
# Create IP Networks
120+
client_ip_network = IPNetwork(name="IP Network 1", id="1", agentAssignments=AgentAssignments(by_tag=[]), minAgents=1)
121+
server_ip_network = IPNetwork(name="IP Network 2", id="2", agentAssignments=AgentAssignments(by_tag=[]), minAgents=1)
122+
123+
# Append the IP Networks to the Network Profile
124+
session.config.config.network_profiles[0].ip_network_segment = [client_ip_network, server_ip_network]
125+
session.config.config.network_profiles[0].ip_network_segment.update()
126+
print("Client and Server network segments added successfully.\n")
127+
128+
# Get available agents
129+
api_agents_instance = cyperf.AgentsApi(api_client)
130+
agents = api_agents_instance.get_agents(exclude_offline='true')
131+
132+
if len(agents) < 2:
133+
raise ValueError("Expected at least 2 active agents")
134+
135+
# Create an agent map
136+
agent_map = {
137+
'IP Network 1': [agents[0].id, agents[0].ip],
138+
'IP Network 2': [agents[1].id, agents[1].ip]
139+
}
140+
141+
# Assign the agents
142+
print("Assigning agents ...")
143+
for net_profile in session.config.config.network_profiles:
144+
for ip_net in net_profile.ip_network_segment:
145+
if ip_net.name in agent_map:
146+
agent_ip = agent_map[ip_net.name][1]
147+
print(f" Agent {agent_ip} assigned to {ip_net.name}.")
148+
capture_settings = None # str | The capture settings of the agent that is assigned (optional)
149+
interfaces = None # List[str] | The names of the assigned test interfaces for the agent (optional)
150+
links = None # List[APILink] | (optional)
151+
agent_id = agent_map[ip_net.name][0]
152+
agentDetails = [cyperf.AgentAssignmentDetails(agent_id=agent_id,
153+
capture_setting=capture_settings,
154+
id=agent_id,
155+
interfaces=interfaces,
156+
links=links)]
157+
158+
if not ip_net.agent_assignments:
159+
by_id = None # List[AgentAssignmentDetails] | The agents statically assigned to the current test configuration (optional)
160+
by_port = None # List[AgentAssignmentByPort] | The ports assigned to the current test configuration (optional)
161+
by_tag = [] # List[str] | The tags according to which the agents are dynamically assigned
162+
links = None # List[APILink] | (optional)
163+
ip_net.agent_assignments = cyperf.AgentAssignments(by_id=by_id,
164+
by_port=by_port,
165+
by_tag=by_tag,
166+
links=links)
167+
168+
ip_net.agent_assignments.by_id.extend(agentDetails)
169+
ip_net.update()
170+
print("Assigning agents completed.\n")
171+
172+
# Start traffic
173+
print("Starting the test ...")
174+
api_test_operation_instance = cyperf.TestOperationsApi(api_client)
175+
api_test_operation_response = api_test_operation_instance.start_test_run_start(session_id=session.id)
176+
api_test_operation_response.await_completion()
177+
178+
# Wait for the test to be finished
179+
print("Test running ...")
180+
session.refresh()
181+
while session.test.status != 'STOPPED':
182+
sleep(5)
183+
session.refresh()
184+
print("Test finished successfully.\n")
185+
186+
# Download the test results
187+
print("Downloading test results ...")
188+
api_reports_instance = cyperf.ReportsApi(api_client)
189+
generate_csv_operation = cyperf.GenerateCSVReportsOperation()
190+
api_test_results_response = api_reports_instance.start_result_generate_csv(result_id=session.test.test_id, generate_csv_reports_operation=generate_csv_operation)
191+
file_path = api_test_results_response.await_completion()
192+
193+
last_separator_index = file_path.rfind("\\")
194+
directory = file_path[:last_separator_index]
195+
file_name = file_path[last_separator_index + 1:]
196+
197+
print(f"Saved as: '{file_name}' at {directory}\n")
198+
199+
# Add the file path to the list
200+
all_files.append(file_path)
201+
202+
203+
print("Aggregating all CSV files ...")
204+
aggregated_file_path = os.path.join(directory, "aggregated_all_strikes_attack.csv")
205+
206+
with open(aggregated_file_path, 'w', encoding='ISO-8859-1') as outfile:
207+
for i, file_path in enumerate(all_files):
208+
with open(file_path, 'r', encoding='ISO-8859-1') as infile:
209+
if i == 0:
210+
outfile.write(infile.read())
211+
else:
212+
next(infile)
213+
outfile.write(infile.read())
214+
215+
print(f"Aggregated results saved as: '{aggregated_file_path}'")
216+
217+

0 commit comments

Comments
 (0)