Skip to content

Commit 0a3241b

Browse files
authored
Merge branch 'master' into dev-reset-local-users-password
2 parents fa8f09f + 6bac82b commit 0a3241b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2001
-85
lines changed

common/Makefile.am

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,18 @@ common_libswsscommon_la_SOURCES = \
6868
common/zmqclient.cpp \
6969
common/zmqserver.cpp \
7070
common/asyncdbupdater.cpp \
71-
common/redis_table_waiter.cpp
71+
common/redis_table_waiter.cpp \
72+
common/interface.h \
73+
common/c-api/util.cpp \
74+
common/c-api/dbconnector.cpp \
75+
common/c-api/consumerstatetable.cpp \
76+
common/c-api/producerstatetable.cpp \
77+
common/c-api/subscriberstatetable.cpp \
78+
common/c-api/zmqclient.cpp \
79+
common/c-api/zmqserver.cpp \
80+
common/c-api/zmqconsumerstatetable.cpp \
81+
common/c-api/zmqproducerstatetable.cpp \
82+
common/performancetimer.cpp
7283

7384
common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
7485
common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS)

common/asyncdbupdater.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ AsyncDBUpdater::~AsyncDBUpdater()
3030
// notify db update thread exit
3131
m_dbUpdateDataNotifyCv.notify_all();
3232
m_dbUpdateThread->join();
33+
SWSS_LOG_DEBUG("AsyncDBUpdater dtor tableName: %s", m_tableName.c_str());
3334
}
3435

3536
void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
@@ -61,16 +62,30 @@ void AsyncDBUpdater::dbUpdateThread()
6162
std::mutex cvMutex;
6263
std::unique_lock<std::mutex> cvLock(cvMutex);
6364

64-
while (m_runThread)
65+
while (true)
6566
{
6667
size_t count;
6768
count = queueSize();
6869
if (count == 0)
6970
{
71+
// Check if there still data in queue before exit
72+
if (!m_runThread)
73+
{
74+
SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str());
75+
break;
76+
}
77+
7078
// when queue is empty, wait notification, when data come, continue to check queue size again
7179
m_dbUpdateDataNotifyCv.wait(cvLock);
7280
continue;
7381
}
82+
else
83+
{
84+
if (!m_runThread)
85+
{
86+
SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count);
87+
}
88+
}
7489

