Skip to content

Commit 978541b

Browse files
jjcarstensoestrich
authored andcommitted
Start support for AWS IoT setup
Initial support for an MQTT and message queue setup for device connections as an alternative to websockets
1 parent 93f8db7 commit 978541b

File tree

9 files changed

+462
-1
lines changed

9 files changed

+462
-1
lines changed

config/dev.exs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,38 @@ config :nerves_hub, NervesHub.SwooshMailer, adapter: Swoosh.Adapters.Local
108108
config :nerves_hub, NervesHub.RateLimit, limit: 10
109109

110110
config :sentry, environment_name: :development
111+
112+
broker_opts = [
113+
name: NervesHub.AWSIoT.PintBroker,
114+
rules: [{"nh/device_messages", &Broadway.test_message(:nerves_hub_iot_messages, &1.payload)}],
115+
on_connect: fn client_id ->
116+
payload = %{clientId: client_id, eventType: :connected}
117+
Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
118+
end,
119+
on_disconnect: fn client_id ->
120+
payload = %{
121+
clientId: client_id,
122+
eventType: :disconnected,
123+
disconnectReason: "CONNECTION_LOST"
124+
}
125+
126+
Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
127+
end
128+
]
129+
130+
config :nerves_hub, NervesHub.AWSIoT,
131+
# Run a PintBroker for local process and/or device connections
132+
local_broker: {PintBroker, broker_opts},
133+
queues: [
134+
[
135+
name: :nerves_hub_iot_messages,
136+
producer: [
137+
module: {Broadway.DummyProducer, []}
138+
# To test fetching from development queues registered with AWS, use the producer
139+
# below. You may need to set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY ¬
140+
# module: {BroadwaySQS.Producer, queue_url: "nerves-hub-iot-messages"}
141+
],
142+
processors: [default: []],
143+
batchers: [default: [batch_size: 10, batch_timeout: 2000]]
144+
]
145+
]

config/runtime.exs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,23 @@ if config_env() == :prod do
140140
]
141141
]
142142
]
143+
144+
if System.get_env("AWS_IOT_ENABLED") in ["1", "true", "t"] do
145+
config :nerves_hub, NervesHub.AWSIoT,
146+
queues: [
147+
[
148+
# AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY must be set
149+
name: :nerves_hub_iot_messages,
150+
producer: [
151+
module:
152+
{BroadwaySQS.Producer,
153+
queue_url: System.get_env("AWS_IOT_SQS_QUEUE", "nerves-hub-iot-messages")}
154+
],
155+
processors: [default: []],
156+
batchers: [default: [batch_size: 10, batch_timeout: 2000]]
157+
]
158+
]
159+
end
143160
end
144161
end
145162

config/test.exs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,36 @@ config :nerves_hub, NervesHub.SwooshMailer, adapter: Swoosh.Adapters.Test
8282
config :nerves_hub, NervesHub.RateLimit, limit: 100
8383

8484
config :sentry, environment_name: :test
85+
86+
## AWS IoT
87+
broker_opts = [
88+
name: NervesHub.AWSIoT.PintBroker,
89+
rules: [{"nh/device_messages", &Broadway.test_message(:nerves_hub_iot_messages, &1.payload)}],
90+
on_connect: fn client_id ->
91+
payload = %{clientId: client_id, eventType: :connected}
92+
Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
93+
end,
94+
on_disconnect: fn client_id ->
95+
payload = %{
96+
clientId: client_id,
97+
eventType: :disconnected,
98+
disconnectReason: "CONNECTION_LOST"
99+
}
100+
101+
Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
102+
end
103+
]
104+
105+
config :nerves_hub, NervesHub.AWSIoT,
106+
# Use PintBroker for local device connections in tests
107+
local_broker: {PintBroker, broker_opts},
108+
queues: [
109+
[
110+
name: :nerves_hub_iot_messages,
111+
producer: [
112+
module: {Broadway.DummyProducer, []}
113+
],
114+
processors: [default: []],
115+
batchers: [default: [batch_size: 10, batch_timeout: 2000]]
116+
]
117+
]

lib/nerves_hub/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ defmodule NervesHub.Application do
3030
{Phoenix.PubSub, name: NervesHub.PubSub},
3131
{Cluster.Supervisor, [topologies]},
3232
{Task.Supervisor, name: NervesHub.TaskSupervisor},
33-
{Oban, Application.fetch_env!(:nerves_hub, Oban)}
33+
{Oban, Application.fetch_env!(:nerves_hub, Oban)},
34+
NervesHub.AWSIoT
3435
] ++
3536
deployments_supervisor(deploy_env()) ++
3637
endpoints(deploy_env())

