Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ common_libswsscommon_la_SOURCES = \
common/notificationproducer.cpp \
common/linkcache.cpp \
common/portmap.cpp \
common/publishereventtable.cpp \
common/pubsub.cpp \
common/tokenize.cpp \
common/exec.cpp \
common/saiaclschema.cpp \
common/subscribereventtable.cpp \
common/subscriberstatetable.cpp \
common/timestamp.cpp \
common/warm_restart.cpp \
Expand Down
88 changes: 88 additions & 0 deletions common/publishereventtable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include <nlohmann/json.hpp>
#include "publishereventtable.h"
#include "rediscommand.h"
#include "schema.h"
#include "table.h"

using namespace std;
using namespace swss;

string buildJsonWithKey(const FieldValueTuple &fvHead, const vector<FieldValueTuple> &fv)
{
nlohmann::json j = nlohmann::json::array();
j.push_back(fvField(fvHead));
j.push_back(fvValue(fvHead));

// we use array to save order
for (const auto &i : fv)
{
j.push_back(fvField(i));
j.push_back(fvValue(i));
}

return j.dump();
}

PublisherEventTable::PublisherEventTable(const DBConnector *db, const std::string &tableName)
: Table(db, tableName)
{
m_channel = getChannelName();
}

PublisherEventTable::PublisherEventTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered)
: Table(pipeline, tableName, buffered)
{
m_channel = getChannelName();
}

PublisherEventTable::~PublisherEventTable()
{
}

void PublisherEventTable::set(const string &key, const vector<FieldValueTuple> &values,
const string &op, const string &prefix)
{
if (values.size() == 0)
return;

RedisCommand cmd;

cmd.formatHSET(getKeyName(key), values.begin(), values.end());
m_pipe->push(cmd, REDIS_REPLY_INTEGER);


FieldValueTuple opdata(SET_COMMAND, key);
std::string msg = buildJsonWithKey(opdata, values);

SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str());

RedisCommand command;
command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str());
m_pipe->push(command, REDIS_REPLY_INTEGER);

if (!m_buffered)
{
m_pipe->flush();
}
}

void PublisherEventTable::del(const string &key, const string& op, const string& /*prefix*/)
{
RedisCommand del_key;
del_key.format("DEL %s", getKeyName(key).c_str());
m_pipe->push(del_key, REDIS_REPLY_INTEGER);

FieldValueTuple opdata(DEL_COMMAND, key);
std::string msg = buildJsonWithKey(opdata, {});

SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str());

RedisCommand command;
command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str());
m_pipe->push(command, REDIS_REPLY_INTEGER);

if (!m_buffered)
{
m_pipe->flush();
}
}
72 changes: 72 additions & 0 deletions common/publishereventtable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include <string>
#include "dbconnector.h"
#include "table.h"

namespace swss {

class PublisherEventTable : public Table { // public TableBase, public TableEntryEnumerable {
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Outdated comment // public TableBase, public TableEntryEnumerable; update or remove it to reflect that this class now inherits directly from Table.

Copilot uses AI. Check for mistakes.
public:
PublisherEventTable(const DBConnector *db, const std::string &tableName);
PublisherEventTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered);
~PublisherEventTable() override;

/* Set an entry in the DB directly (op not in use) */
virtual void set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op = "",
const std::string &prefix = EMPTY_PREFIX);

/* Set an entry in the DB directly and configure ttl for it (op not in use) */
// explicitly disable the virtual function
virtual void set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op,
const std::string &prefix,
const int64_t &ttl)
{
throw std::runtime_error("set with ttl is not supported in PublisherEventTable");
}

/* Delete an entry in the table */
virtual void del(const std::string &key,
const std::string &op = "",
const std::string &prefix = EMPTY_PREFIX);

// explicitly disable the virtual function
virtual void hdel(const std::string &key,
const std::string &field,
const std::string &op = "",
const std::string &prefix = EMPTY_PREFIX)
{
throw std::runtime_error("hdel is not supported in PublisherEventTable");
}

virtual void hset(const std::string &key,
const std::string &field,
const std::string &value,
const std::string &op = "",
const std::string &prefix = EMPTY_PREFIX)
{
throw std::runtime_error("hset is not supported in PublisherEventTable");
}

/* Read a value from the DB directly */
/* Returns false if the key doesn't exists */
// virtual bool get(const std::string &key, std::vector<FieldValueTuple> &ovalues);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

// void getKeys(std::vector<std::string> &keys);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

// void setBuffered(bool buffered);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

// void flush();

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

// void dump(TableDump &tableDump);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

private:
std::string m_channel;
};

}

