Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 142 additions & 87 deletions lib/valkey.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# frozen_string_literal: true

require "ffi"
require "google/protobuf"
require "json"
require "cgi"

require "valkey/version"
require "valkey/request_type"
require "valkey/response_type"
require "valkey/request_error_type"
require "valkey/protobuf/command_request_pb"
require "valkey/protobuf/connection_request_pb"
require "valkey/protobuf/response_pb"
require "valkey/bindings"
require "valkey/utils"
require "valkey/commands"
Expand Down Expand Up @@ -323,36 +321,112 @@ def initialize(options = {})
# Extract connection parameters
host = options[:host] || "127.0.0.1"
port = options[:port] || 6379
database_id = options[:db] || 0

# Validate database ID
raise ArgumentError, "Database ID must be non-negative, got: #{database_id}" if database_id.negative?

nodes = options[:nodes] || [{ host: host, port: port }]

cluster_mode_enabled = options[:cluster_mode] || false
# Validate nodes array
raise ArgumentError, "Nodes array cannot be empty" if nodes.empty?

# Build URI string
# Use the first node for standalone mode, or first node for cluster discovery
first_node = nodes.first
raise ArgumentError, "First node cannot be nil" if first_node.nil?

uri_host = first_node[:host]
uri_port = first_node[:port]

# Validate host and port
raise ArgumentError, "Host cannot be nil" if uri_host.nil?
raise ArgumentError, "Port cannot be nil" if uri_port.nil?
raise ArgumentError, "Port must be a number" unless uri_port.is_a?(Integer)

# Determine scheme based on TLS/SSL
scheme = [true, "true"].include?(options[:ssl]) ? "rediss" : "redis"

# Build URI with authentication if provided
uri_parts = [scheme, "://"]

# Add authentication to URI
if options[:username] && options[:password]
uri_parts << CGI.escape(options[:username])
uri_parts << ":"
uri_parts << CGI.escape(options[:password])
uri_parts << "@"
elsif options[:password]
uri_parts << ":"
uri_parts << CGI.escape(options[:password])
uri_parts << "@"
end

uri_parts << uri_host
uri_parts << ":"
uri_parts << uri_port.to_s

# Add database ID to URI if specified
uri_parts << "/" << database_id.to_s if database_id.positive?

uri_str = uri_parts.join

# Build JSON options for additional configuration
json_options = {}

# Cluster mode
json_options["cluster_mode_enabled"] = true if options[:cluster_mode]

# Protocol
case options[:protocol]
when :resp3, "resp3", 3
json_options["protocol"] = "RESP3"
when :resp2, "resp2", 2
json_options["protocol"] = "RESP2"
end

# Protocol defaults to RESP2
protocol = case options[:protocol]
when :resp3, "resp3", 3
ConnectionRequest::ProtocolVersion::RESP3
else
ConnectionRequest::ProtocolVersion::RESP2
end
# Timeouts
request_timeout = options[:timeout] || 5.0

# Validate timeout types
unless request_timeout.is_a?(Numeric)
raise ArgumentError, "Timeout must be a number, got: #{request_timeout.class}"
end
raise ArgumentError, "Timeout must be positive, got: #{request_timeout}" if request_timeout <= 0

json_options["request_timeout"] = (request_timeout * 1000).to_i

if options[:connect_timeout]
connect_timeout = options[:connect_timeout]
unless connect_timeout.is_a?(Numeric)
raise ArgumentError, "Connect timeout must be a number, got: #{connect_timeout.class}"
end
raise ArgumentError, "Connect timeout must be positive, got: #{connect_timeout}" if connect_timeout <= 0

json_options["connection_timeout"] = (connect_timeout * 1000).to_i
end

# TLS/SSL support
tls_mode = if [true, "true"].include?(options[:ssl])
ConnectionRequest::TlsMode::SecureTls
else
ConnectionRequest::TlsMode::NoTls
end
# Client name
json_options["client_name"] = options[:client_name] if options[:client_name]

