forked from LaurieWired/GhidraMCP
-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathtest_data_operations.py
More file actions
executable file
·418 lines (344 loc) · 15.4 KB
/
test_data_operations.py
File metadata and controls
executable file
·418 lines (344 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
#!/usr/bin/env python3
"""
Comprehensive test script for data operations in GhydraMCP.
This script tests all data-related operations including:
1. Creating data items with different types
2. Renaming data items
3. Updating data types
4. Deleting data items
5. Reading memory
Tests are performed using both direct HTTP API and MCP bridge interfaces.
"""
import json
import logging
import sys
import time
import requests
import anyio
from typing import Dict, Any
from urllib.parse import quote
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_test")
# Configure default test values
GHIDRA_PORT = 8192
DEFAULT_MEMORY_ADDRESS = "08000200" # Fallback test address
def wait_for_program_loaded(port=GHIDRA_PORT, timeout=20):
"""Wait for a Ghidra program to be loaded."""
for _ in range(timeout // 2):
try:
response = requests.get(f"http://localhost:{port}/program")
if response.status_code == 200:
data = json.loads(response.text)
if data.get("success", False):
logger.info(f"Program loaded: {data['result']['name']}")
return True
except Exception as e:
logger.warning(f"Error checking program status: {e}")
logger.info("Waiting for program to load...")
time.sleep(2)
logger.error("Timed out waiting for program to load")
return False
def find_valid_addresses(port=GHIDRA_PORT) -> list:
"""Find valid memory addresses for testing by checking memory map."""
try:
response = requests.get(f"http://localhost:{port}/memory")
memory_info = json.loads(response.text)
memory_blocks = memory_info.get("result", [])
valid_addresses = []
# First try to find a RAM block
for block in memory_blocks:
if "start" in block and "name" in block and "RAM" in block["name"].upper():
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i*4:08x}")
return valid_addresses
# If no RAM blocks, try any memory block
for block in memory_blocks:
if "start" in block:
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i*4:08x}")
return valid_addresses
except Exception as e:
logger.error(f"Error getting memory map: {e}")
# Fallback addresses if cannot determine from memory map
return ["08000100", "08000104", "08000108", "0800010c", "08000110"]
def test_http_data_create():
"""Test creating data items with different types using HTTP API."""
if not wait_for_program_loaded():
return False
addresses = find_valid_addresses()
if not addresses:
logger.error("No valid addresses found for data creation test")
return False
types_to_try = ["uint", "int", "uint *", "int *", "byte", "word", "dword", "pointer"]
success_count = 0
for i, data_type in enumerate(types_to_try):
address = addresses[i % len(addresses)]
logger.info(f"Testing data type: {data_type} at address {address}")
url = f"http://localhost:{GHIDRA_PORT}/data"
payload = {
"address": address,
"type": data_type,
"newName": f"TEST_{data_type.upper()}"
}
# Add size for string types
if data_type.lower() == "string":
payload["size"] = 16
try:
response = requests.post(url, json=payload)
logger.info(f"Status: {response.status_code}")
logger.info(f"Response: {response.text}")
if response.status_code == 200 and json.loads(response.text).get("success", False):
success_count += 1
logger.info(f"Success with data type {data_type}")
except Exception as e:
logger.error(f"Error: {e}")
time.sleep(0.5)
return success_count > 0
def test_http_data_rename():
"""Test data rename operations using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[0]
test_name = f"TEST_RENAME_{int(time.time())}"
# First create a data item to rename
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_BEFORE_RENAME"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for rename test")
return False
# Rename the data
rename_payload = {
"address": test_address,
"newName": test_name
}
rename_response = requests.post(create_url, json=rename_payload)
logger.info(f"Rename response: {rename_response.status_code}")
logger.info(f"Rename response: {rename_response.text}")
return rename_response.status_code == 200 and json.loads(rename_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in rename test: {e}")
return False
def test_http_data_type_change():
"""Test changing data type using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[1]
# First create a data item
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "uint",
"newName": "TEST_TYPE_CHANGE"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for type change test")
return False
# Change the type
type_url = f"http://localhost:{GHIDRA_PORT}/data/type"
type_payload = {
"address": test_address,
"type": "byte"
}
type_response = requests.post(type_url, json=type_payload)
logger.info(f"Type change response: {type_response.status_code}")
logger.info(f"Type change response: {type_response.text}")
return type_response.status_code == 200 and json.loads(type_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in type change test: {e}")
return False
def test_http_data_delete():
"""Test deleting data using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[2]
# First create a data item to delete
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_DELETE_ME"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for delete test")
return False
# Delete the data
delete_url = f"http://localhost:{GHIDRA_PORT}/data/delete"
delete_payload = {
"address": test_address,
"action": "delete"
}
delete_response = requests.post(delete_url, json=delete_payload)
logger.info(f"Delete response: {delete_response.status_code}")
logger.info(f"Delete response: {delete_response.text}")
return delete_response.status_code == 200 and json.loads(delete_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in delete test: {e}")
return False
def test_http_combined_operations():
"""Test data operations that update both name and type together."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[3]
# First create a data item
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_COMBINED_ORIG"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for combined update test")
return False
# Update both name and type in one operation
update_url = f"http://localhost:{GHIDRA_PORT}/data"
update_payload = {
"address": test_address,
"newName": "TEST_COMBINED_NEW",
"type": "uint"
}
update_response = requests.post(update_url, json=update_payload)
logger.info(f"Combined update response: {update_response.status_code}")
logger.info(f"Combined update response: {update_response.text}")
return update_response.status_code == 200 and json.loads(update_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in combined update test: {e}")
return False
async def test_mcp_data_operations():
"""Test data operations using the MCP bridge."""
server_parameters = StdioServerParameters(
command=sys.executable,
args=["bridge_mcp_hydra.py"],
)
logger.info("Connecting to MCP bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
logger.info("Initializing session...")
await session.initialize()
# First set the current instance
logger.info("Setting current Ghidra instance...")
await session.call_tool(
"instances_use",
arguments={"port": 8192}
)
# Get a valid address to work with
addresses = find_valid_addresses()
test_address = addresses[4] if addresses and len(addresses) > 4 else DEFAULT_MEMORY_ADDRESS
logger.info(f"Using address {test_address} for MCP data operations test")
# Test data_create
try:
logger.info("Testing data_create...")
create_result = await session.call_tool(
"data_create",
arguments={"address": test_address, "data_type": "uint"}
)
create_data = json.loads(create_result.content[0].text)
assert create_data.get("success", False), "data_create failed"
logger.info("data_create passed")
# Test data_rename
logger.info("Testing data_rename...")
test_name = f"MCP_TEST_{int(time.time())}"
rename_result = await session.call_tool(
"data_rename",
arguments={"address": test_address, "name": test_name}
)
rename_data = json.loads(rename_result.content[0].text)
assert rename_data.get("success", False), "data_rename failed"
logger.info("data_rename passed")
# Test data_set_type
logger.info("Testing data_set_type...")
set_type_result = await session.call_tool(
"data_set_type",
arguments={"address": test_address, "data_type": "byte"}
)
set_type_data = json.loads(set_type_result.content[0].text)
assert set_type_data.get("success", False), "data_set_type failed"
logger.info("data_set_type passed")
# Test memory_read on the data
logger.info("Testing memory_read...")
read_result = await session.call_tool(
"memory_read",
arguments={"address": test_address, "length": 4}
)
read_data = json.loads(read_result.content[0].text)
assert read_data.get("success", False), "memory_read failed"
assert "hexBytes" in read_data, "memory_read response missing hexBytes"
logger.info("memory_read passed")
# Test data_delete
logger.info("Testing data_delete...")
delete_result = await session.call_tool(
"data_delete",
arguments={"address": test_address}
)
delete_data = json.loads(delete_result.content[0].text)
assert delete_data.get("success", False), "data_delete failed"
logger.info("data_delete passed")
logger.info("All MCP data operations passed")
return True
except Exception as e:
logger.error(f"Error in MCP data operations test: {e}")
# Try to clean up
try:
await session.call_tool("data_delete", arguments={"address": test_address})
except:
pass
return False
def main():
"""Main entry point for data operations tests."""
all_passed = True
try:
# Run HTTP API tests
logger.info("===== Testing HTTP API Data Operations =====")
logger.info("----- Testing data creation -----")
create_result = test_http_data_create()
logger.info(f"Data creation test: {'PASSED' if create_result else 'FAILED'}")
all_passed = all_passed and create_result
logger.info("----- Testing data rename -----")
rename_result = test_http_data_rename()
logger.info(f"Data rename test: {'PASSED' if rename_result else 'FAILED'}")
all_passed = all_passed and rename_result
logger.info("----- Testing data type change -----")
type_result = test_http_data_type_change()
logger.info(f"Data type change test: {'PASSED' if type_result else 'FAILED'}")
all_passed = all_passed and type_result
logger.info("----- Testing data delete -----")
delete_result = test_http_data_delete()
logger.info(f"Data delete test: {'PASSED' if delete_result else 'FAILED'}")
all_passed = all_passed and delete_result
logger.info("----- Testing combined operations -----")
combined_result = test_http_combined_operations()
logger.info(f"Combined operations test: {'PASSED' if combined_result else 'FAILED'}")
all_passed = all_passed and combined_result
# Run MCP bridge tests
logger.info("===== Testing MCP Bridge Data Operations =====")
mcp_result = anyio.run(test_mcp_data_operations)
logger.info(f"MCP data operations test: {'PASSED' if mcp_result else 'FAILED'}")
all_passed = all_passed and mcp_result
logger.info(f"Overall data operations test: {'PASSED' if all_passed else 'FAILED'}")
if not all_passed:
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error in data tests: {e}")
sys.exit(1)
if __name__ == "__main__":
main()