@@ -274,3 +274,171 @@ def test_validate_server_properties_with_additional_properties(self):
274274
275275 # Should not raise any exception despite additional properties
276276 self .connection ._validate_server_properties ()
277+
278+
279+ class TestServerVersionGte420 :
280+ """Test cases for the _is_server_version_gte_4_2_0 method."""
281+
282+ def setup_method (self ):
283+ """Set up test fixtures."""
284+ self .connection = Connection .__new__ (Connection )
285+ # Initialize required attributes that would normally be set in __init__
286+ self .connection ._conn = None
287+ self .connection ._addr = "amqp://localhost:5672"
288+ self .connection ._addrs = None
289+ self .connection ._conf_ssl_context = None
290+ self .connection ._managements = []
291+ self .connection ._recovery_configuration = None
292+ self .connection ._ssl_domain = None
293+ self .connection ._connections = []
294+ self .connection ._index = - 1
295+ self .connection ._publishers = []
296+ self .connection ._consumers = []
297+ self .connection ._oauth2_options = None
298+
299+ def test_is_server_version_gte_4_2_0_exact_version (self ):
300+ """Test with exact version 4.2.0."""
301+ mock_blocking_conn = Mock ()
302+ mock_proton_conn = Mock ()
303+ mock_remote_props = {"product" : "RabbitMQ" , "version" : "4.2.0" }
304+
305+ mock_proton_conn .remote_properties = mock_remote_props
306+ mock_blocking_conn .conn = mock_proton_conn
307+ self .connection ._conn = mock_blocking_conn
308+
309+ result = self .connection ._is_server_version_gte_4_2_0 ()
310+ assert result is True
311+
312+ def test_is_server_version_gte_4_2_0_higher_versions (self ):
313+ """Test with versions higher than 4.2.0."""
314+ higher_versions = ["4.2.1" , "4.3.0" , "4.10.5" , "5.0.0" , "6.1.2" ]
315+
316+ for version_str in higher_versions :
317+ mock_blocking_conn = Mock ()
318+ mock_proton_conn = Mock ()
319+ mock_remote_props = {"product" : "RabbitMQ" , "version" : version_str }
320+
321+ mock_proton_conn .remote_properties = mock_remote_props
322+ mock_blocking_conn .conn = mock_proton_conn
323+ self .connection ._conn = mock_blocking_conn
324+
325+ result = self .connection ._is_server_version_gte_4_2_0 ()
326+ assert result is True , f"Version { version_str } should return True"
327+
328+ def test_is_server_version_gte_4_2_0_lower_versions (self ):
329+ """Test with versions lower than 4.2.0."""
330+ lower_versions = ["4.1.9" , "4.1.0" , "4.0.0" , "3.12.0" , "3.9.5" ]
331+
332+ for version_str in lower_versions :
333+ mock_blocking_conn = Mock ()
334+ mock_proton_conn = Mock ()
335+ mock_remote_props = {"product" : "RabbitMQ" , "version" : version_str }
336+
337+ mock_proton_conn .remote_properties = mock_remote_props
338+ mock_blocking_conn .conn = mock_proton_conn
339+ self .connection ._conn = mock_blocking_conn
340+
341+ result = self .connection ._is_server_version_gte_4_2_0 ()
342+ assert result is False , f"Version { version_str } should return False"
343+
344+ def test_is_server_version_gte_4_2_0_no_connection (self ):
345+ """Test when connection is None."""
346+ self .connection ._conn = None
347+
348+ with pytest .raises (ValidationCodeException ) as exc_info :
349+ self .connection ._is_server_version_gte_4_2_0 ()
350+
351+ assert "Connection not established" in str (exc_info .value )
352+
353+ def test_is_server_version_gte_4_2_0_no_proton_connection (self ):
354+ """Test when proton connection is None."""
355+ mock_blocking_conn = Mock ()
356+ mock_blocking_conn .conn = None
357+ self .connection ._conn = mock_blocking_conn
358+
359+ with pytest .raises (ValidationCodeException ) as exc_info :
360+ self .connection ._is_server_version_gte_4_2_0 ()
361+
362+ assert "Connection not established" in str (exc_info .value )
363+
364+ def test_is_server_version_gte_4_2_0_no_remote_properties (self ):
365+ """Test when remote_properties is None."""
366+ mock_blocking_conn = Mock ()
367+ mock_proton_conn = Mock ()
368+ mock_proton_conn .remote_properties = None
369+ mock_blocking_conn .conn = mock_proton_conn
370+ self .connection ._conn = mock_blocking_conn
371+
372+ with pytest .raises (ValidationCodeException ) as exc_info :
373+ self .connection ._is_server_version_gte_4_2_0 ()
374+
375+ assert "No remote properties received from server" in str (exc_info .value )
376+
377+ def test_is_server_version_gte_4_2_0_missing_version (self ):
378+ """Test when version property is missing."""
379+ mock_blocking_conn = Mock ()
380+ mock_proton_conn = Mock ()
381+ mock_remote_props = {
382+ "product" : "RabbitMQ"
383+ # Missing "version" key
384+ }
385+
386+ mock_proton_conn .remote_properties = mock_remote_props
387+ mock_blocking_conn .conn = mock_proton_conn
388+ self .connection ._conn = mock_blocking_conn
389+
390+ with pytest .raises (ValidationCodeException ) as exc_info :
391+ self .connection ._is_server_version_gte_4_2_0 ()
392+
393+ assert "Server version not provided" in str (exc_info .value )
394+
395+ def test_is_server_version_gte_4_2_0_invalid_version_format (self ):
396+ """Test with invalid version formats."""
397+ invalid_versions = ["invalid-version" , "" , "not-a-version" ]
398+
399+ for version_str in invalid_versions :
400+ mock_blocking_conn = Mock ()
401+ mock_proton_conn = Mock ()
402+ mock_remote_props = {"product" : "RabbitMQ" , "version" : version_str }
403+
404+ mock_proton_conn .remote_properties = mock_remote_props
405+ mock_blocking_conn .conn = mock_proton_conn
406+ self .connection ._conn = mock_blocking_conn
407+
408+ with pytest .raises (ValidationCodeException ) as exc_info :
409+ self .connection ._is_server_version_gte_4_2_0 ()
410+
411+ error_msg = str (exc_info .value )
412+ assert "Failed to parse server version" in error_msg
413+
414+ def test_is_server_version_gte_4_2_0_edge_cases (self ):
415+ """Test with edge case versions."""
416+ # Test cases around the boundary
417+ test_cases = [
418+ ("4.1.99" , False ), # Just below
419+ ("4.2.0" , True ), # Exact match
420+ ("4.2.0.0" , True ), # With extra zeroes
421+ ("v4.2.0" , True ), # With v prefix
422+ ("4.2.0-rc1" , False ), # Pre-release should be less than 4.2.0
423+ ]
424+
425+ for version_str , expected in test_cases :
426+ mock_blocking_conn = Mock ()
427+ mock_proton_conn = Mock ()
428+ mock_remote_props = {"product" : "RabbitMQ" , "version" : version_str }
429+
430+ mock_proton_conn .remote_properties = mock_remote_props
431+ mock_blocking_conn .conn = mock_proton_conn
432+ self .connection ._conn = mock_blocking_conn
433+
434+ if version_str == "4.2.0-rc1" :
435+ # Pre-release versions should be handled correctly
436+ result = self .connection ._is_server_version_gte_4_2_0 ()
437+ assert (
438+ result == expected
439+ ), f"Version { version_str } should return { expected } "
440+ else :
441+ result = self .connection ._is_server_version_gte_4_2_0 ()
442+ assert (
443+ result == expected
444+ ), f"Version { version_str } should return { expected } "
0 commit comments