Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func NewConfigDBConnector(a ...interface{}) *ConfigDBConnector {
if namespace is None:
namespace = ''
super(ConfigDBConnector, self).__init__(use_unix_socket_path = use_unix_socket_path, namespace = namespace)
# Initialize pubsub lazily to avoid accessing unconnected database
self.pubsub = None

# Trick: to achieve static/instance method "overload", we must use initize the function in ctor
# ref: https://stackoverflow.com/a/28766809/2514803
Expand Down Expand Up @@ -97,9 +99,6 @@ func NewConfigDBConnector(a ...interface{}) *ConfigDBConnector {
## Note: callback is difficult to implement by SWIG C++, so keep in python
def listen(self, init_data_handler=None):
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
self.pubsub = self.get_redis_client(self.db_name).pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))

# Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications
init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]}

Expand Down Expand Up @@ -208,7 +207,13 @@ func NewConfigDBConnector(a ...interface{}) *ConfigDBConnector {
handler = self.handlers[table]
handler(table, key, data)

def _ensure_pubsub(self):
if self.pubsub is None:
self.pubsub = self.get_redis_client(self.db_name).pubsub()

def subscribe(self, table, handler, fire_init_data=False):
self._ensure_pubsub()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ensure_pubsub

There is one pitfall in this solution. The redis connections are not thread safe, so each connection should be used inside each thread only. If multiple thread calling subscribe, they will share the same pubsub, which will not work.

you may consider thread local variables if you really want to achieve the optimization.

self.pubsub.psubscribe("__keyspace@{}__:{}*".format(self.get_dbid(self.db_name), table))
self.handlers[table] = handler
self.fire_init_data[table] = fire_init_data

Expand Down
17 changes: 11 additions & 6 deletions common/pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,34 @@
#include "dbconnector.h"
#include "logger.h"
#include "redisreply.h"
#include <iostream>

using namespace std;
using namespace swss;

PubSub::PubSub(DBConnector *parent)
: m_parentConnector(parent)
: RedisSelect(parent, 0), m_parentConnector(parent)
{
}

void PubSub::psubscribe(const std::string &pattern)
{
if (m_subscribe)
RedisSelect::psubscribe(pattern);
if (p_subscribe_count == 0)
{
m_select.removeSelectable(this);
m_select.addSelectable(this);
}
RedisSelect::psubscribe(m_parentConnector, pattern);
m_select.addSelectable(this);
p_subscribe_count++;
}

void PubSub::punsubscribe(const std::string &pattern)
{
RedisSelect::punsubscribe(pattern);
m_select.removeSelectable(this);
p_subscribe_count--;
if (p_subscribe_count == 0)
{
m_select.removeSelectable(this);
}
}

uint64_t PubSub::readData()
Expand Down
1 change: 1 addition & 0 deletions common/pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class PubSub : protected RedisSelect
/* Pop keyspace event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();
MessageResultPair get_message_internal(double timeout = 0.0, bool interrupt_on_signal = false);
unsigned int p_subscribe_count = 0;

DBConnector *m_parentConnector;
Select m_select;
Expand Down
25 changes: 23 additions & 2 deletions common/redisselect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@

namespace swss {

constexpr unsigned int RedisSelect::SUBSCRIBE_TIMEOUT;

RedisSelect::RedisSelect(int pri) : Selectable(pri), m_queueLength(-1)
{
}

RedisSelect::RedisSelect(DBConnector* parent, int pri) : Selectable(pri), m_queueLength(-1)
{
m_subscribe.reset(parent->newConnector(SUBSCRIBE_TIMEOUT));
}

int RedisSelect::getFd()
{
return m_subscribe->getContext()->fd;
Expand All @@ -21,7 +28,6 @@ const DBConnector* RedisSelect::getDbConnector() const
{
return m_subscribe.get();
}

uint64_t RedisSelect::readData()
{
redisReply *reply = nullptr;
Expand Down Expand Up @@ -81,6 +87,12 @@ void RedisSelect::subscribe(DBConnector* db, const std::string &channelName)
m_subscribe->subscribe(channelName);
}

void RedisSelect::subscribe(const std::string &channelName)
{
/* Send SUBSCRIBE #channel command */
m_subscribe->subscribe(channelName);
}

/* PSUBSCRIBE */
void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName)
{
Expand All @@ -92,6 +104,15 @@ void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName)
m_subscribe->psubscribe(channelName);
}

void RedisSelect::psubscribe(const std::string &channelName)
{
/*
* Send PSUBSCRIBE #channel command on the
* non-blocking subscriber DBConnector
*/
m_subscribe->psubscribe(channelName);
}

/* PUNSUBSCRIBE */
void RedisSelect::punsubscribe(const std::string &channelName)
{
Expand All @@ -101,7 +122,7 @@ void RedisSelect::punsubscribe(const std::string &channelName)
*/
if (m_subscribe)
{
m_subscribe->psubscribe(channelName);
m_subscribe->punsubscribe(channelName);
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/redisselect.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class RedisSelect : public Selectable
static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000;

RedisSelect(int pri = 0);
RedisSelect(DBConnector* parent, int pri = 0);

int getFd() override;
uint64_t readData() override;
Expand All @@ -25,9 +26,12 @@ class RedisSelect : public Selectable

/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void subscribe(DBConnector* db, const std::string &channelName);
void subscribe(const std::string &channelName);

/* PSUBSCRIBE */
void psubscribe(DBConnector* db, const std::string &channelName);
void psubscribe(const std::string &channelName);

/* PUNSUBSCRIBE */
void punsubscribe(const std::string &channelName);

Expand Down
1 change: 1 addition & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tests_tests_SOURCES = tests/redis_ut.cpp \
tests/converter_ut.cpp \
tests/exec_ut.cpp \
tests/redis_subscriber_state_ut.cpp \
tests/redis_multichannel_ut.cpp \
tests/selectable_priority.cpp \
tests/warm_restart_ut.cpp \
tests/redis_multi_db_ut.cpp \
Expand Down
Loading
Loading