Skip to content

Commit 85c65fc

Browse files
authored
Merge pull request #103 from JuliaDatabases/tan/tls
feat: add TLS support
2 parents fcd0353 + 10dd621 commit 85c65fc

File tree

22 files changed

+865
-474
lines changed

22 files changed

+865
-474
lines changed

.github/workflows/CI.yml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@ jobs:
88
test:
99
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
1010
runs-on: ubuntu-latest
11-
services:
12-
redis:
13-
image: redis:7.2.3-bookworm # https://hub.docker.com/_/redis
14-
ports:
15-
- 6379:6379
1611
strategy:
1712
fail-fast: false
1813
matrix:
@@ -41,9 +36,18 @@ jobs:
4136
${{ runner.os }}-test-
4237
${{ runner.os }}-
4338
- uses: julia-actions/julia-buildpkg@v1
39+
- name: Start redis server
40+
run: |
41+
echo "Starting redis server"
42+
pwd
43+
test/conf/redis.sh
44+
sleep 5
45+
echo "Redis started"
4446
- uses: julia-actions/julia-runtest@v1
4547
- uses: julia-actions/julia-processcoverage@v1
46-
- uses: codecov/codecov-action@v2
48+
- uses: codecov/codecov-action@v3
49+
id: codecov
50+
continue-on-error: true
4751
with:
4852
files: lcov.info
4953
fail_ci_if_error: false

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ version = "2.0.0"
66
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
77
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
88
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
9+
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
910

1011
[compat]
1112
julia = "^1"
1213
DataStructures = "^0.18"
14+
MbedTLS = "0.6.8, 0.7, 1"
1315

1416
[extras]
1517
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"

src/Redis.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module Redis
22
using Dates
33
using Sockets
4+
using MbedTLS
45

56
import Base.get, Base.keys, Base.time
67

@@ -59,6 +60,7 @@ export sentinel_masters, sentinel_master, sentinel_slaves, sentinel_getmasteradd
5960
export REDIS_PERSISTENT_KEY, REDIS_EXPIRED_KEY
6061

6162
include("exceptions.jl")
63+
include("transport/transport.jl")
6264
include("connection.jl")
6365
include("parser.jl")
6466
include("client.jl")

src/client.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ end
160160
function subscription_loop(conn::SubscriptionConnection, err_callback::Function)
161161
while is_connected(conn)
162162
try
163-
l = getline(conn.socket)
164-
reply = parseline(l, conn.socket)
163+
l = getline(conn.transport)
164+
reply = parseline(l, conn.transport)
165165
reply = convert_reply(reply)
166166
message = SubscriptionMessage(reply)
167167
if message.message_type == SubscriptionMessageType.Message

src/connection.jl

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import Sockets.connect, Sockets.TCPSocket, Base.StatusActive, Base.StatusOpen, Base.StatusPaused
2-
31
abstract type RedisConnectionBase end
42
abstract type SubscribableConnection<:RedisConnectionBase end
53

@@ -8,31 +6,31 @@ struct RedisConnection <: SubscribableConnection
86
port::Integer
97
password::AbstractString
108
db::Integer
11-
socket::TCPSocket
9+
transport::Transport.RedisTransport
1210
end
1311

1412
struct SentinelConnection <: SubscribableConnection
1513
host::AbstractString
1614
port::Integer
1715
password::AbstractString
1816
db::Integer
19-
socket::TCPSocket
17+
transport::Transport.RedisTransport
2018
end
2119

2220
struct TransactionConnection <: RedisConnectionBase
2321
host::AbstractString
2422
port::Integer
2523
password::AbstractString
2624
db::Integer
27-
socket::TCPSocket
25+
transport::Transport.RedisTransport
2826
end
2927

3028
mutable struct PipelineConnection <: RedisConnectionBase
3129
host::AbstractString
3230
port::Integer
3331
password::AbstractString
3432
db::Integer
35-
socket::TCPSocket
33+
transport::Transport.RedisTransport
3634
num_commands::Integer
3735
end
3836

