Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,37 @@ See the [docs](https://hexdocs.pm/phoenix_pubsub_redis/) for more information.

## Usage

To use Redis as your PubSub adapter, simply add it to your deps and Application's Supervisor tree:
To use Redis as your PubSub adapter, simply add it to your deps and application supervision tree:

```elixir
# mix.exs
defp deps do
[{:phoenix_pubsub_redis, "~> 3.0"}],
[
{:phoenix_pubsub_redis, "~> 3.0"}
]
end

# application.ex
children = [
# ...,
{Phoenix.PubSub,
adapter: Phoenix.PubSub.Redis,
host: "192.168.1.100",
redis_opts: "redis://localhost:6379",
node_name: System.get_env("NODE")}

# or with keyword options:
{Phoenix.PubSub,
adapter: Phoenix.PubSub.Redis,
redis_opts: [host: "example.com", port: 6379],
node_name: System.get_env("NODE")}
```

Config Options

Option | Description | Default |
:-----------------------| :------------------------------------------------------------------------ | :------------- |
`:name` | The required name to register the PubSub processes, ie: `MyApp.PubSub` | |
`:node_name` | The required and unique name of the node, ie: `System.get_env("NODE")` | |
`:url` | The redis-server URL, ie: `redis://username:password@host:port` | |
`:host` | The redis-server host IP | `"127.0.0.1"` |
`:port` | The redis-server port | `6379` |
`:password` | The redis-server password | `""` |
`:compression_level` | Compression level applied to serialized terms (`0` - none, `9` - highest) | `0` |
`:socket_opts` | The redis-server network layer options | `[]` |
Option | Description | Default |
:-----------------------| :----------------------------------------------------------------------------------------- | :------------- |
`:name` | The required name to register the PubSub processes, e.g. `MyApp.PubSub`. | |
`:node_name` | The name of the node. Must be unique. | `node()` |
`:compression_level` | Compression level applied to serialized terms - `0` (none) to `9` (highest). | `0` |
`:redis_pool_size` | The size of the Redis connection pool. | `5` |
`:redis_opts` | Redix connection options - either a Redis URL string or a keyword list. See [Redix docs](https://hexdocs.pm/redix/Redix.html#start_link/1-redis-options) for details. | |
149 changes: 107 additions & 42 deletions lib/phoenix_pubsub_redis/redis.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,59 @@ defmodule Phoenix.PubSub.Redis do

{Phoenix.PubSub,
adapter: Phoenix.PubSub.Redis,
host: "192.168.1.100",
redis_opts: "redis://localhost:6379",
node_name: System.get_env("NODE")}

## Options

* `:url` - The url to the redis server ie: `redis://username:password@host:port`
* `:name` - The required name to register the PubSub processes, ie: `MyApp.PubSub`
* `:node_name` - The required name of the node, defaults to Erlang --sname flag. It must be unique.
* `:host` - The redis-server host IP, defaults `"127.0.0.1"`
* `:port` - The redis-server port, defaults `6379`
* `:username` - The redis-server username
* `:password` - The redis-server password, defaults `""`
* `:ssl` - The redis-server ssl option, defaults `false`
* `:redis_pool_size` - The size of the redis connection pool. Defaults `5`
* `:compression_level` - Compression level applied to serialized terms - from `0` (no compression), to `9` (highest). Defaults `0`
* `:socket_opts` - List of options that are passed to the network layer when connecting to the Redis server. Default `[]`
* `:sentinel` - Redix sentinel configuration. Default to `nil`
* `:name` - The required name to register the PubSub processes, e.g. `MyApp.PubSub`.
* `:node_name` - The name of the node, used to filter out messages broadcast by the same node. Defaults to `node()`. Must be unique.
* `:redis_pool_size` - The size of the Redis connection pool. Defaults to `5`.
* `:compression_level` - Compression level applied to serialized terms - `0` (none) to `9` (highest). Defaults to `0`.
* `:redis_opts` - Redix connection options - either a Redis URL string or a keyword list. See `Redix.start_link/1` for more information.

"""

use Supervisor

@behaviour Phoenix.PubSub.Adapter
@redis_pool_size 5
@redis_opts [:host, :port, :username, :password, :database, :ssl, :socket_opts, :sentinel]
@defaults [host: "127.0.0.1", port: 6379]

@schema NimbleOptions.new!(
node_name: [
type: :atom,
doc: "The name of the node. Defaults to `node()`. Must be unique."
],
redis_pool_size: [
type: :pos_integer,
default: 5,
doc: "The size of the Redis connection pool."
],
compression_level: [
type: {:in, 0..9},
default: 0,
doc: "Compression level applied to serialized terms - `0` (none) to `9` (highest)."
],
redis_opts: [
type: {:or, [:string, :keyword_list]},
default: [],
doc:
"Redix connection options - either a Redis URL string or a keyword list. " <>
"See `Redix.start_link/1` for more information."
]
)

# Using top-level configuration keys for Redis configuration is deprecated
@redis_top_level_keys [
:host,
:port,
:username,
:password,
:database,
:ssl,
:socket_opts,
:sentinel,
:url
]

## Adapter callbacks

Expand All @@ -58,15 +85,14 @@ defmodule Phoenix.PubSub.Redis do

@impl true
def init(opts) do
opts = build_opts(opts)
pubsub_name = Keyword.fetch!(opts, :name)
adapter_name = Keyword.fetch!(opts, :adapter_name)
compression_level = Keyword.get(opts, :compression_level, 0)

opts = handle_url_opts(opts)
opts = Keyword.merge(@defaults, opts)
redis_opts = Keyword.take(opts, @redis_opts)
compression_level = Keyword.fetch!(opts, :compression_level)
redis_opts = Keyword.fetch!(opts, :redis_opts)
node_name = Keyword.fetch!(opts, :node_name)
redis_pool_size = Keyword.fetch!(opts, :redis_pool_size)

node_name = opts[:node_name] || node()
validate_node_name!(node_name)

:ets.new(adapter_name, [:public, :named_table, read_concurrency: true])
Expand All @@ -76,7 +102,7 @@ defmodule Phoenix.PubSub.Redis do
pool_opts = [
name: {:local, adapter_name},
worker_module: Redix,
size: opts[:redis_pool_size] || @redis_pool_size,
size: redis_pool_size,
max_overflow: 0
]

Expand All @@ -88,28 +114,67 @@ defmodule Phoenix.PubSub.Redis do
Supervisor.init(children, strategy: :rest_for_one)
end

defp handle_url_opts(opts) do
if opts[:url] do
merge_url_opts(opts)
else
opts
end
end
@doc false
def build_opts(opts) do
{internal, user_opts} =
Keyword.split(opts, [:name, :adapter_name, :adapter, :pool_size, :registry_size])

defp merge_url_opts(opts) do
info = URI.parse(opts[:url])
{top_level_redis, user_opts} = Keyword.split(user_opts, @redis_top_level_keys)

user_opts =
case String.split(info.userinfo || "", ":") do
[""] -> []
[username] -> [username: username]
["", password] -> [password: password]
[username, password] -> [username: username, password: password]
end
validated =
user_opts
|> Keyword.put_new(:node_name, node())
|> NimbleOptions.validate!(@schema)

opts
|> Keyword.merge(user_opts)
|> Keyword.merge(host: info.host, port: info.port || @defaults[:port])
redis_opts = build_redis_opts(top_level_redis, validated[:redis_opts])

internal
|> Keyword.put(:node_name, validated[:node_name])
|> Keyword.put(:compression_level, validated[:compression_level])
|> Keyword.put(:redis_pool_size, validated[:redis_pool_size])
|> Keyword.put(:redis_opts, redis_opts)
end

defp build_redis_opts(top_level, redis_opts) do
case {top_level, redis_opts} do
# no options provided, use defaults
{[], []} ->
[]

{[_ | _], []} ->
keys = top_level |> Keyword.keys() |> Enum.map_join(", ", &inspect/1)

IO.warn(
"Passing Redis connection keys at the top level is deprecated. " <>
"Move #{keys} inside the :redis_opts option instead.",
[]
)

case Keyword.pop(top_level, :url) do
{nil, opts} ->
opts

{url, []} ->
url

{url, _other} ->
IO.warn(
"Passing :url with other top-level Redis keys is not supported. " <>
"Only the :url value will be used. Use redis_opts: \"#{url}\" instead.",
[]
)

url
end

{[], redis_opts} when redis_opts != [] ->
redis_opts

# deprecated and new options both provided
_multiple ->
raise ArgumentError,
"only one of :redis_opts or top-level Redis keys may be provided, not both"
end
end

defp validate_node_name!(node_name) do
Expand Down
12 changes: 10 additions & 2 deletions lib/phoenix_pubsub_redis/redis_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule Phoenix.PubSub.RedisServer do
node_name: node_name,
redix_pid: nil,
reconnect_timer: nil,
redis_opts: [sync_connect: true] ++ redis_opts
redis_opts: redis_opts
}

{:ok, establish_conn(state)}
Expand Down Expand Up @@ -130,7 +130,7 @@ defmodule Phoenix.PubSub.RedisServer do
end

defp establish_conn(%{redis_opts: redis_opts} = state) do
case Redix.PubSub.start_link(redis_opts) do
case start_redix_pubsub(redis_opts) do
{:ok, redix_pid} ->
establish_success(%{state | redix_pid: redix_pid})

Expand All @@ -139,5 +139,13 @@ defmodule Phoenix.PubSub.RedisServer do
end
end

defp start_redix_pubsub(url) when is_binary(url) do
Redix.PubSub.start_link(url, sync_connect: true)
end

defp start_redix_pubsub(opts) when is_list(opts) do
Redix.PubSub.start_link(opts ++ [sync_connect: true])
end

defp redis_namespace(adapter_name), do: "phx:#{adapter_name}"
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule PhoenixPubsubRedis.Mixfile do
[
phoenix_pubsub(),
{:redix, "~> 1.0"},
{:nimble_options, "~> 1.0"},
{:ex_doc, ">= 0.0.0", only: :docs},
{:poolboy, "~> 1.5.1 or ~> 1.6"}
]
Expand Down
Loading
Loading