diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 9c2368f..15c1da3 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -8,6 +8,7 @@ jobs: test: name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} runs-on: ubuntu-latest + continue-on-error: ${{ matrix.version == 'nightly' }} strategy: fail-fast: false matrix: @@ -20,22 +21,24 @@ jobs: arch: - x64 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: julia-actions/setup-julia@v1 with: version: ${{ matrix.version }} arch: ${{ matrix.arch }} - - uses: actions/cache@v1 + - uses: actions/cache@v4 env: cache-name: cache-artifacts with: path: ~/.julia/artifacts - key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} + key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('Project.toml') }} restore-keys: | ${{ runner.os }}-test-${{ env.cache-name }}- ${{ runner.os }}-test- ${{ runner.os }}- - uses: julia-actions/julia-buildpkg@v1 + - name: Update /etc/hosts for TLS test + run: echo "127.0.0.1 redisjltest" | sudo tee -a /etc/hosts - name: Start redis server run: | echo "Starting redis server" @@ -43,6 +46,13 @@ jobs: test/conf/redis.sh sleep 5 echo "Redis started" + - name: Start redis cluster + run: | + echo "Starting redis cluster" + chmod +x test/conf/redis-cluster.sh + test/conf/redis-cluster.sh + sleep 10 + echo "Redis cluster started" - uses: julia-actions/julia-runtest@v1 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v3 diff --git a/src/Redis.jl b/src/Redis.jl index b294d18..d1fe10d 100644 --- a/src/Redis.jl +++ b/src/Redis.jl @@ -6,7 +6,7 @@ using MbedTLS import Base.get, Base.keys, Base.time export RedisException, ConnectionException, ServerException, ProtocolException, ClientException -export RedisConnection, SentinelConnection, TransactionConnection, SubscriptionConnection, +export RedisConnection, SentinelConnection, TransactionConnection, SubscriptionConnection, RedisClusterConnection, disconnect, is_connected, open_transaction, reset_transaction, open_subscription, open_pipeline, read_pipeline # Key commands @@ -65,5 +65,6 @@ include("connection.jl") include("parser.jl") include("client.jl") include("commands.jl") +include("cluster_commands.jl") end diff --git a/src/client.jl b/src/client.jl index 7b9cf76..13e68be 100644 --- a/src/client.jl +++ b/src/client.jl @@ -1,5 +1,38 @@ import DataStructures.OrderedSet +const CLUSTER_MULTI_KEY_COMMANDS = Set([ + # String commands - multi-key operations + "mget", "mset", "msetnx", + + # Key commands - require same slot or special handling + "del", "rename", "renamenx", "keys", "randomkey", + + # List commands - require same slot + "rpoplpush", "brpoplpush", + + # Set commands - multi-key operations + "smove", "sdiff", "sinter", "sunion", + "sdiffstore", "sinterstore", "sunionstore", + + # HyperLogLog commands - multi-key operations + "pfcount", "pfmerge", + + # Sorted Set commands - multi-key operations + "zinterstore", "zunionstore", + + # Bit commands - multi-key operations + "bitop", + + # Script commands - may involve multiple keys + "eval", "evalsha", + + # Server commands - need to broadcast to all nodes + "flushall", "flushdb", "_time", + + # Pub/Sub commands - need special handling + "publish" +]) + flatten(token) = string(token) flatten(token::Vector{UInt8}) = [token] flatten(token::String) = token @@ -171,11 +204,14 @@ end macro redisfunction(command::AbstractString, ret_type, args...) is_exec = Symbol(command) == :exec func_name = esc(Symbol(command)) - command = lstrip(command,'_') - command = split(command, '_') + command_str = lstrip(command, '_') + command = split(command_str, '_') + + # Check if command needs special cluster handling (multi-key commands) + needs_special_cluster_handling = command_str in CLUSTER_MULTI_KEY_COMMANDS if length(args) > 0 - return quote + base_functions = quote function $(func_name)(conn::RedisConnection, $(args...)) response = execute_command(conn, flatten_command($(command...), $(args...))) convert_response($ret_type, response) @@ -191,6 +227,17 @@ macro redisfunction(command::AbstractString, ret_type, args...) execute_command_without_reply(conn, flatten_command($(command...), $(args...))) end end + + if !needs_special_cluster_handling + push!(base_functions.args, :( + function $(func_name)(conn::RedisClusterConnection, $(args...)) + response = execute_command(conn, flatten_command($(command...), $(args...))) + convert_response($ret_type, response) + end + )) + end + return base_functions + else q1 = quote function $(func_name)(conn::RedisConnection) @@ -209,12 +256,24 @@ macro redisfunction(command::AbstractString, ret_type, args...) conn.num_commands += 1 end end + + exprs = [q1.args[2]] + if !needs_special_cluster_handling + q1_cluster = quote + function $(func_name)(conn::RedisClusterConnection) + response = execute_command(conn, flatten_command($(command...))) + convert_response($ret_type, response) + end + end + push!(exprs, q1_cluster.args[2]) + end + # To avoid redefining `function exec(conn::TransactionConnection)` - if is_exec - return Expr(:block, q1.args[2], q3.args[2]) - else - return Expr(:block, q1.args[2], q2.args[2], q3.args[2]) + if !is_exec + push!(exprs, q2.args[2]) end + push!(exprs, q3.args[2]) + return Expr(:block, exprs...) end end diff --git a/src/cluster_commands.jl b/src/cluster_commands.jl new file mode 100644 index 0000000..5a89aa9 --- /dev/null +++ b/src/cluster_commands.jl @@ -0,0 +1,823 @@ +# Special implementations for Redis Cluster multi-key commands +# These commands need special handling because they involve multiple keys +# that may span different slots + + +const CRC16_TABLE = begin + table = Vector{UInt16}(undef, 256) + const poly = 0x1021 + + for i = 0:255 + crc = UInt16(i << 8) + for _ = 1:8 + if (crc & 0x8000) != 0 + crc = (crc << 1) ⊻ poly + else + crc = crc << 1 + end + end + table[i+1] = crc + end + table +end + + +function crc16(data::Union{AbstractString,AbstractVector{UInt8}}, crc::UInt16 = 0x0000) + crc_val = crc + + bytes_view = data isa AbstractString ? codeunits(data) : data + + for byte in bytes_view + table_index = ((crc_val >> 8) ⊻ byte) & 0xFF + crc_val = ((crc_val << 8) ⊻ CRC16_TABLE[table_index+1]) & 0xFFFF + end + + return crc_val +end + +function calculate_slot(key::Union{AbstractString,AbstractVector{UInt8}}) + local key_for_crc + + # find hash tag '{' + start_bracket = findfirst(isequal(UInt8('{')), codeunits(key)) + + if isnothing(start_bracket) + key_for_crc = key # no '{', use entire key + else + # find in '{' 之后的 '}' + end_bracket = findnext(isequal(UInt8('}')), codeunits(key), start_bracket + 1) + + # must find both '{' and '}', and there must be content between them + if !isnothing(end_bracket) && end_bracket > start_bracket + 1 + # only calculate CRC for content between { and } + # use view or SubString to avoid allocation + if key isa AbstractString + key_for_crc = SubString(key, start_bracket + 1, end_bracket - 1) + else # Vector{UInt8} + key_for_crc = view(key, (start_bracket+1):(end_bracket-1)) + end + else + key_for_crc = key # found '{' but no '}' or {} is empty + end + end + + return crc16(key_for_crc) % 16384 +end + + +""" + keys_in_same_slot(keys...) + +Check if all keys are in the same slot. +""" +function keys_in_same_slot(keys...) + if isempty(keys) + return true + end + + first_slot = calculate_slot(keys[1]) + for key in keys[2:end] + if calculate_slot(key) != first_slot + return false + end + end + return true +end + +# ==================== String Commands ==================== + +""" + del(cluster::RedisClusterConnection, keys...) + +Delete one or more keys from the cluster. +""" +function del(cluster::RedisClusterConnection, keys...) + if isempty(keys) + return execute_command(cluster, ["DEL"]) + end + + slot_map = Dict{Int,Vector{Any}}() + for key in keys + slot = calculate_slot(key) + if !haskey(slot_map, slot) + slot_map[slot] = [] + end + push!(slot_map[slot], key) + end + + if length(slot_map) == 1 + return execute_command(cluster, flatten_command("DEL", keys...)) + end + + total_deleted = 0 + + for (slot, keys_in_slot) in slot_map + try + command = flatten_command("DEL", keys_in_slot...) + deleted_count::Integer = execute_command(cluster, command) + total_deleted += deleted_count + catch e + @warn "Failed to DEL keys in slot $slot: $e" + end + end + + return total_deleted +end + +""" + mget(cluster::RedisClusterConnection, key, keys...) + +Cluster version of MGET - get values of multiple keys. +If keys are in different slots, they are fetched separately and returned in original order. +""" +function mget(cluster::RedisClusterConnection, key, keys...) + all_keys = [key, keys...] + + # Check if all keys are in the same slot + if keys_in_same_slot(all_keys...) + response = execute_command(cluster, flatten_command("MGET", all_keys...)) + return convert_response(Array{Union{AbstractString,Nothing},1}, response) + end + + # Keys are in different slots, fetch separately + results = Vector{Union{String,Nothing}}(undef, length(all_keys)) + for (i, k) in enumerate(all_keys) + results[i] = get(cluster, k) + end + return results +end + +""" + mset(cluster::RedisClusterConnection, keyvalues) + +Cluster version of MSET - set multiple key-value pairs. +If keys are in different slots, they are set separately. +""" +function mset(cluster::RedisClusterConnection, keyvalues) + if isa(keyvalues, Dict) + keys_list = collect(keys(keyvalues)) + + # Check if all keys are in the same slot + if keys_in_same_slot(keys_list...) + response = execute_command(cluster, flatten_command("MSET", keyvalues)) + return convert_response(Bool, response) + end + + # Keys are in different slots, set separately + for (k, v) in keyvalues + set(cluster, k, v) + end + return true + else + # Assume array format [key1, val1, key2, val2, ...] + if length(keyvalues) % 2 != 0 + throw(ClientException("MSET requires an even number of arguments")) + end + + keys_list = [keyvalues[i] for i = 1:2:length(keyvalues)] + + if keys_in_same_slot(keys_list...) + response = execute_command(cluster, flatten_command("MSET", keyvalues...)) + return convert_response(Bool, response) + end + + # Keys are in different slots, set separately + for i = 1:2:length(keyvalues) + set(cluster, keyvalues[i], keyvalues[i+1]) + end + return true + end +end + +""" + msetnx(cluster::RedisClusterConnection, keyvalues) + +Cluster version of MSETNX - set multiple key-value pairs only if all keys don't exist. +Note: In cluster mode, if keys are in different slots, this operation is not atomic. +""" +function msetnx(cluster::RedisClusterConnection, keyvalues) + if isa(keyvalues, Dict) + keys_list = collect(keys(keyvalues)) + + # Check if all keys are in the same slot + if keys_in_same_slot(keys_list...) + response = execute_command(cluster, flatten_command("MSETNX", keyvalues)) + return convert_response(Bool, response) + end + + # Keys are in different slots - warn about non-atomicity + @warn "MSETNX with keys in different slots is not atomic in cluster mode" + + # First check if all keys exist + for k in keys_list + if exists(cluster, k) + return false + end + end + + # Set all keys + for (k, v) in keyvalues + set(cluster, k, v) + end + return true + else + # Array format + if length(keyvalues) % 2 != 0 + throw(ClientException("MSETNX requires an even number of arguments")) + end + + keys_list = [keyvalues[i] for i = 1:2:length(keyvalues)] + + if keys_in_same_slot(keys_list...) + response = execute_command(cluster, flatten_command("MSETNX", keyvalues...)) + return convert_response(Bool, response) + end + + @warn "MSETNX with keys in different slots is not atomic in cluster mode" + + for k in keys_list + if exists(cluster, k) + return false + end + end + + for i = 1:2:length(keyvalues) + set(cluster, keyvalues[i], keyvalues[i+1]) + end + return true + end +end + +# ==================== Key Commands ==================== + +""" + keys(cluster::RedisClusterConnection, pattern) + +Cluster version of KEYS - returns all keys matching the pattern. +Broadcasts the command to all master nodes and aggregates the results. +""" +function keys(cluster::RedisClusterConnection, pattern) + all_keys = Set{AbstractString}() + + # Broadcast to all master nodes + for (_, conn) in cluster.node_connections + try + node_keys = execute_command(conn, ["KEYS", pattern]) + if node_keys !== nothing + union!(all_keys, Set(node_keys)) + end + catch e + @warn "Failed to execute KEYS on one node: $e" + end + end + + return all_keys +end + +""" + randomkey(cluster::RedisClusterConnection) + +Cluster version of RANDOMKEY - returns a random key. +Gets a random key from a randomly selected master node. +""" +function randomkey(cluster::RedisClusterConnection) + if isempty(cluster.node_connections) + return nothing + end + + # Select a random master node + connections = collect(values(cluster.node_connections)) + random_conn = rand(connections) + + response = execute_command(random_conn, ["RANDOMKEY"]) + return convert_response(Union{AbstractString,Nothing}, response) +end + +""" + rename(cluster::RedisClusterConnection, key, newkey) + +Cluster version of RENAME. +Note: RENAME requires both keys to be in the same slot. +""" +function rename(cluster::RedisClusterConnection, key, newkey) + if !keys_in_same_slot(key, newkey) + throw( + ClientException( + "RENAME requires both keys to be in the same slot. Use hash tags like {user}:old and {user}:new", + ), + ) + end + + response = execute_command(cluster, flatten_command("RENAME", key, newkey)) + return convert_response(AbstractString, response) +end + +""" + renamenx(cluster::RedisClusterConnection, key, newkey) + +Cluster version of RENAMENX. +Note: RENAMENX requires both keys to be in the same slot. +""" +function renamenx(cluster::RedisClusterConnection, key, newkey) + if !keys_in_same_slot(key, newkey) + throw( + ClientException( + "RENAMENX requires both keys to be in the same slot. Use hash tags like {user}:old and {user}:new", + ), + ) + end + + response = execute_command(cluster, flatten_command("RENAMENX", key, newkey)) + return convert_response(Bool, response) +end + +# ==================== List Commands ==================== + +""" + rpoplpush(cluster::RedisClusterConnection, source, destination) + +Cluster version of RPOPLPUSH. +Note: Requires source and destination to be in the same slot. +""" +function rpoplpush(cluster::RedisClusterConnection, source, destination) + if !keys_in_same_slot(source, destination) + throw( + ClientException( + "RPOPLPUSH requires both keys to be in the same slot. Use hash tags like {list}:source and {list}:dest", + ), + ) + end + + response = execute_command(cluster, flatten_command("RPOPLPUSH", source, destination)) + return convert_response(Union{AbstractString,Nothing}, response) +end + +""" + brpoplpush(cluster::RedisClusterConnection, source, destination, timeout) + +Cluster version of BRPOPLPUSH. +Note: Requires source and destination to be in the same slot. +""" +function brpoplpush(cluster::RedisClusterConnection, source, destination, timeout) + if !keys_in_same_slot(source, destination) + throw( + ClientException( + "BRPOPLPUSH requires both keys to be in the same slot. Use hash tags like {list}:source and {list}:dest", + ), + ) + end + + response = execute_command( + cluster, + flatten_command("BRPOPLPUSH", source, destination, timeout), + ) + return convert_response(Union{AbstractString,Nothing}, response) +end + +# ==================== Set Commands ==================== + +""" + smove(cluster::RedisClusterConnection, source, destination, member) + +Cluster version of SMOVE. +Note: Requires source and destination to be in the same slot. +""" +function smove(cluster::RedisClusterConnection, source, destination, member) + if !keys_in_same_slot(source, destination) + throw( + ClientException( + "SMOVE requires both keys to be in the same slot. Use hash tags like {set}:source and {set}:dest", + ), + ) + end + + response = + execute_command(cluster, flatten_command("SMOVE", source, destination, member)) + return convert_response(Bool, response) +end + +""" + sdiff(cluster::RedisClusterConnection, key, keys...) + +Cluster version of SDIFF - returns the difference between the first set and other sets. +Note: Requires all keys to be in the same slot. +""" +function sdiff(cluster::RedisClusterConnection, key, keys...) + all_keys = [key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "SDIFF requires all keys to be in the same slot. Use hash tags like {set}:key1 and {set}:key2", + ), + ) + end + + response = execute_command(cluster, flatten_command("SDIFF", all_keys...)) + return convert_response(Set{AbstractString}, response) +end + +""" + sinter(cluster::RedisClusterConnection, key, keys...) + +Cluster version of SINTER - returns the intersection of all sets. +Note: Requires all keys to be in the same slot. +""" +function sinter(cluster::RedisClusterConnection, key, keys...) + all_keys = [key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "SINTER requires all keys to be in the same slot. Use hash tags like {set}:key1 and {set}:key2", + ), + ) + end + + response = execute_command(cluster, flatten_command("SINTER", all_keys...)) + return convert_response(Set{AbstractString}, response) +end + +""" + sunion(cluster::RedisClusterConnection, key, keys...) + +Cluster version of SUNION - returns the union of all sets. +Note: Requires all keys to be in the same slot. +""" +function sunion(cluster::RedisClusterConnection, key, keys...) + all_keys = [key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "SUNION requires all keys to be in the same slot. Use hash tags like {set}:key1 and {set}:key2", + ), + ) + end + + response = execute_command(cluster, flatten_command("SUNION", all_keys...)) + return convert_response(Set{AbstractString}, response) +end + +""" + sdiffstore(cluster::RedisClusterConnection, destination, key, keys...) + +Cluster version of SDIFFSTORE. +Note: Requires all keys to be in the same slot. +""" +function sdiffstore(cluster::RedisClusterConnection, destination, key, keys...) + all_keys = [destination, key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "SDIFFSTORE requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + response = + execute_command(cluster, flatten_command("SDIFFSTORE", destination, key, keys...)) + return convert_response(Integer, response) +end + +""" + sinterstore(cluster::RedisClusterConnection, destination, key, keys...) + +Cluster version of SINTERSTORE. +Note: Requires all keys to be in the same slot. +""" +function sinterstore(cluster::RedisClusterConnection, destination, key, keys...) + all_keys = [destination, key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "SINTERSTORE requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + response = + execute_command(cluster, flatten_command("SINTERSTORE", destination, key, keys...)) + return convert_response(Integer, response) +end + +""" + sunionstore(cluster::RedisClusterConnection, destination, key, keys...) + +Cluster version of SUNIONSTORE. +Note: Requires all keys to be in the same slot. +""" +function sunionstore(cluster::RedisClusterConnection, destination, key, keys...) + all_keys = [destination, key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "SUNIONSTORE requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + response = + execute_command(cluster, flatten_command("SUNIONSTORE", destination, key, keys...)) + return convert_response(Integer, response) +end + +# ==================== HyperLogLog Commands ==================== + +""" + pfcount(cluster::RedisClusterConnection, key, keys...) + +Cluster version of PFCOUNT. +Note: When using multiple keys, all keys must be in the same slot. +""" +function pfcount(cluster::RedisClusterConnection, key, keys...) + if isempty(keys) + # Single key case, execute directly + response = execute_command(cluster, flatten_command("PFCOUNT", key)) + return convert_response(Integer, response) + end + + # Multiple keys case + all_keys = [key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "PFCOUNT with multiple keys requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + response = execute_command(cluster, flatten_command("PFCOUNT", all_keys...)) + return convert_response(Integer, response) +end + +""" + pfmerge(cluster::RedisClusterConnection, destkey, sourcekey, sourcekeys...) + +Cluster version of PFMERGE. +Note: Requires all keys to be in the same slot. +""" +function pfmerge(cluster::RedisClusterConnection, destkey, sourcekey, sourcekeys...) + all_keys = [destkey, sourcekey, sourcekeys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "PFMERGE requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + response = execute_command( + cluster, + flatten_command("PFMERGE", destkey, sourcekey, sourcekeys...), + ) + return convert_response(Bool, response) +end + +# ==================== Bit Commands ==================== + +""" + bitop(cluster::RedisClusterConnection, operation, destkey, key, keys...) + +Cluster version of BITOP. +Note: Requires all keys to be in the same slot. +""" +function bitop(cluster::RedisClusterConnection, operation, destkey, key, keys...) + all_keys = [destkey, key, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "BITOP requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + response = + execute_command(cluster, flatten_command("BITOP", operation, destkey, key, keys...)) + return convert_response(Integer, response) +end + +# ==================== Sorted Set Commands ==================== + +""" + zinterstore(cluster::RedisClusterConnection, destination, numkeys, keys, weights=[]; aggregate=Aggregate.NotSet) + +Cluster version of ZINTERSTORE. +Note: Requires all keys to be in the same slot. +""" +function zinterstore( + cluster::RedisClusterConnection, + destination, + numkeys, + keys::Array, + weights = []; + aggregate = Aggregate.NotSet, +) + + all_keys = [destination, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "ZINTERSTORE requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + command = + _build_store_internal(destination, numkeys, keys, weights, aggregate, "zinterstore") + response = execute_command(cluster, command) + return convert_response(Integer, response) +end + +""" + zunionstore(cluster::RedisClusterConnection, destination, numkeys, keys, weights=[]; aggregate=Aggregate.NotSet) + +Cluster version of ZUNIONSTORE. +Note: Requires all keys to be in the same slot. +""" +function zunionstore( + cluster::RedisClusterConnection, + destination, + numkeys::Integer, + keys::Array, + weights = []; + aggregate = Aggregate.NotSet, +) + + all_keys = [destination, keys...] + + if !keys_in_same_slot(all_keys...) + throw( + ClientException( + "ZUNIONSTORE requires all keys to be in the same slot. Use hash tags", + ), + ) + end + + command = + _build_store_internal(destination, numkeys, keys, weights, aggregate, "zunionstore") + response = execute_command(cluster, command) + return convert_response(Integer, response) +end + +# ==================== Server Commands ==================== + +""" + flushall(cluster::RedisClusterConnection) + +Cluster version of FLUSHALL - broadcasts to all master nodes. +Removes all keys from all databases on all nodes in the cluster. +""" +function flushall(cluster::RedisClusterConnection) + # Broadcast FLUSHALL to all master nodes + for (_, conn) in cluster.node_connections + try + execute_command(conn, ["FLUSHALL"]) + catch e + @warn "Failed to execute FLUSHALL on node: $e" + end + end + return "OK" +end + +""" + flushdb(cluster::RedisClusterConnection, db::Integer) + +Cluster version of FLUSHDB - broadcasts to all master nodes. +Note: In cluster mode, typically only DB 0 is used. +""" +function flushdb(cluster::RedisClusterConnection, db::Integer) + # Broadcast FLUSHDB to all master nodes + for (_, conn) in cluster.node_connections + try + execute_command(conn, ["FLUSHDB", string(db)]) + catch e + @warn "Failed to execute FLUSHDB on node: $e" + end + end + return "OK" +end + +""" + _time(cluster::RedisClusterConnection) + +Cluster version of TIME - returns time from a random node. +Returns current Unix time from one of the cluster nodes. +""" +function _time(cluster::RedisClusterConnection) + if isempty(cluster.node_connections) + throw(ConnectionException("No active connections in cluster")) + end + + # Get time from a random master node + connections = collect(values(cluster.node_connections)) + random_conn = rand(connections) + + response = execute_command(random_conn, ["TIME"]) + return convert_response(Array{AbstractString,1}, response) +end + +""" + time(cluster::RedisClusterConnection) + +Cluster version of TIME - returns DateTime from a random node. +""" +function time(cluster::RedisClusterConnection) + t = _time(cluster) + s = parse(Int, t[1]) + ms = parse(Float64, t[2]) + s += (ms / 1e6) + return unix2datetime(s) +end + +# ==================== Scripting Commands ==================== + +""" + evalscript(cluster::RedisClusterConnection, script, numkeys::Integer, keys, args) + +Cluster version of EVAL (evalscript). +Routes the script to the node responsible for the first key. +All keys must be in the same slot. +""" +function evalscript(cluster::RedisClusterConnection, script, numkeys::Integer, keys, args) + # If there are keys, verify they're in the same slot + if numkeys > 0 && length(keys) > 0 + key_list = keys isa Array ? keys : [keys] + if length(key_list) > 1 && !keys_in_same_slot(key_list...) + throw( + ClientException( + "EVAL requires all keys to be in the same slot. Use hash tags", + ), + ) + end + # Route based on first key + first_key = key_list[1] + conn = get_connection_for_key(cluster, string(first_key)) + response = + execute_command(conn, flatten_command("EVAL", script, numkeys, keys, args)) + else + # No keys - execute on any node + if !isempty(cluster.node_connections) + conn = first(values(cluster.node_connections)) + response = + execute_command(conn, flatten_command("EVAL", script, numkeys, keys, args)) + else + throw(ConnectionException("No active connections in cluster")) + end + end + return response +end + +# ==================== Pub/Sub Commands ==================== + +""" + publish(cluster::RedisClusterConnection, channel::AbstractString, message) + +Cluster version of PUBLISH. +In Redis Cluster, PUBLISH is broadcast to all nodes in the cluster, +so we can send to any node and it will propagate. +""" +function publish(cluster::RedisClusterConnection, channel::AbstractString, message) + if isempty(cluster.node_connections) + throw(ConnectionException("No active connections in cluster")) + end + + # Publish to any node - it will broadcast to all nodes in the cluster + conn = first(values(cluster.node_connections)) + response = execute_command(conn, flatten_command("PUBLISH", channel, message)) + return convert_response(Integer, response) +end + +""" + open_subscription(cluster::RedisClusterConnection, err_callback=nothing) + +Cluster version of open_subscription. +Creates a subscription connection to one of the cluster nodes. +In Redis Cluster, Pub/Sub messages are automatically broadcast to all nodes, +so subscribing to any single node is sufficient. +""" +function open_subscription(cluster::RedisClusterConnection, err_callback = nothing) + if isempty(cluster.node_connections) + throw(ConnectionException("No active connections in cluster")) + end + + # Select any node for subscription (messages are broadcast across cluster) + conn = first(values(cluster.node_connections)) + + # Use default error callback if none provided + if err_callback === nothing + err_callback = err -> @debug err + end + + # Create subscription connection using the selected node + s = SubscriptionConnection(conn) + Threads.@spawn subscription_loop(s, err_callback) + s +end diff --git a/src/connection.jl b/src/connection.jl index 6ca94c1..3d94ec4 100644 --- a/src/connection.jl +++ b/src/connection.jl @@ -44,7 +44,18 @@ struct SubscriptionConnection <: RedisConnectionBase transport::Transport.RedisTransport end +mutable struct RedisClusterConnection <: RedisConnectionBase + slot_map::Dict{UInt16, RedisConnection} + startup_nodes::Vector{Tuple{String, Int}} + password::AbstractString + db::Integer + sslconfig::Union{MbedTLS.SSLConfig, Nothing} + # Node connection pool: (host, port) -> RedisConnection + node_connections::Dict{Tuple{String, Int}, RedisConnection} +end + Transport.get_sslconfig(s::RedisConnectionBase) = Transport.get_sslconfig(s.transport) +Transport.get_sslconfig(s::RedisClusterConnection) = s.sslconfig function RedisConnection(; host="127.0.0.1", port=6379, password="", db=0, sslconfig=nothing) try @@ -130,7 +141,6 @@ function on_connect(conn::RedisConnectionBase) conn.db != 0 && select(conn, conn.db) conn end - function disconnect(conn::RedisConnectionBase) Transport.close(conn.transport) end @@ -138,3 +148,363 @@ end function is_connected(conn::RedisConnectionBase) Transport.is_connected(conn.transport) end + + +# ==================== RedisClusterConnection Methods ==================== + +""" + get_node_connection(cluster::RedisClusterConnection, host::String, port::Int) -> RedisConnection + +Get or create a connection to the specified cluster node. + +This method implements connection pooling for cluster nodes. If a connection +to the specified node already exists and is active, it returns the existing +connection. Otherwise, it creates a new connection. + +# Arguments +- `cluster::RedisClusterConnection`: The cluster connection object +- `host::String`: Hostname or IP address of the cluster node +- `port::Int`: Port number of the cluster node + +# Returns +- `RedisConnection`: Active connection to the specified node + +# Throws +- `ConnectionException`: If unable to establish connection to the node + +# Example +```julia +cluster = RedisClusterConnection(startup_nodes=[("127.0.0.1", 7000)]) +conn = get_node_connection(cluster, "127.0.0.1", 7001) +``` +""" +function get_node_connection(cluster::RedisClusterConnection, host::String, port::Int) + node_key = (host, port) + + # If connection already exists and is active, return it directly + if haskey(cluster.node_connections, node_key) + conn = cluster.node_connections[node_key] + if is_connected(conn) + return conn + else + # Connection is broken, need to recreate + delete!(cluster.node_connections, node_key) + end + end + + # Create new connection + try + conn = RedisConnection( + host=host, + port=port, + password=cluster.password, + db=cluster.db, + sslconfig=cluster.sslconfig + ) + cluster.node_connections[node_key] = conn + return conn + catch e + throw(ConnectionException("Failed to connect to cluster node $host:$port: $e")) + end +end + +""" + refresh_slot_map!(cluster::RedisClusterConnection) + +Refresh the slot mapping information from cluster nodes. + +This method queries the Redis Cluster for the current slot distribution +across nodes using the `CLUSTER SLOTS` command. It updates the internal +slot-to-connection mapping to reflect the current cluster topology. + +The method attempts to connect to each startup node in sequence until +successful. If all startup nodes fail, it throws a ConnectionException. + +# Arguments +- `cluster::RedisClusterConnection`: The cluster connection object + +# Throws +- `ConnectionException`: If unable to refresh slot map from any seed node + +# Notes +- This method is automatically called during cluster initialization +- It should be called after cluster topology changes (e.g., failover, resharding) +- The method clears existing mappings before building new ones + +# Example +```julia +cluster = RedisClusterConnection(startup_nodes=[("127.0.0.1", 7000)]) +refresh_slot_map!(cluster) # Manually refresh if topology changed +``` +""" +function refresh_slot_map!(cluster::RedisClusterConnection) + # Try to get slot information from any available seed node + local slots_info + last_error = nothing + + for (host, port) in cluster.startup_nodes + try + conn = get_node_connection(cluster, host, port) + slots_info = execute_command(conn, ["CLUSTER", "SLOTS"]) + break + catch e + last_error = e + @warn "Failed to get cluster slots from $host:$port: $e" + continue + end + end + + if !@isdefined(slots_info) + throw(ConnectionException("Failed to refresh cluster slot map from any seed node. Last error: $last_error")) + end + + # Clear existing mappings + empty!(cluster.slot_map) + + # Parse slot information and build mappings + # CLUSTER SLOTS returns format: [[start_slot, end_slot, [host, port, node_id], ...], ...] + for slot_range in slots_info + start_slot = UInt16(slot_range[1]) + end_slot = UInt16(slot_range[2]) + + # Master node information is in the third element + if length(slot_range) >= 3 && length(slot_range[3]) >= 2 + master_info = slot_range[3] + host = String(master_info[1]) + port = Int(master_info[2]) + + # Get or create connection to this node + conn = get_node_connection(cluster, host, port) + + # Build mapping for all slots in this range + for slot in start_slot:end_slot + cluster.slot_map[slot] = conn + end + end + end + + @info "Refreshed cluster slot map: $(length(cluster.slot_map)) slots mapped to $(length(cluster.node_connections)) nodes" +end + +""" + get_connection_for_slot(cluster::RedisClusterConnection, slot::UInt16) -> RedisConnection + +Get the connection corresponding to the specified slot number. + +This method looks up which node is responsible for the given slot and +returns the connection to that node. If the slot is not mapped or the +connection is broken, it automatically refreshes the slot mapping. + +# Arguments +- `cluster::RedisClusterConnection`: The cluster connection object +- `slot::UInt16`: Slot number (0-16383) + +# Returns +- `RedisConnection`: Connection to the node responsible for the slot + +# Throws +- `ConnectionException`: If unable to find connection for the slot after refresh + +# Example +```julia +cluster = RedisClusterConnection(startup_nodes=[("127.0.0.1", 7000)]) +slot = UInt16(1234) +conn = get_connection_for_slot(cluster, slot) +``` +""" +function get_connection_for_slot(cluster::RedisClusterConnection, slot::UInt16) + if haskey(cluster.slot_map, slot) + conn = cluster.slot_map[slot] + if is_connected(conn) + return conn + end + end + + # Slot not mapped or connection broken, refresh slot mapping + refresh_slot_map!(cluster) + + if haskey(cluster.slot_map, slot) + return cluster.slot_map[slot] + else + throw(ConnectionException("Unable to find connection for slot $slot after refresh")) + end +end + +""" + get_connection_for_key(cluster::RedisClusterConnection, key::Union{AbstractString, AbstractVector{UInt8}}) -> RedisConnection + +Get the connection corresponding to the specified key (automatically calculates slot). + +This is a convenience method that calculates the hash slot for the given key +and returns the connection to the node responsible for that slot. It supports +hash tags (e.g., `{user:1000}:profile`) for controlling key placement. + +# Arguments +- `cluster::RedisClusterConnection`: The cluster connection object +- `key::Union{AbstractString, AbstractVector{UInt8}}`: Redis key + +# Returns +- `RedisConnection`: Connection to the node responsible for the key + +# Throws +- `ConnectionException`: If unable to find connection for the key's slot + +# Example +```julia +cluster = RedisClusterConnection(startup_nodes=[("127.0.0.1", 7000)]) +conn = get_connection_for_key(cluster, "user:1000") +conn = get_connection_for_key(cluster, "{user:1000}:profile") # Hash tag +``` +""" +function get_connection_for_key(cluster::RedisClusterConnection, key::Union{AbstractString,AbstractVector{UInt8}}) + slot = calculate_slot(key) + return get_connection_for_slot(cluster, UInt16(slot)) +end + +""" + RedisClusterConnection(; startup_nodes, password="", db=0, sslconfig=nothing) -> RedisClusterConnection + +Create a Redis Cluster connection. + +This constructor initializes a connection to a Redis Cluster by connecting to +one or more startup nodes and discovering the cluster topology. It automatically +builds an internal mapping of hash slots to cluster nodes. + +# Arguments +- `startup_nodes::Vector{Tuple{String, Int}}`: List of seed nodes as [(host, port), ...] + At least one node must be provided and reachable. +- `password::AbstractString=""`: Authentication password (optional) +- `db::Integer=0`: Database number, typically 0 for cluster mode (optional) +- `sslconfig::Union{MbedTLS.SSLConfig, Nothing}=nothing`: SSL configuration (optional) + +# Returns +- `RedisClusterConnection`: Initialized cluster connection object + +# Throws +- `ArgumentError`: If startup_nodes is empty +- `ConnectionException`: If unable to initialize cluster connection or refresh slot map + +# Notes +- The cluster automatically handles MOVED and ASK redirects +- Slot mapping is refreshed automatically when topology changes are detected +- All node connections share the same password, db, and sslconfig settings +- In cluster mode, only database 0 is typically available + +# Example +```julia +# Basic cluster connection +cluster = RedisClusterConnection( + startup_nodes=[("127.0.0.1", 7000), ("127.0.0.1", 7001), ("127.0.0.1", 7002)] +) + +# With authentication +cluster = RedisClusterConnection( + startup_nodes=[("127.0.0.1", 7000)], + password="mypassword" +) + +# With SSL +cluster = RedisClusterConnection( + startup_nodes=[("127.0.0.1", 7000)], + sslconfig=MbedTLS.SSLConfig() +) +``` +""" +function RedisClusterConnection(; + startup_nodes::Vector{Tuple{String,Int}}, + password::AbstractString="", + db::Integer=0, + sslconfig::Union{MbedTLS.SSLConfig,Nothing}=nothing +) + if isempty(startup_nodes) + throw(ArgumentError("startup_nodes cannot be empty")) + end + + # Initialize cluster connection object + cluster = RedisClusterConnection( + Dict{UInt16,RedisConnection}(), # slot_map + startup_nodes, + password, + db, + sslconfig, + Dict{Tuple{String,Int},RedisConnection}() # node_connections + ) + + # Initialize slot mapping + try + refresh_slot_map!(cluster) + catch e + # Clean up any created connections + for (_, conn) in cluster.node_connections + try + disconnect(conn) + catch + end + end + throw(ConnectionException("Failed to initialize cluster connection: $e")) + end + + return cluster +end + +""" + disconnect(cluster::RedisClusterConnection) + +Disconnect all node connections in the cluster and clean up resources. + +This method closes all active connections to cluster nodes and clears +the internal slot mapping and connection pool. + +# Arguments +- `cluster::RedisClusterConnection`: The cluster connection to disconnect + +# Example +```julia +cluster = RedisClusterConnection(startup_nodes=[("127.0.0.1", 7000)]) +# ... use cluster ... +disconnect(cluster) +``` +""" +function disconnect(cluster::RedisClusterConnection) + for (_, conn) in cluster.node_connections + try + disconnect(conn) + catch e + @warn "Failed to disconnect from node: $e" + end + end + empty!(cluster.node_connections) + empty!(cluster.slot_map) +end + +""" + is_connected(cluster::RedisClusterConnection) -> Bool + +Check if the cluster connection is active. + +Returns `true` if at least one node connection in the cluster is active, +`false` otherwise. + +# Arguments +- `cluster::RedisClusterConnection`: The cluster connection to check + +# Returns +- `Bool`: `true` if at least one node is connected, `false` otherwise + +# Example +```julia +cluster = RedisClusterConnection(startup_nodes=[("127.0.0.1", 7000)]) +if is_connected(cluster) + println("Cluster is active") +end +``` +""" +function is_connected(cluster::RedisClusterConnection) + # Consider cluster connected if at least one node connection is active + for (_, conn) in cluster.node_connections + if is_connected(conn) + return true + end + end + return false +end diff --git a/src/parser.jl b/src/parser.jl index 99b6200..a3fa2cf 100644 --- a/src/parser.jl +++ b/src/parser.jl @@ -1,6 +1,9 @@ """ Formatting of incoming Redis Replies """ + +include("connection.jl") + function getline(t::Transport.RedisTransport) l = chomp(Transport.read_line(t)) length(l) > 1 || throw(ProtocolException("Invalid response received: $l")) @@ -100,6 +103,114 @@ function execute_command(conn::RedisConnectionBase, command::Vector) read_reply(conn) end +# execute_command for RedisClusterConnection +function execute_command(cluster::RedisClusterConnection, command::Vector) + # For cluster connections, need to find the corresponding node based on the key in the command + # Most Redis commands have the key as the first argument + max_redirects = 5 + redirects = 0 + + while redirects < max_redirects + try + # Try to extract key from command and get corresponding connection + local target_conn + + if length(command) >= 2 + # Most command format: [CMD, KEY, ...] + # But some commands have different structure + cmd_name = uppercase(string(command[1])) + + # Commands where key is at position 3 instead of 2 + # BITOP operation destkey key [key ...] + key_index = if cmd_name == "BITOP" && length(command) >= 3 + 3 # destkey position + else + 2 # default key position + end + + key = command[key_index] + target_conn = get_connection_for_key(cluster, string(key)) + else + # For commands without keys (like PING), use any connection + if !isempty(cluster.node_connections) + target_conn = first(values(cluster.node_connections)) + else + throw(ConnectionException("No active connections in cluster")) + end + end + + # Execute command on target connection + execute_command_without_reply(target_conn, command) + return read_reply(target_conn) + + catch e + if isa(e, ServerException) + # Handle MOVED redirect + if occursin("MOVED", e.message) + redirects += 1 + @info "Cluster redirect: $(e.message) (attempt $redirects/$max_redirects)" + + # Parse MOVED response: "MOVED slot host:port" + parts = split(e.message, " ") + if length(parts) >= 3 + slot = parse(UInt16, parts[2]) + connect_info_string = parts[3] + connect_info = split(connect_info_string, ":") + + if length(connect_info) >= 2 + host = String(connect_info[1]) + port = parse(Int, connect_info[2]) + + # Get or create connection to new node and update slot mapping + new_conn = get_node_connection(cluster, host, port) + cluster.slot_map[slot] = new_conn + + # Retry command (via while loop) + continue + end + end + + # If parsing failed, refresh entire slot map + @warn "Failed to parse MOVED response, refreshing entire slot map" + refresh_slot_map!(cluster) + continue + + # Handle ASK redirect + elseif occursin("ASK", e.message) + redirects += 1 + @info "Cluster ASK redirect: $(e.message) (attempt $redirects/$max_redirects)" + + # Parse ASK response: "ASK slot host:port" + parts = split(e.message, " ") + if length(parts) >= 3 + connect_info_string = parts[3] + connect_info = split(connect_info_string, ":") + + if length(connect_info) >= 2 + host = String(connect_info[1]) + port = parse(Int, connect_info[2]) + + # ASK requires sending ASKING command first, then retry original command + ask_conn = get_node_connection(cluster, host, port) + execute_command(ask_conn, ["ASKING"]) + execute_command_without_reply(ask_conn, command) + return read_reply(ask_conn) + end + end + + @warn "Failed to parse ASK response: $(e.message)" + rethrow(e) + end + end + + # Rethrow other errors + rethrow(e) + end + end + + throw(ConnectionException("Too many cluster redirects ($max_redirects)")) +end + baremodule SubscriptionMessageType const Message = 0 const Pmessage = 1 diff --git a/test/conf/redis-cluster.sh b/test/conf/redis-cluster.sh new file mode 100755 index 0000000..6b43c12 --- /dev/null +++ b/test/conf/redis-cluster.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +# Script to start a Redis cluster with 3 master nodes for testing + +SCRIPT_PATH=$(cd "$(dirname "$0")"; pwd) +TEST_PATH=$(dirname "$SCRIPT_PATH") + +# Create directories for cluster nodes +mkdir -p "${TEST_PATH}/cluster-data/7000" +mkdir -p "${TEST_PATH}/cluster-data/7001" +mkdir -p "${TEST_PATH}/cluster-data/7002" + +# Create minimal cluster configuration for each node +for port in 7000 7001 7002; do + cat > "${TEST_PATH}/cluster-data/${port}/redis.conf" </dev/null || true + +for port in 7000 7001 7002; do + docker run -d --name redis-${port} \ + --hostname redis-${port} \ + --network redis-cluster \ + -p ${port}:${port} \ + -v "${TEST_PATH}/cluster-data/${port}":/data \ + redis:7.2.3-bookworm redis-server /data/redis.conf +done + +# Wait for nodes to start +echo "Waiting for Redis nodes to start..." +sleep 5 + +# Create the cluster +echo "Creating Redis cluster..." +docker run -i --rm --network redis-cluster redis:7.2.3-bookworm redis-cli \ + --cluster create \ + redis-7000:7000 \ + redis-7001:7001 \ + redis-7002:7002 \ + --cluster-replicas 0 \ + --cluster-yes + +echo "Redis cluster created successfully!" + +# Verify cluster status +docker run -i --rm --network redis-cluster redis:7.2.3-bookworm redis-cli \ + -h redis-7000 -p 7000 cluster info diff --git a/test/redis_tests.jl b/test/redis_tests.jl index 826c14d..7af464c 100644 --- a/test/redis_tests.jl +++ b/test/redis_tests.jl @@ -3,12 +3,14 @@ const REDIS_PERSISTENT_KEY = -1 const REDIS_EXPIRED_KEY = -2 +const TAG = "{Redis_Test_$(randstring(6))}" + # some random key names -testkey = "Redis_Test_"*randstring() -testkey2 = "Redis_Test_"*randstring() -testkey3 = "Redis_Test_"*randstring() -testkey4 = "Redis_Test_"*randstring() -testhash = "Redis_Test_"*randstring() +testkey = TAG * randstring() +testkey2 = TAG * randstring() +testkey3 = TAG * randstring() +testkey4 = TAG * randstring() +testhash = TAG * randstring() # some random strings s1 = randstring(); s2 = randstring(); s3 = randstring() @@ -16,6 +18,8 @@ s4 = randstring(); s5 = randstring(); s6 = randstring() s7 = randstring(); s8 = randstring(); s9 = randstring() function redis_tests(conn = RedisConnection()) + is_cluster = conn isa RedisClusterConnection + flushall(conn) @testset "Strings" begin @@ -32,7 +36,8 @@ function redis_tests(conn = RedisConnection()) set(conn, testkey2, s2) set(conn, testkey3, s3) # RANDOMKEY can return 'NIL', so it returns Union{Nothing, T}. KEYS * always returns empty Set when Redis is empty - @test randomkey(conn) in keys(conn, "*") + key = randomkey(conn) + @test isnothing(key) || (key in keys(conn, "$(TAG)*")) @test getrange(conn, testkey, 0, 3) == s1[1:4] @test set(conn, testkey, 2) @@ -105,6 +110,7 @@ function redis_tests(conn = RedisConnection()) =# end + if !is_cluster @testset "Migrate" begin # TODO: test of `migrate` requires 2 server instances in Travis set(conn, testkey, s1) @@ -115,6 +121,9 @@ function redis_tests(conn = RedisConnection()) del(conn, testkey) Redis.select(conn, 0) end + else + @info "Skipping 'Migrate' testset for RedisClusterConnection (unsupported)." + end @testset "Expiry" begin set(conn, testkey, s1) @@ -343,11 +352,11 @@ function redis_tests(conn = RedisConnection()) @testset "Scripting" begin script = "return {KEYS[1], KEYS[2], ARGV[1], ARGV[2]}" - keys = ["key1", "key2"] + keys = [testkey, testkey2] args = ["first", "second"] resp = evalscript(conn, script, 2, keys, args) @test resp == vcat(keys, args) - del(conn, "key1") + del(conn, testkey, testkey2) script = "return redis.call('set', KEYS[1], 'bar')" ky = "foo" @@ -381,6 +390,7 @@ function redis_tests(conn = RedisConnection()) #@test evalscript(conn, "return {1, 2, 3.3333, 'foo', nil, 'bar'}", 0, []) == [1, 2, 3, "foo"] end + if !is_cluster @testset "Transactions" begin trans = open_transaction(conn) @test set(trans, testkey, "foobar") == "QUEUED" @@ -392,7 +402,11 @@ function redis_tests(conn = RedisConnection()) @test exec(trans) == [nothing] disconnect(trans) end + else + @info "Skipping 'Transactions' testset for RedisClusterConnection (unsupported)." + end + if !is_cluster @testset "Pipelines" begin pipe = open_pipeline(conn) set(pipe, testkey3, "anything") @@ -414,6 +428,9 @@ function redis_tests(conn = RedisConnection()) @test result == [] disconnect(pipe) end + else + @info "Skipping 'Pipelines' testset for RedisClusterConnection (unsupported)." + end @testset "Pub/Sub" begin function handleException(ex) diff --git a/test/runtests.jl b/test/runtests.jl index 00c93f0..9b2eb8b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -16,4 +16,10 @@ client_tests() redis_tests(RedisConnection()) # TLS connection -redis_tests(RedisConnection(; port=16379, sslconfig=client_tls_config(joinpath(@__DIR__, "certs", "ca.crt")))) \ No newline at end of file +redis_tests(RedisConnection(;host="redisjltest", port=16379, sslconfig=client_tls_config(joinpath(@__DIR__, "certs", "ca.crt")))) + +# Cluster connection +cluster = RedisClusterConnection( + startup_nodes=[("127.0.0.1", 7000), ("127.0.0.1", 7001), ("127.0.0.1", 7002)] +) +redis_tests(cluster)