@@ -3,6 +3,7 @@ defmodule Sequin.Redis do
33 alias __MODULE__
44 alias Sequin.Error
55 alias Sequin.Error.ServiceError
6+ alias Sequin.Redis.RedisClient
67 alias Sequin.Statsd
78
89 require Logger
@@ -12,37 +13,61 @@ defmodule Sequin.Redis do
1213 @ type pipeline_return_value :: redis_value ( ) | ServiceError . t ( )
1314 @ type command_opt :: { :query_name , String . t ( ) }
1415
16+ defmodule ClusterClient do
17+ @ moduledoc false
18+ def connect ( index , % { host: host , port: port } = opts ) do
19+ opts =
20+ opts
21+ |> Map . drop ( [ :host , :port , :database ] )
22+ |> Keyword . new ( )
23+
24+ cluster_nodes = [ { host , port } ]
25+ :ok = :eredis_cluster . connect ( Sequin.Redis . connection ( index ) , cluster_nodes , opts )
26+ end
27+
28+ def q ( connection , command ) do
29+ :eredis_cluster . q ( connection , command )
30+ end
31+
32+ def qp ( connection , commands ) do
33+ :eredis_cluster . q ( connection , commands )
34+ end
35+ end
36+
37+ defmodule Client do
38+ @ moduledoc false
39+ def connect ( index , opts ) do
40+ opts =
41+ opts
42+ |> Keyword . new ( )
43+ |> Keyword . put ( :name , { :local , Sequin.Redis . connection ( index ) } )
44+ |> Keyword . delete ( :pool_size )
45+
46+ { :ok , _pid } = :eredis . start_link ( opts )
47+ :ok
48+ end
49+
50+ def q ( connection , command ) do
51+ :eredis . q ( connection , command )
52+ end
53+
54+ def qp ( connection , commands ) do
55+ :eredis . qp ( connection , commands )
56+ end
57+ end
58+
1559 @ doc """
1660 :eredis_cluster_sup_sup has already been started elsewhere. To start nodes underneath it,
1761 we need to call the connect/3 function.
1862 connect/3 calls :eredis_cluster_sup_sup.start_child
1963 """
2064 def connect_cluster do
21- { url , opts } = Keyword . pop! ( config ( ) , :url )
22- % { host: host , port: port , userinfo: userinfo } = URI . parse ( url )
23- cluster_nodes = [ { to_charlist ( host ) , port } ]
24-
25- # Parse username and password from userinfo
26- opts =
27- case userinfo do
28- nil ->
29- opts
30-
31- info ->
32- { username , password } =
33- case String . split ( info , ":" ) do
34- [ user , pass ] -> { user , pass }
35- [ pass ] -> { nil , pass }
36- end
37-
38- opts
39- |> Keyword . put ( :username , username )
40- |> Keyword . put ( :password , password )
41- end
65+ :ok = set_redis_client ( )
66+ opts = parse_redis_connection_opts ( )
4267
4368 # Start connections for each pool member
4469 for index <- 0 .. ( pool_size ( ) - 1 ) do
45- :ok = :eredis_cluster . connect ( connection ( index ) , cluster_nodes , opts )
70+ redis_client ( ) . connect ( index , opts )
4671 end
4772 rescue
4873 error ->
@@ -55,7 +80,7 @@ defmodule Sequin.Redis do
5580 maybe_time ( command , opts [ :query_name ] , fn ->
5681 res =
5782 connection ( )
58- |> :eredis_cluster . q ( command )
83+ |> redis_client ( ) . q ( command )
5984 |> parse_result ( )
6085
6186 case res do
@@ -79,7 +104,7 @@ defmodule Sequin.Redis do
79104 when opt: command_opt ( )
80105 def command! ( command , opts \\ [ ] ) do
81106 maybe_time ( command , opts [ :query_name ] , fn ->
82- res = connection ( ) |> :eredis_cluster . q ( command ) |> parse_result ( )
107+ res = connection ( ) |> redis_client ( ) . q ( command ) |> parse_result ( )
83108
84109 case res do
85110 { :ok , result } -> result
@@ -92,7 +117,7 @@ defmodule Sequin.Redis do
92117 when opt: command_opt ( )
93118 def pipeline ( commands , opts \\ [ ] ) do
94119 maybe_time ( commands , opts [ :query_name ] , fn ->
95- case :eredis_cluster . q ( connection ( ) , commands ) do
120+ case redis_client ( ) . qp ( connection ( ) , commands ) do
96121 results when is_list ( results ) ->
97122 # Convert eredis results to Redix-style results
98123 { :ok ,
@@ -131,7 +156,7 @@ defmodule Sequin.Redis do
131156 |> Sequin.Keyword . reject_nils ( )
132157 end
133158
134- defp connection ( index \\ random_index ( ) ) do
159+ def connection ( index \\ random_index ( ) ) do
135160 :"#{ Redis } _#{ index } "
136161 end
137162
@@ -155,4 +180,48 @@ defmodule Sequin.Redis do
155180
156181 result
157182 end
183+
184+ defp parse_redis_connection_opts do
185+ { url , opts } = Keyword . pop! ( config ( ) , :url )
186+ opts = Map . new ( opts )
187+
188+ % { host: host , port: port , userinfo: userinfo , path: path } = URI . parse ( url )
189+ opts = Map . merge ( opts , % { host: to_charlist ( host ) , port: port } )
190+
191+ opts =
192+ case path do
193+ "/" <> database -> Map . put ( opts , :database , String . to_integer ( database ) )
194+ _ -> Map . put ( opts , :database , 0 )
195+ end
196+
197+ # Parse username and password from userinfo
198+ case userinfo do
199+ nil ->
200+ opts
201+
202+ info ->
203+ { username , password } =
204+ case String . split ( info , ":" ) do
205+ [ user , pass ] -> { user , pass }
206+ [ pass ] -> { nil , pass }
207+ end
208+
209+ opts
210+ |> Map . put ( :username , username )
211+ |> Map . put ( :password , password )
212+ end
213+ end
214+
215+ defp set_redis_client do
216+ case parse_redis_connection_opts ( ) do
217+ % { database: 0 } -> Application . put_env ( :sequin , RedisClient , ClusterClient )
218+ % { database: database } when database > 0 and database < 16 -> Application . put_env ( :sequin , RedisClient , Client )
219+ end
220+
221+ :ok
222+ end
223+
224+ defp redis_client do
225+ Application . get_env ( :sequin , RedisClient )
226+ end
158227end
0 commit comments