# SSL parameters - map ssl_params to protobuf root_certs
# TLS/SSL certificates
root_certs = []
if options[:ssl_params].is_a?(Hash)
# ca_file - read CA certificate file (PEM or DER format)
root_certs << File.binread(options[:ssl_params][:ca_file]) if options[:ssl_params][:ca_file]
if options[:ssl_params][:ca_file]
ca_file = options[:ssl_params][:ca_file]
raise ArgumentError, "CA file does not exist: #{ca_file}" unless File.exist?(ca_file)
raise ArgumentError, "CA file is not readable: #{ca_file}" unless File.readable?(ca_file)
root_certs << File.binread(ca_file)
end

# cert - client certificate (file path or OpenSSL::X509::Certificate)
if options[:ssl_params][:cert]
cert_data = if options[:ssl_params][:cert].is_a?(String)
File.binread(options[:ssl_params][:cert])
cert_file = options[:ssl_params][:cert]
raise ArgumentError, "Cert file does not exist: #{cert_file}" unless File.exist?(cert_file)
raise ArgumentError, "Cert file is not readable: #{cert_file}" unless File.readable?(cert_file)
File.binread(cert_file)
elsif options[:ssl_params][:cert].respond_to?(:to_pem)
options[:ssl_params][:cert].to_pem
elsif options[:ssl_params][:cert].respond_to?(:to_der)
Expand All @@ -366,7 +440,10 @@ def initialize(options = {})
# key - client key (file path or OpenSSL::PKey)
if options[:ssl_params][:key]
key_data = if options[:ssl_params][:key].is_a?(String)
File.binread(options[:ssl_params][:key])
key_file = options[:ssl_params][:key]
raise ArgumentError, "Key file does not exist: #{key_file}" unless File.exist?(key_file)
raise ArgumentError, "Key file is not readable: #{key_file}" unless File.readable?(key_file)
File.binread(key_file)
elsif options[:ssl_params][:key].respond_to?(:to_pem)
options[:ssl_params][:key].to_pem
elsif options[:ssl_params][:key].respond_to?(:to_der)
Expand All @@ -379,54 +456,48 @@ def initialize(options = {})

# Additional root certificates from ca_path
if options[:ssl_params][:ca_path]
Dir.glob(File.join(options[:ssl_params][:ca_path], "*.crt")).each do |cert_file|
root_certs << File.binread(cert_file)
ca_path = options[:ssl_params][:ca_path]
raise ArgumentError, "CA path does not exist: #{ca_path}" unless Dir.exist?(ca_path)

Dir.glob(File.join(ca_path, "*.crt")).each do |cert_file|
root_certs << File.binread(cert_file) if File.readable?(cert_file)
end
Dir.glob(File.join(options[:ssl_params][:ca_path], "*.pem")).each do |cert_file|
root_certs << File.binread(cert_file)
Dir.glob(File.join(ca_path, "*.pem")).each do |cert_file|
root_certs << File.binread(cert_file) if File.readable?(cert_file)
end
end

# Direct root_certs array support
root_certs.concat(options[:ssl_params][:root_certs]) if options[:ssl_params][:root_certs].is_a?(Array)
end

# Authentication support
authentication_info = nil
if options[:password] || options[:username]
authentication_info = ConnectionRequest::AuthenticationInfo.new(
password: options[:password] || "",
username: options[:username] || ""
)
end

# Database selection
database_id = options[:db] || 0

# Client name
client_name = options[:client_name] || ""

# Timeout handling
# :timeout sets the request timeout (for command execution)
# :connect_timeout sets the connection establishment timeout
# Default request timeout is 5.0 seconds
request_timeout = options[:timeout] || 5.0

# Connection timeout (milliseconds) - defaults to 0 (uses system default)
connection_timeout_ms = if options[:connect_timeout]
(options[:connect_timeout] * 1000).to_i
else
0
end
json_options["root_certs"] = root_certs unless root_certs.empty?

# Connection retry strategy
connection_retry_strategy = nil
if options[:reconnect_attempts] || options[:reconnect_delay] || options[:reconnect_delay_max]
number_of_retries = options[:reconnect_attempts] || 1
base_delay = options[:reconnect_delay] || 0.5
max_delay = options[:reconnect_delay_max]

