Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/sw/redis++/async_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ AsyncConnection::AsyncContextUPtr AsyncConnection::_connect(const ConnectionOpti
redis_opts.type = REDIS_CONN_TCP;
redis_opts.endpoint.tcp.ip = opts.host.c_str();
redis_opts.endpoint.tcp.port = opts.port;
if (!opts.srcip.empty())
{
redis_opts.endpoint.tcp.source_addr = opts.srcip.c_str();
}
break;

case ConnectionType::UNIX:
Expand Down
2 changes: 2 additions & 0 deletions src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {

void update_node_info(const std::string &host, int port);



void set_subscriber_mode() {
_subscriber_impl = std::unique_ptr<AsyncSubscriberImpl>(new AsyncSubscriberImpl);
}
Expand Down
5 changes: 5 additions & 0 deletions src/sw/redis++/async_connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class AsyncConnectionPool : public std::enable_shared_from_this<AsyncConnectionP
void update_node_info(AsyncConnectionSPtr &connection,
std::exception_ptr err);

void update_conn_opt(ConnectionOptions _new_opts) {
std::lock_guard<std::mutex> lock(_mutex);
_opts = _new_opts;
}

private:
// NOT thread-safe
AsyncConnectionSPtr _create();
Expand Down
10 changes: 10 additions & 0 deletions src/sw/redis++/async_redis_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts,
_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role, cluster_opts);
}

bool AsyncRedisCluster::getSlotState()
{
return _pool->getSlotState();
}

void AsyncRedisCluster::update_conn_opt(std::map<std::string, std::string> _new_opts)
{
_pool->update_conn_opt(_new_opts);
}

AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection) {
assert(_pool);

Expand Down
23 changes: 23 additions & 0 deletions src/sw/redis++/async_redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,33 @@ class AsyncRedisCluster {

AsyncRedis redis(const StringView &hash_tag, bool new_connection = true);

bool getSlotState();

void update_conn_opt(std::map<std::string, std::string> _new_opts);

AsyncSubscriber subscriber();

AsyncSubscriber subscriber(const StringView &hash_tag);

template <typename Result, typename Input, typename Callback>
auto command(const StringView &key, Input first, Input last, Callback &&cb)
-> typename std::enable_if<IsIter<Input>::value, void>::type {
if (first == last || std::next(first) == last) {
throw Error("command: invalid range");
}

auto formatter = [](Input start, Input stop) {
CmdArgs cmd_args;
while (start != stop) {
cmd_args.append(*start);
++start;
}
return fmt::format_cmd(cmd_args);
};

_callback_generic_command<Result>(std::forward<Callback>(cb), formatter, key, first, last);
}

template <typename Result, typename ...Args>
auto command(const StringView &cmd_name, const StringView &key, Args &&...args)
-> typename std::enable_if<!IsInvocable<typename LastType<Args...>::type,
Expand Down
101 changes: 100 additions & 1 deletion src/sw/redis++/async_shards_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void AsyncShardsPool::_run() {
}

// Failed to update shards, retry later.
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(_pool_opts.solt_node_error_recover_time);

update();
}
Expand All @@ -223,6 +223,101 @@ auto AsyncShardsPool::_fetch_events() -> std::queue<RedeliverEvent> {
return events;
}

void AsyncShardsPool::update_conn_opt(std::map<std::string, std::string> _new_opts)
{
std::lock_guard<std::mutex> lock(_mutex);

//Auth related
if (_new_opts.count("passwd") != 0)
{
_connection_opts.password = _new_opts["passwd"];
}

if (_new_opts.count("user") != 0)
{
_connection_opts.user = _new_opts["user"];
}
//TLS related
if (_new_opts.count("enable_tls") != 0)
{
if (_new_opts["enable_tls"] == "True") {
_connection_opts.tls.enabled = true;
} else {
_connection_opts.tls.enabled = false;
}
}

if (_new_opts.count("trust_ca_file") != 0)
{
_connection_opts.tls.cacert = _new_opts["trust_ca_file"];
}
if (_new_opts.count("trust_ca_path") != 0)
{
_connection_opts.tls.cacertdir = _new_opts["trust_ca_path"];
}
if (_new_opts.count("client_cert_file") != 0)
{
_connection_opts.tls.cert = _new_opts["client_cert_file"];
}
if (_new_opts.count("client_cert_key") != 0)
{
_connection_opts.tls.key = _new_opts["client_cert_key"];
}
if (_new_opts.count("server_name") != 0)
{
_connection_opts.tls.sni = _new_opts["server_name"];
}
if (_new_opts.count("tls_protocol") != 0)
{
_connection_opts.tls.tls_protocol = _new_opts["tls_protocol"];
}
if (_new_opts.count("tls_ciphers") != 0)
{
_connection_opts.tls.ciphers = _new_opts["tls_ciphers"];
}
//Timers
int interval;
if (_new_opts.count("conn_timeout") != 0)
{
try {
interval = std::stoi(_new_opts["conn_timeout"]);
_connection_opts.connect_timeout = std::chrono::milliseconds(interval);
} catch (std::exception &e) {
//ingore
}
}
if (_new_opts.count("command_timeout") != 0)
{
try {
interval = std::stoi(_new_opts["command_timeout"]);
_connection_opts.socket_timeout = std::chrono::milliseconds(interval);
} catch (std::exception &e) {
//ingore
}
}
if (_new_opts.count("slot_error_retry") != 0)
{
try {
interval = std::stoi(_new_opts["slot_error_retry"]);
_connection_opts.socket_timeout = std::chrono::seconds(interval);
} catch (std::exception &e) {
// ingore
}
}

//Above new prop update, only apply into new connections.

for (const auto &entry : _pools)
{
const AsyncConnectionPoolSPtr &pool = entry.second;
if (pool != nullptr)
{
pool->update_conn_opt(_connection_opts);
}
}
}


std::size_t AsyncShardsPool::_random(std::size_t min, std::size_t max) const {
static thread_local std::default_random_engine engine;

Expand Down Expand Up @@ -294,6 +389,10 @@ void AsyncShardsPool::_update_shards() {
}
}

if (!_slot_ready) {
_slot_ready = true; //inital ready, mark it
}

// Update successfully.
return;
} catch (const Error &) {
Expand Down
6 changes: 6 additions & 0 deletions src/sw/redis++/async_shards_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ class AsyncShardsPool {

void update();

bool getSlotState() {return _slot_ready;}

ConnectionOptions connection_options(const StringView &key);

ConnectionOptions connection_options();

void update_conn_opt(std::map<std::string, std::string> _new_opts);

private:
struct RedeliverEvent {
std::string key;
Expand Down Expand Up @@ -119,6 +123,8 @@ class AsyncShardsPool {
std::queue<RedeliverEvent> _events;

static const std::size_t SHARDS = 16383;

bool _slot_ready = false;
};

using AsyncShardsPoolSPtr = std::shared_ptr<AsyncShardsPool>;
Expand Down
2 changes: 2 additions & 0 deletions src/sw/redis++/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct ConnectionOptions {

int port = 6379;

std::string srcip;

std::string path;

std::string user = "default";
Expand Down
3 changes: 3 additions & 0 deletions src/sw/redis++/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct ConnectionPoolOptions {

// Max idle time of a connection. 0ms means we never expire the connection.
std::chrono::milliseconds connection_idle_time{0};

// Max interval for retry solt node detection at error case, 5s as the default value.
std::chrono::seconds solt_node_error_recover_time{5};
};

class ConnectionPool {
Expand Down
4 changes: 4 additions & 0 deletions src/sw/redis++/reply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ std::string parse(ParseTag<std::string>, redisReply &reply) {
return std::string(reply.str, reply.len);
}

redisReply* parse(ParseTag<redisReply*>, redisReply &reply) {
return &reply;
}

long long parse(ParseTag<long long>, redisReply &reply) {
if (!reply::is_integer(reply)) {
throw ParseError("INTEGER", reply);
Expand Down
2 changes: 2 additions & 0 deletions src/sw/redis++/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ inline T parse(redisReply &reply) {
template <typename T>
T parse_leniently(redisReply &reply);

redisReply* parse(ParseTag<redisReply*>, redisReply &reply);

void parse(ParseTag<void>, redisReply &reply);

std::string parse(ParseTag<std::string>, redisReply &reply);
Expand Down
Loading