Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

strategy:
matrix:
redis_version: ["7.2", "7.4"]
redis_version: ["7.2", "7.4", "unstable"]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

strategy:
matrix:
redis_version: ["7.2", "7.4"]
redis_version: ["7.2", "7.4", "unstable"]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "LicenseRef-RSALv2 OR SSPL-1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
redis-module = { git = "https://github.com/RedisLabsModules/redismodule-rs", branch = "master", default-features = false }
redis-module = { git = "https://github.com/RedisLabsModules/redismodule-rs", branch = "add_GetInternalSecret_redismodule_api", default-features = false }
serde_json = "1"
serde = "1"
serde_derive = "1"
Expand Down
7 changes: 1 addition & 6 deletions rust_api/libmr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,12 @@ impl Default for crate::libmr_c_raw::bindings::Record {

pub type RustMRError = String;

pub fn mr_init(ctx: &Context, num_threads: usize, username: Option<&str>, password: Option<&str>) {
let username = username.map(|v| CString::new(v).unwrap());
pub fn mr_init(ctx: &Context, num_threads: usize, password: Option<&str>) {
let password = password.map(|v| CString::new(v).unwrap());
unsafe {
MR_Init(
ctx.ctx as *mut RedisModuleCtx,
num_threads,
username
.as_ref()
.map(|v| v.as_ptr())
.unwrap_or(ptr::null_mut()) as *mut c_char,
password
.as_ref()
.map(|v| v.as_ptr())
Expand Down
94 changes: 34 additions & 60 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,6 @@
#define NETWORK_TEST_COMMAND xstr(MODULE_NAME)".NETWORKTEST"
#define FORCE_SHARDS_CONNECTION xstr(MODULE_NAME)".FORCESHARDSCONNECTION"

/**
* @brief Sets the ACL categories for the given command.
* @return true if the ACL categories were set successfully for the
* command was registered successfully, false otherwise.
*/
#define SetCommandAcls(ctx, cmd, acls) \
({ \
bool result = false; \
if (!RedisModule_GetCommand || !RedisModule_SetCommandACLCategories \
|| !RedisModule_AddACLCategory) { \
result = true; \
} \
if (!result) { \
struct RedisModuleCommand *command = RedisModule_GetCommand(ctx, cmd); \
if (command != NULL) { \
const char *categories = ((acls) == NULL || !strcmp(acls, "")) \
? LIBMR_ACL_COMMAND_CATEGORY_NAME \
: acls " " LIBMR_ACL_COMMAND_CATEGORY_NAME; \
if (RedisModule_SetCommandACLCategories(command, categories) == \
REDISMODULE_OK) { \
result = true; \
} \
} \
} \
result; \
})

/** @brief Register a new Redis command with the required ACLs.
* @see RedisModule_CreateCommand
* @return true if the command was registered successfully, false
Expand All @@ -71,7 +44,7 @@
static inline __attribute__((always_inline)) bool
RegisterRedisCommand(RedisModuleCtx *ctx, const char *name,
RedisModuleCmdFunc cmdfunc, const char *strflags,
int firstkey, int lastkey, int keystep, const bool setAcls) {
int firstkey, int lastkey, int keystep) {
const int ret = RedisModule_CreateCommand(ctx, name, cmdfunc, strflags,
firstkey, lastkey, keystep);

Expand All @@ -81,11 +54,7 @@ RegisterRedisCommand(RedisModuleCtx *ctx, const char *name,
return false;
}

if (!setAcls) {
return true;
}

return SetCommandAcls(ctx, name, "");
return true;
}

typedef enum NodeStatus{
Expand Down Expand Up @@ -118,7 +87,6 @@ typedef struct Node{
char* id;
char* ip;
unsigned short port;
char* username;
char* password;
char* unixSocket;
redisAsyncContext *c;
Expand Down Expand Up @@ -153,7 +121,6 @@ struct ClusterCtx {
char myId[REDISMODULE_NODE_ID_LEN + 1];
int isOss;
functionId networkTestMsgReciever;
char *username;
char *password;
}clusterCtx;

Expand Down Expand Up @@ -617,10 +584,17 @@ static void MR_OnConnectCallback(const struct redisAsyncContext* c, int status){

RedisModule_Log(mr_staticCtx, "notice", "connected : %s:%d, status = %d\r\n", c->c.tcp.host, c->c.tcp.port, status);

if (n->username && n->password) {
redisAsyncCommand((redisAsyncContext*)c, NULL, NULL, "AUTH %s %s", n->username, n->password);
} else if (n->password){
if (n->password){
/* If password is provided to us we will use it (it means it was given to us with clusterset) */
redisAsyncCommand((redisAsyncContext*)c, NULL, NULL, "AUTH %s", n->password);
} else if (RedisModule_GetInternalSecret && !MR_IsEnterpriseBuild()) {
/* OSS deployment that support internal secret, lets use it. */
RedisModule_ThreadSafeContextLock(mr_staticCtx);
size_t len;
const char *secret = RedisModule_GetInternalSecret(mr_staticCtx, &len);
RedisModule_Assert(secret);
redisAsyncCommand((redisAsyncContext*)c, NULL, NULL, "INTERNALAUTH %b", secret, len);
RedisModule_ThreadSafeContextUnlock(mr_staticCtx);
}

if(n->sendClusterTopologyOnNextConnect && clusterCtx.CurrCluster->clusterSetCommand){
Expand Down Expand Up @@ -746,14 +720,13 @@ static Node* MR_GetNode(const char* id){
return n;
}

static Node* MR_CreateNode(const char* id, const char* ip, unsigned short port, const char* username, const char* password, const char* unixSocket, size_t minSlot, size_t maxSlot){
static Node* MR_CreateNode(const char* id, const char* ip, unsigned short port, const char* password, const char* unixSocket, size_t minSlot, size_t maxSlot){
RedisModule_Assert(!MR_GetNode(id));
Node* n = MR_ALLOC(sizeof(*n));
*n = (Node){
.id = MR_STRDUP(id),
.ip = MR_STRDUP(ip),
.port = port,
.username = username ? MR_STRDUP(username) : NULL,
.password = password ? MR_STRDUP(password) : NULL,
.unixSocket = unixSocket ? MR_STRDUP(unixSocket) : NULL,
.c = NULL,
Expand Down Expand Up @@ -847,7 +820,8 @@ static void MR_RefreshClusterData(){

Node* n = MR_GetNode(nodeId);
if(!n){
n = MR_CreateNode(nodeId, nodeIp, (unsigned short)port, clusterCtx.username, clusterCtx.password, NULL, minslot, maxslot);
/* If we have internal secret we will ignore the clusterCtx.password, we do not need it. */
n = MR_CreateNode(nodeId, nodeIp, (unsigned short)port, RedisModule_GetInternalSecret ? NULL : clusterCtx.password, NULL, minslot, maxslot);
}

if (n->isMe) {
Expand Down Expand Up @@ -964,7 +938,7 @@ static void MR_SetClusterData(RedisModuleString** argv, int argc){

Node* n = MR_GetNode(realId);
if(!n){
n = MR_CreateNode(realId, ip, port, NULL, password, NULL, minslot, maxslot);
n = MR_CreateNode(realId, ip, port, password, NULL, minslot, maxslot);
}
for(int i = minslot ; i <= maxslot ; ++i){
clusterCtx.CurrCluster->slots[i] = n;
Expand Down Expand Up @@ -1349,15 +1323,14 @@ static void MR_NetworkTest(RedisModuleCtx *ctx, const char *sender_id, uint8_t t
RedisModule_Log(ctx, "notice", "got a nextwork test msg");
}

int MR_ClusterInit(RedisModuleCtx* rctx, char *username, char *password) {
int MR_ClusterInit(RedisModuleCtx* rctx, char *password) {
clusterCtx.CurrCluster = NULL;
clusterCtx.callbacks = array_new(MR_ClusterMessageReceiver, 10);
clusterCtx.nodesMsgIds = mr_dictCreate(&mr_dictTypeHeapStrings, NULL);
clusterCtx.minSlot = 0;
clusterCtx.maxSlot = 0;
clusterCtx.clusterSize = 1;
clusterCtx.isOss = true;
clusterCtx.username = username ? MR_STRDUP(username) : NULL;
clusterCtx.password = password ? MR_STRDUP(password) : NULL;
memset(clusterCtx.myId, '0', REDISMODULE_NODE_ID_LEN);

Expand All @@ -1373,56 +1346,57 @@ int MR_ClusterInit(RedisModuleCtx* rctx, char *username, char *password) {
const char *command_flags = "readonly deny-script";
if (MR_IsEnterpriseBuild()) {
command_flags = "readonly deny-script _proxy-filtered";
} else {
if (RedisModule_GetInternalSecret) {
/* We run at a version that supports internal commands, let use it. */
command_flags = "readonly deny-script internal";
}
}

if (RedisModule_AddACLCategory) {
if (RedisModule_AddACLCategory(rctx, LIBMR_ACL_COMMAND_CATEGORY_NAME) != REDISMODULE_OK) {
RedisModule_Log(rctx, "error", "Failed to add ACL category");

if (!MR_IsEnterpriseBuild()) {
/* Refresh cluster is only relevant for oss, also notice that refresh cluster
* is not considered internal and should be performed by the user. */
if (!RegisterRedisCommand(rctx, CLUSTER_REFRESH_COMMAND, MR_ClusterRefresh,
"readonly deny-script", 0, 0, 0)) {
return REDISMODULE_ERR;
}
}

if (!RegisterRedisCommand(rctx, CLUSTER_REFRESH_COMMAND, MR_ClusterRefresh,
command_flags, 0, 0, 0, false)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_SET_COMMAND, MR_ClusterSet,
command_flags, 0, 0, -1, false)) {
command_flags, 0, 0, -1)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_SET_FROM_SHARD_COMMAND,
MR_ClusterSetFromShard, command_flags, 0, 0,
-1, true)) {
-1)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_HELLO_COMMAND, MR_ClusterHello,
command_flags, 0, 0, 0, true)) {
command_flags, 0, 0, 0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_INNER_COMMUNICATION_COMMAND,
MR_ClusterInnerCommunicationMsg, command_flags, 0, 0,
0, true)) {
0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, NETWORK_TEST_COMMAND, MR_NetworkTestCommand,
command_flags, 0, 0, 0, true)) {
command_flags, 0, 0, 0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_INFO_COMMAND, MR_ClusterInfoCommand,
command_flags, 0, 0, 0, true)) {
command_flags, 0, 0, 0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, FORCE_SHARDS_CONNECTION,
MR_ForceShardsConnectionCommand, command_flags, 0, 0,
0, true)) {
0)) {
return REDISMODULE_ERR;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ int MR_IsClusterInitialize();

size_t MR_ClusterGetSize();

int MR_ClusterInit(RedisModuleCtx* rctx, char *username, char *password);
int MR_ClusterInit(RedisModuleCtx* rctx, char *password);

size_t MR_ClusterGetSlotdByKey(const char* key, size_t len);

Expand Down
11 changes: 0 additions & 11 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@
#error "MODULE_NAME is not defined"
#endif

/** The name of the ACL category for the commands created by LibMR for
* its own operations.
*
* The user may redefine the category name by defining the macro
* LIBMR_ACL_COMMAND_CATEGORY_NAME before including this header.
*/
#ifndef LIBMR_ACL_COMMAND_CATEGORY_NAME
#define LIBMR_ACL_COMMAND_CATEGORY_NAME \
"_" xstr(MODULE_NAME) "_libmr_internal"
#endif

typedef struct MR_RedisVersion
{
int redisMajorVersion;
Expand Down
4 changes: 2 additions & 2 deletions src/mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1405,11 +1405,11 @@ static void MR_GetRedisVersion() {
RedisModule_FreeCallReply(reply);
}

int MR_Init(RedisModuleCtx* ctx, size_t numThreads, char *username, char *password) {
int MR_Init(RedisModuleCtx* ctx, size_t numThreads, char *password) {
mr_staticCtx = RedisModule_GetDetachedThreadSafeContext(ctx);
MR_GetRedisVersion();

if (MR_ClusterInit(ctx, username, password) != REDISMODULE_OK) {
if (MR_ClusterInit(ctx, password) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}

Expand Down
2 changes: 1 addition & 1 deletion src/mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ LIBMR_API void MR_Run(Execution* e);
LIBMR_API void MR_FreeExecution(Execution* e);

/* Initialize mr library */
LIBMR_API int MR_Init(struct RedisModuleCtx* ctx, size_t numThreads, char *username, char *password);
LIBMR_API int MR_Init(struct RedisModuleCtx* ctx, size_t numThreads, char *password);

/* Register a new object type */
LIBMR_API int MR_RegisterObject(MRObjectType* t);
Expand Down
2 changes: 2 additions & 0 deletions src/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,7 @@ REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(con
REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API const char * (*RedisModule_GetInternalSecret)(RedisModuleCtx *ctx, size_t *len) REDISMODULE_ATTR;

#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)

Expand Down Expand Up @@ -1694,6 +1695,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RdbStreamFree);
REDISMODULE_GET_API(RdbLoad);
REDISMODULE_GET_API(RdbSave);
REDISMODULE_GET_API(GetInternalSecret);


#ifdef REDISMODULE_RLEC_API_DEFS
Expand Down
18 changes: 16 additions & 2 deletions tests/mr_test_module/pytests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ def shardsConnections(env):

def verifyClusterInitialized(env):
for conn in shardsConnections(env):
try:
# try to promote to internal connection
conn.execute_command('debug', 'PROMOTE-CONN')
except Exception:
pass
allConnected = False
while not allConnected:
res = conn.execute_command('MRTESTS.INFOCLUSTER')
Expand All @@ -108,7 +113,12 @@ def initialiseCluster(env):
# able to execute commands on shards, on slow envs, run with valgrind,
# or mac, it is needed.
env.broadcast('CONFIG', 'set', 'cluster-node-timeout', '120000')
env.broadcast('MRTESTS.FORCESHARDSCONNECTION')
for conn in shardsConnections(env):
try:
conn.execute_command('debug', 'PROMOTE-CONN')
except Exception:
pass
conn.execute_command('MRTESTS.FORCESHARDSCONNECTION')
with TimeLimit(2):
verifyClusterInitialized(env)

Expand Down Expand Up @@ -141,7 +151,11 @@ def test_func():
raise unittest.SkipTest()
if skipOnVersionLowerThan:
skip_if_redis_version_is_lower_than(skipOnVersionLowerThan)
envArgs['moduleArgs'] = moduleArgs or None
defaultModuleArgs = 'password'
if not is_redis_version_is_lower_than('8.0.0'):
# We provide password only if version < 8.0.0. If version is greater, we have internal command and we do not need the password.
defaultModuleArgs = None
envArgs['moduleArgs'] = moduleArgs or defaultModuleArgs
envArgs['redisConfigFile'] = create_config_file(redisConfigFileContent) if redisConfigFileContent else None
env = Env(**envArgs)
conn = getConnectionByEnv(env)
Expand Down
2 changes: 1 addition & 1 deletion tests/mr_test_module/pytests/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ else
fi


python3 -m RLTest --verbose-information-on-failure --module $MODULE_PATH --clear-logs "$@" --oss_password "password"
python3 -m RLTest --verbose-information-on-failure --module $MODULE_PATH --clear-logs "$@" --oss_password "password" --enable-debug-command
Loading