# Validate reconnection parameters
unless number_of_retries.is_a?(Integer)
raise ArgumentError, "Reconnect attempts must be an integer, got: #{number_of_retries.class}"
end
raise ArgumentError, "Reconnect attempts must be non-negative, got: #{number_of_retries}" if number_of_retries.negative?

unless base_delay.is_a?(Numeric)
raise ArgumentError, "Reconnect delay must be a number, got: #{base_delay.class}"
end
raise ArgumentError, "Reconnect delay must be positive, got: #{base_delay}" unless base_delay.positive?

if max_delay
unless max_delay.is_a?(Numeric)
raise ArgumentError, "Reconnect delay max must be a number, got: #{max_delay.class}"
end
raise ArgumentError, "Reconnect delay max must be positive, got: #{max_delay}" unless max_delay.positive?
end

exponent_base = 2
jitter_percent = 0

if max_delay && base_delay.positive? && number_of_retries.positive?
calculated_base = (max_delay / base_delay)**(1.0 / number_of_retries.to_f)
Expand All @@ -435,45 +506,29 @@ def initialize(options = {})

factor_ms = (base_delay * 1000).to_i

connection_retry_strategy = ConnectionRequest::ConnectionRetryStrategy.new(
number_of_retries: number_of_retries,
factor: factor_ms,
exponent_base: exponent_base,
jitter_percent: jitter_percent
)
json_options["connection_retry_strategy"] = {
"number_of_retries" => number_of_retries,
"factor" => factor_ms,
"exponent_base" => exponent_base,
"jitter_percent" => 0
}
end

# Build connection request
request_params = {
cluster_mode_enabled: cluster_mode_enabled,
request_timeout: request_timeout,
protocol: protocol,
tls_mode: tls_mode,
addresses: nodes.map { |node| ConnectionRequest::NodeAddress.new(host: node[:host], port: node[:port]) }
}

# Add optional fields only if they have values
request_params[:connection_timeout] = connection_timeout_ms if connection_timeout_ms.positive?
request_params[:database_id] = database_id if database_id.positive?
request_params[:client_name] = client_name unless client_name.empty?
request_params[:authentication_info] = authentication_info if authentication_info
request_params[:root_certs] = root_certs unless root_certs.empty?
request_params[:connection_retry_strategy] = connection_retry_strategy if connection_retry_strategy
# Multi-node addresses for cluster mode
if nodes.size > 1
json_options["addresses"] = nodes.map { |node| { "host" => node[:host], "port" => node[:port] } }
end

request = ConnectionRequest::ConnectionRequest.new(request_params)
# Convert JSON options to string (pass nil if empty)
json_str = json_options.empty? ? nil : JSON.generate(json_options)

# Create client using URI-based FFI function
client_type = Bindings::ClientType.new
client_type[:tag] = 1 # SyncClient

request_str = ConnectionRequest::ConnectionRequest.encode(request)
request_buf = FFI::MemoryPointer.new(:char, request_str.bytesize)
request_buf.put_bytes(0, request_str)

request_len = request_str.bytesize

response_ptr = Bindings.create_client(
request_buf,
request_len,
response_ptr = Bindings.create_client_from_uri(
uri_str,
json_str,
client_type,
method(:pubsub_callback)
)
Expand Down
7 changes: 7 additions & 0 deletions lib/valkey/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ class CommandResult < FFI::Struct
:pubsub_callback # callback
], :pointer # *const ConnectionResponse

attach_function :create_client_from_uri, [
:string, # *const c_char (uri_str)
:string, # *const c_char (extra_options_json)
ClientType.by_ref, # *const ClientType
:pubsub_callback # callback
], :pointer # *const ConnectionResponse

attach_function :close_client, [
:pointer # client_adapter_ptr
], :void
Expand Down
Binary file modified lib/valkey/libglide_ffi.dylib
Binary file not shown.
51 changes: 0 additions & 51 deletions lib/valkey/protobuf/command_request_pb.rb

This file was deleted.

Loading
Loading