Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

strategy:
matrix:
redis_version: ["7.2", "7.4"]
redis_version: ["7.2", "7.4", "unstable"]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

strategy:
matrix:
redis_version: ["7.2", "7.4"]
redis_version: ["7.2", "7.4", "unstable"]

steps:
- uses: actions/checkout@v4
Expand Down
7 changes: 1 addition & 6 deletions rust_api/libmr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,12 @@ impl Default for crate::libmr_c_raw::bindings::Record {

pub type RustMRError = String;

pub fn mr_init(ctx: &Context, num_threads: usize, username: Option<&str>, password: Option<&str>) {
let username = username.map(|v| CString::new(v).unwrap());
pub fn mr_init(ctx: &Context, num_threads: usize, password: Option<&str>) {
let password = password.map(|v| CString::new(v).unwrap());
unsafe {
MR_Init(
ctx.ctx as *mut RedisModuleCtx,
num_threads,
username
.as_ref()
.map(|v| v.as_ptr())
.unwrap_or(ptr::null_mut()) as *mut c_char,
password
.as_ref()
.map(|v| v.as_ptr())
Expand Down
94 changes: 34 additions & 60 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,6 @@
#define NETWORK_TEST_COMMAND xstr(MODULE_NAME)".NETWORKTEST"
#define FORCE_SHARDS_CONNECTION xstr(MODULE_NAME)".FORCESHARDSCONNECTION"

/**
* @brief Sets the ACL categories for the given command.
* @return true if the ACL categories were set successfully for the
* command was registered successfully, false otherwise.
*/
#define SetCommandAcls(ctx, cmd, acls) \
({ \
bool result = false; \
if (!RedisModule_GetCommand || !RedisModule_SetCommandACLCategories \
|| !RedisModule_AddACLCategory) { \
result = true; \
} \
if (!result) { \
struct RedisModuleCommand *command = RedisModule_GetCommand(ctx, cmd); \
if (command != NULL) { \
const char *categories = ((acls) == NULL || !strcmp(acls, "")) \
? LIBMR_ACL_COMMAND_CATEGORY_NAME \
: acls " " LIBMR_ACL_COMMAND_CATEGORY_NAME; \
if (RedisModule_SetCommandACLCategories(command, categories) == \
REDISMODULE_OK) { \
result = true; \
} \
} \
} \
result; \
})

/** @brief Register a new Redis command with the required ACLs.
* @see RedisModule_CreateCommand
* @return true if the command was registered successfully, false
Expand All @@ -71,7 +44,7 @@
static inline __attribute__((always_inline)) bool
RegisterRedisCommand(RedisModuleCtx *ctx, const char *name,
RedisModuleCmdFunc cmdfunc, const char *strflags,
int firstkey, int lastkey, int keystep, const bool setAcls) {
int firstkey, int lastkey, int keystep) {
const int ret = RedisModule_CreateCommand(ctx, name, cmdfunc, strflags,
firstkey, lastkey, keystep);

Expand All @@ -81,11 +54,7 @@ RegisterRedisCommand(RedisModuleCtx *ctx, const char *name,
return false;
}

if (!setAcls) {
return true;
}

return SetCommandAcls(ctx, name, "");
return true;
}

typedef enum NodeStatus{
Expand Down Expand Up @@ -118,7 +87,6 @@ typedef struct Node{
char* id;
char* ip;
unsigned short port;
char* username;
char* password;
char* unixSocket;
redisAsyncContext *c;
Expand Down Expand Up @@ -153,7 +121,6 @@ struct ClusterCtx {
char myId[REDISMODULE_NODE_ID_LEN + 1];
int isOss;
functionId networkTestMsgReciever;
char *username;
char *password;
}clusterCtx;

Expand Down Expand Up @@ -617,10 +584,17 @@ static void MR_OnConnectCallback(const struct redisAsyncContext* c, int status){

RedisModule_Log(mr_staticCtx, "notice", "connected : %s:%d, status = %d\r\n", c->c.tcp.host, c->c.tcp.port, status);

if (n->username && n->password) {
redisAsyncCommand((redisAsyncContext*)c, NULL, NULL, "AUTH %s %s", n->username, n->password);
} else if (n->password){
if (n->password){
/* If password is provided to us we will use it (it means it was given to us with clusterset) */
redisAsyncCommand((redisAsyncContext*)c, NULL, NULL, "AUTH %s", n->password);
} else if (RedisModule_GetInternalSecret && !MR_IsEnterpriseBuild()) {
/* OSS deployment that support internal secret, lets use it. */
RedisModule_ThreadSafeContextLock(mr_staticCtx);
size_t len;
const char *secret = RedisModule_GetInternalSecret(mr_staticCtx, &len);
RedisModule_Assert(secret);
redisAsyncCommand((redisAsyncContext*)c, NULL, NULL, "AUTH %s %b", "internal connection", secret, len);
RedisModule_ThreadSafeContextUnlock(mr_staticCtx);
}

if(n->sendClusterTopologyOnNextConnect && clusterCtx.CurrCluster->clusterSetCommand){
Expand Down Expand Up @@ -746,14 +720,13 @@ static Node* MR_GetNode(const char* id){
return n;
}

static Node* MR_CreateNode(const char* id, const char* ip, unsigned short port, const char* username, const char* password, const char* unixSocket, size_t minSlot, size_t maxSlot){
static Node* MR_CreateNode(const char* id, const char* ip, unsigned short port, const char* password, const char* unixSocket, size_t minSlot, size_t maxSlot){
RedisModule_Assert(!MR_GetNode(id));
Node* n = MR_ALLOC(sizeof(*n));
*n = (Node){
.id = MR_STRDUP(id),
.ip = MR_STRDUP(ip),
.port = port,
.username = username ? MR_STRDUP(username) : NULL,
.password = password ? MR_STRDUP(password) : NULL,
.unixSocket = unixSocket ? MR_STRDUP(unixSocket) : NULL,
.c = NULL,
Expand Down Expand Up @@ -847,7 +820,8 @@ static void MR_RefreshClusterData(){

Node* n = MR_GetNode(nodeId);
if(!n){
n = MR_CreateNode(nodeId, nodeIp, (unsigned short)port, clusterCtx.username, clusterCtx.password, NULL, minslot, maxslot);
/* If we have internal secret we will ignore the clusterCtx.password, we do not need it. */
n = MR_CreateNode(nodeId, nodeIp, (unsigned short)port, RedisModule_GetInternalSecret ? NULL : clusterCtx.password, NULL, minslot, maxslot);
}

if (n->isMe) {
Expand Down Expand Up @@ -964,7 +938,7 @@ static void MR_SetClusterData(RedisModuleString** argv, int argc){

Node* n = MR_GetNode(realId);
if(!n){
n = MR_CreateNode(realId, ip, port, NULL, password, NULL, minslot, maxslot);
n = MR_CreateNode(realId, ip, port, password, NULL, minslot, maxslot);
}
for(int i = minslot ; i <= maxslot ; ++i){
clusterCtx.CurrCluster->slots[i] = n;
Expand Down Expand Up @@ -1349,15 +1323,14 @@ static void MR_NetworkTest(RedisModuleCtx *ctx, const char *sender_id, uint8_t t
RedisModule_Log(ctx, "notice", "got a nextwork test msg");
}

int MR_ClusterInit(RedisModuleCtx* rctx, char *username, char *password) {
int MR_ClusterInit(RedisModuleCtx* rctx, char *password) {
clusterCtx.CurrCluster = NULL;
clusterCtx.callbacks = array_new(MR_ClusterMessageReceiver, 10);
clusterCtx.nodesMsgIds = mr_dictCreate(&mr_dictTypeHeapStrings, NULL);
clusterCtx.minSlot = 0;
clusterCtx.maxSlot = 0;
clusterCtx.clusterSize = 1;
clusterCtx.isOss = true;
clusterCtx.username = username ? MR_STRDUP(username) : NULL;
clusterCtx.password = password ? MR_STRDUP(password) : NULL;
memset(clusterCtx.myId, '0', REDISMODULE_NODE_ID_LEN);

Expand All @@ -1373,56 +1346,57 @@ int MR_ClusterInit(RedisModuleCtx* rctx, char *username, char *password) {
const char *command_flags = "readonly deny-script";
if (MR_IsEnterpriseBuild()) {
command_flags = "readonly deny-script _proxy-filtered";
} else {
if (RedisModule_GetInternalSecret) {
/* We run at a version that supports internal commands, let use it. */
command_flags = "readonly deny-script internal";
}
}

if (RedisModule_AddACLCategory) {
if (RedisModule_AddACLCategory(rctx, LIBMR_ACL_COMMAND_CATEGORY_NAME) != REDISMODULE_OK) {
RedisModule_Log(rctx, "error", "Failed to add ACL category");

if (!MR_IsEnterpriseBuild()) {
/* Refresh cluster is only relevant for oss, also notice that refresh cluster
* is not considered internal and should be performed by the user. */
if (!RegisterRedisCommand(rctx, CLUSTER_REFRESH_COMMAND, MR_ClusterRefresh,
"readonly deny-script", 0, 0, 0)) {
return REDISMODULE_ERR;
}
}

if (!RegisterRedisCommand(rctx, CLUSTER_REFRESH_COMMAND, MR_ClusterRefresh,
command_flags, 0, 0, 0, false)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_SET_COMMAND, MR_ClusterSet,
command_flags, 0, 0, -1, false)) {
command_flags, 0, 0, -1)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_SET_FROM_SHARD_COMMAND,
MR_ClusterSetFromShard, command_flags, 0, 0,
-1, true)) {
-1)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_HELLO_COMMAND, MR_ClusterHello,
command_flags, 0, 0, 0, true)) {
command_flags, 0, 0, 0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_INNER_COMMUNICATION_COMMAND,
MR_ClusterInnerCommunicationMsg, command_flags, 0, 0,
0, true)) {
0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, NETWORK_TEST_COMMAND, MR_NetworkTestCommand,
command_flags, 0, 0, 0, true)) {
command_flags, 0, 0, 0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, CLUSTER_INFO_COMMAND, MR_ClusterInfoCommand,
command_flags, 0, 0, 0, true)) {
command_flags, 0, 0, 0)) {
return REDISMODULE_ERR;
}

if (!RegisterRedisCommand(rctx, FORCE_SHARDS_CONNECTION,
MR_ForceShardsConnectionCommand, command_flags, 0, 0,
0, true)) {
0)) {
return REDISMODULE_ERR;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ int MR_IsClusterInitialize();

size_t MR_ClusterGetSize();

int MR_ClusterInit(RedisModuleCtx* rctx, char *username, char *password);
int MR_ClusterInit(RedisModuleCtx* rctx, char *password);

size_t MR_ClusterGetSlotdByKey(const char* key, size_t len);

Expand Down
11 changes: 0 additions & 11 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@
#error "MODULE_NAME is not defined"
#endif

/** The name of the ACL category for the commands created by LibMR for
* its own operations.
*
* The user may redefine the category name by defining the macro
* LIBMR_ACL_COMMAND_CATEGORY_NAME before including this header.
*/
#ifndef LIBMR_ACL_COMMAND_CATEGORY_NAME
#define LIBMR_ACL_COMMAND_CATEGORY_NAME \
"_" xstr(MODULE_NAME) "_libmr_internal"
#endif

typedef struct MR_RedisVersion
{
int redisMajorVersion;
Expand Down
4 changes: 2 additions & 2 deletions src/mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1405,11 +1405,11 @@ static void MR_GetRedisVersion() {
RedisModule_FreeCallReply(reply);
}

int MR_Init(RedisModuleCtx* ctx, size_t numThreads, char *username, char *password) {
int MR_Init(RedisModuleCtx* ctx, size_t numThreads, char *password) {
mr_staticCtx = RedisModule_GetDetachedThreadSafeContext(ctx);
MR_GetRedisVersion();

if (MR_ClusterInit(ctx, username, password) != REDISMODULE_OK) {
if (MR_ClusterInit(ctx, password) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}

Expand Down
2 changes: 1 addition & 1 deletion src/mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ LIBMR_API void MR_Run(Execution* e);
LIBMR_API void MR_FreeExecution(Execution* e);

/* Initialize mr library */
LIBMR_API int MR_Init(struct RedisModuleCtx* ctx, size_t numThreads, char *username, char *password);
LIBMR_API int MR_Init(struct RedisModuleCtx* ctx, size_t numThreads, char *password);

/* Register a new object type */
LIBMR_API int MR_RegisterObject(MRObjectType* t);
Expand Down
2 changes: 2 additions & 0 deletions src/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,7 @@ REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(con
REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API const char * (*RedisModule_GetInternalSecret)(RedisModuleCtx *ctx, size_t *len) REDISMODULE_ATTR;

#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)

Expand Down Expand Up @@ -1694,6 +1695,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RdbStreamFree);
REDISMODULE_GET_API(RdbLoad);
REDISMODULE_GET_API(RdbSave);
REDISMODULE_GET_API(GetInternalSecret);


#ifdef REDISMODULE_RLEC_API_DEFS
Expand Down
19 changes: 17 additions & 2 deletions tests/mr_test_module/pytests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ def shardsConnections(env):

def verifyClusterInitialized(env):
for conn in shardsConnections(env):
try:
# try to promote to internal connection
conn.execute_command('debug', 'MARK-INTERNAL-CLIENT')
except Exception:
pass
allConnected = False
while not allConnected:
res = conn.execute_command('MRTESTS.INFOCLUSTER')
Expand All @@ -108,7 +113,13 @@ def initialiseCluster(env):
# able to execute commands on shards, on slow envs, run with valgrind,
# or mac, it is needed.
env.broadcast('CONFIG', 'set', 'cluster-node-timeout', '120000')
env.broadcast('MRTESTS.FORCESHARDSCONNECTION')
for conn in shardsConnections(env):
try:
conn.execute_command('debug', 'MARK-INTERNAL-CLIENT')
except Exception as e:
print(e)
pass
conn.execute_command('MRTESTS.FORCESHARDSCONNECTION')
with TimeLimit(2):
verifyClusterInitialized(env)

Expand Down Expand Up @@ -141,7 +152,11 @@ def test_func():
raise unittest.SkipTest()
if skipOnVersionLowerThan:
skip_if_redis_version_is_lower_than(skipOnVersionLowerThan)
envArgs['moduleArgs'] = moduleArgs or None
defaultModuleArgs = 'password'
if not is_redis_version_is_lower_than('8.0.0'):
# We provide password only if version < 8.0.0. If version is greater, we have internal command and we do not need the password.
defaultModuleArgs = None
envArgs['moduleArgs'] = moduleArgs or defaultModuleArgs
envArgs['redisConfigFile'] = create_config_file(redisConfigFileContent) if redisConfigFileContent else None
env = Env(**envArgs)
conn = getConnectionByEnv(env)
Expand Down
2 changes: 1 addition & 1 deletion tests/mr_test_module/pytests/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ else
fi


python3 -m RLTest --verbose-information-on-failure --module $MODULE_PATH --clear-logs "$@" --oss_password "password"
python3 -m RLTest --verbose-information-on-failure --module $MODULE_PATH --clear-logs "$@" --oss_password "password" --enable-debug-command
Loading