7590
for (size_t ie = 0; ie < count; ie++)
7691
{

common/binaryserializer.h

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#define __BINARY_SERIALIZER__
33

44
#include "common/armhelper.h"
5+
#include "common/rediscommand.h"
6+
#include "common/table.h"
57

68
#include <string>
79

@@ -11,6 +13,26 @@ namespace swss {
1113

1214
class BinarySerializer {
1315
public:
16+
static size_t serializedSize(const string &dbName, const string &tableName,
17+
const vector<KeyOpFieldsValuesTuple> &kcos) {
18+
size_t n = 0;
19+
n += dbName.size() + sizeof(size_t);
20+
n += tableName.size() + sizeof(size_t);
21+
22+
for (const KeyOpFieldsValuesTuple &kco : kcos) {
23+
const vector<FieldValueTuple> &fvs = kfvFieldsValues(kco);
24+
n += kfvKey(kco).size() + sizeof(size_t);
25+
n += to_string(fvs.size()).size() + sizeof(size_t);
26+
27+
for (const FieldValueTuple &fv : fvs) {
28+
n += fvField(fv).size() + sizeof(size_t);
29+
n += fvValue(fv).size() + sizeof(size_t);
30+
}
31+
}
32+
33+
return n + sizeof(size_t);
34+
}
35+
1436
static size_t serializeBuffer(
1537
const char* buffer,
1638
const size_t size,
@@ -192,8 +214,8 @@ class BinarySerializer {
192214
{
193215
if ((size_t)(m_current_position - m_buffer + datalen + sizeof(size_t)) > m_buffer_size)
194216
{
195-
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\
196-
key count: %zu, data length %zu, buffer size: %zu",
217+
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\n"
218+
" key count: %zu, data length %zu, buffer size: %zu",
197219
m_kvp_count,
198220
datalen,
199221
m_buffer_size);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#include <boost/numeric/conversion/cast.hpp>
2+
#include <cstdlib>
3+
#include <cstring>
4+
#include <deque>
5+
6+
#include "../consumerstatetable.h"
7+
#include "../dbconnector.h"
8+
#include "../table.h"
9+
#include "consumerstatetable.h"
10+
#include "util.h"
11+
12+
using namespace swss;
13+
using namespace std;
14+
using boost::numeric_cast;
15+
16+
SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
17+
const int32_t *p_popBatchSize,
18+
const int32_t *p_pri) {
19+
int popBatchSize = p_popBatchSize ? numeric_cast<int>(*p_popBatchSize)
20+
: TableConsumable::DEFAULT_POP_BATCH_SIZE;
21+
int pri = p_pri ? numeric_cast<int>(*p_pri) : 0;
22+
SWSSTry(return (SWSSConsumerStateTable) new ConsumerStateTable(
23+
(DBConnector *)db, string(tableName), popBatchSize, pri));
24+
}
25+
26+
void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl) {
27+
SWSSTry(delete (ConsumerStateTable *)tbl);
28+
}
29+
30+
SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl) {
31+
SWSSTry({
32+
deque<KeyOpFieldsValuesTuple> vkco;
33+
((ConsumerStateTable *)tbl)->pops(vkco);
34+
return makeKeyOpFieldValuesArray(vkco);
35+
});
36+
}
37+
38+
uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl) {
39+
SWSSTry(return numeric_cast<uint32_t>(((ConsumerStateTable *)tbl)->getFd()));
40+
}
41+
42+
SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms,
43+
uint8_t interrupt_on_signal) {
44+
SWSSTry(return selectOne((ConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal));
45+
}

common/c-api/consumerstatetable.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#ifndef SWSS_COMMON_C_API_CONSUMERSTATETABLE_H
2+
#define SWSS_COMMON_C_API_CONSUMERSTATETABLE_H
3+
4+
#include "dbconnector.h"
5+
#include "util.h"
6+
7+
#ifdef __cplusplus
8+
extern "C" {
9+
#endif
10+
11+
#include <stdint.h>
12+
13+
typedef struct SWSSConsumerStateTableOpaque *SWSSConsumerStateTable;
14+
15+
// Pass NULL for popBatchSize and/or pri to use the default values
16+
SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
17+
const int32_t *popBatchSize, const int32_t *pri);
18+
19+
void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl);
20+
21+
// Result array and all of its members must be freed using free()
22+
SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl);
23+
24+
// Return the underlying fd for polling/selecting on.
25+
// Callers must NOT read/write on fd, it may only be used for epoll or similar.
26+
// After the fd becomes readable, SWSSConsumerStateTable_readData must be used to
27+
// reset the fd and read data into internal data structures.
28+
uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl);
29+
30+
// Block until data is available to read or until a timeout elapses.
31+
// A timeout of 0 means the call will return immediately.
32+
SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms,
33+
uint8_t interrupt_on_signal);
34+
35+
#ifdef __cplusplus
36+
}
37+
#endif
38+
39+
#endif

