From 978541b0fe6d7f3c9831ac936b6d1eac097c9a8e Mon Sep 17 00:00:00 2001 From: Jon Carstens Date: Sun, 19 Nov 2023 15:11:50 -0700 Subject: [PATCH] Start support for AWS IoT setup Initial support for an MQTT and message queue setup for device connections as an alternative to websockets --- config/dev.exs | 35 +++++++ config/runtime.exs | 17 ++++ config/test.exs | 33 +++++++ lib/nerves_hub/application.ex | 3 +- lib/nerves_hub/aws_iot.ex | 164 +++++++++++++++++++++++++++++++ mix.exs | 2 + mix.lock | 8 ++ test/nerves_hub/aws_iot_test.exs | 153 ++++++++++++++++++++++++++++ test/support/mqtt_client.ex | 48 +++++++++ 9 files changed, 462 insertions(+), 1 deletion(-) create mode 100644 lib/nerves_hub/aws_iot.ex create mode 100644 test/nerves_hub/aws_iot_test.exs create mode 100644 test/support/mqtt_client.ex diff --git a/config/dev.exs b/config/dev.exs index e447ed784..9b650b6f9 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -108,3 +108,38 @@ config :nerves_hub, NervesHub.SwooshMailer, adapter: Swoosh.Adapters.Local config :nerves_hub, NervesHub.RateLimit, limit: 10 config :sentry, environment_name: :development + +broker_opts = [ + name: NervesHub.AWSIoT.PintBroker, + rules: [{"nh/device_messages", &Broadway.test_message(:nerves_hub_iot_messages, &1.payload)}], + on_connect: fn client_id -> + payload = %{clientId: client_id, eventType: :connected} + Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload)) + end, + on_disconnect: fn client_id -> + payload = %{ + clientId: client_id, + eventType: :disconnected, + disconnectReason: "CONNECTION_LOST" + } + + Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload)) + end +] + +config :nerves_hub, NervesHub.AWSIoT, + # Run a PintBroker for local process and/or device connections + local_broker: {PintBroker, broker_opts}, + queues: [ + [ + name: :nerves_hub_iot_messages, + producer: [ + module: {Broadway.DummyProducer, []} + # To test fetching from development queues registered with AWS, use the producer + # below. You may need to set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY ¬ + # module: {BroadwaySQS.Producer, queue_url: "nerves-hub-iot-messages"} + ], + processors: [default: []], + batchers: [default: [batch_size: 10, batch_timeout: 2000]] + ] + ] diff --git a/config/runtime.exs b/config/runtime.exs index 7200909fc..42b7daff7 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -140,6 +140,23 @@ if config_env() == :prod do ] ] ] + + if System.get_env("AWS_IOT_ENABLED") in ["1", "true", "t"] do + config :nerves_hub, NervesHub.AWSIoT, + queues: [ + [ + # AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY must be set + name: :nerves_hub_iot_messages, + producer: [ + module: + {BroadwaySQS.Producer, + queue_url: System.get_env("AWS_IOT_SQS_QUEUE", "nerves-hub-iot-messages")} + ], + processors: [default: []], + batchers: [default: [batch_size: 10, batch_timeout: 2000]] + ] + ] + end end end diff --git a/config/test.exs b/config/test.exs index bcf495d6f..6097c03e3 100644 --- a/config/test.exs +++ b/config/test.exs @@ -82,3 +82,36 @@ config :nerves_hub, NervesHub.SwooshMailer, adapter: Swoosh.Adapters.Test config :nerves_hub, NervesHub.RateLimit, limit: 100 config :sentry, environment_name: :test + +## AWS IoT +broker_opts = [ + name: NervesHub.AWSIoT.PintBroker, + rules: [{"nh/device_messages", &Broadway.test_message(:nerves_hub_iot_messages, &1.payload)}], + on_connect: fn client_id -> + payload = %{clientId: client_id, eventType: :connected} + Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload)) + end, + on_disconnect: fn client_id -> + payload = %{ + clientId: client_id, + eventType: :disconnected, + disconnectReason: "CONNECTION_LOST" + } + + Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload)) + end +] + +config :nerves_hub, NervesHub.AWSIoT, + # Use PintBroker for local device connections in tests + local_broker: {PintBroker, broker_opts}, + queues: [ + [ + name: :nerves_hub_iot_messages, + producer: [ + module: {Broadway.DummyProducer, []} + ], + processors: [default: []], + batchers: [default: [batch_size: 10, batch_timeout: 2000]] + ] + ] diff --git a/lib/nerves_hub/application.ex b/lib/nerves_hub/application.ex index a17750d77..f0cd5b619 100644 --- a/lib/nerves_hub/application.ex +++ b/lib/nerves_hub/application.ex @@ -30,7 +30,8 @@ defmodule NervesHub.Application do {Phoenix.PubSub, name: NervesHub.PubSub}, {Cluster.Supervisor, [topologies]}, {Task.Supervisor, name: NervesHub.TaskSupervisor}, - {Oban, Application.fetch_env!(:nerves_hub, Oban)} + {Oban, Application.fetch_env!(:nerves_hub, Oban)}, + NervesHub.AWSIoT ] ++ deployments_supervisor(deploy_env()) ++ endpoints(deploy_env()) diff --git a/lib/nerves_hub/aws_iot.ex b/lib/nerves_hub/aws_iot.ex new file mode 100644 index 000000000..72ca97a03 --- /dev/null +++ b/lib/nerves_hub/aws_iot.ex @@ -0,0 +1,164 @@ +defmodule NervesHub.AWSIoT do + @moduledoc """ + Support for common AWS IOT infrastructure including MQTT and SQS + + Requires `:queues` to be defined in the application config or + the supervisor is simply ignored + + See docs.nerves-hub.org for a general overview of the architecture + """ + use Supervisor + + alias NervesHub.Tracker + + @type opt :: {:queues, [keyword()]} + @spec start_link([opt]) :: Supervisor.on_start() + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl Supervisor + def init(opts) do + opts = + Application.get_env(:nerves_hub, __MODULE__, []) + |> Keyword.merge(opts) + + case opts[:queues] do + queues when is_list(queues) and length(queues) > 0 -> + children = + Enum.map(queues, &{__MODULE__.SQS, &1}) + |> maybe_add_local_broker(opts) + + Supervisor.init(children, strategy: :one_for_one) + + _ -> + :ignore + end + end + + defp maybe_add_local_broker(children, opts) do + if broker_spec = opts[:local_broker] do + [broker_spec | children] + else + children + end + end + + if Application.compile_env(:nerves_hub, [__MODULE__, :local_broker], false) do + def publish(serial, event, payload) do + data = Jason.encode!(%{event: event, payload: payload}) + PintBroker.publish(__MODULE__.PintBroker, "nh/#{serial}", data) + end + else + def publish(serial, event, payload) do + # TODO: Topic and data may change soon + # Stubbing out initial idea here for now + data = %{event: event, payload: payload} + topic = "/topics/nh/#{serial}" + + ExAws.Operation.JSON.new(:iot_data, %{path: topic, data: data}) + |> ExAws.request() + end + end + + defmodule SQS do + @moduledoc """ + Consumer for AWS SQS messages + + This is the ingestion point of devices coming from the MQTT + broker. A message from a device must include the `"identifier"` + key either in the payload or pulled from the topic via the + AWS IoT rule that forwards to the queue. + + The system must also be setup with a rule to forward [AWS Lifecycle + events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html) + to a queue for tracking device online/offline presence + + Right now, all configured queues are handled by this module. + In the future, we may want to separate handling for each + queue in it's own module. + """ + use Broadway + + alias Broadway.Message + alias NervesHub.Devices + + require Logger + + def start_link(opts), do: Broadway.start_link(__MODULE__, opts) + + @impl Broadway + def handle_message(_processor, %{data: raw} = msg, _context) do + case Jason.decode(raw) do + {:ok, data} -> + Message.put_data(msg, data) + |> process_message() + + _ -> + Message.failed(msg, :malformed) + end + end + + @impl Broadway + def handle_batch(_batcher, messages, batch_info, _context) do + Logger.debug("[SQS] Handled #{inspect(batch_info.size)}") + messages + end + + defp process_message(%{data: %{"eventType" => "connected"} = data} = msg) do + # TODO: Maybe use more info from the connection? + # Example payload of AWS lifecycle connected event + # principalIdentifier is a SHA256 fingerprint of the certificate that + # is Base16 encoded + # { + # "clientId": "186b5", + # "timestamp": 1573002230757, + # "eventType": "connected", + # "sessionIdentifier": "a4666d2a7d844ae4ac5d7b38c9cb7967", + # "principalIdentifier": "12345678901234567890123456789012", + # "ipAddress": "192.0.2.0", + # "versionNumber": 0 + # } + + with {:ok, device} <- Devices.get_by_identifier(data["clientId"]) do + Logger.debug("[AWS IoT] device #{device.identifier} connected") + + Tracker.online(device) + + msg + else + _ -> + Message.failed(msg, :unknown_device) + end + end + + defp process_message(%{data: %{"eventType" => "disconnected"} = data} = msg) do + # TODO: Maybe use more of the disconnect data? + # Example payload of AWS lifecyle disconnect event + # { + # "clientId": "186b5", + # "timestamp": 1573002340451, + # "eventType": "disconnected", + # "sessionIdentifier": "a4666d2a7d844ae4ac5d7b38c9cb7967", + # "principalIdentifier": "12345678901234567890123456789012", + # "clientInitiatedDisconnect": true, + # "disconnectReason": "CLIENT_INITIATED_DISCONNECT", + # "versionNumber": 0 + # } + with {:ok, device} <- Devices.get_by_identifier(data["clientId"]) do + Logger.debug( + "[AWS IoT] device #{device.identifier} disconnected: #{data["disconnectReason"]}" + ) + + Tracker.offline(device) + end + + msg + end + + defp process_message(msg) do + # TODO: Track unhandled msg + msg + end + end +end diff --git a/mix.exs b/mix.exs index b902b1fd7..a23645590 100644 --- a/mix.exs +++ b/mix.exs @@ -56,6 +56,7 @@ defmodule NervesHub.MixProject do {:bcrypt_elixir, "~> 3.0"}, {:castore, "~> 1.0"}, {:circular_buffer, "~> 0.4.1"}, + {:broadway_sqs, "~> 0.7"}, {:comeonin, "~> 5.3"}, {:crontab, "~> 1.1"}, {:decorator, "~> 1.2"}, @@ -84,6 +85,7 @@ defmodule NervesHub.MixProject do {:phoenix_pubsub, "~> 2.0"}, {:phoenix_swoosh, "~> 1.0"}, {:phoenix_view, "~> 2.0"}, + {:pint_broker, "~> 1.0", only: [:dev, :test]}, {:plug, "~> 1.7"}, {:postgrex, "~> 0.14"}, {:scrivener_ecto, "~> 2.7"}, diff --git a/mix.lock b/mix.lock index db4c3c50e..53f3edce4 100644 --- a/mix.lock +++ b/mix.lock @@ -2,6 +2,8 @@ "bandit": {:hex, :bandit, "1.2.2", "569fe5d0efb107c9af37a1e37e25ce2ceec293101a2d4bc512876fc3207192b5", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "2f89adb7281c78d4e75733e0a9e1b24f46f84d2993963d6fa57d0eafadec5f03"}, "base62": {:hex, :base62, "1.2.2", "85c6627eb609317b70f555294045895ffaaeb1758666ab9ef9ca38865b11e629", [:mix], [{:custom_base, "~> 0.2.1", [hex: :custom_base, repo: "hexpm", optional: false]}], "hexpm", "d41336bda8eaa5be197f1e4592400513ee60518e5b9f4dcf38f4b4dae6f377bb"}, "bcrypt_elixir": {:hex, :bcrypt_elixir, "3.1.0", "0b110a9a6c619b19a7f73fa3004aa11d6e719a67e672d1633dc36b6b2290a0f7", [:make, :mix], [{:comeonin, "~> 5.3", [hex: :comeonin, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "2ad2acb5a8bc049e8d5aa267802631912bb80d5f4110a178ae7999e69dca1bf7"}, + "broadway": {:hex, :broadway, "1.0.7", "7808f9e3eb6f53ca6d060f0f9d61012dd8feb0d7a82e62d087dd517b9b66fa53", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e76cfb0a7d64176c387b8b1ddbfb023e2ee8a63e92f43664d78e6d5d0b1177c6"}, + "broadway_sqs": {:hex, :broadway_sqs, "0.7.3", "b7b99cf4d21e9d87a64853c4c502690ece01897a3a08bfc6df01ad8999e19da3", [:mix], [{:broadway, "~> 1.0", [hex: :broadway, repo: "hexpm", optional: false]}, {:ex_aws_sqs, "~> 3.2.1 or ~> 3.3", [hex: :ex_aws_sqs, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:saxy, "~> 1.1", [hex: :saxy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "0a9f02d4a32ba65feebb0cd247c466342d4eb1803ee7db993f2886810dfc1d3a"}, "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "circular_buffer": {:hex, :circular_buffer, "0.4.1", "477f370fd8cfe1787b0a1bade6208bbd274b34f1610e41f1180ba756a7679839", [:mix], [], "hexpm", "633ef2e059dde0d7b89bbab13b1da9d04c6685e80e68fbdf41282d4fae746b72"}, @@ -18,11 +20,14 @@ "elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"}, "ex_aws": {:hex, :ex_aws, "2.5.1", "7418917974ea42e9e84b25e88b9f3d21a861d5f953ad453e212f48e593d8d39f", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1b95431f70c446fa1871f0eb9b183043c5a625f75f9948a42d25f43ae2eff12b"}, "ex_aws_s3": {:hex, :ex_aws_s3, "2.5.3", "422468e5c3e1a4da5298e66c3468b465cfd354b842e512cb1f6fbbe4e2f5bdaf", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "4f09dd372cc386550e484808c5ac5027766c8d0cd8271ccc578b82ee6ef4f3b8"}, + "ex_aws_sqs": {:hex, :ex_aws_sqs, "3.4.0", "f7c4d0177c1c954776363d3dc05e5dfd37ddf0e2c65ec3f047e5c9c7dd1b71ac", [:mix], [{:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:saxy, "~> 1.1", [hex: :saxy, repo: "hexpm", optional: true]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "b504482206ccaf767b714888e9d41a1cfcdcb241577985517114191c812f155a"}, "expo": {:hex, :expo, "0.5.1", "249e826a897cac48f591deba863b26c16682b43711dd15ee86b92f25eafd96d9", [:mix], [], "hexpm", "68a4233b0658a3d12ee00d27d37d856b1ba48607e7ce20fd376958d0ba6ce92b"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "finch": {:hex, :finch, "0.17.0", "17d06e1d44d891d20dbd437335eebe844e2426a0cd7e3a3e220b461127c73f70", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8d014a661bb6a437263d4b5abf0bcbd3cf0deb26b1e8596f2a271d22e48934c7"}, "floki": {:hex, :floki, "0.35.4", "cc947b446024732c07274ac656600c5c4dc014caa1f8fb2dfff93d275b83890d", [:mix], [], "hexpm", "27fa185d3469bd8fc5947ef0f8d5c4e47f0af02eb6b070b63c868f69e3af0204"}, "gen_smtp": {:hex, :gen_smtp, "1.2.0", "9cfc75c72a8821588b9b9fe947ae5ab2aed95a052b81237e0928633a13276fd3", [:rebar3], [{:ranch, ">= 1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "5ee0375680bca8f20c4d85f58c2894441443a743355430ff33a783fe03296779"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, + "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "gettext": {:hex, :gettext, "0.24.0", "6f4d90ac5f3111673cbefc4ebee96fe5f37a114861ab8c7b7d5b30a1108ce6d8", [:mix], [{:expo, "~> 0.5.1", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "bdf75cdfcbe9e4622dd18e034b227d77dd17f0f133853a1c73b97b3d6c770e8b"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~>2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "hlclock": {:hex, :hlclock, "1.0.0", "7a72fc7a20a9382499216227edf97a8b118e21fc3fcad0e81b8d10c616ce1431", [:mix], [], "hexpm", "d3f994336a7fcbc68bf08b14b2101b61e57bef82c032a6e05c1cdc753612c941"}, @@ -55,11 +60,13 @@ "phoenix_swoosh": {:hex, :phoenix_swoosh, "1.2.1", "b74ccaa8046fbc388a62134360ee7d9742d5a8ae74063f34eb050279de7a99e1", [:mix], [{:finch, "~> 0.8", [hex: :finch, repo: "hexpm", optional: true]}, {:hackney, "~> 1.10", [hex: :hackney, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_view, "~> 1.0 or ~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: false]}, {:swoosh, "~> 1.5", [hex: :swoosh, repo: "hexpm", optional: false]}], "hexpm", "4000eeba3f9d7d1a6bf56d2bd56733d5cadf41a7f0d8ffe5bb67e7d667e204a2"}, "phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"}, "phoenix_view": {:hex, :phoenix_view, "2.0.3", "4d32c4817fce933693741deeb99ef1392619f942633dde834a5163124813aad3", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}], "hexpm", "cd34049af41be2c627df99cd4eaa71fc52a328c0c3d8e7d4aa28f880c30e7f64"}, + "pint_broker": {:hex, :pint_broker, "1.0.1", "da6613278d2c66e6304eeb583d00adc2004c476c33234036bb9776a594604456", [:mix], [{:tortoise311, "~> 0.11", [hex: :tortoise311, repo: "hexpm", optional: false]}], "hexpm", "ce2b9c61338db19d98b04665f536887670978972e075951e18de0e0a05b57b46"}, "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "ranch": {:hex, :ranch, "2.1.0", "2261f9ed9574dcfcc444106b9f6da155e6e540b2f82ba3d42b339b93673b72a3", [:make, :rebar3], [], "hexpm", "244ee3fa2a6175270d8e1fc59024fd9dbc76294a321057de8f803b1479e76916"}, "recon": {:hex, :recon, "2.5.4", "05dd52a119ee4059fa9daa1ab7ce81bc7a8161a2f12e9d42e9d551ffd2ba901c", [:mix, :rebar3], [], "hexpm", "e9ab01ac7fc8572e41eb59385efeb3fb0ff5bf02103816535bacaedf327d0263"}, + "saxy": {:hex, :saxy, "1.5.0", "0141127f2d042856f135fb2d94e0beecda7a2306f47546dbc6411fc5b07e28bf", [:mix], [], "hexpm", "ea7bb6328fbd1f2aceffa3ec6090bfb18c85aadf0f8e5030905e84235861cf89"}, "scrivener": {:hex, :scrivener, "2.7.2", "1d913c965ec352650a7f864ad7fd8d80462f76a32f33d57d1e48bc5e9d40aba2", [:mix], [], "hexpm", "7866a0ec4d40274efbee1db8bead13a995ea4926ecd8203345af8f90d2b620d9"}, "scrivener_ecto": {:hex, :scrivener_ecto, "2.7.0", "cf64b8cb8a96cd131cdbcecf64e7fd395e21aaa1cb0236c42a7c2e34b0dca580", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:scrivener, "~> 2.4", [hex: :scrivener, repo: "hexpm", optional: false]}], "hexpm", "e809f171687806b0031129034352f5ae44849720c48dd839200adeaf0ac3e260"}, "scrivener_html": {:git, "https://github.com/nerves-hub/scrivener_html", "e7d2ffcf241cebc8490da44b5d09ac297b57a91b", [branch: "phx-1.5"]}, @@ -74,6 +81,7 @@ "telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"}, "thousand_island": {:hex, :thousand_island, "1.3.3", "d193a0e268f44a2d09efc6b2d8b39cf37fc65ce99fc421deca6260759d6eb204", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "adc328b15f10012769dcc3aeb14d4855790cd5ea34e4c6a701f334516084e750"}, "timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"}, + "tortoise311": {:hex, :tortoise311, "0.11.7", "29e78e6ffa153a2c0728344277e587342cf7035ee62a38dc5656d970007920f0", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "0ef7cf940b50ab47c433323294c9a3bb2883a03e0c7cb37de3e860c0b9ba5bef"}, "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, diff --git a/test/nerves_hub/aws_iot_test.exs b/test/nerves_hub/aws_iot_test.exs new file mode 100644 index 000000000..f2dcbc1b4 --- /dev/null +++ b/test/nerves_hub/aws_iot_test.exs @@ -0,0 +1,153 @@ +defmodule NervesHub.AWSIoTTest do + use NervesHub.DataCase + + alias NervesHub.AWSIoT + alias NervesHub.Fixtures + alias NervesHub.Support.MQTTClient + + test "can publish to connected devices" do + %{device: device} = create_device() + {:ok, conn} = MQTTClient.start_link(device) + topic = "nh/#{device.identifier}" + + assert MQTTClient.connected?(conn) + assert MQTTClient.subscribed?(conn, topic) + + msg = %{event: "testing", payload: %{howdy: :partner}} + + AWSIoT.publish(device.identifier, msg.event, msg.payload) + + MQTTClient.assert_message(conn, topic, Jason.encode!(msg)) + end + + test "known device connection from lifecycle event" do + %{device: device, device_certificate: cert} = create_device() + + identifier = device.identifier + + lifecyle_event = + %{ + clientId: device.identifier, + timestamp: DateTime.to_unix(DateTime.utc_now()), + eventType: :connected, + sessionIdentifier: Base.encode16(:crypto.strong_rand_bytes(16), case: :lower), + principalIdentifier: Base.encode16(:crypto.hash(:sha256, cert.der), case: :lower), + ipAddress: "127.0.0.2", + versionNumber: 0 + } + + Phoenix.PubSub.subscribe(NervesHub.PubSub, "device:#{identifier}:internal") + + # Publish message to the rule that forwards to the queue + # This is expected to be configured in AWS to come from the broker + PintBroker.publish(AWSIoT.PintBroker, "nh/device_messages", Jason.encode!(lifecyle_event)) + + # Piggy back on internal messaging to know when status is updated to online + assert_receive %{ + event: "connection_change", + payload: %{device_id: ^identifier, status: "online"} + } + end + + test "unknown device connection from lifecyle event" do + lifecyle_event = %{ + clientId: "im-an-imposter!", + timestamp: DateTime.to_unix(DateTime.utc_now()), + eventType: :connected, + sessionIdentifier: Base.encode16(:crypto.strong_rand_bytes(16), case: :lower), + principalIdentifier: "123429083423094lskdf2304msd234", + ipAddress: "127.0.0.2", + versionNumber: 0 + } + + msg = %Broadway.Message{acknowledger: :stub, data: Jason.encode!(lifecyle_event)} + + # Assert on the broadway message sruct here to mostly test that things + # don't crash in this case since there is no after effect + assert %{status: {:failed, :unknown_device}} = + AWSIoT.SQS.handle_message(:nerves_hub_iot_messages, msg, :no_context) + end + + test "known device disconnect from lifecyle event" do + %{device: device, device_certificate: cert} = create_device() + identifier = device.identifier + + lifecyle_event = + %{ + clientId: device.identifier, + timestamp: DateTime.to_unix(DateTime.utc_now()), + eventType: :disconnected, + sessionIdentifier: Base.encode16(:crypto.strong_rand_bytes(16), case: :lower), + principalIdentifier: Base.encode16(:crypto.hash(:sha256, cert.der), case: :lower), + clientInitiatedDisconnect: true, + disconnectReason: "CLIENT_INITIATED_DISCONNECT", + versionNumber: 0 + } + + Phoenix.PubSub.subscribe(NervesHub.PubSub, "device:#{identifier}:internal") + + # Publish message to the rule that forwards to the queue + # This is expected to be configured in AWS to come from the broker + PintBroker.publish(AWSIoT.PintBroker, "nh/device_messages", Jason.encode!(lifecyle_event)) + + # Piggy back on internal messaging to know when status is updated to online + assert_receive %{ + event: "connection_change", + payload: %{device_id: ^identifier, status: "offline"} + } + end + + test "unknown device disconnect from lifecyle event" do + data = %{ + clientId: "im-an-imposter!", + timestamp: DateTime.to_unix(DateTime.utc_now()), + eventType: :disconnected, + sessionIdentifier: Base.encode16(:crypto.strong_rand_bytes(16), case: :lower), + principalIdentifier: "123429083423094lskdf2304msd234", + clientInitiatedDisconnect: true, + disconnectReason: "CLIENT_INITIATED_DISCONNECT", + versionNumber: 0 + } + + msg = %Broadway.Message{acknowledger: :stub, data: Jason.encode!(data)} + + # Assert on the broadway message sruct here to mostly test that things + # don't crash in this case since there is no after effect + assert %{status: :ok} = AWSIoT.SQS.handle_message(:nerves_hub_iot_messages, msg, :no_context) + end + + test "malformed messages from queue" do + msg = %Broadway.Message{acknowledger: :stub, data: "i am invalid JSoN?!!*"} + + assert %{status: {:failed, :malformed}} = + AWSIoT.SQS.handle_message(:nerves_hub_iot_messages, msg, :no_context) + end + + test "unknown messages are ignored" do + data = Jason.encode!(%{this: :is, an: :unhandled, message: "!"}) + msg = %Broadway.Message{acknowledger: :stub, data: data} + assert %{status: :ok} = AWSIoT.SQS.handle_message(:nerves_hub_iot_messages, msg, :no_context) + end + + defp create_device() do + user = Fixtures.user_fixture() + org = Fixtures.org_fixture(user) + product = Fixtures.product_fixture(user, org) + org_key = Fixtures.org_key_fixture(org) + firmware = Fixtures.firmware_fixture(org_key, product) + + device = Fixtures.device_fixture(org, product, firmware) + + %{db_cert: cert} = + Fixtures.device_certificate_fixture(device, X509.PrivateKey.new_ec(:secp256r1)) + + %{ + device: %{device | device_certificates: [cert]}, + device_certificate: cert, + firmware: firmware, + org: org, + product: product, + user: user + } + end +end diff --git a/test/support/mqtt_client.ex b/test/support/mqtt_client.ex new file mode 100644 index 000000000..1aac5af19 --- /dev/null +++ b/test/support/mqtt_client.ex @@ -0,0 +1,48 @@ +defmodule NervesHub.Support.MQTTClient do + @moduledoc """ + Sample MQTT Client for testing device connections + """ + use Tortoise311.Handler + + import ExUnit.Assertions + + def assert_message(conn, topic, msg) do + assert Process.alive?(conn) + assert_receive {:message, ^topic, ^msg} + end + + def connected?(conn) do + Process.alive?(conn) and :sys.get_state(conn).status == :up + end + + def subscribed?(conn, topic) do + Process.alive?(conn) and + Enum.find_value(GenServer.call(conn, :subscriptions), fn + {^topic, _qos} -> true + _ -> false + end) + end + + def start_link(device) do + opts = [ + client_id: device.identifier, + server: {Tortoise311.Transport.Tcp, host: "localhost", port: 1883}, + handler: {__MODULE__, [{device, self()}]}, + subscriptions: [{"nh/#{device.identifier}", 0}] + # will: something? + ] + + Tortoise311.Connection.start_link(opts) + end + + @impl Tortoise311.Handler + def init([{device, test_pid}]) do + {:ok, %{device: device, test_pid: test_pid}} + end + + @impl Tortoise311.Handler + def handle_message(topic_levels, payload, state) do + send(state.test_pid, {:message, Path.join(topic_levels), payload}) + {:ok, state} + end +end