99import logging
1010import uuid
1111import warnings
12- import datetime
12+ import subprocess
13+ import time
14+ import signal
15+ import ssl
16+ import functools
17+
18+ from typing import Callable
19+
1320from functools import partial
1421from logging .handlers import RotatingFileHandler
1522from azure .core .settings import settings
4552EVENTHUB_DEFAULT_AUTH_RULE_NAME = "RootManageSharedAccessKey"
4653LOCATION = get_region_override ("westus" )
4754
55+ # Set up the amqpproxy environment variables
56+ path = os .environ .get ("AMQPPROXY_PATH" )
57+ AMQPPROXY_PATH = os .path .abspath (path ) if path else None
58+ RECORD_AMQP_PROXY = os .environ .get ("RECORD_AMQP_PROXY" ) == 'true'
59+ AMQPPROXY_RECORDINGS_DIR = os .path .join (os .path .dirname (__file__ ), "amqpproxy_recordings" )
60+ if RECORD_AMQP_PROXY :
61+ if not os .path .exists (AMQPPROXY_RECORDINGS_DIR ):
62+ os .makedirs (AMQPPROXY_RECORDINGS_DIR )
63+
64+ # Create/overwrite the amqp proxy startup log file
65+ AMQPPROXY_STARTUP_LOG = os .path .join (AMQPPROXY_RECORDINGS_DIR , "amqpproxy_startup.log" )
66+ # Create/overwrite the amqp proxy startup log file
67+ if os .path .exists (AMQPPROXY_STARTUP_LOG ):
68+ with open (AMQPPROXY_STARTUP_LOG , "w" ) as log_file :
69+ log_file .write ("" ) # Overwrite the file with an empty string
70+ else :
71+ open (AMQPPROXY_STARTUP_LOG , "w" ).close () # Create the file if it doesn't exist
72+
73+ AMQPPROXY_CUSTOM_ENDPOINT_ADDRESS = "sb://localhost:5671"
74+ AMQPPROXY_TRANSPORT_TYPE = TransportType .Amqp
75+
76+ context = ssl .SSLContext (ssl .PROTOCOL_TLS_CLIENT )
77+ context .check_hostname = False
78+ context .verify_mode = ssl .CERT_NONE
79+ AMQPPROXY_SSL_CONTEXT = context
80+ AMQPPROXY_CLIENT_ARGS = {
81+ "custom_endpoint_address" : AMQPPROXY_CUSTOM_ENDPOINT_ADDRESS ,
82+ "ssl_context" : AMQPPROXY_SSL_CONTEXT ,
83+ "transport_type" : AMQPPROXY_TRANSPORT_TYPE ,
84+ }
85+
4886
4987def pytest_addoption (parser ):
5088 parser .addoption ("--sleep" , action = "store" , default = "True" , help = "sleep on reconnect test: True or False" )
@@ -245,6 +283,105 @@ def connection_str(live_eventhub):
245283 live_eventhub ["hostname" ], live_eventhub ["key_name" ], live_eventhub ["access_key" ], live_eventhub ["event_hub" ]
246284 )
247285
286+ @pytest .fixture ()
287+ def skip_amqp_proxy (request ):
288+ """Helper method to determine if the AMQP proxy should be run for a test."""
289+ if not RECORD_AMQP_PROXY or not AMQPPROXY_PATH :
290+ return True
291+ if any (marker .name == "no_amqpproxy" for marker in request .node .own_markers ):
292+ return True
293+ return False
294+
295+ @pytest .fixture ()
296+ def client_args (skip_amqp_proxy ):
297+ """Fixture that adds the amqpproxy client args to the test context."""
298+ if skip_amqp_proxy :
299+ return {}
300+ # Add proxy args to test context
301+ return AMQPPROXY_CLIENT_ARGS
302+
303+ def remove_existing_recordings (path , file_name ):
304+ """Remove existing recordings for the test."""
305+ # Remove any existing recordings for the test
306+ for file in os .listdir (path ):
307+ if file .startswith (file_name ) and file .endswith (".json" ):
308+ os .remove (os .path .join (path , file ))
309+ print (f"Removed existing recording: { file } " )
310+
311+ def stop_existing_amqpproxy (log_file ):
312+ # Kill any existing process using the AMQP proxy port
313+ try :
314+ subprocess .run (
315+ ["fuser" , "-k" , "5671/tcp" ], check = True , stdout = log_file , stderr = log_file
316+ )
317+ log_file .write ("Kill existing process on port 5671.\n " )
318+ except subprocess .CalledProcessError :
319+ log_file .write ("No existing process found on port 5671.\n " )
320+
321+ @pytest .fixture (autouse = True )
322+ def amqpproxy (live_eventhub , skip_amqp_proxy , request ):
323+ """Fixture that redirects network requests to target the amqp proxy.
324+ Tests can opt out using @pytest.mark.no_amqpproxy or set the environment variable
325+ RECORD_AMQP_PROXY=False.
326+ """
327+ # Skip if not recording or test opted out
328+ if skip_amqp_proxy :
329+ yield
330+ return
331+
332+ # Use test name as logfile
333+ test_name = request .node .name
334+ # Mirror relative path in AMQPPROXY_RECORDINGS_PATH for recording files
335+ relative_path = os .path .relpath (request .node .fspath , start = os .path .dirname (__file__ ))
336+ recording_dir_path = os .path .join (AMQPPROXY_RECORDINGS_DIR , os .path .dirname (relative_path ))
337+ file_name = os .path .splitext (os .path .basename (request .node .fspath ))[0 ]
338+ recording_file = f"{ file_name } .{ test_name } "
339+ if not os .path .exists (recording_dir_path ):
340+ os .makedirs (recording_dir_path )
341+ else :
342+ # Remove any existing recordings with the same test name, so that we overwrite instead of add
343+ remove_existing_recordings (recording_dir_path , recording_file )
344+
345+ # Start amqpproxy process
346+ log_file = open (AMQPPROXY_STARTUP_LOG , "a" )
347+ # Flush log after writing to ensure log line ordering is preserved
348+ log_file .write (f"####### Starting amqpproxy for test: { test_name } \n " )
349+ log_file .flush ()
350+ try :
351+ # Navigate to the amqpproxy directory and run the proxy
352+ os .chdir (AMQPPROXY_PATH )
353+ stop_existing_amqpproxy (log_file )
354+
355+ # Start the AMQP proxy process
356+ log_file .write ("Starting amqpproxy process...\n " )
357+ log_file .flush ()
358+ proxy_process = subprocess .Popen (
359+ ["go" , "run" , "." ,
360+ "--host" , live_eventhub ["hostname" ],
361+ "--logs" , recording_dir_path ,
362+ "--logfile" , recording_file ],
363+ stdout = log_file ,
364+ stderr = log_file ,
365+ preexec_fn = os .setsid
366+ )
367+
368+ if not proxy_process :
369+ log_file .write ("Failed to start amqpproxy.\n " )
370+ raise RuntimeError (f"Failed to start amqpproxy. Check for errors in { AMQPPROXY_STARTUP_LOG } " )
371+
372+ try :
373+ time .sleep (1 )
374+ # Add proxy args to test context
375+ request .node .user_properties .append (("client_args" , AMQPPROXY_CLIENT_ARGS ))
376+ yield
377+ finally :
378+ os .killpg (os .getpgid (proxy_process .pid ), signal .SIGTERM )
379+ proxy_process .wait ()
380+ finally :
381+ log_file .write (f"####### Stopping amqpproxy for test: { test_name } \n " )
382+ log_file .flush ()
383+ log_file .close ()
384+
248385
249386@pytest .fixture ()
250387def invalid_hostname (live_eventhub ):
@@ -269,7 +406,7 @@ def invalid_policy(live_eventhub):
269406
270407
271408@pytest .fixture ()
272- def connstr_receivers (live_eventhub , uamqp_transport ):
409+ def connstr_receivers (live_eventhub , uamqp_transport , client_args ):
273410 connection_str = live_eventhub ["connection_str" ]
274411 partitions = [str (i ) for i in range (PARTITION_COUNT )]
275412 receivers = []
@@ -286,7 +423,7 @@ def connstr_receivers(live_eventhub, uamqp_transport):
286423 else :
287424 sas_auth = SASTokenAuth (uri , uri , live_eventhub ["key_name" ], live_eventhub ["access_key" ])
288425 receiver = ReceiveClient (
289- live_eventhub ["hostname" ], source , auth = sas_auth , network_trace = False , timeout = 0 , link_credit = 500
426+ live_eventhub ["hostname" ], source , auth = sas_auth , network_trace = False , timeout = 0 , link_credit = 500 , ** client_args
290427 )
291428 receiver .open ()
292429 receivers .append (receiver )
@@ -306,7 +443,7 @@ def auth_credentials_async(live_eventhub):
306443
307444
308445@pytest .fixture ()
309- def auth_credential_receivers (live_eventhub , uamqp_transport ):
446+ def auth_credential_receivers (live_eventhub , uamqp_transport , client_args ):
310447 fully_qualified_namespace = live_eventhub ["hostname" ]
311448 eventhub_name = live_eventhub ["event_hub" ]
312449 partitions = [str (i ) for i in range (PARTITION_COUNT )]
@@ -325,7 +462,7 @@ def auth_credential_receivers(live_eventhub, uamqp_transport):
325462 # TODO: TokenAuth should be fine?
326463 sas_auth = SASTokenAuth (uri , uri , live_eventhub ["key_name" ], live_eventhub ["access_key" ])
327464 receiver = ReceiveClient (
328- live_eventhub ["hostname" ], source , auth = sas_auth , network_trace = False , timeout = 30 , link_credit = 500
465+ live_eventhub ["hostname" ], source , auth = sas_auth , network_trace = False , timeout = 30 , link_credit = 500 , ** client_args
329466 )
330467 receiver .open ()
331468 receivers .append (receiver )
@@ -335,7 +472,7 @@ def auth_credential_receivers(live_eventhub, uamqp_transport):
335472
336473
337474@pytest .fixture ()
338- def auth_credential_receivers_async (live_eventhub , uamqp_transport ):
475+ def auth_credential_receivers_async (live_eventhub , uamqp_transport , client_args ):
339476 fully_qualified_namespace = live_eventhub ["hostname" ]
340477 eventhub_name = live_eventhub ["event_hub" ]
341478 partitions = [str (i ) for i in range (PARTITION_COUNT )]
@@ -354,7 +491,7 @@ def auth_credential_receivers_async(live_eventhub, uamqp_transport):
354491 # TODO: TokenAuth should be fine?
355492 sas_auth = SASTokenAuth (uri , uri , live_eventhub ["key_name" ], live_eventhub ["access_key" ])
356493 receiver = ReceiveClient (
357- live_eventhub ["hostname" ], source , auth = sas_auth , network_trace = False , timeout = 30 , link_credit = 500
494+ live_eventhub ["hostname" ], source , auth = sas_auth , network_trace = False , timeout = 30 , link_credit = 500 , ** client_args
358495 )
359496 receiver .open ()
360497 receivers .append (receiver )
@@ -364,14 +501,15 @@ def auth_credential_receivers_async(live_eventhub, uamqp_transport):
364501
365502
366503@pytest .fixture ()
367- def auth_credential_senders (live_eventhub , uamqp_transport ):
504+ def auth_credential_senders (live_eventhub , uamqp_transport , client_args ):
368505 fully_qualified_namespace = live_eventhub ["hostname" ]
369506 eventhub_name = live_eventhub ["event_hub" ]
370507 client = EventHubProducerClient (
371508 fully_qualified_namespace = fully_qualified_namespace ,
372509 eventhub_name = eventhub_name ,
373510 credential = get_devtools_credential (),
374511 uamqp_transport = uamqp_transport ,
512+ ** client_args ,
375513 )
376514 partitions = client .get_partition_ids ()
377515
@@ -386,14 +524,15 @@ def auth_credential_senders(live_eventhub, uamqp_transport):
386524
387525
388526@pytest .fixture ()
389- def auth_credential_senders_async (live_eventhub , uamqp_transport ):
527+ def auth_credential_senders_async (live_eventhub , uamqp_transport , client_args ):
390528 fully_qualified_namespace = live_eventhub ["hostname" ]
391529 eventhub_name = live_eventhub ["event_hub" ]
392530 client = EventHubProducerClient (
393531 fully_qualified_namespace = fully_qualified_namespace ,
394532 eventhub_name = eventhub_name ,
395533 credential = get_devtools_credential (),
396534 uamqp_transport = uamqp_transport ,
535+ ** client_args ,
397536 )
398537 partitions = client .get_partition_ids ()
399538
@@ -412,3 +551,4 @@ def auth_credential_senders_async(live_eventhub, uamqp_transport):
412551def pytest_configure (config ):
413552 # register an additional marker
414553 config .addinivalue_line ("markers" , "liveTest: mark test to be a live test only" )
554+ config .addinivalue_line ("markers" , "no_amqpproxy: mark test to opt out of amqp proxy recording" )
0 commit comments