Skip to content

Commit bc2f174

Browse files
Orchagent changes to enable zmq mode with syncd for DPUs (#3547)
* Enable syncd zmq mode for DPUs What I did Moved setting of redis communication mode into initSaiRedis() as otherwise, INIT_VIEW will still use redis channel which will not work when syncd uses zmq mode Forward port_state_change notifications from on_port_state_change handler to redis channel that portsorch polls for when zmq communication mode is enabled, otherwise port state notifications will never get processed by orchagent. Why I did it To enable zmq commnication mode for smartswitch DPUs to achieve better performance
1 parent ae6e138 commit bc2f174

File tree

8 files changed

+225
-25
lines changed

8 files changed

+225
-25
lines changed

orchagent/main.cpp

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -571,21 +571,6 @@ int main(int argc, char **argv)
571571
attrs.push_back(attr);
572572
}
573573

574-
// SAI_REDIS_SWITCH_ATTR_SYNC_MODE attribute only setBuffer and g_syncMode to true
575-
// since it is not using ASIC_DB, we can execute it before create_switch
576-
// when g_syncMode is set to true here, create_switch will wait the response from syncd
577-
if (gSyncMode)
578-
{
579-
SWSS_LOG_WARN("sync mode is depreacated, use -z param");
580-
581-
gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_SYNC;
582-
}
583-
584-
attr.id = SAI_REDIS_SWITCH_ATTR_REDIS_COMMUNICATION_MODE;
585-
attr.value.s32 = gRedisCommunicationMode;
586-
587-
sai_switch_api->set_switch_attribute(gSwitchId, &attr);
588-
589574
if (!gAsicInstance.empty())
590575
{
591576
attr.id = SAI_SWITCH_ATTR_SWITCH_HARDWARE_INFO;

orchagent/notifications.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ extern "C" {
77
#include "switchorch.h"
88

99
extern SwitchOrch *gSwitchOrch;
10+
extern sai_redis_communication_mode_t gRedisCommunicationMode;
1011

1112
#ifdef ASAN_ENABLED
1213
#include <sanitizer/lsan_interface.h>
@@ -18,10 +19,25 @@ void on_fdb_event(uint32_t count, sai_fdb_event_notification_data_t *data)
1819
// which causes concurrency access to the DB
1920
}
2021

22+
/*
23+
* Don't perform DB operations within this event handler, because it runs by
24+
* libsairedis in a separate thread which causes concurrency issues.
25+
* For platforms which use zmq between orchagent and syncd, it is an acceptable
26+
* workaround to forward the notifications from the callback handler to the
27+
* redis notifications channel processed by portsorch.
28+
*/
2129
void on_port_state_change(uint32_t count, sai_port_oper_status_notification_t *data)
2230
{
23-
// don't use this event handler, because it runs by libsairedis in a separate thread
24-
// which causes concurrency access to the DB
31+
if (gRedisCommunicationMode == SAI_REDIS_COMMUNICATION_MODE_ZMQ_SYNC)
32+
{
33+
swss::DBConnector db("ASIC_DB", 0);
34+
swss::NotificationProducer port_state_change(&db, "NOTIFICATIONS");
35+
std::string sdata = sai_serialize_port_oper_status_ntf(count, data);
36+
std::vector<swss::FieldValueTuple> values;
37+
38+
// Forward port_state_change notification to be handled in portsorch doTask()
39+
port_state_change.send("port_state_change", sdata, values);
40+
}
2541
}
2642

2743
void on_bfd_session_state_change(uint32_t count, sai_bfd_session_state_notification_t *data)

orchagent/p4orch/tests/test_main.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ size_t gMaxBulkSize = DEFAULT_MAX_BULK_SIZE;
4545
bool gSyncMode = false;
4646
bool gIsNatSupported = false;
4747
bool gTraditionalFlexCounter = false;
48+
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
4849

4950
PortsOrch *gPortsOrch;
5051
CrmOrch *gCrmOrch;

orchagent/saihelper.cpp

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ sai_stp_api_t* sai_stp_api;
9191

9292
extern sai_object_id_t gSwitchId;
9393
extern bool gTraditionalFlexCounter;
94+
extern bool gSyncMode;
95+
extern sai_redis_communication_mode_t gRedisCommunicationMode;
9496

9597
vector<sai_object_id_t> gGearboxOids;
9698

@@ -298,6 +300,16 @@ void initFlexCounterTables()
298300

299301
void initSaiRedis()
300302
{
303+
// SAI_REDIS_SWITCH_ATTR_SYNC_MODE attribute only setBuffer and g_syncMode to true
304+
// since it is not using ASIC_DB, we can execute it before create_switch
305+
// when g_syncMode is set to true here, create_switch will wait the response from syncd
306+
if (gSyncMode)
307+
{
308+
SWSS_LOG_WARN("sync mode is depreacated, use -z param");
309+
310+
gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_SYNC;
311+
}
312+
301313
/**
302314
* NOTE: Notice that all Redis attributes here are using SAI_NULL_OBJECT_ID
303315
* as the switch ID, because those operations don't require actual switch
@@ -307,6 +319,16 @@ void initSaiRedis()
307319
sai_attribute_t attr;
308320
sai_status_t status;
309321

322+
attr.id = SAI_REDIS_SWITCH_ATTR_REDIS_COMMUNICATION_MODE;
323+
attr.value.s32 = gRedisCommunicationMode;
324+
325+
status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
326+
if (status != SAI_STATUS_SUCCESS)
327+
{
328+
SWSS_LOG_ERROR("Failed to set communication mode, rv:%d", status);
329+
exit(EXIT_FAILURE);
330+
}
331+
310332
auto record_filename = Recorder::Instance().sairedis.getFile();
311333
auto record_location = Recorder::Instance().sairedis.getLoc();
312334

@@ -351,16 +373,19 @@ void initSaiRedis()
351373
exit(EXIT_FAILURE);
352374
}
353375

354-
attr.id = SAI_REDIS_SWITCH_ATTR_USE_PIPELINE;
355-
attr.value.booldata = true;
356-
357-
status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
358-
if (status != SAI_STATUS_SUCCESS)
376+
if (gRedisCommunicationMode == SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC)
359377
{
360-
SWSS_LOG_ERROR("Failed to enable redis pipeline, rv:%d", status);
361-
exit(EXIT_FAILURE);
378+
SWSS_LOG_NOTICE("Enable redis pipeline");
379+
attr.id = SAI_REDIS_SWITCH_ATTR_USE_PIPELINE;
380+
attr.value.booldata = true;
381+
382+
status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
383+
if (status != SAI_STATUS_SUCCESS)
384+
{
385+
SWSS_LOG_ERROR("Failed to enable redis pipeline, rv:%d", status);
386+
exit(EXIT_FAILURE);
387+
}
362388
}
363-
SWSS_LOG_NOTICE("Enable redis pipeline");
364389

365390
char *platform = getenv("platform");
366391
if (platform && (strstr(platform, MLNX_PLATFORM_SUBSTRING) || strstr(platform, XS_PLATFORM_SUBSTRING)))

tests/mock_tests/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ tests_SOURCES = aclorch_ut.cpp \
7272
mock_orch_test.cpp \
7373
mock_dash_orch_test.cpp \
7474
zmq_orch_ut.cpp \
75+
mock_saihelper.cpp \
7576
$(top_srcdir)/warmrestart/warmRestartHelper.cpp \
7677
$(top_srcdir)/lib/gearboxutils.cpp \
7778
$(top_srcdir)/lib/subintf.cpp \

tests/mock_tests/mock_orchagent_main.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ int32_t gVoqMySwitchId = 0;
1818
string gMyHostName = "Linecard1";
1919
string gMyAsicName = "Asic0";
2020
bool gTraditionalFlexCounter = false;
21+
bool gSyncMode = false;
22+
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
2123

2224
VRFOrch *gVrfOrch;
2325

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#define private public // make Directory::m_values available to clean it.
2+
#include "directory.h"
3+
#undef private
4+
#define protected public
5+
#include "orch.h"
6+
#undef protected
7+
#include "ut_helper.h"
8+
#include "mock_orchagent_main.h"
9+
#include "mock_table.h"
10+
#include "mock_response_publisher.h"
11+
#include "saihelper.h"
12+
13+
namespace saihelper_test
14+
{
15+
using namespace std;
16+
17+
sai_switch_api_t ut_sai_switch_api;
18+
sai_switch_api_t *old_sai_switch_api;
19+
20+
shared_ptr<swss::DBConnector> m_app_db;
21+
shared_ptr<swss::DBConnector> m_config_db;
22+
shared_ptr<swss::DBConnector> m_state_db;
23+
24+
bool set_comm_mode_not_supported;
25+
bool use_pipeline_not_supported;
26+
27+
sai_status_t _ut_stub_sai_set_switch_attribute(
28+
_In_ sai_object_id_t switch_id,
29+
_In_ const sai_attribute_t *attr)
30+
{
31+
switch (attr[0].id)
32+
{
33+
case SAI_REDIS_SWITCH_ATTR_REDIS_COMMUNICATION_MODE:
34+
if (set_comm_mode_not_supported)
35+
{
36+
return SAI_STATUS_NOT_SUPPORTED;
37+
}
38+
else
39+
{
40+
return SAI_STATUS_SUCCESS;
41+
}
42+
break;
43+
case SAI_REDIS_SWITCH_ATTR_USE_PIPELINE:
44+
if (use_pipeline_not_supported)
45+
{
46+
return SAI_STATUS_NOT_SUPPORTED;
47+
}
48+
else
49+
{
50+
return SAI_STATUS_SUCCESS;
51+
}
52+
break;
53+
default:
54+
break;
55+
}
56+
return SAI_STATUS_SUCCESS;
57+
}
58+
59+
void _hook_sai_apis()
60+
{
61+
ut_sai_switch_api = *sai_switch_api;
62+
old_sai_switch_api = sai_switch_api;
63+
ut_sai_switch_api.set_switch_attribute = _ut_stub_sai_set_switch_attribute;
64+
sai_switch_api = &ut_sai_switch_api;
65+
}
66+
67+
void _unhook_sai_apis()
68+
{
69+
sai_switch_api = old_sai_switch_api;
70+
}
71+
72+
class SaihelperTest : public ::testing::Test
73+
{
74+
public:
75+
76+
SaihelperTest()
77+
{
78+
};
79+
80+
~SaihelperTest()
81+
{
82+
};
83+
84+
void SetUp() override
85+
{
86+
// Init switch and create dependencies
87+
m_app_db = make_shared<swss::DBConnector>("APPL_DB", 0);
88+
m_config_db = make_shared<swss::DBConnector>("CONFIG_DB", 0);
89+
m_state_db = make_shared<swss::DBConnector>("STATE_DB", 0);
90+
91+
set_comm_mode_not_supported = false;
92+
use_pipeline_not_supported = false;
93+
94+
map<string, string> profile = {
95+
{ "SAI_VS_SWITCH_TYPE", "SAI_VS_SWITCH_TYPE_BCM56850" },
96+
{ "KV_DEVICE_MAC_ADDRESS", "20:03:04:05:06:00" }
97+
};
98+
99+
ut_helper::initSaiApi(profile);
100+
101+
sai_attribute_t attr;
102+
103+
attr.id = SAI_SWITCH_ATTR_INIT_SWITCH;
104+
attr.value.booldata = true;
105+
106+
auto status = sai_switch_api->create_switch(&gSwitchId, 1, &attr);
107+
ASSERT_EQ(status, SAI_STATUS_SUCCESS);
108+
}
109+
110+
void initSwitchOrch()
111+
{
112+
TableConnector stateDbSwitchTable(m_state_db.get(), "SWITCH_CAPABILITY");
113+
TableConnector conf_asic_sensors(m_config_db.get(), CFG_ASIC_SENSORS_TABLE_NAME);
114+
TableConnector app_switch_table(m_app_db.get(), APP_SWITCH_TABLE_NAME);
115+
TableConnector conf_suppress_asic_sdk_health_categories(m_config_db.get(), CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME);
116+
117+
vector<TableConnector> switch_tables = {
118+
conf_asic_sensors,
119+
conf_suppress_asic_sdk_health_categories,
120+
app_switch_table
121+
};
122+
123+
ASSERT_EQ(gSwitchOrch, nullptr);
124+
gSwitchOrch = new SwitchOrch(m_app_db.get(), switch_tables, stateDbSwitchTable);
125+
}
126+
127+
void TearDown() override
128+
{
129+
::testing_db::reset();
130+
131+
gDirectory.m_values.clear();
132+
133+
delete gSwitchOrch;
134+
gSwitchOrch = nullptr;
135+
136+
ut_helper::uninitSaiApi();
137+
}
138+
};
139+
140+
TEST_F(SaihelperTest, TestSetCommunicationModeFailure) {
141+
set_comm_mode_not_supported = true;
142+
_hook_sai_apis();
143+
initSwitchOrch();
144+
145+
// Assert that the program terminates after initSaiRedis() call
146+
ASSERT_DEATH({initSaiRedis();}, "");
147+
set_comm_mode_not_supported = false;
148+
_unhook_sai_apis();
149+
}
150+
151+
TEST_F(SaihelperTest, TestSetRedisPipelineFailure) {
152+
use_pipeline_not_supported = true;
153+
_hook_sai_apis();
154+
initSwitchOrch();
155+
156+
// Assert that the program terminates after initSaiRedis() call
157+
ASSERT_DEATH({initSaiRedis();}, "");
158+
use_pipeline_not_supported = false;
159+
_unhook_sai_apis();
160+
}
161+
}
162+

tests/mock_tests/portsorch_ut.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#define private public
1212
#include "pfcactionhandler.h"
1313
#include "switchorch.h"
14+
#include "notifications.h"
1415
#include <sys/mman.h>
1516
#undef private
1617
#define private public
@@ -20,6 +21,7 @@
2021
#include <sstream>
2122

2223
extern redisReply *mockReply;
24+
extern sai_redis_communication_mode_t gRedisCommunicationMode;
2325
using ::testing::_;
2426
using ::testing::StrictMock;
2527

@@ -689,6 +691,12 @@ namespace portsorch_test
689691
gPortsOrch->doTask(*consumer);
690692
mockReply = nullptr;
691693

694+
// Call the orchagent port state change callback method with zmq mode
695+
sai_redis_communication_mode_t oldRedisCommunicationMode = gRedisCommunicationMode;
696+
gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_ZMQ_SYNC;
697+
on_port_state_change(1, &port_oper_status);
698+
gRedisCommunicationMode = oldRedisCommunicationMode;
699+
692700
gPortsOrch->getPort("Ethernet0", port);
693701
ASSERT_TRUE(port.m_oper_status == oper_status);
694702
ASSERT_TRUE(port.m_flap_count == count+1);

0 commit comments

Comments
 (0)