@@ -43,77 +41,100 @@ struct SubscriptionConnection <: RedisConnectionBase
4341
db::Integer
4442
callbacks::Dict{AbstractString, Function}
4543
pcallbacks::Dict{AbstractString, Function}
46-
socket::TCPSocket
44+
transport::Transport.RedisTransport
4745
end
4846

49-
function RedisConnection(; host="127.0.0.1", port=6379, password="", db=0)
47+
Transport.get_sslconfig(s::RedisConnectionBase) = Transport.get_sslconfig(s.transport)
48+
49+
function RedisConnection(; host="127.0.0.1", port=6379, password="", db=0, sslconfig=nothing)
5050
try
51-
socket = connect(host, port)
52-
connection = RedisConnection(host, port, password, db, socket)
51+
connection = RedisConnection(
52+
host,
53+
port,
54+
password,
55+
db,
56+
Transport.transport(host, port, sslconfig)
57+
)
5358
on_connect(connection)
5459
catch
5560
throw(ConnectionException("Failed to connect to Redis server"))
5661
end
5762
end
5863

59-
function SentinelConnection(; host="127.0.0.1", port=26379, password="", db=0)
64+
function SentinelConnection(; host="127.0.0.1", port=26379, password="", db=0, sslconfig=nothing)
6065
try
61-
socket = connect(host, port)
62-
sentinel_connection = SentinelConnection(host, port, password, db, socket)
66+
sentinel_connection = SentinelConnection(
67+
host,
68+
port,
69+
password,
70+
db,
71+
Transport.transport(host, port, sslconfig)
72+
)
6373
on_connect(sentinel_connection)
6474
catch
6575
throw(ConnectionException("Failed to connect to Redis sentinel"))
6676
end
6777
end
6878

69-
function TransactionConnection(parent::RedisConnection)
79+
function TransactionConnection(parent::RedisConnection; sslconfig=Transport.get_sslconfig(parent))
7080
try
71-
socket = connect(parent.host, parent.port)
72-
transaction_connection = TransactionConnection(parent.host,
73-
parent.port, parent.password, parent.db, socket)
81+
transaction_connection = TransactionConnection(
82+
parent.host,
83+
parent.port,
84+
parent.password,
85+
parent.db,
86+
Transport.transport(parent.host, parent.port, sslconfig)
87+
)
7488
on_connect(transaction_connection)
7589
catch
7690
throw(ConnectionException("Failed to create transaction"))
7791
end
7892
end
7993

80-
function PipelineConnection(parent::RedisConnection)
94+
function PipelineConnection(parent::RedisConnection; sslconfig=Transport.get_sslconfig(parent))
8195
try
82-
socket = connect(parent.host, parent.port)
83-
pipeline_connection = PipelineConnection(parent.host,
84-
parent.port, parent.password, parent.db, socket, 0)
96+
pipeline_connection = PipelineConnection(
97+
parent.host,
98+
parent.port,
99+
parent.password,
100+
parent.db,
101+
Transport.transport(parent.host, parent.port, sslconfig),
102+
0
103+
)
85104
on_connect(pipeline_connection)
86105
catch
87106
throw(ConnectionException("Failed to create pipeline"))
88107
end
89108
end
90109

91-
function SubscriptionConnection(parent::SubscribableConnection)
110+
function SubscriptionConnection(parent::SubscribableConnection; sslconfig=Transport.get_sslconfig(parent))
92111
try
93-
socket = connect(parent.host, parent.port)
94-
subscription_connection = SubscriptionConnection(parent.host,
95-
parent.port, parent.password, parent.db, Dict{AbstractString, Function}(),
96-
Dict{AbstractString, Function}(), socket)
112+
subscription_connection = SubscriptionConnection(
113+
parent.host,
114+
parent.port,
115+
parent.password,
116+
parent.db,
117+
Dict{AbstractString, Function}(),
118+
Dict{AbstractString, Function}(),
119+
Transport.transport(parent.host, parent.port, sslconfig)
120+
)
97121
on_connect(subscription_connection)
98122
catch
99123
throw(ConnectionException("Failed to create subscription"))
100124
end
101125
end
102126