common/c-api/dbconnector.cpp

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#include <cstring>
2+
#include <string>
3+
#include <utility>
4+
5+
#include "../dbconnector.h"
6+
#include "dbconnector.h"
7+
#include "util.h"
8+
9+
using namespace swss;
10+
using namespace std;
11+
12+
void SWSSSonicDBConfig_initialize(const char *path) {
13+
SWSSTry(SonicDBConfig::initialize(path));
14+
}
15+
16+
void SWSSSonicDBConfig_initializeGlobalConfig(const char *path) {
17+
SWSSTry(SonicDBConfig::initializeGlobalConfig(path));
18+
}
19+
20+
SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port,
21+
uint32_t timeout) {
22+
SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(hostname), port, timeout));
23+
}
24+
25+
SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout) {
26+
SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(sock_path), timeout));
27+
}
28+
29+
SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn) {
30+
SWSSTry(return (SWSSDBConnector) new DBConnector(string(dbName), timeout_ms, isTcpConn));
31+
}
32+
33+
void SWSSDBConnector_free(SWSSDBConnector db) {
34+
delete (DBConnector *)db;
35+
}
36+
37+
int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) {
38+
SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0);
39+
}
40+
41+
void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value) {
42+
SWSSTry(((DBConnector *)db)->set(string(key), takeStrRef(value)));
43+
}
44+
45+
SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key) {
46+
SWSSTry({
47+
shared_ptr<string> s = ((DBConnector *)db)->get(string(key));
48+
return s ? makeString(move(*s)) : nullptr;
49+
});
50+
}
51+
52+
int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key) {
53+
SWSSTry(return ((DBConnector *)db)->exists(string(key)) ? 1 : 0);
54+
}
55+
56+
int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field) {
57+
SWSSTry(return ((DBConnector *)db)->hdel(string(key), string(field)) ? 1 : 0);
58+
}
59+
60+
void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field,
61+
SWSSStrRef value) {
62+
SWSSTry(((DBConnector *)db)->hset(string(key), string(field), takeStrRef(value)));
63+
}
64+
65+
SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) {
66+
SWSSTry({
67+
shared_ptr<string> s = ((DBConnector *)db)->hget(string(key), string(field));
68+
return s ? makeString(move(*s)) : nullptr;
69+
});
70+
}
71+
72+
SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) {
73+
SWSSTry({
74+
auto map = ((DBConnector *)db)->hgetall(string(key));
75+
76+
// We can't move keys out of the map, we have to copy them, until C++17 map::extract so we
77+
// copy them here into a vector to avoid needing an overload on makeFieldValueArray
78+
vector<pair<string, string>> pairs;
79+
pairs.reserve(map.size());
80+
for (auto &pair : map)
81+
pairs.push_back(make_pair(pair.first, move(pair.second)));
82+
83+
return makeFieldValueArray(std::move(pairs));
84+
});
85+
}
86+
87+
int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field) {
88+
SWSSTry(return ((DBConnector *)db)->hexists(string(key), string(field)) ? 1 : 0);
89+
}
90+
91+
int8_t SWSSDBConnector_flushdb(SWSSDBConnector db) {
92+
SWSSTry(return ((DBConnector *)db)->flushdb() ? 1 : 0);
93+
}

common/c-api/dbconnector.h

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#ifndef SWSS_COMMON_C_API_DBCONNECTOR_H
2+
#define SWSS_COMMON_C_API_DBCONNECTOR_H
3+
4+
#include "util.h"
5+
#ifdef __cplusplus
6+
extern "C" {
7+
#endif
8+
9+
#include <stdint.h>
10+
11+
void SWSSSonicDBConfig_initialize(const char *path);
12+
13+
void SWSSSonicDBConfig_initializeGlobalConfig(const char *path);
14+
15+
typedef struct SWSSDBConnectorOpaque *SWSSDBConnector;
16+
17+
// Pass 0 to timeout for infinity
18+
SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port,
19+
uint32_t timeout_ms);
20+
21+
// Pass 0 to timeout for infinity
22+
SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout_ms);
23+
24+
// Pass 0 to timeout for infinity
25+
SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn);
26+
27+
void SWSSDBConnector_free(SWSSDBConnector db);
28+
29+
// Returns 0 when key doesn't exist, 1 when key was deleted
30+
int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key);
31+
32+
void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value);
33+
34+
// Returns NULL if key doesn't exist
35+
// Result must be freed using SWSSString_free()
36+
SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key);
37+
38+
// Returns 0 for false, 1 for true
39+
int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key);
40+
41+
// Returns 0 when key or field doesn't exist, 1 when field was deleted
42+
int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field);
43+
44+
void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, SWSSStrRef value);
45+
46+
// Returns NULL if key or field doesn't exist
47+
// Result must be freed using SWSSString_free()
48+
SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field);
49+
50+
// Returns an empty map when the key doesn't exist
51+
// Result array and all of its elements must be freed using appropriate free functions
52+
SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key);
53+
54+
// Returns 0 when key or field doesn't exist, 1 when field exists
55+
int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field);
56+
57+
// Returns 1 on success, 0 on failure
58+
int8_t SWSSDBConnector_flushdb(SWSSDBConnector db);
59+
60+
#ifdef __cplusplus
61+
}
62+
#endif
63+
64+
#endif

0 commit comments

Comments
 (0)