lib/nerves_hub/aws_iot.ex

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
defmodule NervesHub.AWSIoT do
2+
@moduledoc """
3+
Support for common AWS IOT infrastructure including MQTT and SQS
4+
5+
Requires `:queues` to be defined in the application config or
6+
the supervisor is simply ignored
7+
8+
See docs.nerves-hub.org for a general overview of the architecture
9+
"""
10+
use Supervisor
11+
12+
alias NervesHub.Tracker
13+
14+
@type opt :: {:queues, [keyword()]}
15+
@spec start_link([opt]) :: Supervisor.on_start()
16+
def start_link(opts) do
17+
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
18+
end
19+
20+
@impl Supervisor
21+
def init(opts) do
22+
opts =
23+
Application.get_env(:nerves_hub, __MODULE__, [])
24+
|> Keyword.merge(opts)
25+
26+
case opts[:queues] do
27+
queues when is_list(queues) and length(queues) > 0 ->
28+
children =
29+
Enum.map(queues, &{__MODULE__.SQS, &1})
30+
|> maybe_add_local_broker(opts)
31+
32+
Supervisor.init(children, strategy: :one_for_one)
33+
34+
_ ->
35+
:ignore
36+
end
37+
end
38+
39+
defp maybe_add_local_broker(children, opts) do
40+
if broker_spec = opts[:local_broker] do
41+
[broker_spec | children]
42+
else
43+
children
44+
end
45+
end
46+
47+
if Application.compile_env(:nerves_hub, [__MODULE__, :local_broker], false) do
48+
def publish(serial, event, payload) do
49+
data = Jason.encode!(%{event: event, payload: payload})
50+
PintBroker.publish(__MODULE__.PintBroker, "nh/#{serial}", data)
51+
end
52+
else
53+
def publish(serial, event, payload) do
54+
# TODO: Topic and data may change soon
55+
# Stubbing out initial idea here for now
56+
data = %{event: event, payload: payload}
57+
topic = "/topics/nh/#{serial}"
58+
59+
ExAws.Operation.JSON.new(:iot_data, %{path: topic, data: data})
60+
|> ExAws.request()
61+
end
62+
end
63+
64+
defmodule SQS do
65+
@moduledoc """
66+
Consumer for AWS SQS messages
67+
68+
This is the ingestion point of devices coming from the MQTT
69+
broker. A message from a device must include the `"identifier"`
70+
key either in the payload or pulled from the topic via the
71+
AWS IoT rule that forwards to the queue.
72+
73+
The system must also be setup with a rule to forward [AWS Lifecycle
74+
events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html)
75+
to a queue for tracking device online/offline presence
76+
77+
Right now, all configured queues are handled by this module.
78+
In the future, we may want to separate handling for each
79+
queue in it's own module.
80+
"""
81+
use Broadway
82+
83+
alias Broadway.Message
84+
alias NervesHub.Devices
85+
86+
require Logger
87+
88+
def start_link(opts), do: Broadway.start_link(__MODULE__, opts)
89+
90+
@impl Broadway
91+
def handle_message(_processor, %{data: raw} = msg, _context) do
92+
case Jason.decode(raw) do
93+
{:ok, data} ->
94+
Message.put_data(msg, data)
95+
|> process_message()
96+
97+
_ ->
98+
Message.failed(msg, :malformed)
99+
end
100+
end
101+
102+
@impl Broadway
103+
def handle_batch(_batcher, messages, batch_info, _context) do
104+
Logger.debug("[SQS] Handled #{inspect(batch_info.size)}")
105+
messages
106+
end
107+
108+
defp process_message(%{data: %{"eventType" => "connected"} = data} = msg) do
109+
# TODO: Maybe use more info from the connection?
110+
# Example payload of AWS lifecycle connected event
111+
# principalIdentifier is a SHA256 fingerprint of the certificate that
112+
# is Base16 encoded
113+
# {
114+
# "clientId": "186b5",
115+
# "timestamp": 1573002230757,
116+
# "eventType": "connected",
117+
# "sessionIdentifier": "a4666d2a7d844ae4ac5d7b38c9cb7967",
118+
# "principalIdentifier": "12345678901234567890123456789012",
119+
# "ipAddress": "192.0.2.0",
120+
# "versionNumber": 0
121+
# }
122+
123+
with {:ok, device} <- Devices.get_by_identifier(data["clientId"]) do
124+
Logger.debug("[AWS IoT] device #{device.identifier} connected")
125+
126+
Tracker.online(device)
127+
128+
msg
129+
else
130+
_ ->
131+
Message.failed(msg, :unknown_device)
132+
end
133+
end
134+
135+
defp process_message(%{data: %{"eventType" => "disconnected"} = data} = msg) do
136+
# TODO: Maybe use more of the disconnect data?
137+
# Example payload of AWS lifecyle disconnect event
138+
# {
139+
# "clientId": "186b5",
140+
# "timestamp": 1573002340451,
141+
# "eventType": "disconnected",
142+
# "sessionIdentifier": "a4666d2a7d844ae4ac5d7b38c9cb7967",
143+
# "principalIdentifier": "12345678901234567890123456789012",
144+
# "clientInitiatedDisconnect": true,
145+
# "disconnectReason": "CLIENT_INITIATED_DISCONNECT",
146+
# "versionNumber": 0
147+
# }
148+
with {:ok, device} <- Devices.get_by_identifier(data["clientId"]) do
149+
Logger.debug(
150+
"[AWS IoT] device #{device.identifier} disconnected: #{data["disconnectReason"]}"
151+
)
152+
153+
Tracker.offline(device)
154+
end
155+
156+
msg
157+
end
158+
159+
defp process_message(msg) do
160+
# TODO: Track unhandled msg
161+
msg
162+
end
163+
end
164+
end

mix.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ defmodule NervesHub.MixProject do
5656
{:bcrypt_elixir, "~> 3.0"},
5757
{:castore, "~> 1.0"},
5858
{:circular_buffer, "~> 0.4.1"},
59+
{:broadway_sqs, "~> 0.7"},
5960
{:comeonin, "~> 5.3"},
6061
{:crontab, "~> 1.1"},
6162
{:decorator, "~> 1.2"},
@@ -84,6 +85,7 @@ defmodule NervesHub.MixProject do
8485
{:phoenix_pubsub, "~> 2.0"},
8586
{:phoenix_swoosh, "~> 1.0"},
8687
{:phoenix_view, "~> 2.0"},
88+
{:pint_broker, "~> 1.0", only: [:dev, :test]},
8789
{:plug, "~> 1.7"},
8890
{:postgrex, "~> 0.14"},
8991
{:scrivener_ecto, "~> 2.7"},

0 commit comments

Comments
 (0)