Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,32 @@ def interface_has_mirror_config(ctx, mirror_table, dst_port, src_port, direction

return False


def is_port_mirror_capability_supported(direction, namespace=None):
""" Check if port mirror capability is supported for the given direction """
state_db = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
state_db.connect(state_db.STATE_DB, False)
entry_name = "SWITCH_CAPABILITY|switch"

# If no direction is specified, check both ingress and egress capabilities
if not direction:
ingress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_INGRESS_MIRROR_CAPABLE")
egress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_EGRESS_MIRROR_CAPABLE")
return ingress_supported == "true" and egress_supported == "true"

if direction in ['rx', 'both']:
ingress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_INGRESS_MIRROR_CAPABLE")
if ingress_supported != "true":
return False

if direction in ['tx', 'both']:
egress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_EGRESS_MIRROR_CAPABLE")
if egress_supported != "true":
return False

return True


def validate_mirror_session_config(config_db, session_name, dst_port, src_port, direction):
""" Check if SPAN mirror-session config is valid """
ctx = click.get_current_context()
Expand All @@ -1139,7 +1165,6 @@ def validate_mirror_session_config(config_db, session_name, dst_port, src_port,
if interface_is_in_vlan(vlan_member_table, dst_port):
ctx.fail("Error: Destination Interface {} has vlan config".format(dst_port))


if interface_is_in_portchannel(portchannel_member_table, dst_port):
ctx.fail("Error: Destination Interface {} has portchannel config".format(dst_port))

Expand All @@ -1160,6 +1185,12 @@ def validate_mirror_session_config(config_db, session_name, dst_port, src_port,
if direction not in ['rx', 'tx', 'both']:
ctx.fail("Error: Direction {} is invalid".format(direction))

# Check port mirror capability before allowing configuration
# If direction is provided, check the specific direction
if not is_port_mirror_capability_supported(direction):
ctx.fail("Error: Port mirror direction '{}' is not supported by the ASIC".format(
direction if direction else 'both'))

return True

def cli_sroute_to_config(ctx, command_str, strict_nh = True):
Expand Down
124 changes: 121 additions & 3 deletions tests/config_mirror_session_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_mirror_session_add():
["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "0", "0"])

mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 0, 0, None)

result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "100.1.1.1", "2.2.2.2", "8", "63"])
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_mirror_session_erspan_add():
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"])
assert result.exit_code != 0
assert ERR_MSG_GRE_TYPE_FAILURE in result.stdout

result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "abcd", "100"])
Expand Down Expand Up @@ -295,7 +295,7 @@ def test_mirror_session_span_add():
result = runner.invoke(
config.config.commands["mirror_session"].commands["span"].commands["add"],
["test_session", "Ethernet8", "Ethernet4", "tx", "100"])

mocked.assert_called_with("test_session", "Ethernet8", "Ethernet4", "tx", 100, None)

result = runner.invoke(
Expand Down Expand Up @@ -355,3 +355,121 @@ def test_mirror_session_remove_invalid_yang_validation():
["mrr_sample"])
print(result.output)
assert "Invalid ConfigDB. Error" in result.output


def test_mirror_session_capability_checking():
"""Test mirror session capability checking functionality"""
config.ADHOC_VALIDATION = True
runner = CliRunner()

# Test 1: Check that capability checking fails when direction is not supported
with mock.patch('config.main.is_port_mirror_capability_supported') as mock_capability:
mock_capability.return_value = False

# Test with rx direction - should fail
result = runner.invoke(
config.config.commands["mirror_session"].commands["span"].commands["add"],
["test_session", "Ethernet20", "Ethernet24", "rx", "100"])

assert result.exit_code != 0
assert "Error: Port mirror direction 'rx' is not supported by the ASIC" in result.output

# Test with tx direction - should fail
result = runner.invoke(
config.config.commands["mirror_session"].commands["span"].commands["add"],
["test_session", "Ethernet20", "Ethernet24", "tx", "100"])

assert result.exit_code != 0
assert "Error: Port mirror direction 'tx' is not supported by the ASIC" in result.output

# Test with both direction - should fail
result = runner.invoke(
config.config.commands["mirror_session"].commands["span"].commands["add"],
["test_session", "Ethernet20", "Ethernet24", "both", "100"])

assert result.exit_code != 0
assert "Error: Port mirror direction 'both' is not supported by the ASIC" in result.output


def test_mirror_session_capability_function():
"""Test the is_port_mirror_capability_supported function directly"""

# Test 1: Test with valid STATE_DB responses
with mock.patch('config.main.SonicV2Connector') as mock_connector:
mock_instance = mock.Mock()
mock_connector.return_value = mock_instance

# Mock successful connection
mock_instance.connect.return_value = None

# Test ingress capability check
mock_instance.get.side_effect = lambda db, entry, field: {
("SWITCH_CAPABILITY|switch", "PORT_INGRESS_MIRROR_CAPABLE"): "true",
("SWITCH_CAPABILITY|switch", "PORT_EGRESS_MIRROR_CAPABLE"): "true"
}.get((entry, field), "false")

# Test rx direction
result = config.is_port_mirror_capability_supported("rx")
assert result is True

# Test tx direction
result = config.is_port_mirror_capability_supported("tx")
assert result is True

# Test both direction
result = config.is_port_mirror_capability_supported("both")
assert result is True

# Test no direction (should check both)
result = config.is_port_mirror_capability_supported(None)
assert result is True

# Test 2: Test with partial capability support
with mock.patch('config.main.SonicV2Connector') as mock_connector:
mock_instance = mock.Mock()
mock_connector.return_value = mock_instance

# Mock successful connection
mock_instance.connect.return_value = None

# Mock only ingress supported
mock_instance.get.side_effect = lambda db, entry, field: {
("SWITCH_CAPABILITY|switch", "PORT_INGRESS_MIRROR_CAPABLE"): "true",
("SWITCH_CAPABILITY|switch", "PORT_EGRESS_MIRROR_CAPABLE"): "false"
}.get((entry, field), "false")

# Test rx direction (should pass)
result = config.is_port_mirror_capability_supported("rx")
assert result is True

# Test tx direction (should fail)
result = config.is_port_mirror_capability_supported("tx")
assert result is False

# Test both direction (should fail)
result = config.is_port_mirror_capability_supported("both")
assert result is False

# Test no direction (should fail)
result = config.is_port_mirror_capability_supported(None)
assert result is False

# Test 3: Test with no capability support
with mock.patch('config.main.SonicV2Connector') as mock_connector:
mock_instance = mock.Mock()
mock_connector.return_value = mock_instance

# Mock successful connection
mock_instance.connect.return_value = None

# Mock no capabilities supported
mock_instance.get.side_effect = lambda db, entry, field: {
("SWITCH_CAPABILITY|switch", "PORT_INGRESS_MIRROR_CAPABLE"): "false",
("SWITCH_CAPABILITY|switch", "PORT_EGRESS_MIRROR_CAPABLE"): "false"
}.get((entry, field), "false")

# All directions should fail
assert config.is_port_mirror_capability_supported("rx") is False
assert config.is_port_mirror_capability_supported("tx") is False
assert config.is_port_mirror_capability_supported("both") is False
assert config.is_port_mirror_capability_supported(None) is False
Loading