Skip to content

Commit 4aa4ae1

Browse files
authored
[EventHub] add amqp proxy recordings for tests (Azure#40778)
* add recorded_by_amqpproxy decorator for tests * more decorator changes * get logs working * add client_args to sync consumer tests + gitignore * add consumer_client sync recordings * workaround issue with async transport setting unexpected sockopts * remove log * fix add async consumer_client test recordings * fix env var * recordings for test receive.py * remove existing recordings and re-record * recordings should overwrite not add more files * update async test_send * sync send recordings * only create startup proxy log if recording * test_properties * test_negative * test_auth test_buffered_producer test_reconnect * move recordings into a separate pr * fix failing tests * fix test
1 parent ec469b2 commit 4aa4ae1

19 files changed

+487
-148
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,11 @@ def _get_tcp_socket_defaults(self, sock):
294294
return tcp_opts
295295

296296
def _set_socket_options(self, sock, socket_settings):
297-
tcp_opts = self._get_tcp_socket_defaults(sock)
298297
if socket_settings:
298+
tcp_opts = self._get_tcp_socket_defaults(sock)
299299
tcp_opts.update(socket_settings)
300-
for opt, val in tcp_opts.items():
301-
sock.setsockopt(SOL_TCP, opt, val)
300+
for opt, val in tcp_opts.items():
301+
sock.setsockopt(SOL_TCP, opt, val)
302302

303303
async def _read(
304304
self,
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# amqpproxy
2+
**/amqpproxy_startup.log
3+
*.bin
4+
5+
# ignoring the key files so they don't get checked in.
6+
*-bin-*.txt
7+
*-tlskeys.txt

sdk/eventhub/azure-eventhub/tests/conftest.py

Lines changed: 149 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@
99
import logging
1010
import uuid
1111
import warnings
12-
import datetime
12+
import subprocess
13+
import time
14+
import signal
15+
import ssl
16+
import functools
17+
18+
from typing import Callable
19+
1320
from functools import partial
1421
from logging.handlers import RotatingFileHandler
1522
from azure.core.settings import settings
@@ -45,6 +52,37 @@
4552
EVENTHUB_DEFAULT_AUTH_RULE_NAME = "RootManageSharedAccessKey"
4653
LOCATION = get_region_override("westus")
4754

55+
# Set up the amqpproxy environment variables
56+
path = os.environ.get("AMQPPROXY_PATH")
57+
AMQPPROXY_PATH = os.path.abspath(path) if path else None
58+
RECORD_AMQP_PROXY = os.environ.get("RECORD_AMQP_PROXY") == 'true'
59+
AMQPPROXY_RECORDINGS_DIR = os.path.join(os.path.dirname(__file__), "amqpproxy_recordings")
60+
if RECORD_AMQP_PROXY:
61+
if not os.path.exists(AMQPPROXY_RECORDINGS_DIR):
62+
os.makedirs(AMQPPROXY_RECORDINGS_DIR)
63+
64+
# Create/overwrite the amqp proxy startup log file
65+
AMQPPROXY_STARTUP_LOG = os.path.join(AMQPPROXY_RECORDINGS_DIR, "amqpproxy_startup.log")
66+
# Create/overwrite the amqp proxy startup log file
67+
if os.path.exists(AMQPPROXY_STARTUP_LOG):
68+
with open(AMQPPROXY_STARTUP_LOG, "w") as log_file:
69+
log_file.write("") # Overwrite the file with an empty string
70+
else:
71+
open(AMQPPROXY_STARTUP_LOG, "w").close() # Create the file if it doesn't exist
72+
73+
AMQPPROXY_CUSTOM_ENDPOINT_ADDRESS = "sb://localhost:5671"
74+
AMQPPROXY_TRANSPORT_TYPE = TransportType.Amqp
75+
76+
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
77+
context.check_hostname = False
78+
context.verify_mode = ssl.CERT_NONE
79+
AMQPPROXY_SSL_CONTEXT = context
80+
AMQPPROXY_CLIENT_ARGS = {
81+
"custom_endpoint_address": AMQPPROXY_CUSTOM_ENDPOINT_ADDRESS,
82+
"ssl_context": AMQPPROXY_SSL_CONTEXT,
83+
"transport_type": AMQPPROXY_TRANSPORT_TYPE,
84+
}
85+
4886

4987
def pytest_addoption(parser):
5088
parser.addoption("--sleep", action="store", default="True", help="sleep on reconnect test: True or False")
@@ -245,6 +283,105 @@ def connection_str(live_eventhub):
245283
live_eventhub["hostname"], live_eventhub["key_name"], live_eventhub["access_key"], live_eventhub["event_hub"]
246284
)
247285

286+
@pytest.fixture()
287+
def skip_amqp_proxy(request):
288+
"""Helper method to determine if the AMQP proxy should be run for a test."""
289+
if not RECORD_AMQP_PROXY or not AMQPPROXY_PATH:
290+
return True
291+
if any(marker.name == "no_amqpproxy" for marker in request.node.own_markers):
292+
return True
293+
return False
294+
295+
@pytest.fixture()
296+
def client_args(skip_amqp_proxy):
297+
"""Fixture that adds the amqpproxy client args to the test context."""
298+
if skip_amqp_proxy:
299+
return {}
300+
# Add proxy args to test context
301+
return AMQPPROXY_CLIENT_ARGS
302+
303+
def remove_existing_recordings(path, file_name):
304+
"""Remove existing recordings for the test."""
305+
# Remove any existing recordings for the test
306+
for file in os.listdir(path):
307+
if file.startswith(file_name) and file.endswith(".json"):
308+
os.remove(os.path.join(path, file))
309+
print(f"Removed existing recording: {file}")
310+
311+
def stop_existing_amqpproxy(log_file):
312+
# Kill any existing process using the AMQP proxy port
313+
try:
314+
subprocess.run(
315+
["fuser", "-k", "5671/tcp"], check=True, stdout=log_file, stderr=log_file
316+
)
317+
log_file.write("Kill existing process on port 5671.\n")
318+
except subprocess.CalledProcessError:
319+
log_file.write("No existing process found on port 5671.\n")
320+
321+
@pytest.fixture(autouse=True)
322+
def amqpproxy(live_eventhub, skip_amqp_proxy, request):
323+
"""Fixture that redirects network requests to target the amqp proxy.
324+
Tests can opt out using @pytest.mark.no_amqpproxy or set the environment variable
325+
RECORD_AMQP_PROXY=False.
326+
"""
327+
# Skip if not recording or test opted out
328+
if skip_amqp_proxy:
329+
yield
330+
return
331+
332+
# Use test name as logfile
333+
test_name = request.node.name
334+
# Mirror relative path in AMQPPROXY_RECORDINGS_PATH for recording files
335+
relative_path = os.path.relpath(request.node.fspath, start=os.path.dirname(__file__))
336+
recording_dir_path = os.path.join(AMQPPROXY_RECORDINGS_DIR, os.path.dirname(relative_path))
337+
file_name = os.path.splitext(os.path.basename(request.node.fspath))[0]
338+
recording_file = f"{file_name}.{test_name}"
339+
if not os.path.exists(recording_dir_path):
340+
os.makedirs(recording_dir_path)
341+
else:
342+
# Remove any existing recordings with the same test name, so that we overwrite instead of add
343+
remove_existing_recordings(recording_dir_path, recording_file)
344+
345+
# Start amqpproxy process
346+
log_file = open(AMQPPROXY_STARTUP_LOG, "a")
347+
# Flush log after writing to ensure log line ordering is preserved
348+
log_file.write(f"####### Starting amqpproxy for test: {test_name}\n")
349+
log_file.flush()
350+
try:
351+
# Navigate to the amqpproxy directory and run the proxy
352+
os.chdir(AMQPPROXY_PATH)
353+
stop_existing_amqpproxy(log_file)
354+
355+
# Start the AMQP proxy process
356+
log_file.write("Starting amqpproxy process...\n")
357+
log_file.flush()
358+
proxy_process = subprocess.Popen(
359+
["go", "run", ".",
360+
"--host", live_eventhub["hostname"],
361+
"--logs", recording_dir_path,
362+
"--logfile", recording_file],
363+
stdout=log_file,
364+
stderr=log_file,
365+
preexec_fn=os.setsid
366+
)
367+
368+
if not proxy_process:
369+
log_file.write("Failed to start amqpproxy.\n")
370+
raise RuntimeError(f"Failed to start amqpproxy. Check for errors in {AMQPPROXY_STARTUP_LOG}")
371+
372+
try:
373+
time.sleep(1)
374+
# Add proxy args to test context
375+
request.node.user_properties.append(("client_args", AMQPPROXY_CLIENT_ARGS))
376+
yield
377+
finally:
378+
os.killpg(os.getpgid(proxy_process.pid), signal.SIGTERM)
379+
proxy_process.wait()
380+
finally:
381+
log_file.write(f"####### Stopping amqpproxy for test: {test_name}\n")
382+
log_file.flush()
383+
log_file.close()
384+
248385

249386
@pytest.fixture()
250387
def invalid_hostname(live_eventhub):
@@ -269,7 +406,7 @@ def invalid_policy(live_eventhub):
269406

270407

271408
@pytest.fixture()
272-
def connstr_receivers(live_eventhub, uamqp_transport):
409+
def connstr_receivers(live_eventhub, uamqp_transport, client_args):
273410
connection_str = live_eventhub["connection_str"]
274411
partitions = [str(i) for i in range(PARTITION_COUNT)]
275412
receivers = []
@@ -286,7 +423,7 @@ def connstr_receivers(live_eventhub, uamqp_transport):
286423
else:
287424
sas_auth = SASTokenAuth(uri, uri, live_eventhub["key_name"], live_eventhub["access_key"])
288425
receiver = ReceiveClient(
289-
live_eventhub["hostname"], source, auth=sas_auth, network_trace=False, timeout=0, link_credit=500
426+
live_eventhub["hostname"], source, auth=sas_auth, network_trace=False, timeout=0, link_credit=500, **client_args
290427
)
291428
receiver.open()
292429
receivers.append(receiver)
@@ -306,7 +443,7 @@ def auth_credentials_async(live_eventhub):
306443

307444

308445
@pytest.fixture()
309-
def auth_credential_receivers(live_eventhub, uamqp_transport):
446+
def auth_credential_receivers(live_eventhub, uamqp_transport, client_args):
310447
fully_qualified_namespace = live_eventhub["hostname"]
311448
eventhub_name = live_eventhub["event_hub"]
312449
partitions = [str(i) for i in range(PARTITION_COUNT)]
@@ -325,7 +462,7 @@ def auth_credential_receivers(live_eventhub, uamqp_transport):
325462
# TODO: TokenAuth should be fine?
326463
sas_auth = SASTokenAuth(uri, uri, live_eventhub["key_name"], live_eventhub["access_key"])
327464
receiver = ReceiveClient(
328-
live_eventhub["hostname"], source, auth=sas_auth, network_trace=False, timeout=30, link_credit=500
465+
live_eventhub["hostname"], source, auth=sas_auth, network_trace=False, timeout=30, link_credit=500, **client_args
329466
)
330467
receiver.open()
331468
receivers.append(receiver)
@@ -335,7 +472,7 @@ def auth_credential_receivers(live_eventhub, uamqp_transport):
335472

336473

337474
@pytest.fixture()
338-
def auth_credential_receivers_async(live_eventhub, uamqp_transport):
475+
def auth_credential_receivers_async(live_eventhub, uamqp_transport, client_args):
339476
fully_qualified_namespace = live_eventhub["hostname"]
340477
eventhub_name = live_eventhub["event_hub"]
341478
partitions = [str(i) for i in range(PARTITION_COUNT)]
@@ -354,7 +491,7 @@ def auth_credential_receivers_async(live_eventhub, uamqp_transport):
354491
# TODO: TokenAuth should be fine?
355492
sas_auth = SASTokenAuth(uri, uri, live_eventhub["key_name"], live_eventhub["access_key"])
356493
receiver = ReceiveClient(
357-
live_eventhub["hostname"], source, auth=sas_auth, network_trace=False, timeout=30, link_credit=500
494+
live_eventhub["hostname"], source, auth=sas_auth, network_trace=False, timeout=30, link_credit=500, **client_args
358495
)
359496
receiver.open()
360497
receivers.append(receiver)
@@ -364,14 +501,15 @@ def auth_credential_receivers_async(live_eventhub, uamqp_transport):
364501

365502

366503
@pytest.fixture()
367-
def auth_credential_senders(live_eventhub, uamqp_transport):
504+
def auth_credential_senders(live_eventhub, uamqp_transport, client_args):
368505
fully_qualified_namespace = live_eventhub["hostname"]
369506
eventhub_name = live_eventhub["event_hub"]
370507
client = EventHubProducerClient(
371508
fully_qualified_namespace=fully_qualified_namespace,
372509
eventhub_name=eventhub_name,
373510
credential=get_devtools_credential(),
374511
uamqp_transport=uamqp_transport,
512+
**client_args,
375513
)
376514
partitions = client.get_partition_ids()
377515

@@ -386,14 +524,15 @@ def auth_credential_senders(live_eventhub, uamqp_transport):
386524

387525

388526
@pytest.fixture()
389-
def auth_credential_senders_async(live_eventhub, uamqp_transport):
527+
def auth_credential_senders_async(live_eventhub, uamqp_transport, client_args):
390528
fully_qualified_namespace = live_eventhub["hostname"]
391529
eventhub_name = live_eventhub["event_hub"]
392530
client = EventHubProducerClient(
393531
fully_qualified_namespace=fully_qualified_namespace,
394532
eventhub_name=eventhub_name,
395533
credential=get_devtools_credential(),
396534
uamqp_transport=uamqp_transport,
535+
**client_args,
397536
)
398537
partitions = client.get_partition_ids()
399538

@@ -412,3 +551,4 @@ def auth_credential_senders_async(live_eventhub, uamqp_transport):
412551
def pytest_configure(config):
413552
# register an additional marker
414553
config.addinivalue_line("markers", "liveTest: mark test to be a live test only")
554+
config.addinivalue_line("markers", "no_amqpproxy: mark test to opt out of amqp proxy recording")

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_auth_async.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
@pytest.mark.liveTest
2525
@pytest.mark.asyncio
26-
async def test_client_token_credential_async(live_eventhub, get_credential_async, uamqp_transport):
26+
async def test_client_token_credential_async(live_eventhub, get_credential_async, uamqp_transport, client_args):
2727
credential = get_credential_async()
2828
producer_client = EventHubProducerClient(
2929
fully_qualified_namespace=live_eventhub["hostname"],
@@ -32,6 +32,7 @@ async def test_client_token_credential_async(live_eventhub, get_credential_async
3232
user_agent="customized information",
3333
auth_timeout=30,
3434
uamqp_transport=uamqp_transport,
35+
**client_args
3536
)
3637
consumer_client = EventHubConsumerClient(
3738
fully_qualified_namespace=live_eventhub["hostname"],
@@ -41,6 +42,7 @@ async def test_client_token_credential_async(live_eventhub, get_credential_async
4142
user_agent="customized information",
4243
auth_timeout=30,
4344
uamqp_transport=uamqp_transport,
45+
**client_args
4446
)
4547

4648
async with producer_client:
@@ -65,13 +67,14 @@ async def on_event(partition_context, event):
6567

6668
@pytest.mark.liveTest
6769
@pytest.mark.asyncio
68-
async def test_client_sas_credential_async(live_eventhub, uamqp_transport):
70+
async def test_client_sas_credential_async(live_eventhub, uamqp_transport, client_args):
6971
# This should "just work" to validate known-good.
7072
hostname = live_eventhub["hostname"]
7173
producer_client = EventHubProducerClient.from_connection_string(
7274
live_eventhub["connection_str"],
7375
eventhub_name=live_eventhub["event_hub"],
7476
uamqp_transport=uamqp_transport,
77+
**client_args
7578
)
7679

7780
async with producer_client:
@@ -88,6 +91,7 @@ async def test_client_sas_credential_async(live_eventhub, uamqp_transport):
8891
eventhub_name=live_eventhub["event_hub"],
8992
credential=EventHubSASTokenCredential(token, time.time() + 3000),
9093
uamqp_transport=uamqp_transport,
94+
**client_args
9195
)
9296

9397
async with producer_client:
@@ -101,6 +105,7 @@ async def test_client_sas_credential_async(live_eventhub, uamqp_transport):
101105
token_conn_str,
102106
eventhub_name=live_eventhub["event_hub"],
103107
uamqp_transport=uamqp_transport,
108+
**client_args
104109
)
105110

106111
async with conn_str_producer_client:
@@ -111,13 +116,14 @@ async def test_client_sas_credential_async(live_eventhub, uamqp_transport):
111116

112117
@pytest.mark.liveTest
113118
@pytest.mark.asyncio
114-
async def test_client_azure_sas_credential_async(live_eventhub, uamqp_transport):
119+
async def test_client_azure_sas_credential_async(live_eventhub, uamqp_transport, client_args):
115120
# This should "just work" to validate known-good.
116121
hostname = live_eventhub["hostname"]
117122
producer_client = EventHubProducerClient.from_connection_string(
118123
live_eventhub["connection_str"],
119124
eventhub_name=live_eventhub["event_hub"],
120125
uamqp_transport=uamqp_transport,
126+
**client_args
121127
)
122128

123129
async with producer_client:
@@ -134,6 +140,7 @@ async def test_client_azure_sas_credential_async(live_eventhub, uamqp_transport)
134140
auth_timeout=30,
135141
credential=AzureSasCredential(token),
136142
uamqp_transport=uamqp_transport,
143+
**client_args
137144
)
138145

139146
async with producer_client:
@@ -146,7 +153,7 @@ async def test_client_azure_sas_credential_async(live_eventhub, uamqp_transport)
146153

147154
@pytest.mark.liveTest
148155
@pytest.mark.asyncio
149-
async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_transport):
156+
async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_transport, client_args):
150157

151158
credential = AzureNamedKeyCredential(live_eventhub["key_name"], live_eventhub["access_key"])
152159
consumer_client = EventHubConsumerClient(
@@ -157,6 +164,7 @@ async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_tran
157164
auth_timeout=30,
158165
user_agent="customized information",
159166
uamqp_transport=uamqp_transport,
167+
**client_args
160168
)
161169

162170
assert (await consumer_client.get_eventhub_properties()) is not None
@@ -173,6 +181,7 @@ async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_tran
173181
# New feature only for Pure Python AMQP, not uamqp.
174182
@pytest.mark.liveTest
175183
@pytest.mark.asyncio
184+
@pytest.mark.no_amqpproxy # testing ssl_context
176185
async def test_client_with_ssl_context_async(auth_credentials_async, socket_transport):
177186
fully_qualified_namespace, eventhub_name, credential = auth_credentials_async
178187

0 commit comments

Comments
 (0)