175 changes: 175 additions & 0 deletions common/subscribereventtable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include "json.h"
#include "dbconnector.h"
#include "rediscommand.h"
#include "table.h"
#include "selectable.h"
#include "redisselect.h"
#include "redisapi.h"
#include "tokenize.h"
#include "subscriberstatetable.h"
#include "subscribereventtable.h"

using namespace std;

namespace swss {

// TODO: reuse
#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3)
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in macro name REDIS_PUBLISH_MESSAGE_ELEMNTS; consider renaming to REDIS_PUBLISH_MESSAGE_ELEMENTS.

Suggested change
#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3)
#define REDIS_PUBLISH_MESSAGE_ELEMENTS (3)

Copilot uses AI. Check for mistakes.
#define REDIS_PUBLISH_MESSAGE_INDEX (2)
string processReply(redisReply *reply)
{
SWSS_LOG_ENTER();

if (reply->type != REDIS_REPLY_ARRAY)
{
// SWSS_LOG_ERROR("expected ARRAY redis reply on channel %s, got: %d", m_channel.c_str(), reply->type);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

throw std::runtime_error("getRedisReply operation failed");
}

if (reply->elements != REDIS_PUBLISH_MESSAGE_ELEMNTS)
{
// SWSS_LOG_ERROR("expected %d elements in redis reply on channel %s, got: %zu",
// REDIS_PUBLISH_MESSAGE_ELEMNTS,
// m_channel.c_str(),
// reply->elements);

throw std::runtime_error("getRedisReply operation failed");
}

std::string msg = std::string(reply->element[REDIS_PUBLISH_MESSAGE_INDEX]->str);

SWSS_LOG_DEBUG("got message: %s", msg.c_str());

return msg;
}

SubscriberEventTable::SubscriberEventTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName)
{
m_channel = getChannelName();

subscribe(m_db, m_channel);

vector<string> keys;
m_table.getKeys(keys);

for (const auto &key: keys)
{
KeyOpFieldsValuesTuple kco;

kfvKey(kco) = key;
kfvOp(kco) = SET_COMMAND;

if (!m_table.get(key, kfvFieldsValues(kco)))
{
continue;
}

m_buffer.push_back(kco);
}
}

uint64_t SubscriberEventTable::readData()
{
redisReply *reply = nullptr;

/* Read data from redis. This call is non blocking. This method
* is called from Select framework when data is available in socket.
* NOTE: All data should be stored in event buffer. It won't be possible to
* read them second time. */
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}

m_event_buffer.emplace_back(make_shared<RedisReply>(reply));

/* Try to read data from redis cacher.
* If data exists put it to event buffer.
* NOTE: channel event is not persistent and it won't
* be possible to read it second time. If it is not stored in
* the buffer it will be lost. */

reply = nullptr;
int status;
do
{
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
m_event_buffer.emplace_back(make_shared<RedisReply>(reply));
}
}
while(reply != nullptr && status == REDIS_OK);

if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
return 0;
}

bool SubscriberEventTable::hasData()
{
return m_buffer.size() > 0 || m_event_buffer.size() > 0;
}

bool SubscriberEventTable::hasCachedData()
{
return m_buffer.size() + m_event_buffer.size() > 1;
}

void SubscriberEventTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
{
vkco.clear();

if (!m_buffer.empty())
{
vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end());
m_buffer.clear();
return;
}

while (auto event = popEventBuffer())
{

KeyOpFieldsValuesTuple kco;
auto &values = kfvFieldsValues(kco);
string msg = processReply(event.get()->getContext());

JSon::readJson(msg, values);

FieldValueTuple fvHead = values.at(0);

kfvOp(kco) = fvField(fvHead);
kfvKey(kco) = fvValue(fvHead);

values.erase(values.begin());

vkco.push_back(kco);
}

m_event_buffer.clear();

return;
}

shared_ptr<RedisReply> SubscriberEventTable::popEventBuffer()
{
if (m_event_buffer.empty())
{
return NULL;
}

auto reply = m_event_buffer.front();
m_event_buffer.pop_front();

return reply;
}

}
34 changes: 34 additions & 0 deletions common/subscribereventtable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "subscriberstatetable.h"

namespace swss {

class SubscriberEventTable : public ConsumerTableBase
{
public:
SubscriberEventTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

/* Get all available events from pub/sub channel */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);

/* Read event from redis channel*/
uint64_t readData() override;
bool hasData() override;
bool hasCachedData() override;
bool initializedWithData() override
{
return !m_buffer.empty();
}

private:
/* Pop event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();

std::string m_channel;

std::deque<std::shared_ptr<RedisReply>> m_event_buffer;
Table m_table;
};

}
Loading
Loading