103127
function on_connect(conn::RedisConnectionBase)
104-
# disable nagle and enable quickack to speed up the usually small exchanges
105-
Sockets.nagle(conn.socket, false)
106-
Sockets.quickack(conn.socket, true)
107-
128+
Transport.set_props!(conn.transport)
108129
conn.password != "" && auth(conn, conn.password)
109130
conn.db != 0 && select(conn, conn.db)
110131
conn
111132
end
112133

113134
function disconnect(conn::RedisConnectionBase)
114-
close(conn.socket)
135+
Transport.close(conn.transport)
115136
end
116137

117138
function is_connected(conn::RedisConnectionBase)
118-
conn.socket.status == StatusActive || conn.socket.status == StatusOpen || conn.socket.status == StatusPaused
139+
Transport.is_connected(conn.transport)
119140
end

src/parser.jl

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""
22
Formatting of incoming Redis Replies
33
"""
4-
function getline(s::TCPSocket)
5-
l = chomp(readline(s))
4+
function getline(t::Transport.RedisTransport)
5+
l = chomp(Transport.read_line(t))
66
length(l) > 1 || throw(ProtocolException("Invalid response received: $l"))
77
return l
88
end
@@ -12,15 +12,15 @@ convert_reply(reply::Array) = [convert_reply(r) for r in reply]
1212
convert_reply(x) = x
1313

1414
function read_reply(conn::RedisConnectionBase)
15-
l = getline(conn.socket)
16-
reply = parseline(l, conn.socket)
15+
l = getline(conn.transport)
16+
reply = parseline(l, conn.transport)
1717
convert_reply(reply)
1818
end
1919

2020
parse_error(l::AbstractString) = throw(ServerException(l))
2121

22-
function parse_bulk_string(s::TCPSocket, slen::Int)
23-
b = read(s, slen+2) # add crlf
22+
function parse_bulk_string(t::Transport.RedisTransport, slen::Int)
23+
b = Transport.read_nbytes(t, slen+2) # add crlf
2424
if length(b) != slen + 2
2525
throw(ProtocolException(
2626
"Bulk string read error: expected $slen bytes; received $(length(b))"
@@ -30,17 +30,17 @@ function parse_bulk_string(s::TCPSocket, slen::Int)
3030
end
3131
end
3232

33-
function parse_array(s::TCPSocket, slen::Int)
33+
function parse_array(t::Transport.RedisTransport, slen::Int)
3434
a = Array{Any, 1}(undef, slen)
3535
for i = 1:slen
36-
l = getline(s)
37-
r = parseline(l, s)
36+
l = getline(t)
37+
r = parseline(l, t)
3838
a[i] = r
3939
end
4040
return a
4141
end
4242

43-
function parseline(l::AbstractString, s::TCPSocket)
43+
function parseline(l::AbstractString, t::Transport.RedisTransport)
4444
reply_type = l[1]
4545
reply_token = l[2:end]
4646
if reply_type == '+'
@@ -52,14 +52,14 @@ function parseline(l::AbstractString, s::TCPSocket)
5252
if slen == -1
5353
nothing
5454
else
55-
parse_bulk_string(s, slen)
55+
parse_bulk_string(t, slen)
5656
end
5757
elseif reply_type == '*'
5858
slen = parse(Int, reply_token)
5959
if slen == -1
6060
nothing
6161
else
62-
parse_array(s, slen)
62+
parse_array(t, slen)
6363
end
6464
elseif reply_type == '-'
6565
parse_error(reply_token)
@@ -90,8 +90,8 @@ function execute_command_without_reply(conn::RedisConnectionBase, command)
9090
is_connected(conn) || throw(ConnectionException("Socket is disconnected"))
9191
iob = IOBuffer()
9292
pack_command(iob, command)
93-
lock(conn.socket.lock) do
94-
write(conn.socket, take!(iob))
93+
Transport.io_lock(conn.transport) do
94+
Transport.write_bytes(conn.transport, take!(iob))
9595
end
9696
end
9797

src/transport/tcp.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
struct TCPTransport <: RedisTransport
2+
sock::TCPSocket
3+
end
4+
5+
read_line(t::TCPTransport) = readline(t.sock)
6+
read_nbytes(t::TCPTransport, m::Int) = read(t.sock, m)
7+
write_bytes(t::TCPTransport, b::Vector{UInt8}) = write(t.sock, b)
8+
Base.close(t::TCPTransport) = close(t.sock)
9+
function set_props!(t::TCPTransport)
10+
# disable nagle and enable quickack to speed up the usually small exchanges
11+
Sockets.nagle(t.sock, false)
12+
Sockets.quickack(t.sock, true)
13+
end
14+
get_sslconfig(::TCPTransport) = nothing
15+
io_lock(f, t::TCPTransport) = lock(f, t.sock.lock)
16+
function is_connected(t::TCPTransport)
17+
status = t.sock.status
18+
status == StatusActive || status == StatusOpen || status == StatusPaused
19+
end

src/transport/tls.jl

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
struct TLSTransport <: RedisTransport
2+
sock::TCPSocket
3+
ctx::MbedTLS.SSLContext
4+
sslconfig::MbedTLS.SSLConfig
5+
buff::IOBuffer
6+
7+
function TLSTransport(sock::TCPSocket, sslconfig::MbedTLS.SSLConfig)
8+
ctx = MbedTLS.SSLContext()
9+
MbedTLS.setup!(ctx, sslconfig)
10+
MbedTLS.associate!(ctx, sock)
11+
MbedTLS.handshake(ctx)
12+
13+
return new(sock, ctx, sslconfig, PipeBuffer())
14+
end
15+
end
16+
17+
function read_into_buffer_until(cond::Function, t::TLSTransport)
18+
cond(t) && return
19+
20+
buff = Vector{UInt8}(undef, MbedTLS.MBEDTLS_SSL_MAX_CONTENT_LEN)
21+
pbuff = pointer(buff)
22+
23+
while !cond(t) && !eof(t.ctx)
24+
nread = readbytes!(t.ctx, buff; all=false)
25+
if nread > 0
26+
unsafe_write(t.buff, pbuff, nread)
27+
end
28+
end
29+
end
30+
31+
function read_line(t::TLSTransport)
32+
read_into_buffer_until(t) do t
33+
iob = t.buff
34+
(bytesavailable(t.buff) > 0) && (UInt8('\n') in view(iob.data, iob.ptr:iob.size))
35+
end
36+
return readline(t.buff)
37+
end
38+
function read_nbytes(t::TLSTransport, m::Int)
39+
read_into_buffer_until(t) do t
40+
bytesavailable(t.buff) >= m
41+
end
42+
return read(t.buff, m)
43+
end
44+
write_bytes(t::TLSTransport, b::Vector{UInt8}) = write(t.ctx, b)
45+
Base.close(t::TLSTransport) = close(t.ctx)
46+
function set_props!(s::TLSTransport)
47+
# disable nagle and enable quickack to speed up the usually small exchanges
48+
Sockets.nagle(s.sock, false)
49+
Sockets.quickack(s.sock, true)
50+
end
51+
get_sslconfig(t::TLSTransport) = t.sslconfig
52+
io_lock(f, t::TLSTransport) = lock(f, t.sock.lock)
53+
function is_connected(t::TLSTransport)
54+
status = t.sock.status
55+
status == StatusActive || status == StatusOpen || status == StatusPaused
56+
end

0 commit comments

Comments
 (0)