From bd9bbb8cdcbe3f15acb20109f5763abdded0e3c7 Mon Sep 17 00:00:00 2001 From: Alioune Gaye Date: Fri, 10 Oct 2025 12:39:45 +0200 Subject: [PATCH 1/3] test(dup): Extract Interface proprieties unset test from simple flow --- .../data_updater_test.exs | 38 ----- .../dup_unset_properties_test.exs | 157 ++++++++++++++++++ 2 files changed, 157 insertions(+), 38 deletions(-) create mode 100644 apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs index ad2b5eab12..6de1f237f0 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs @@ -1290,44 +1290,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do value = Repo.one(value_query) assert value == 5 - - # Unset subtest - - # Delete it otherwise it gets raised - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - volatile_changed_trigger_id - ) == :ok - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/weekSchedule/10/start", - <<>>, - gen_tracking_id(), - make_timestamp("2017-10-09T15:10:32+00:00") - ) - - DataUpdater.dump_state(realm, encoded_device_id) - - endpoint_id = - retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") - - value_query = - from ip in IndividualProperty, - prefix: ^keyspace_name, - where: - ip.device_id == ^device_id and - ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and - ip.endpoint_id == ^endpoint_id and - ip.path == "/weekSchedule/10/start", - select: ip.longinteger_value - - value = Repo.one(value_query) - - assert value == nil end test "empty introspection is updated correctly", %{realm: realm, helper_name: helper_name} do diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs new file mode 100644 index 0000000000..e3e577f8bf --- /dev/null +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs @@ -0,0 +1,157 @@ +# +# This file is part of Astarte. +# +# Copyright 2017 - 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +defmodule Astate.DataUpdaterPlant.UnsetTest do + use ExUnit.Case, async: true + import Mox + + alias Astarte.DataUpdaterPlant.DatabaseTestHelper + alias Astarte.DataUpdaterPlant.AMQPTestHelper + alias Astarte.Core.Device + alias Astarte.DataUpdaterPlant.DataUpdater + alias Astarte.DataAccess.Realms.Realm + alias Astarte.DataAccess.Repo + alias Astarte.DataAccess.Realms.IndividualProperty + alias Astarte.DataAccess.Realms.Interface + alias Astarte.DataUpdaterPlant.DatabaseTestHelper + + alias Astarte.Core.CQLUtils + + import Ecto.Query + + setup :verify_on_exit! + + setup do + realm_string = "autotestrealm#{System.unique_integer([:positive])}" + {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) + + on_exit(fn -> + DatabaseTestHelper.destroy_local_test_keyspace(realm_string) + end) + + helper_name = String.to_atom("helper_#{realm_string}") + + consumer_name = String.to_atom("consumer_#{realm_string}") + + {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm_string) + + {:ok, _consumer_pid} = + AMQPTestHelper.start_events_consumer( + name: consumer_name, + realm: realm_string, + helper_name: helper_name + ) + + {:ok, %{realm: realm_string, helper_name: helper_name}} + end + + test "Unset values from interface properties", %{ + realm: realm, + helper_name: helper_name + } do + AMQPTestHelper.clean_queue(helper_name) + + encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" + keyspace_name = Realm.keyspace_name(realm) + {:ok, device_id} = Device.decode_device_id(encoded_device_id) + volatile_changed_trigger_id = :crypto.strong_rand_bytes(16) + + received_msgs = 45000 + received_bytes = 4_500_000 + existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} + + insert_opts = [ + introspection: existing_introspection_map, + total_received_msgs: received_msgs, + total_received_bytes: received_bytes, + groups: ["group1"] + ] + + DatabaseTestHelper.insert_device(realm, device_id, insert_opts) + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + volatile_changed_trigger_id + ) == :ok + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/10/start", + <<>>, + gen_tracking_id(), + make_timestamp("2017-10-09T15:10:32+00:00") + ) + + DataUpdater.dump_state(realm, encoded_device_id) + + endpoint_id = + retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/weekSchedule/10/start", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == nil + end + + defp retrieve_endpoint_id(realm_name, interface_name, interface_major, path) do + keyspace_name = Realm.keyspace_name(realm_name) + + query = + from i in Interface, + prefix: ^keyspace_name, + where: i.name == ^interface_name and i.major_version == ^interface_major, + select: %{ + automaton_transitions: i.automaton_transitions, + automaton_accepting_states: i.automaton_accepting_states + } + + interface_row = Repo.one!(query) + + automaton = + {:erlang.binary_to_term(interface_row[:automaton_transitions]), + :erlang.binary_to_term(interface_row[:automaton_accepting_states])} + + {:ok, endpoint_id} = Astarte.Core.Mapping.EndpointsAutomaton.resolve_path(path, automaton) + + endpoint_id + end + + defp make_timestamp(timestamp_string) do + {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) + + DateTime.to_unix(date_time, :millisecond) * 10000 + end + + defp gen_tracking_id() do + message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() + delivery_tag = {:injected_msg, make_ref()} + {message_id, delivery_tag} + end +end From c1516b14813322fbc03ad5f3f926a03346fa6e41 Mon Sep 17 00:00:00 2001 From: Alioune Gaye Date: Fri, 10 Oct 2025 18:18:13 +0200 Subject: [PATCH 2/3] dup(test): Extract properties producer from simple flow --- .../data_updater_test.exs | 109 -------- .../dup_producer_properties_test.exs | 255 ++++++++++++++++++ 2 files changed, 255 insertions(+), 109 deletions(-) create mode 100644 apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs index 6de1f237f0..bacb0b42b0 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs @@ -1181,115 +1181,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do v_value: nil ] ] - - # Test /producer/properties control message - data = - <<0, 0, 0, 98>> <> - :zlib.compress("com.test.LCDMonitor/time/to;com.test.LCDMonitor/weekSchedule/10/start") - - timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) - - DataUpdater.handle_control( - realm, - encoded_device_id, - "/producer/properties", - data, - gen_tracking_id(), - timestamp_us_x_10 - ) - - DataUpdater.dump_state(realm, encoded_device_id) - {remove_event, remove_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert remove_headers["x_astarte_event_type"] == "path_removed_event" - assert remove_headers["x_astarte_device_id"] == encoded_device_id - assert remove_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(remove_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() - - assert :uuid.string_to_uuid(remove_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.path_removed_trigger_id() - - assert SimpleEvent.decode(remove_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: - {:path_removed_event, - %PathRemovedEvent{interface: "com.test.LCDMonitor", path: "/time/from"}}, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.path_removed_trigger_id() - } - - endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") - - value_query = - from ip in IndividualProperty, - prefix: ^keyspace_name, - where: - ip.device_id == ^device_id and - ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and - ip.endpoint_id == ^endpoint_id and - ip.path == "/time/from", - select: ip.longinteger_value - - value = Repo.one(value_query) - - assert value == nil - - endpoint_id = - retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/9/start") - - value_query = - from ip in IndividualProperty, - prefix: ^keyspace_name, - where: - ip.device_id == ^device_id and - ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and - ip.endpoint_id == ^endpoint_id and - ip.path == "/weekSchedule/9/start", - select: ip.longinteger_value - - value = Repo.one(value_query) - - assert value == nil - - endpoint_id = - retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") - - value_query = - from ip in IndividualProperty, - prefix: ^keyspace_name, - where: - ip.device_id == ^device_id and - ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and - ip.endpoint_id == ^endpoint_id and - ip.path == "/weekSchedule/10/start", - select: ip.longinteger_value - - value = Repo.one(value_query) - - assert value == 10 - - endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") - - timestamp_ms = DateTime.from_unix!(1_507_557_632_000, :millisecond) - - value_query = - from id in IndividualDatastream, - prefix: ^keyspace_name, - where: - id.device_id == ^device_id and - id.interface_id == ^CQLUtils.interface_id("com.test.SimpleStreamTest", 1) and - id.endpoint_id == ^endpoint_id and - id.path == "/0/value" and - id.value_timestamp >= ^timestamp_ms, - select: id.integer_value - - value = Repo.one(value_query) - - assert value == 5 end test "empty introspection is updated correctly", %{realm: realm, helper_name: helper_name} do diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs new file mode 100644 index 0000000000..844720a2c2 --- /dev/null +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs @@ -0,0 +1,255 @@ +# +# This file is part of Astarte. +# +# Copyright 2017 - 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +defmodule Astarte.DataUpdaterPlant.ProducerPropertiesTest do + use ExUnit.Case, async: true + import Mox + + alias Astarte.DataUpdaterPlant.DatabaseTestHelper + alias Astarte.DataUpdaterPlant.AMQPTestHelper + alias Astarte.Core.Device + alias Astarte.DataUpdaterPlant.DataUpdater + alias Astarte.DataAccess.Realms.Realm + alias Astarte.DataAccess.Repo + alias Astarte.DataAccess.Realms.IndividualProperty + alias Astarte.DataAccess.Realms.Interface + alias Astarte.Core.Triggers.SimpleEvents.PathRemovedEvent + alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent + alias Astarte.Core.Device + alias Astarte.DataAccess.Realms.IndividualDatastream + + alias Astarte.Core.CQLUtils + + import Ecto.Query + + setup :verify_on_exit! + + setup do + realm_string = "autotestrealm#{System.unique_integer([:positive])}" + {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) + + on_exit(fn -> + DatabaseTestHelper.destroy_local_test_keyspace(realm_string) + end) + + helper_name = String.to_atom("helper_#{realm_string}") + + consumer_name = String.to_atom("consumer_#{realm_string}") + + {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm_string) + + {:ok, _consumer_pid} = + AMQPTestHelper.start_events_consumer( + name: consumer_name, + realm: realm_string, + helper_name: helper_name + ) + + {:ok, %{realm: realm_string, helper_name: helper_name}} + end + + test "producer properties are correctly set", %{ + realm: realm, + helper_name: helper_name + } do + AMQPTestHelper.clean_queue(helper_name) + + keyspace_name = Realm.keyspace_name(realm) + encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" + {:ok, device_id} = Device.decode_device_id(encoded_device_id) + + received_msgs = 45000 + received_bytes = 4_500_000 + existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} + + insert_opts = [ + introspection: existing_introspection_map, + total_received_msgs: received_msgs, + total_received_bytes: received_bytes, + groups: ["group1"] + ] + + DatabaseTestHelper.insert_device(realm, device_id, insert_opts) + + data = + <<0, 0, 0, 98>> <> + :zlib.compress("com.test.LCDMonitor/time/to;com.test.LCDMonitor/weekSchedule/10/start") + + timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) + + DataUpdater.handle_control( + realm, + encoded_device_id, + "/producer/properties", + data, + gen_tracking_id(), + timestamp_us_x_10 + ) + + datastream_timestamp = make_timestamp("2017-10-09T14:15:32+00:00") + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.SimpleStreamTest", + "/0/value", + Cyanide.encode!(%{"v" => 5}), + gen_tracking_id(), + datastream_timestamp + ) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/10/start", + Cyanide.encode!(%{"v" => 10}), + gen_tracking_id(), + datastream_timestamp + ) + + DataUpdater.dump_state(realm, encoded_device_id) + {remove_event, remove_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + + assert remove_headers["x_astarte_event_type"] == "path_removed_event" + assert remove_headers["x_astarte_device_id"] == encoded_device_id + assert remove_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(remove_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(remove_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.path_removed_trigger_id() + + assert SimpleEvent.decode(remove_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: + {:path_removed_event, + %PathRemovedEvent{interface: "com.test.LCDMonitor", path: "/time/from"}}, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.path_removed_trigger_id() + } + + endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/time/from", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == nil + + endpoint_id = + retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/9/start") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/weekSchedule/9/start", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == nil + + endpoint_id = + retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/weekSchedule/10/start", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == 10 + + endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") + + timestamp_ms = DateTime.from_unix!(1_507_557_632_000, :millisecond) + + value_query = + from id in IndividualDatastream, + prefix: ^keyspace_name, + where: + id.device_id == ^device_id and + id.interface_id == ^CQLUtils.interface_id("com.test.SimpleStreamTest", 1) and + id.endpoint_id == ^endpoint_id and + id.path == "/0/value" and + id.value_timestamp >= ^timestamp_ms, + select: id.integer_value + + value = Repo.one(value_query) + + assert value == 5 + end + + defp retrieve_endpoint_id(realm_name, interface_name, interface_major, path) do + keyspace_name = Realm.keyspace_name(realm_name) + + query = + from i in Interface, + prefix: ^keyspace_name, + where: i.name == ^interface_name and i.major_version == ^interface_major, + select: %{ + automaton_transitions: i.automaton_transitions, + automaton_accepting_states: i.automaton_accepting_states + } + + interface_row = Repo.one!(query) + + automaton = + {:erlang.binary_to_term(interface_row[:automaton_transitions]), + :erlang.binary_to_term(interface_row[:automaton_accepting_states])} + + {:ok, endpoint_id} = Astarte.Core.Mapping.EndpointsAutomaton.resolve_path(path, automaton) + + endpoint_id + end + + defp gen_tracking_id() do + message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() + delivery_tag = {:injected_msg, make_ref()} + {message_id, delivery_tag} + end + + defp make_timestamp(timestamp_string) do + {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) + + DateTime.to_unix(date_time, :millisecond) * 10000 + end +end From 01fdf9f8a8540f2568953c1b424a924264482b2b Mon Sep 17 00:00:00 2001 From: Alioune Gaye Date: Thu, 16 Oct 2025 10:33:11 +0200 Subject: [PATCH 3/3] refactor(dup): Isolate simple flow into a dedicated module --- .../data_updater/server.ex | 2 +- .../data_updater_test.exs | 1331 ++-------------- .../device_disconnection_test.exs | 72 +- .../dup_device_delete_test.exs | 284 ++-- .../dup_producer_properties_test.exs | 107 +- .../dup_simple_flow_test.exs | 1338 +++++++++++++++++ .../dup_unset_properties_test.exs | 99 +- .../test/support/amqp_events_consumer.ex | 32 +- .../test/support/amqp_test_helper.ex | 20 +- .../test/support/cases/amqp.ex | 42 + .../test/support/helpers/data_updater.ex | 6 +- 11 files changed, 1763 insertions(+), 1570 deletions(-) create mode 100644 apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_simple_flow_test.exs create mode 100644 apps/astarte_data_updater_plant/test/support/cases/amqp.ex diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/server.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/server.ex index 713707d590..0a8f6e7d1b 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/server.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/server.ex @@ -17,7 +17,7 @@ # defmodule Astarte.DataUpdaterPlant.DataUpdater.Server do - use GenServer + use GenServer, restart: :transient alias Astarte.DataUpdaterPlant.DataUpdater.Core alias Astarte.DataUpdaterPlant.Config alias Astarte.DataUpdaterPlant.DataUpdater.Impl diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs index bacb0b42b0..0ca50b17c4 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs @@ -17,1187 +17,106 @@ # defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do - use ExUnit.Case, async: true - import Mox - - import Ecto.Query - - alias Astarte.Core.Device - alias Astarte.Core.Triggers.SimpleEvents.DeviceConnectedEvent - alias Astarte.Core.Triggers.SimpleEvents.DeviceDisconnectedEvent - alias Astarte.Core.Triggers.SimpleEvents.IncomingDataEvent - alias Astarte.Core.Triggers.SimpleEvents.PathRemovedEvent - alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent - alias Astarte.Core.Triggers.SimpleEvents.ValueChangeAppliedEvent - alias Astarte.Core.Triggers.SimpleEvents.IncomingIntrospectionEvent - alias Astarte.Core.Triggers.SimpleEvents.InterfaceAddedEvent - alias Astarte.Core.Triggers.SimpleEvents.InterfaceRemovedEvent - alias Astarte.Core.Triggers.SimpleEvents.InterfaceMinorUpdatedEvent - alias Astarte.Core.Triggers.SimpleEvents.InterfaceVersion - alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget - alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger - alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DeviceTrigger - alias Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer - alias Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer - alias Astarte.DataAccess.Devices.Device, as: DeviceSchema - alias Astarte.DataAccess.Realms.Realm - alias Astarte.DataAccess.Realms.IndividualDatastream - alias Astarte.DataAccess.Realms.IndividualProperty - alias Astarte.DataAccess.Realms.Interface - alias Astarte.DataUpdaterPlant.AMQPTestHelper - alias Astarte.DataUpdaterPlant.DatabaseTestHelper - alias Astarte.DataUpdaterPlant.DataUpdater - alias Astarte.DataAccess.Repo - alias Astarte.Core.CQLUtils - - setup :verify_on_exit! - - setup_all do - realm_string = "autotestrealm#{System.unique_integer([:positive])}" - {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) - - on_exit(fn -> - DatabaseTestHelper.destroy_local_test_keyspace(realm_string) - end) - - # Need to be an atom because it's the name we are starting our helper with - helper_name = String.to_atom("helper_#{realm_string}") - - consumer_name = String.to_atom("consumer_#{realm_string}") - - realm = String.to_atom(realm_string) - - {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm) - - {:ok, _consumer_pid} = - AMQPTestHelper.start_events_consumer( - name: consumer_name, - realm: realm_string, - helper_name: helper_name - ) - - {:ok, %{realm: realm_string, helper_name: helper_name}} - end - - test "simple flow", %{realm: realm, helper_name: helper_name} do - AMQPTestHelper.clean_queue(helper_name) - - keyspace_name = Realm.keyspace_name(realm) - encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" - {:ok, device_id} = Device.decode_device_id(encoded_device_id) - - received_msgs = 45000 - received_bytes = 4_500_000 - existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} - existing_introspection_string = "com.test.LCDMonitor:1:0;com.test.SimpleStreamTest:1:0" - - existing_introspection_proto_map = %{ - "com.test.LCDMonitor" => %InterfaceVersion{major: 1, minor: 0}, - "com.test.SimpleStreamTest" => %InterfaceVersion{major: 1, minor: 0} - } - - insert_opts = [ - introspection: existing_introspection_map, - total_received_msgs: received_msgs, - total_received_bytes: received_bytes, - groups: ["group1"] - ] - - DatabaseTestHelper.insert_device(realm, device_id, insert_opts) - - # Install a volatile device test trigger - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_event_type: :DEVICE_CONNECTED - } - } - } - |> SimpleTriggerContainer.encode() - - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() - - volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - volatile_trigger_id = :crypto.strong_rand_bytes(16) - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - device_id, - 1, - volatile_trigger_parent_id, - volatile_trigger_id, - simple_trigger_data, - trigger_target_data - ) == :ok - - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - volatile_trigger_id - ) == :ok - - timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) - - DataUpdater.handle_connection( - realm, - encoded_device_id, - "10.0.0.1", - gen_tracking_id(), - timestamp_us_x_10 - ) - - DataUpdater.dump_state(realm, encoded_device_id) - {conn_event, conn_headers, _metadata} = AMQPTestHelper.wait_and_get_message(helper_name) - assert conn_headers["x_astarte_event_type"] == "device_connected_event" - assert conn_headers["x_astarte_realm"] == realm - assert conn_headers["x_astarte_device_id"] == encoded_device_id - - assert :uuid.string_to_uuid(conn_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() - - assert :uuid.string_to_uuid(conn_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.group1_device_connected_trigger_id() - - assert SimpleEvent.decode(conn_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :device_connected_event, - %DeviceConnectedEvent{ - device_ip_address: "10.0.0.1" - } - }, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.group1_device_connected_trigger_id() - } - - {conn_event, conn_headers, _metadata} = AMQPTestHelper.wait_and_get_message(helper_name) - assert conn_headers["x_astarte_event_type"] == "device_connected_event" - assert conn_headers["x_astarte_realm"] == realm - assert conn_headers["x_astarte_device_id"] == encoded_device_id - - assert :uuid.string_to_uuid(conn_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() - - assert :uuid.string_to_uuid(conn_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.device_connected_trigger_id() - - assert SimpleEvent.decode(conn_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :device_connected_event, - %DeviceConnectedEvent{ - device_ip_address: "10.0.0.1" - } - }, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.device_connected_trigger_id() - } - - device_query = - from d in DeviceSchema, - prefix: ^keyspace_name, - where: d.device_id == ^device_id, - select: %{ - connected: d.connected, - total_received_msgs: d.total_received_msgs, - total_received_bytes: d.total_received_bytes, - exchanged_msgs_by_interface: d.exchanged_msgs_by_interface, - exchanged_bytes_by_interface: d.exchanged_bytes_by_interface - } - - device_row = Repo.one(device_query) - - assert device_row == %{ - connected: true, - total_received_msgs: 45000, - total_received_bytes: 4_500_000, - exchanged_msgs_by_interface: %{}, - exchanged_bytes_by_interface: %{} - } - - # Introspection sub-test - device_introspection_query = - from d in DeviceSchema, - prefix: ^keyspace_name, - where: d.device_id == ^device_id, - select: d.introspection - - ^existing_introspection_map = Repo.one(device_introspection_query) - - # Install a volatile incoming introspection test trigger - incoming_introspection_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_id: encoded_device_id, - device_event_type: :INCOMING_INTROSPECTION - } - } - } - |> SimpleTriggerContainer.encode() - - incoming_introspection_trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() - - incoming_introspection_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - incoming_introspection_volatile_trigger_id = :crypto.strong_rand_bytes(16) - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - incoming_introspection_volatile_trigger_parent_id, - incoming_introspection_volatile_trigger_id, - incoming_introspection_trigger_data, - incoming_introspection_trigger_target_data - ) == :ok - - DataUpdater.handle_introspection( - realm, - encoded_device_id, - existing_introspection_string, - gen_tracking_id(), - make_timestamp("2017-10-09T14:00:32+00:00") - ) - - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "incoming_introspection_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - incoming_introspection_volatile_trigger_parent_id - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - incoming_introspection_volatile_trigger_id - - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :incoming_introspection_event, - %IncomingIntrospectionEvent{ - introspection_map: existing_introspection_proto_map - } - }, - timestamp: timestamp_ms, - parent_trigger_id: incoming_introspection_volatile_trigger_parent_id, - realm: realm, - simple_trigger_id: incoming_introspection_volatile_trigger_id - } - - # Remove the incoming introspection trigger, don't curse next tests - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - incoming_introspection_volatile_trigger_id - ) == :ok - - # Install a volatile interface added introspection test trigger - interface_added_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_id: encoded_device_id, - device_event_type: :INTERFACE_ADDED, - interface_name: "*" - } - } - } - |> SimpleTriggerContainer.encode() - - interface_added_trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() - - interface_added_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - interface_added_volatile_trigger_id = :crypto.strong_rand_bytes(16) - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - interface_added_volatile_trigger_parent_id, - interface_added_volatile_trigger_id, - interface_added_trigger_data, - interface_added_trigger_target_data - ) == :ok - - new_introspection = existing_introspection_string <> ";com.test.YetAnother:1:0" - - DataUpdater.handle_introspection( - realm, - encoded_device_id, - new_introspection, - gen_tracking_id(), - make_timestamp("2017-10-09T14:00:32+00:00") - ) - - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "interface_added_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - interface_added_volatile_trigger_parent_id - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - interface_added_volatile_trigger_id - - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :interface_added_event, - %InterfaceAddedEvent{ - interface: "com.test.YetAnother", - major_version: 1, - minor_version: 0 - } - }, - timestamp: timestamp_ms, - parent_trigger_id: interface_added_volatile_trigger_parent_id, - realm: realm, - simple_trigger_id: interface_added_volatile_trigger_id - } - - # Remove the interface added trigger, don't curse next tests - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - interface_added_volatile_trigger_id - ) == :ok - - # Install a volatile interface minor updated introspection test trigger - interface_minor_updated_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_id: encoded_device_id, - device_event_type: :INTERFACE_MINOR_UPDATED, - interface_name: "com.test.YetAnother", - interface_major: 1 - } - } - } - |> SimpleTriggerContainer.encode() - - interface_minor_updated_trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() - - interface_minor_updated_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - interface_minor_updated_volatile_trigger_id = :crypto.strong_rand_bytes(16) - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - interface_minor_updated_volatile_trigger_parent_id, - interface_minor_updated_volatile_trigger_id, - interface_minor_updated_trigger_data, - interface_minor_updated_trigger_target_data - ) == :ok - - new_introspection = existing_introspection_string <> ";com.test.YetAnother:1:1" - - DataUpdater.handle_introspection( - realm, - encoded_device_id, - new_introspection, - gen_tracking_id(), - make_timestamp("2017-10-09T14:00:32+00:00") - ) - - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "interface_minor_updated_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - interface_minor_updated_volatile_trigger_parent_id - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - interface_minor_updated_volatile_trigger_id - - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :interface_minor_updated_event, - %InterfaceMinorUpdatedEvent{ - interface: "com.test.YetAnother", - major_version: 1, - old_minor_version: 0, - new_minor_version: 1 - } - }, - timestamp: timestamp_ms, - parent_trigger_id: interface_minor_updated_volatile_trigger_parent_id, - realm: realm, - simple_trigger_id: interface_minor_updated_volatile_trigger_id - } - - # Remove the interface minor updated trigger, don't curse next tests - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - interface_minor_updated_volatile_trigger_id - ) == :ok - - # Install a volatile interface removed introspection test trigger - interface_removed_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :device_trigger, - %DeviceTrigger{ - device_id: encoded_device_id, - device_event_type: :INTERFACE_REMOVED, - interface_name: "*" - } - } - } - |> SimpleTriggerContainer.encode() - - interface_removed_trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() - - interface_removed_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - interface_removed_volatile_trigger_id = :crypto.strong_rand_bytes(16) - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - interface_removed_volatile_trigger_parent_id, - interface_removed_volatile_trigger_id, - interface_removed_trigger_data, - interface_removed_trigger_target_data - ) == :ok - - DataUpdater.handle_introspection( - realm, - encoded_device_id, - existing_introspection_string, - gen_tracking_id(), - make_timestamp("2017-10-09T14:00:32+00:00") - ) - - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "interface_removed_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - interface_removed_volatile_trigger_parent_id - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - interface_removed_volatile_trigger_id - - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :interface_removed_event, - %InterfaceRemovedEvent{ - interface: "com.test.YetAnother", - major_version: 1 - } - }, - timestamp: timestamp_ms, - parent_trigger_id: interface_removed_volatile_trigger_parent_id, - realm: realm, - simple_trigger_id: interface_removed_volatile_trigger_id - } - - # Remove the interface removed trigger, don't curse next tests - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - interface_removed_volatile_trigger_id - ) == :ok - - DataUpdater.dump_state(realm, encoded_device_id) - - device_introspection = Repo.one(device_introspection_query) - - assert existing_introspection_map == device_introspection - - # Install a volatile test trigger - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %DataTrigger{ - interface_name: "com.test.SimpleStreamTest", - interface_major: 1, - data_trigger_type: :INCOMING_DATA, - match_path: "/0/value", - value_match_operator: :LESS_THAN, - known_value: Cyanide.encode!(%{v: 100}) - } - } - } - |> SimpleTriggerContainer.encode() - - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() - - volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - volatile_trigger_id = :crypto.strong_rand_bytes(16) - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - volatile_trigger_parent_id, - volatile_trigger_id, - simple_trigger_data, - trigger_target_data - ) == :ok - - # Install a volatile test trigger that won't match, to check that multiple triggers - # for a single interface/endpoint are correctly loaded - non_matching_simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %DataTrigger{ - interface_name: "com.test.SimpleStreamTest", - interface_major: 1, - data_trigger_type: :INCOMING_DATA, - match_path: "/0/value", - value_match_operator: :GREATER_THAN, - known_value: Cyanide.encode!(%{v: 1000}) - } - } - } - |> SimpleTriggerContainer.encode() - - non_matching_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) - non_matching_volatile_trigger_id = :crypto.strong_rand_bytes(16) - - # Install the non-matching trigger twice to check that this installs 2 trigger_targets - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - non_matching_volatile_trigger_parent_id, - non_matching_volatile_trigger_id, - non_matching_simple_trigger_data, - trigger_target_data - ) == :ok - - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), - 2, - non_matching_volatile_trigger_parent_id, - non_matching_volatile_trigger_id, - non_matching_simple_trigger_data, - trigger_target_data - ) == :ok - - # Incoming data sub-test - timestamp_us_x_10 = make_timestamp("2017-10-09T14:10:31+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/weekSchedule/3/start", - Cyanide.encode!(%{"v" => 1}), - gen_tracking_id(), - timestamp_us_x_10 - ) + use Astarte.Cases.Data + use Astarte.Cases.AMQP - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "incoming_data_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.less_than_device_incoming_trigger_id() + import Mox + import Astarte.Helpers.DataUpdater - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :incoming_data_event, - %IncomingDataEvent{ - bson_value: Cyanide.encode!(%{"v" => 1}), - interface: "com.test.LCDMonitor", - path: "/weekSchedule/3/start" - } - }, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.less_than_device_incoming_trigger_id() - } + import Ecto.Query - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/weekSchedule/4/start", - Cyanide.encode!(%{"v" => 3}), - gen_tracking_id(), - timestamp_us_x_10 - ) + alias Astarte.Core.Device + alias Astarte.Core.Triggers.SimpleEvents.DeviceConnectedEvent + alias Astarte.Core.Triggers.SimpleEvents.DeviceDisconnectedEvent + alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DeviceTrigger + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer + alias Astarte.DataAccess.Devices.Device, as: DeviceSchema + alias Astarte.DataAccess.Realms.Realm + alias Astarte.DataUpdaterPlant.AMQPTestHelper + alias Astarte.DataUpdaterPlant.DatabaseTestHelper + alias Astarte.DataUpdaterPlant.DataUpdater + alias Astarte.DataAccess.Repo - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "incoming_data_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm + setup :verify_on_exit! - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() + setup_all %{realm_name: realm_name} do + encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" + {:ok, device_id} = Device.decode_device_id(encoded_device_id) + received_msgs = 45000 + received_bytes = 4_500_000 + existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.equal_to_group_incoming_trigger_id() + insert_opts = [ + introspection: existing_introspection_map, + total_received_msgs: received_msgs, + total_received_bytes: received_bytes, + groups: ["group1"] + ] - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :incoming_data_event, - %IncomingDataEvent{ - bson_value: Cyanide.encode!(%{"v" => 3}), - interface: "com.test.LCDMonitor", - path: "/weekSchedule/4/start" - } - }, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.equal_to_group_incoming_trigger_id() - } + DatabaseTestHelper.insert_device(realm_name, device_id, insert_opts) + test_process = self() - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/time/from", - Cyanide.encode!(%{"v" => 9000}), - gen_tracking_id(), - make_timestamp("2017-10-09T14:10:32+00:00") - ) + Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock + |> Mox.stub(:delete, fn %{realm_name: ^realm_name, device_id: ^encoded_device_id} -> + send(test_process, :data_updater_message_received) + :ok + end) - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/weekSchedule/9/start", - Cyanide.encode!(%{"v" => 9}), - gen_tracking_id(), - make_timestamp("2017-10-09T14:10:32+00:00") - ) + setup_data_updater(realm_name, encoded_device_id) - # Install a volatile value change applied test trigger - simple_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %DataTrigger{ - interface_name: "com.test.LCDMonitor", - interface_major: 1, - data_trigger_type: :VALUE_CHANGE_APPLIED, - match_path: "/weekSchedule/10/start", - value_match_operator: :ANY - } - } - } - |> SimpleTriggerContainer.encode() + %{ + device_id: device_id, + encoded_device_id: encoded_device_id, + received_msgs: received_msgs, + received_bytes: received_bytes + } + end - trigger_target_data = - %TriggerTargetContainer{ - trigger_target: { - :amqp_trigger_target, - %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) - } - } - } - |> TriggerTargetContainer.encode() + test "empty introspection is updated correctly", %{ + realm: realm, + amqp_consumer: amqp_consumer, + test_id: test_id + } do + AMQPTestHelper.clean_queue(amqp_consumer) - volatile_changed_trigger_parent_id = :crypto.strong_rand_bytes(16) - volatile_changed_trigger_id = :crypto.strong_rand_bytes(16) + keyspace_name = Realm.keyspace_name(realm) - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051"), - 2, - volatile_changed_trigger_parent_id, - volatile_changed_trigger_id, - simple_trigger_data, - trigger_target_data - ) == :ok + encoded_device_id = + :crypto.strong_rand_bytes(16) + |> Base.url_encode64(padding: false) - bad_trigger_data = - %SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %DataTrigger{ - interface_name: "com.missing.Interface", - interface_major: 1, - data_trigger_type: :VALUE_CHANGE_APPLIED, - match_path: "/test", - value_match_operator: :ANY - } - } - } - |> SimpleTriggerContainer.encode() + {:ok, device_id} = Device.decode_device_id(encoded_device_id) + new_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} + new_introspection_string = "com.test.LCDMonitor:1:0;com.test.SimpleStreamTest:1:0" - bad_trigger_parent_id = :crypto.strong_rand_bytes(16) - bad_trigger_id = :crypto.strong_rand_bytes(16) + DatabaseTestHelper.insert_device(realm, device_id, groups: ["group2"]) + setup_data_updater(realm, encoded_device_id) - assert DataUpdater.handle_install_volatile_trigger( - realm, - encoded_device_id, - :uuid.string_to_uuid("badb93a5-842e-bbad-2e4d-d20306838051"), - 2, - bad_trigger_parent_id, - bad_trigger_id, - bad_trigger_data, - trigger_target_data - ) == {:error, :interface_not_found} + volatile_parent_id = DatabaseTestHelper.fake_parent_trigger_id() + volatile_trigger_id = DatabaseTestHelper.group2_device_connected_trigger_id() - bad_path_trigger_data = + volatile_trigger_data = %SimpleTriggerContainer{ - simple_trigger: { - :data_trigger, - %DataTrigger{ - interface_name: "com.test.LCDMonitor", - interface_major: 1, - data_trigger_type: :VALUE_CHANGE_APPLIED, - match_path: "/weekSchedule/10", - value_match_operator: :ANY - } - } + simple_trigger: {:device_trigger, %DeviceTrigger{device_event_type: :DEVICE_CONNECTED}} } |> SimpleTriggerContainer.encode() - bad_path_trigger_parent_id = :crypto.strong_rand_bytes(16) - bad_path_trigger_id = :crypto.strong_rand_bytes(16) + volatile_target_data = generate_trigger_target(test_id) assert DataUpdater.handle_install_volatile_trigger( realm, encoded_device_id, - :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051"), - 2, - bad_path_trigger_parent_id, - bad_path_trigger_id, - bad_path_trigger_data, - trigger_target_data - ) == {:error, :invalid_match_path} - - timestamp_us_x_10 = make_timestamp("2017-10-09T14:10:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/weekSchedule/10/start", - Cyanide.encode!(%{"v" => 10}), - gen_tracking_id(), - timestamp_us_x_10 - ) - - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "incoming_data_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.greater_than_incoming_trigger_id() - - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :incoming_data_event, - %IncomingDataEvent{ - bson_value: Cyanide.encode!(%{"v" => 10}), - interface: "com.test.LCDMonitor", - path: "/weekSchedule/10/start" - } - }, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.greater_than_incoming_trigger_id() - } - - {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert incoming_headers["x_astarte_event_type"] == "value_change_applied_event" - assert incoming_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == - volatile_changed_trigger_parent_id - - assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == - volatile_changed_trigger_id - - assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: { - :value_change_applied_event, - %ValueChangeAppliedEvent{ - old_bson_value: Cyanide.encode!(%{"v" => 42}), - new_bson_value: Cyanide.encode!(%{"v" => 10}), - interface: "com.test.LCDMonitor", - path: "/weekSchedule/10/start" - } - }, - timestamp: timestamp_ms, - parent_trigger_id: volatile_changed_trigger_parent_id, - realm: realm, - simple_trigger_id: volatile_changed_trigger_id - } - - timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) - - # This should trigger matching_simple_trigger - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.SimpleStreamTest", - "/0/value", - Cyanide.encode!(%{"v" => 5}), - gen_tracking_id(), - timestamp_us_x_10 - ) - - state = DataUpdater.dump_state(realm, encoded_device_id) - - {incoming_volatile_event, incoming_volatile_headers, _meta} = - AMQPTestHelper.wait_and_get_message(helper_name) - - assert incoming_volatile_headers["x_astarte_event_type"] == "incoming_data_event" - assert incoming_volatile_headers["x_astarte_device_id"] == encoded_device_id - assert incoming_volatile_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(incoming_volatile_headers["x_astarte_parent_trigger_id"]) == - volatile_trigger_parent_id - - assert :uuid.string_to_uuid(incoming_volatile_headers["x_astarte_simple_trigger_id"]) == - volatile_trigger_id - - assert SimpleEvent.decode(incoming_volatile_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: - {:incoming_data_event, - %IncomingDataEvent{ - bson_value: Cyanide.encode!(%{"v" => 5}), - interface: "com.test.SimpleStreamTest", - path: "/0/value" - }}, - timestamp: timestamp_ms, - parent_trigger_id: volatile_trigger_parent_id, - realm: realm, - simple_trigger_id: volatile_trigger_id - } - - # We check that all 3 on_incoming_data triggers were correctly installed - interface_id = CQLUtils.interface_id("com.test.SimpleStreamTest", 1) - endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") - trigger_key = {:on_incoming_data, interface_id, endpoint_id} - incoming_data_0_value_triggers = Map.get(state.data_triggers, trigger_key) - - # The length is 2 since greater-then triggers are merged into one because they are congruent - assert length(incoming_data_0_value_triggers) == 2 - # Extract greater-than trigger - assert [gt_trigger] = - Enum.filter(incoming_data_0_value_triggers, fn data_trigger -> - data_trigger.value_match_operator == :GREATER_THAN - end) - - # It should have 2 targets - assert length(gt_trigger.trigger_targets) == 2 - - endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") - - value_query = - from ip in IndividualProperty, - prefix: ^keyspace_name, - where: - ip.device_id == ^device_id and - ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and - ip.endpoint_id == ^endpoint_id and - ip.path == "/time/from", - select: ip.longinteger_value - - value = Repo.one(value_query) - - assert value == 9000 - - endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") - - timestamp_ms = DateTime.from_unix!(1_507_557_632_000, :millisecond) - - value_query = - from id in IndividualDatastream, - prefix: ^keyspace_name, - where: - id.device_id == ^device_id and - id.interface_id == ^CQLUtils.interface_id("com.test.SimpleStreamTest", 1) and - id.endpoint_id == ^endpoint_id and - id.path == "/0/value" and - id.value_timestamp >= ^timestamp_ms, - select: id.integer_value - - value = Repo.one(value_query) - - assert value == 5 - - assert DataUpdater.handle_delete_volatile_trigger( - realm, - encoded_device_id, - volatile_trigger_id + device_id, + 1, + volatile_parent_id, + volatile_trigger_id, + volatile_trigger_data, + volatile_target_data ) == :ok - timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") - - # Introspection change subtest - DataUpdater.handle_introspection( - realm, - encoded_device_id, - "com.test.LCDMonitor:1:0;com.example.TestObject:1:5;com.test.SimpleStreamTest:1:0", - gen_tracking_id(), - timestamp_us_x_10 - ) - - # Incoming object aggregation subtest - payload0 = Cyanide.encode!(%{"value" => 1.9, "string" => "Astarteです"}) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.example.TestObject", - "/", - payload0, - gen_tracking_id(), - make_timestamp("2017-10-26T08:48:49+00:00") - ) - - payload1 = Cyanide.encode!(%{"string" => "Hello World');"}) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.example.TestObject", - "/", - payload1, - gen_tracking_id(), - make_timestamp("2017-10-26T08:48:50+00:00") - ) - - payload2 = Cyanide.encode!(%{"v" => %{"value" => 0.0}}) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.example.TestObject", - "/", - payload2, - gen_tracking_id(), - make_timestamp("2017-10-26T08:48:51+00:00") - ) - - # we expect only /string to be updated here, we need this to check against accidental NULL insertions, that are bad for tombstones on cassandra. - payload3 = Cyanide.encode!(%{"string" => "zzz"}) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.example.TestObject", - "/", - payload3, - gen_tracking_id(), - make_timestamp("2017-09-30T07:13:00+00:00") - ) - - payload4 = Cyanide.encode!(%{}) - - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.example.TestObject", - "/", - payload4, - gen_tracking_id(), - make_timestamp("2017-10-30T07:13:00+00:00") - ) - - DataUpdater.dump_state(realm, encoded_device_id) - - objects_query = - from o in "com_example_testobject_v1", - prefix: ^realm, - where: o.device_id == ^device_id and o.path == "/", - select: [ - device_id: o.device_id, - path: o.path, - reception_timestamp: fragment("toUnixTimestamp(?)", o.reception_timestamp), - reception_timestamp_submillis: o.reception_timestamp_submillis, - v_string: o.v_string, - v_value: o.v_value - ] - - objects = Repo.all(objects_query) - - assert objects == [ - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_506_755_400_000, - reception_timestamp_submillis: 0, - v_string: "aaa", - v_value: 1.1 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_506_755_520_000, - reception_timestamp_submillis: 0, - v_string: "bbb", - v_value: 2.2 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_506_755_580_000, - reception_timestamp_submillis: 0, - v_string: "zzz", - v_value: 3.3 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_007_729_000, - reception_timestamp_submillis: 0, - v_string: "Astarteです", - v_value: 1.9 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_007_730_000, - reception_timestamp_submillis: 0, - v_string: "Hello World');", - v_value: nil - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_007_731_000, - reception_timestamp_submillis: 0, - v_string: nil, - v_value: 0.0 - ], - [ - device_id: device_id, - path: "/", - reception_timestamp: 1_509_347_580_000, - reception_timestamp_submillis: 0, - v_string: nil, - v_value: nil - ] - ] - end - - test "empty introspection is updated correctly", %{realm: realm, helper_name: helper_name} do - AMQPTestHelper.clean_queue(helper_name) - - keyspace_name = Realm.keyspace_name(realm) - - encoded_device_id = - :crypto.strong_rand_bytes(16) - |> Base.url_encode64(padding: false) - - {:ok, device_id} = Device.decode_device_id(encoded_device_id) - new_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} - new_introspection_string = "com.test.LCDMonitor:1:0;com.test.SimpleStreamTest:1:0" - - DatabaseTestHelper.insert_device(realm, device_id, groups: ["group2"]) - timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") timestamp_ms = div(timestamp_us_x_10, 10_000) @@ -1211,7 +130,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DataUpdater.dump_state(realm, encoded_device_id) - {conn_event, conn_headers, _metadata} = AMQPTestHelper.wait_and_get_message(helper_name) + {conn_event, conn_headers, _metadata} = AMQPTestHelper.wait_and_get_message(amqp_consumer) assert conn_headers["x_astarte_event_type"] == "device_connected_event" assert conn_headers["x_astarte_realm"] == realm assert conn_headers["x_astarte_device_id"] == encoded_device_id @@ -1224,16 +143,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do assert SimpleEvent.decode(conn_event) == %SimpleEvent{ device_id: encoded_device_id, - event: { - :device_connected_event, - %DeviceConnectedEvent{ - device_ip_address: "10.0.0.1" - } - }, + event: + {:device_connected_event, %DeviceConnectedEvent{device_ip_address: "10.0.0.1"}}, timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + parent_trigger_id: volatile_parent_id, realm: realm, - simple_trigger_id: DatabaseTestHelper.group2_device_connected_trigger_id() + simple_trigger_id: volatile_trigger_id } device_introspection_query = @@ -1262,11 +177,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do assert new_device_introspection == new_introspection_map - assert AMQPTestHelper.awaiting_messages_count(helper_name) == 0 + assert AMQPTestHelper.awaiting_messages_count(amqp_consumer) == 0 end - test "test introspection with interface update", %{realm: realm, helper_name: helper_name} do - AMQPTestHelper.clean_queue(helper_name) + test "test introspection with interface update", %{ + realm: realm, + amqp_consumer: amqp_consumer + } do + AMQPTestHelper.clean_queue(amqp_consumer) encoded_device_id = :crypto.strong_rand_bytes(16) @@ -1275,6 +193,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do {:ok, device_id} = Device.decode_device_id(encoded_device_id) DatabaseTestHelper.insert_device(realm, device_id) + setup_data_updater(realm, encoded_device_id) DataUpdater.handle_connection( realm, @@ -1331,9 +250,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do test "fails to install volatile trigger on missing device", %{ realm: realm, - helper_name: helper_name + amqp_consumer: amqp_consumer, + test_id: test_id } do - AMQPTestHelper.clean_queue(helper_name) + AMQPTestHelper.clean_queue(amqp_consumer) # Install a volatile device test trigger simple_trigger_data = @@ -1352,7 +272,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do trigger_target: { :amqp_trigger_target, %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key() + routing_key: AMQPTestHelper.events_routing_key(test_id) } } } @@ -1378,9 +298,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do test "fails to delete volatile trigger on missing device", %{ realm: realm, - helper_name: helper_name + amqp_consumer: amqp_consumer } do - AMQPTestHelper.clean_queue(helper_name) + AMQPTestHelper.clean_queue(amqp_consumer) volatile_trigger_id = :crypto.strong_rand_bytes(16) @@ -1396,11 +316,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do test "heartbeat message of type internal is correctly handled", %{ realm: realm, - helper_name: helper_name + amqp_consumer: amqp_consumer } do alias Astarte.DataUpdaterPlant.DataUpdater.State - AMQPTestHelper.clean_queue(helper_name) + AMQPTestHelper.clean_queue(amqp_consumer) encoded_device_id = :crypto.strong_rand_bytes(16) @@ -1409,6 +329,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do {:ok, device_id} = Device.decode_device_id(encoded_device_id) DatabaseTestHelper.insert_device(realm, device_id) + setup_data_updater(realm, encoded_device_id) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") @@ -1439,11 +360,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do # TODO remove this when all heartbeats will be moved to internal test "heartbeat message of type heartbeat is correctly handled", %{ realm: realm, - helper_name: helper_name + amqp_consumer: amqp_consumer } do alias Astarte.DataUpdaterPlant.DataUpdater.State - AMQPTestHelper.clean_queue(helper_name) + AMQPTestHelper.clean_queue(amqp_consumer) encoded_device_id = :crypto.strong_rand_bytes(16) @@ -1452,6 +373,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do {:ok, device_id} = Device.decode_device_id(encoded_device_id) DatabaseTestHelper.insert_device(realm, device_id) + setup_data_updater(realm, encoded_device_id) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") @@ -1481,9 +403,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do test "a disconnected device does not generate a disconnection trigger", %{ realm: realm, - helper_name: helper_name + amqp_consumer: amqp_consumer, + test_id: test_id } do - AMQPTestHelper.clean_queue(helper_name) + AMQPTestHelper.clean_queue(amqp_consumer) encoded_device_id = :crypto.strong_rand_bytes(16) @@ -1492,6 +415,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do {:ok, device_id} = Device.decode_device_id(encoded_device_id) DatabaseTestHelper.insert_device(realm, device_id) + setup_data_updater(realm, encoded_device_id) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") timestamp_ms = div(timestamp_us_x_10, 10_000) @@ -1507,7 +431,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do volatile_trigger_parent_id, volatile_trigger_id, generate_disconnection_trigger_data(), - generate_trigger_target(realm) + generate_trigger_target(test_id) ) == :ok DataUpdater.handle_disconnection( @@ -1518,7 +442,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) # Receive the first disconnection trigger - {event, headers, _metadata} = AMQPTestHelper.wait_and_get_message(helper_name) + {event, headers, _metadata} = AMQPTestHelper.wait_and_get_message(amqp_consumer) assert headers["x_astarte_event_type"] == "device_disconnected_event" assert headers["x_astarte_realm"] == realm assert headers["x_astarte_device_id"] == encoded_device_id @@ -1548,7 +472,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) # The second disconnection trigger is not sent - assert AMQPTestHelper.awaiting_messages_count(helper_name) == 0 + assert AMQPTestHelper.awaiting_messages_count(amqp_consumer) == 0 end defp generate_disconnection_trigger_data() do @@ -1563,55 +487,20 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do |> SimpleTriggerContainer.encode() end - defp generate_trigger_target() do - generate_trigger_target(nil) - end + # defp generate_trigger_target() do + # generate_trigger_target(nil) + # end - defp generate_trigger_target(realm) do + defp generate_trigger_target(id) do %TriggerTargetContainer{ trigger_target: { :amqp_trigger_target, %AMQPTriggerTarget{ - routing_key: AMQPTestHelper.events_routing_key(realm), - exchange: AMQPTestHelper.events_exchange_name(realm) + routing_key: AMQPTestHelper.events_routing_key(id), + exchange: AMQPTestHelper.events_exchange_name(id) } } } |> TriggerTargetContainer.encode() end - - defp retrieve_endpoint_id(realm_name, interface_name, interface_major, path) do - keyspace_name = Realm.keyspace_name(realm_name) - - query = - from i in Interface, - prefix: ^keyspace_name, - where: i.name == ^interface_name and i.major_version == ^interface_major, - select: %{ - automaton_transitions: i.automaton_transitions, - automaton_accepting_states: i.automaton_accepting_states - } - - interface_row = Repo.one!(query) - - automaton = - {:erlang.binary_to_term(interface_row[:automaton_transitions]), - :erlang.binary_to_term(interface_row[:automaton_accepting_states])} - - {:ok, endpoint_id} = Astarte.Core.Mapping.EndpointsAutomaton.resolve_path(path, automaton) - - endpoint_id - end - - defp make_timestamp(timestamp_string) do - {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) - - DateTime.to_unix(date_time, :millisecond) * 10000 - end - - defp gen_tracking_id() do - message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() - delivery_tag = {:injected_msg, make_ref()} - {message_id, delivery_tag} - end end diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/device_disconnection_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/device_disconnection_test.exs index 506425c872..ff2ef5372d 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/device_disconnection_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/device_disconnection_test.exs @@ -17,8 +17,11 @@ # defmodule Astarte.DataUpdaterPlant.DeviceDisconnectionTest do - use ExUnit.Case, async: true + use Astarte.Cases.Data, async: true + use Astarte.Cases.AMQP + import Mox + import Astarte.Helpers.DataUpdater alias Astarte.DataUpdaterPlant.DatabaseTestHelper alias Astarte.Core.Device @@ -32,37 +35,9 @@ defmodule Astarte.DataUpdaterPlant.DeviceDisconnectionTest do setup :verify_on_exit! - setup do - realm_string = "autotestrealm#{System.unique_integer([:positive])}" - {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) - - on_exit(fn -> - DatabaseTestHelper.destroy_local_test_keyspace(realm_string) - end) - - helper_name = String.to_atom("helper_#{realm_string}") - - consumer_name = String.to_atom("consumer_#{realm_string}") - - {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm_string) - - {:ok, _consumer_pid} = - AMQPTestHelper.start_events_consumer( - name: consumer_name, - realm: realm_string, - helper_name: helper_name - ) - - {:ok, %{realm: realm_string, helper_name: helper_name}} - end - - test "device disconnection test", %{realm: realm, helper_name: helper_name} do - AMQPTestHelper.clean_queue(helper_name) - - keyspace_name = Realm.keyspace_name(realm) + setup_all %{realm_name: realm_name} do encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" {:ok, device_id} = Device.decode_device_id(encoded_device_id) - received_msgs = 45000 received_bytes = 4_500_000 existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} @@ -74,7 +49,28 @@ defmodule Astarte.DataUpdaterPlant.DeviceDisconnectionTest do groups: ["group1"] ] - DatabaseTestHelper.insert_device(realm, device_id, insert_opts) + DatabaseTestHelper.insert_device(realm_name, device_id, insert_opts) + setup_data_updater(realm_name, encoded_device_id) + + %{ + device_id: device_id, + encoded_device_id: encoded_device_id, + received_msgs: received_msgs, + received_bytes: received_bytes + } + end + + test "device disconnection test", context do + %{ + realm: realm, + amqp_consumer: amqp_consumer, + device_id: device_id, + encoded_device_id: encoded_device_id, + received_msgs: received_msgs, + received_bytes: received_bytes + } = context + + keyspace_name = Realm.keyspace_name(realm) DataUpdater.handle_disconnection( realm, @@ -107,18 +103,6 @@ defmodule Astarte.DataUpdaterPlant.DeviceDisconnectionTest do exchanged_bytes_by_interface: %{} } - assert AMQPTestHelper.awaiting_messages_count(helper_name) == 0 - end - - defp gen_tracking_id() do - message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() - delivery_tag = {:injected_msg, make_ref()} - {message_id, delivery_tag} - end - - defp make_timestamp(timestamp_string) do - {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) - - DateTime.to_unix(date_time, :millisecond) * 10000 + assert AMQPTestHelper.awaiting_messages_count(amqp_consumer) == 0 end end diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_device_delete_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_device_delete_test.exs index 7048a4cfd3..1c55b5ab11 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_device_delete_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_device_delete_test.exs @@ -17,189 +17,165 @@ # defmodule Astarte.DataUpdaterPlant.DeviceDeleteTest do - use ExUnit.Case - import Mox + use Astarte.Cases.Data, async: true + use Astarte.Cases.AMQP + + import Astarte.Helpers.DataUpdater + import Ecto.Query alias Astarte.Core.Device + alias Astarte.DataAccess.Devices.Device, as: DatabaseDevice alias Astarte.DataAccess.Realms.Realm - alias Astarte.DataUpdaterPlant.AMQPTestHelper + alias Astarte.DataAccess.Repo alias Astarte.DataUpdaterPlant.DatabaseTestHelper alias Astarte.DataUpdaterPlant.DataUpdater + alias Astarte.DataUpdaterPlant.DataUpdater.Queries + + describe "device deletion" do + setup %{realm: realm} do + device_id = Device.random_device_id() + encoded_device_id = Device.encode_device_id(device_id) + keyspace = Realm.keyspace_name(realm) + + on_exit(fn -> + DatabaseDevice + |> where([d], d.device_id == ^device_id) + |> Repo.safe_delete_all(prefix: keyspace) + end) - setup :verify_on_exit! - setup :set_mox_global - - setup do - realm_string = "autotestrealm#{System.unique_integer([:positive])}" - {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) - realm = String.to_atom(realm_string) - - on_exit(fn -> - DatabaseTestHelper.destroy_local_test_keyspace(realm_string) - end) - - helper_name = String.to_atom("helper_#{realm_string}") - - consumer_name = String.to_atom("consumer_#{realm_string}") - - {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm_string) - - {:ok, _consumer_pid} = - AMQPTestHelper.start_events_consumer( - name: consumer_name, - realm: realm_string, - helper_name: helper_name - ) - - {:ok, %{realm: realm_string, helper_name: helper_name}} - end - - test "device deletion is acked and related DataUpdater process stops", %{ - realm: realm, - helper_name: helper_name - } do - AMQPTestHelper.clean_queue(helper_name) - - encoded_device_id = - :crypto.strong_rand_bytes(16) - |> Base.url_encode64(padding: false) - - {:ok, device_id} = Device.decode_device_id(encoded_device_id) + DatabaseTestHelper.insert_device(realm, device_id) + test_process = self() - # Register the device with some fake data - total_received_messages = 42 - total_received_bytes = 4242 + Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock + |> Mox.stub(:delete, fn %{realm_name: ^realm, device_id: ^encoded_device_id} -> + send(test_process, :device_deletion_message_received) + :ok + end) - insert_opts = [ - total_received_msgs: total_received_messages, - total_received_bytes: total_received_bytes - ] + setup_data_updater(realm, encoded_device_id) - DatabaseTestHelper.insert_device(realm, device_id, insert_opts) + %{device_id: device_id, encoded_device_id: encoded_device_id} + end - # Set device deletion to in progress - deletion_in_progress_statement = """ - INSERT INTO #{Realm.keyspace_name(realm)}.deletion_in_progress (device_id) - VALUES (:device_id) - """ + test "device deletion is acked and related DataUpdater process stops", %{ + realm: realm, + device_id: device_id, + encoded_device_id: encoded_device_id + } do + %{ + total_received_bytes: total_received_bytes, + total_received_msgs: total_received_messages + } = Queries.retrieve_device_stats_and_introspection!(realm, device_id) - Xandra.Cluster.run(:xandra, fn conn -> - prepared = Xandra.prepare!(conn, deletion_in_progress_statement) + # Set device deletion to in progress + deletion_in_progress_statement = """ + INSERT INTO #{Realm.keyspace_name(realm)}.deletion_in_progress (device_id) + VALUES (:device_id) + """ - %Xandra.Void{} = - Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) - end) + Xandra.Cluster.run(:xandra, fn conn -> + prepared = Xandra.prepare!(conn, deletion_in_progress_statement) - timestamp_us_x_10 = make_timestamp("2017-10-09T15:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + %Xandra.Void{} = + Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) + end) - Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock - |> expect(:delete, fn %{realm_name: ^realm, device_id: ^encoded_device_id} -> - :ok - end) + timestamp_us_x_10 = make_timestamp("2017-10-09T15:00:32+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) - DataUpdater.start_device_deletion(realm, encoded_device_id, timestamp_ms) + DataUpdater.start_device_deletion(realm, encoded_device_id, timestamp_ms) - # Check DUP start ack in deleted_devices table - dup_start_ack_statement = """ - SELECT dup_start_ack - FROM #{Realm.keyspace_name(realm)}.deletion_in_progress - WHERE device_id = :device_id - """ + assert_receive :device_deletion_message_received - dup_start_ack_result = - Xandra.Cluster.run(:xandra, fn conn -> - prepared = Xandra.prepare!(conn, dup_start_ack_statement) + # Check DUP start ack in deleted_devices table + dup_start_ack_statement = """ + SELECT dup_start_ack + FROM #{Realm.keyspace_name(realm)}.deletion_in_progress + WHERE device_id = :device_id + """ - %Xandra.Page{} = - page = - Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) + dup_start_ack_result = + Xandra.Cluster.run(:xandra, fn conn -> + prepared = Xandra.prepare!(conn, dup_start_ack_statement) - Enum.to_list(page) - end) + %Xandra.Page{} = + page = + Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) - assert [%{"dup_start_ack" => true}] = dup_start_ack_result - - # Check that no data is being handled - DataUpdater.handle_data( - realm, - encoded_device_id, - "this.interface.does.not.Exist", - "/don/t/care", - :dontcare, - gen_tracking_id(), - make_timestamp("2017-10-09T14:30:15+00:00") - ) - - received_data_statement = """ - SELECT total_received_msgs, total_received_bytes - FROM #{Realm.keyspace_name(realm)}.devices WHERE device_id=:device_id; - """ - - received_data_result = - Xandra.Cluster.run(:xandra, fn conn -> - prepared = Xandra.prepare!(conn, received_data_statement) + Enum.to_list(page) + end) - %Xandra.Page{} = - page = - Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) + assert [%{"dup_start_ack" => true}] = dup_start_ack_result - Enum.to_list(page) - end) + # Check that no data is being handled + DataUpdater.handle_data( + realm, + encoded_device_id, + "this.interface.does.not.Exist", + "/don/t/care", + :dontcare, + gen_tracking_id(), + make_timestamp("2017-10-09T14:30:15+00:00") + ) - assert [ - %{ - "total_received_msgs" => ^total_received_messages, - "total_received_bytes" => ^total_received_bytes - } - ] = received_data_result - - # Now process the device's last message - DataUpdater.handle_internal( - realm, - encoded_device_id, - "/f", - :dontcare, - gen_tracking_id(), - timestamp_us_x_10 - ) - - # Let the process handle device's last message - Process.sleep(100) - - # Check DUP end ack in deleted_devices table - dup_end_ack_statement = """ - SELECT dup_end_ack - FROM #{Realm.keyspace_name(realm)}.deletion_in_progress - WHERE device_id = :device_id - """ - - dup_end_ack_result = - Xandra.Cluster.run(:xandra, fn conn -> - prepared = Xandra.prepare!(conn, dup_end_ack_statement) + received_data_statement = """ + SELECT total_received_msgs, total_received_bytes + FROM #{Realm.keyspace_name(realm)}.devices WHERE device_id=:device_id; + """ + + received_data_result = + Xandra.Cluster.run(:xandra, fn conn -> + prepared = Xandra.prepare!(conn, received_data_statement) + + %Xandra.Page{} = + page = + Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) + + Enum.to_list(page) + end) + + assert [ + %{ + "total_received_msgs" => ^total_received_messages, + "total_received_bytes" => ^total_received_bytes + } + ] = received_data_result + + # Now process the device's last message + DataUpdater.handle_internal( + realm, + encoded_device_id, + "/f", + :dontcare, + gen_tracking_id(), + timestamp_us_x_10 + ) - %Xandra.Page{} = - page = - Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) + # Let the process handle device's last message + Process.sleep(100) - Enum.to_list(page) - end) + # Check DUP end ack in deleted_devices table + dup_end_ack_statement = """ + SELECT dup_end_ack + FROM #{Realm.keyspace_name(realm)}.deletion_in_progress + WHERE device_id = :device_id + """ - assert [%{"dup_end_ack" => true}] = dup_end_ack_result + dup_end_ack_result = + Xandra.Cluster.run(:xandra, fn conn -> + prepared = Xandra.prepare!(conn, dup_end_ack_statement) - # Finally, check that the related DataUpdater process exists no more - assert [] = Horde.Registry.lookup(Registry.DataUpdater, {realm, device_id}) - end + %Xandra.Page{} = + page = + Xandra.execute!(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) - defp make_timestamp(timestamp_string) do - {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) + Enum.to_list(page) + end) - DateTime.to_unix(date_time, :millisecond) * 10000 - end + assert [%{"dup_end_ack" => true}] = dup_end_ack_result - defp gen_tracking_id() do - message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() - delivery_tag = {:injected_msg, make_ref()} - {message_id, delivery_tag} + # Finally, check that the related DataUpdater process exists no more + assert [] = Horde.Registry.lookup(Registry.DataUpdater, {realm, device_id}) + end end end diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs index 844720a2c2..e44277cce5 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_producer_properties_test.exs @@ -17,62 +17,32 @@ # defmodule Astarte.DataUpdaterPlant.ProducerPropertiesTest do - use ExUnit.Case, async: true + use Astarte.Cases.Data, async: true + use Astarte.Cases.AMQP + import Mox + import Astarte.Helpers.DataUpdater alias Astarte.DataUpdaterPlant.DatabaseTestHelper - alias Astarte.DataUpdaterPlant.AMQPTestHelper alias Astarte.Core.Device alias Astarte.DataUpdaterPlant.DataUpdater alias Astarte.DataAccess.Realms.Realm alias Astarte.DataAccess.Repo alias Astarte.DataAccess.Realms.IndividualProperty alias Astarte.DataAccess.Realms.Interface - alias Astarte.Core.Triggers.SimpleEvents.PathRemovedEvent - alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent alias Astarte.Core.Device alias Astarte.DataAccess.Realms.IndividualDatastream alias Astarte.Core.CQLUtils import Ecto.Query + require Logger setup :verify_on_exit! - setup do - realm_string = "autotestrealm#{System.unique_integer([:positive])}" - {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) - - on_exit(fn -> - DatabaseTestHelper.destroy_local_test_keyspace(realm_string) - end) - - helper_name = String.to_atom("helper_#{realm_string}") - - consumer_name = String.to_atom("consumer_#{realm_string}") - - {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm_string) - - {:ok, _consumer_pid} = - AMQPTestHelper.start_events_consumer( - name: consumer_name, - realm: realm_string, - helper_name: helper_name - ) - - {:ok, %{realm: realm_string, helper_name: helper_name}} - end - - test "producer properties are correctly set", %{ - realm: realm, - helper_name: helper_name - } do - AMQPTestHelper.clean_queue(helper_name) - - keyspace_name = Realm.keyspace_name(realm) + setup_all %{realm_name: realm_name} do encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" {:ok, device_id} = Device.decode_device_id(encoded_device_id) - received_msgs = 45000 received_bytes = 4_500_000 existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} @@ -84,14 +54,37 @@ defmodule Astarte.DataUpdaterPlant.ProducerPropertiesTest do groups: ["group1"] ] - DatabaseTestHelper.insert_device(realm, device_id, insert_opts) + DatabaseTestHelper.insert_device(realm_name, device_id, insert_opts) + test_process = self() + + Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock + |> Mox.stub(:delete, fn %{realm_name: ^realm_name, device_id: ^encoded_device_id} -> + send(test_process, :producer_properties_message_received) + :ok + end) + + setup_data_updater(realm_name, encoded_device_id) + + %{ + device_id: device_id, + encoded_device_id: encoded_device_id, + received_msgs: received_msgs, + received_bytes: received_bytes + } + end + + test "Unset values from interface properties", %{ + realm: realm, + device_id: device_id, + encoded_device_id: encoded_device_id + } do + keyspace_name = Realm.keyspace_name(realm) data = <<0, 0, 0, 98>> <> :zlib.compress("com.test.LCDMonitor/time/to;com.test.LCDMonitor/weekSchedule/10/start") timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) DataUpdater.handle_control( realm, @@ -125,28 +118,12 @@ defmodule Astarte.DataUpdaterPlant.ProducerPropertiesTest do ) DataUpdater.dump_state(realm, encoded_device_id) - {remove_event, remove_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) - assert remove_headers["x_astarte_event_type"] == "path_removed_event" - assert remove_headers["x_astarte_device_id"] == encoded_device_id - assert remove_headers["x_astarte_realm"] == realm - - assert :uuid.string_to_uuid(remove_headers["x_astarte_parent_trigger_id"]) == - DatabaseTestHelper.fake_parent_trigger_id() - - assert :uuid.string_to_uuid(remove_headers["x_astarte_simple_trigger_id"]) == - DatabaseTestHelper.path_removed_trigger_id() - - assert SimpleEvent.decode(remove_event) == %SimpleEvent{ - device_id: encoded_device_id, - event: - {:path_removed_event, - %PathRemovedEvent{interface: "com.test.LCDMonitor", path: "/time/from"}}, - timestamp: timestamp_ms, - parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), - realm: realm, - simple_trigger_id: DatabaseTestHelper.path_removed_trigger_id() - } + # {remove_event, remove_headers, _meta} = AMQPTestHelper.wait_and_get_message(amqp_consumer) + # + # assert remove_headers["x_astarte_event_type"] == "path_removed_event" + # assert remove_headers["x_astarte_device_id"] == encoded_device_id + # assert remove_headers["x_astarte_realm"] == realm endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") @@ -240,16 +217,4 @@ defmodule Astarte.DataUpdaterPlant.ProducerPropertiesTest do endpoint_id end - - defp gen_tracking_id() do - message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() - delivery_tag = {:injected_msg, make_ref()} - {message_id, delivery_tag} - end - - defp make_timestamp(timestamp_string) do - {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) - - DateTime.to_unix(date_time, :millisecond) * 10000 - end end diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_simple_flow_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_simple_flow_test.exs new file mode 100644 index 0000000000..c3ceeabf1f --- /dev/null +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_simple_flow_test.exs @@ -0,0 +1,1338 @@ +defmodule Astarte.DataUpdaterPlant.DUPSimpleFlowTest do + use ExUnit.Case, async: true + + import Ecto.Query + + alias Astarte.Core.Device + alias Astarte.Core.Triggers.SimpleEvents.DeviceConnectedEvent + alias Astarte.Core.Triggers.SimpleEvents.IncomingDataEvent + alias Astarte.Core.Triggers.SimpleEvents.PathRemovedEvent + alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent + alias Astarte.Core.Triggers.SimpleEvents.ValueChangeAppliedEvent + alias Astarte.Core.Triggers.SimpleEvents.IncomingIntrospectionEvent + alias Astarte.Core.Triggers.SimpleEvents.InterfaceAddedEvent + alias Astarte.Core.Triggers.SimpleEvents.InterfaceRemovedEvent + alias Astarte.Core.Triggers.SimpleEvents.InterfaceMinorUpdatedEvent + alias Astarte.Core.Triggers.SimpleEvents.InterfaceVersion + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DataTrigger + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.DeviceTrigger + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.SimpleTriggerContainer + alias Astarte.Core.Triggers.SimpleTriggersProtobuf.TriggerTargetContainer + alias Astarte.DataAccess.Devices.Device, as: DeviceSchema + alias Astarte.DataAccess.Realms.Realm + alias Astarte.DataAccess.Realms.IndividualDatastream + alias Astarte.DataAccess.Realms.IndividualProperty + alias Astarte.DataAccess.Realms.Interface + alias Astarte.DataUpdaterPlant.AMQPTestHelper + alias Astarte.DataUpdaterPlant.DatabaseTestHelper + alias Astarte.DataUpdaterPlant.DataUpdater + alias Astarte.DataAccess.Repo + alias Astarte.Core.CQLUtils + + setup_all do + realm_string = "autotestrealm#{System.unique_integer([:positive])}" + {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) + + on_exit(fn -> + DatabaseTestHelper.destroy_local_test_keyspace(realm_string) + end) + + # Need to be an atom because it's the name we are starting our helper with + helper_name = String.to_atom("helper_#{realm_string}") + + realm = String.to_atom(realm_string) + + {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm) + + {:ok, _consumer_pid} = + AMQPTestHelper.start_events_consumer( + consumer: helper_name, + realm: realm_string, + test_id: realm_string + ) + + {:ok, %{realm: realm_string, helper_name: helper_name}} + end + + test "simple flow", %{ + realm: realm, + helper_name: helper_name + } do + AMQPTestHelper.clean_queue(helper_name) + + keyspace_name = Realm.keyspace_name(realm) + encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" + {:ok, device_id} = Device.decode_device_id(encoded_device_id) + + received_msgs = 45000 + received_bytes = 4_500_000 + existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} + existing_introspection_string = "com.test.LCDMonitor:1:0;com.test.SimpleStreamTest:1:0" + + existing_introspection_proto_map = %{ + "com.test.LCDMonitor" => %InterfaceVersion{major: 1, minor: 0}, + "com.test.SimpleStreamTest" => %InterfaceVersion{major: 1, minor: 0} + } + + insert_opts = [ + introspection: existing_introspection_map, + total_received_msgs: received_msgs, + total_received_bytes: received_bytes, + groups: ["group1"] + ] + + DatabaseTestHelper.insert_device(realm, device_id, insert_opts) + + # Install a volatile device test trigger + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_event_type: :DEVICE_CONNECTED + } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + volatile_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + device_id, + 1, + volatile_trigger_parent_id, + volatile_trigger_id, + simple_trigger_data, + trigger_target_data + ) == :ok + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + volatile_trigger_id + ) == :ok + + timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) + + DataUpdater.handle_connection( + realm, + encoded_device_id, + "10.0.0.1", + gen_tracking_id(), + timestamp_us_x_10 + ) + + DataUpdater.dump_state(realm, encoded_device_id) + {conn_event, conn_headers, _metadata} = AMQPTestHelper.wait_and_get_message(helper_name) + assert conn_headers["x_astarte_event_type"] == "device_connected_event" + assert conn_headers["x_astarte_realm"] == realm + assert conn_headers["x_astarte_device_id"] == encoded_device_id + + assert :uuid.string_to_uuid(conn_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(conn_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.group1_device_connected_trigger_id() + + assert SimpleEvent.decode(conn_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :device_connected_event, + %DeviceConnectedEvent{ + device_ip_address: "10.0.0.1" + } + }, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.group1_device_connected_trigger_id() + } + + {conn_event, conn_headers, _metadata} = AMQPTestHelper.wait_and_get_message(helper_name) + assert conn_headers["x_astarte_event_type"] == "device_connected_event" + assert conn_headers["x_astarte_realm"] == realm + assert conn_headers["x_astarte_device_id"] == encoded_device_id + + assert :uuid.string_to_uuid(conn_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(conn_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.device_connected_trigger_id() + + assert SimpleEvent.decode(conn_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :device_connected_event, + %DeviceConnectedEvent{ + device_ip_address: "10.0.0.1" + } + }, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.device_connected_trigger_id() + } + + device_query = + from d in DeviceSchema, + prefix: ^keyspace_name, + where: d.device_id == ^device_id, + select: %{ + connected: d.connected, + total_received_msgs: d.total_received_msgs, + total_received_bytes: d.total_received_bytes, + exchanged_msgs_by_interface: d.exchanged_msgs_by_interface, + exchanged_bytes_by_interface: d.exchanged_bytes_by_interface + } + + device_row = Repo.one(device_query) + + assert device_row == %{ + connected: true, + total_received_msgs: 45000, + total_received_bytes: 4_500_000, + exchanged_msgs_by_interface: %{}, + exchanged_bytes_by_interface: %{} + } + + # Introspection sub-test + device_introspection_query = + from d in DeviceSchema, + prefix: ^keyspace_name, + where: d.device_id == ^device_id, + select: d.introspection + + ^existing_introspection_map = Repo.one(device_introspection_query) + + # Install a volatile incoming introspection test trigger + incoming_introspection_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_id: encoded_device_id, + device_event_type: :INCOMING_INTROSPECTION + } + } + } + |> SimpleTriggerContainer.encode() + + incoming_introspection_trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + incoming_introspection_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + incoming_introspection_volatile_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + incoming_introspection_volatile_trigger_parent_id, + incoming_introspection_volatile_trigger_id, + incoming_introspection_trigger_data, + incoming_introspection_trigger_target_data + ) == :ok + + DataUpdater.handle_introspection( + realm, + encoded_device_id, + existing_introspection_string, + gen_tracking_id(), + make_timestamp("2017-10-09T14:00:32+00:00") + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "incoming_introspection_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + incoming_introspection_volatile_trigger_parent_id + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + incoming_introspection_volatile_trigger_id + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :incoming_introspection_event, + %IncomingIntrospectionEvent{ + introspection_map: existing_introspection_proto_map + } + }, + timestamp: timestamp_ms, + parent_trigger_id: incoming_introspection_volatile_trigger_parent_id, + realm: realm, + simple_trigger_id: incoming_introspection_volatile_trigger_id + } + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + incoming_introspection_volatile_trigger_id + ) == :ok + + interface_added_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_id: encoded_device_id, + device_event_type: :INTERFACE_ADDED, + interface_name: "*" + } + } + } + |> SimpleTriggerContainer.encode() + + interface_added_trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + interface_added_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + interface_added_volatile_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + interface_added_volatile_trigger_parent_id, + interface_added_volatile_trigger_id, + interface_added_trigger_data, + interface_added_trigger_target_data + ) == :ok + + new_introspection = existing_introspection_string <> ";com.test.YetAnother:1:0" + + DataUpdater.handle_introspection( + realm, + encoded_device_id, + new_introspection, + gen_tracking_id(), + make_timestamp("2017-10-09T14:00:32+00:00") + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "interface_added_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + interface_added_volatile_trigger_parent_id + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + interface_added_volatile_trigger_id + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :interface_added_event, + %InterfaceAddedEvent{ + interface: "com.test.YetAnother", + major_version: 1, + minor_version: 0 + } + }, + timestamp: timestamp_ms, + parent_trigger_id: interface_added_volatile_trigger_parent_id, + realm: realm, + simple_trigger_id: interface_added_volatile_trigger_id + } + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + interface_added_volatile_trigger_id + ) == :ok + + interface_minor_updated_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_id: encoded_device_id, + device_event_type: :INTERFACE_MINOR_UPDATED, + interface_name: "com.test.YetAnother", + interface_major: 1 + } + } + } + |> SimpleTriggerContainer.encode() + + interface_minor_updated_trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + interface_minor_updated_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + interface_minor_updated_volatile_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + interface_minor_updated_volatile_trigger_parent_id, + interface_minor_updated_volatile_trigger_id, + interface_minor_updated_trigger_data, + interface_minor_updated_trigger_target_data + ) == :ok + + new_introspection = existing_introspection_string <> ";com.test.YetAnother:1:1" + + DataUpdater.handle_introspection( + realm, + encoded_device_id, + new_introspection, + gen_tracking_id(), + make_timestamp("2017-10-09T14:00:32+00:00") + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "interface_minor_updated_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + interface_minor_updated_volatile_trigger_parent_id + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + interface_minor_updated_volatile_trigger_id + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :interface_minor_updated_event, + %InterfaceMinorUpdatedEvent{ + interface: "com.test.YetAnother", + major_version: 1, + old_minor_version: 0, + new_minor_version: 1 + } + }, + timestamp: timestamp_ms, + parent_trigger_id: interface_minor_updated_volatile_trigger_parent_id, + realm: realm, + simple_trigger_id: interface_minor_updated_volatile_trigger_id + } + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + interface_minor_updated_volatile_trigger_id + ) == :ok + + interface_removed_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :device_trigger, + %DeviceTrigger{ + device_id: encoded_device_id, + device_event_type: :INTERFACE_REMOVED, + interface_name: "*" + } + } + } + |> SimpleTriggerContainer.encode() + + interface_removed_trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + interface_removed_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + interface_removed_volatile_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + interface_removed_volatile_trigger_parent_id, + interface_removed_volatile_trigger_id, + interface_removed_trigger_data, + interface_removed_trigger_target_data + ) == :ok + + DataUpdater.handle_introspection( + realm, + encoded_device_id, + existing_introspection_string, + gen_tracking_id(), + make_timestamp("2017-10-09T14:00:32+00:00") + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "interface_removed_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + interface_removed_volatile_trigger_parent_id + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + interface_removed_volatile_trigger_id + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :interface_removed_event, + %InterfaceRemovedEvent{ + interface: "com.test.YetAnother", + major_version: 1 + } + }, + timestamp: timestamp_ms, + parent_trigger_id: interface_removed_volatile_trigger_parent_id, + realm: realm, + simple_trigger_id: interface_removed_volatile_trigger_id + } + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + interface_removed_volatile_trigger_id + ) == :ok + + DataUpdater.dump_state(realm, encoded_device_id) + + device_introspection = Repo.one(device_introspection_query) + + assert existing_introspection_map == device_introspection + + # Install a volatile test trigger + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %DataTrigger{ + interface_name: "com.test.SimpleStreamTest", + interface_major: 1, + data_trigger_type: :INCOMING_DATA, + match_path: "/0/value", + value_match_operator: :LESS_THAN, + known_value: Cyanide.encode!(%{v: 100}) + } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + volatile_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + volatile_trigger_parent_id, + volatile_trigger_id, + simple_trigger_data, + trigger_target_data + ) == :ok + + # for a single interface/endpoint are correctly loaded + non_matching_simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %DataTrigger{ + interface_name: "com.test.SimpleStreamTest", + interface_major: 1, + data_trigger_type: :INCOMING_DATA, + match_path: "/0/value", + value_match_operator: :GREATER_THAN, + known_value: Cyanide.encode!(%{v: 1000}) + } + } + } + |> SimpleTriggerContainer.encode() + + non_matching_volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) + non_matching_volatile_trigger_id = :crypto.strong_rand_bytes(16) + + # Install the non-matching trigger twice to check that this installs 2 trigger_targets + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + non_matching_volatile_trigger_parent_id, + non_matching_volatile_trigger_id, + non_matching_simple_trigger_data, + trigger_target_data + ) == :ok + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("0a0da77d-85b5-93d9-d4d2-bd26dd18c9af"), + 2, + non_matching_volatile_trigger_parent_id, + non_matching_volatile_trigger_id, + non_matching_simple_trigger_data, + trigger_target_data + ) == :ok + + # Incoming data sub-test + timestamp_us_x_10 = make_timestamp("2017-10-09T14:10:31+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/3/start", + Cyanide.encode!(%{"v" => 1}), + gen_tracking_id(), + timestamp_us_x_10 + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "incoming_data_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.less_than_device_incoming_trigger_id() + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :incoming_data_event, + %IncomingDataEvent{ + bson_value: Cyanide.encode!(%{"v" => 1}), + interface: "com.test.LCDMonitor", + path: "/weekSchedule/3/start" + } + }, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.less_than_device_incoming_trigger_id() + } + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/4/start", + Cyanide.encode!(%{"v" => 3}), + gen_tracking_id(), + timestamp_us_x_10 + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "incoming_data_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.equal_to_group_incoming_trigger_id() + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :incoming_data_event, + %IncomingDataEvent{ + bson_value: Cyanide.encode!(%{"v" => 3}), + interface: "com.test.LCDMonitor", + path: "/weekSchedule/4/start" + } + }, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.equal_to_group_incoming_trigger_id() + } + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/time/from", + Cyanide.encode!(%{"v" => 9000}), + gen_tracking_id(), + make_timestamp("2017-10-09T14:10:32+00:00") + ) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/9/start", + Cyanide.encode!(%{"v" => 9}), + gen_tracking_id(), + make_timestamp("2017-10-09T14:10:32+00:00") + ) + + # Install a volatile value change applied test trigger + simple_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %DataTrigger{ + interface_name: "com.test.LCDMonitor", + interface_major: 1, + data_trigger_type: :VALUE_CHANGE_APPLIED, + match_path: "/weekSchedule/10/start", + value_match_operator: :ANY + } + } + } + |> SimpleTriggerContainer.encode() + + trigger_target_data = + %TriggerTargetContainer{ + trigger_target: { + :amqp_trigger_target, + %AMQPTriggerTarget{ + routing_key: AMQPTestHelper.events_routing_key(realm), + exchange: AMQPTestHelper.events_exchange_name(realm) + } + } + } + |> TriggerTargetContainer.encode() + + volatile_changed_trigger_parent_id = :crypto.strong_rand_bytes(16) + volatile_changed_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051"), + 2, + volatile_changed_trigger_parent_id, + volatile_changed_trigger_id, + simple_trigger_data, + trigger_target_data + ) == :ok + + bad_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %DataTrigger{ + interface_name: "com.missing.Interface", + interface_major: 1, + data_trigger_type: :VALUE_CHANGE_APPLIED, + match_path: "/test", + value_match_operator: :ANY + } + } + } + |> SimpleTriggerContainer.encode() + + bad_trigger_parent_id = :crypto.strong_rand_bytes(16) + bad_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("badb93a5-842e-bbad-2e4d-d20306838051"), + 2, + bad_trigger_parent_id, + bad_trigger_id, + bad_trigger_data, + trigger_target_data + ) == {:error, :interface_not_found} + + bad_path_trigger_data = + %SimpleTriggerContainer{ + simple_trigger: { + :data_trigger, + %DataTrigger{ + interface_name: "com.test.LCDMonitor", + interface_major: 1, + data_trigger_type: :VALUE_CHANGE_APPLIED, + match_path: "/weekSchedule/10", + value_match_operator: :ANY + } + } + } + |> SimpleTriggerContainer.encode() + + bad_path_trigger_parent_id = :crypto.strong_rand_bytes(16) + bad_path_trigger_id = :crypto.strong_rand_bytes(16) + + assert DataUpdater.handle_install_volatile_trigger( + realm, + encoded_device_id, + :uuid.string_to_uuid("798b93a5-842e-bbad-2e4d-d20306838051"), + 2, + bad_path_trigger_parent_id, + bad_path_trigger_id, + bad_path_trigger_data, + trigger_target_data + ) == {:error, :invalid_match_path} + + timestamp_us_x_10 = make_timestamp("2017-10-09T14:10:32+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/10/start", + Cyanide.encode!(%{"v" => 10}), + gen_tracking_id(), + timestamp_us_x_10 + ) + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "incoming_data_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.greater_than_incoming_trigger_id() + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :incoming_data_event, + %IncomingDataEvent{ + bson_value: Cyanide.encode!(%{"v" => 10}), + interface: "com.test.LCDMonitor", + path: "/weekSchedule/10/start" + } + }, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.greater_than_incoming_trigger_id() + } + + {incoming_event, incoming_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert incoming_headers["x_astarte_event_type"] == "value_change_applied_event" + assert incoming_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_parent_trigger_id"]) == + volatile_changed_trigger_parent_id + + assert :uuid.string_to_uuid(incoming_headers["x_astarte_simple_trigger_id"]) == + volatile_changed_trigger_id + + assert SimpleEvent.decode(incoming_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: { + :value_change_applied_event, + %ValueChangeAppliedEvent{ + old_bson_value: Cyanide.encode!(%{"v" => 42}), + new_bson_value: Cyanide.encode!(%{"v" => 10}), + interface: "com.test.LCDMonitor", + path: "/weekSchedule/10/start" + } + }, + timestamp: timestamp_ms, + parent_trigger_id: volatile_changed_trigger_parent_id, + realm: realm, + simple_trigger_id: volatile_changed_trigger_id + } + + timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) + + # This should trigger matching_simple_trigger + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.SimpleStreamTest", + "/0/value", + Cyanide.encode!(%{"v" => 5}), + gen_tracking_id(), + timestamp_us_x_10 + ) + + state = DataUpdater.dump_state(realm, encoded_device_id) + + {incoming_volatile_event, incoming_volatile_headers, _meta} = + AMQPTestHelper.wait_and_get_message(helper_name) + + assert incoming_volatile_headers["x_astarte_event_type"] == "incoming_data_event" + assert incoming_volatile_headers["x_astarte_device_id"] == encoded_device_id + assert incoming_volatile_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(incoming_volatile_headers["x_astarte_parent_trigger_id"]) == + volatile_trigger_parent_id + + assert :uuid.string_to_uuid(incoming_volatile_headers["x_astarte_simple_trigger_id"]) == + volatile_trigger_id + + assert SimpleEvent.decode(incoming_volatile_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: + {:incoming_data_event, + %IncomingDataEvent{ + bson_value: Cyanide.encode!(%{"v" => 5}), + interface: "com.test.SimpleStreamTest", + path: "/0/value" + }}, + timestamp: timestamp_ms, + parent_trigger_id: volatile_trigger_parent_id, + realm: realm, + simple_trigger_id: volatile_trigger_id + } + + # We check that all 3 on_incoming_data triggers were correctly installed + interface_id = CQLUtils.interface_id("com.test.SimpleStreamTest", 1) + endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") + trigger_key = {:on_incoming_data, interface_id, endpoint_id} + incoming_data_0_value_triggers = Map.get(state.data_triggers, trigger_key) + + # The length is 2 since greater-then triggers are merged into one because they are congruent + assert length(incoming_data_0_value_triggers) == 2 + # Extract greater-than trigger + assert [gt_trigger] = + Enum.filter(incoming_data_0_value_triggers, fn data_trigger -> + data_trigger.value_match_operator == :GREATER_THAN + end) + + # It should have 2 targets + assert length(gt_trigger.trigger_targets) == 2 + + endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/time/from", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == 9000 + + endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") + + timestamp_ms = DateTime.from_unix!(1_507_557_632_000, :millisecond) + + value_query = + from id in IndividualDatastream, + prefix: ^keyspace_name, + where: + id.device_id == ^device_id and + id.interface_id == ^CQLUtils.interface_id("com.test.SimpleStreamTest", 1) and + id.endpoint_id == ^endpoint_id and + id.path == "/0/value" and + id.value_timestamp >= ^timestamp_ms, + select: id.integer_value + + value = Repo.one(value_query) + + assert value == 5 + + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + volatile_trigger_id + ) == :ok + + timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") + + # Introspection change subtest + DataUpdater.handle_introspection( + realm, + encoded_device_id, + "com.test.LCDMonitor:1:0;com.example.TestObject:1:5;com.test.SimpleStreamTest:1:0", + gen_tracking_id(), + timestamp_us_x_10 + ) + + # Incoming object aggregation subtest + payload0 = Cyanide.encode!(%{"value" => 1.9, "string" => "Astarteです"}) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.example.TestObject", + "/", + payload0, + gen_tracking_id(), + make_timestamp("2017-10-26T08:48:49+00:00") + ) + + payload1 = Cyanide.encode!(%{"string" => "Hello World');"}) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.example.TestObject", + "/", + payload1, + gen_tracking_id(), + make_timestamp("2017-10-26T08:48:50+00:00") + ) + + payload2 = Cyanide.encode!(%{"v" => %{"value" => 0.0}}) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.example.TestObject", + "/", + payload2, + gen_tracking_id(), + make_timestamp("2017-10-26T08:48:51+00:00") + ) + + # we expect only /string to be updated here, we need this to check against accidental NULL insertions, that are bad for tombstones on cassandra. + payload3 = Cyanide.encode!(%{"string" => "zzz"}) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.example.TestObject", + "/", + payload3, + gen_tracking_id(), + make_timestamp("2017-09-30T07:13:00+00:00") + ) + + payload4 = Cyanide.encode!(%{}) + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.example.TestObject", + "/", + payload4, + gen_tracking_id(), + make_timestamp("2017-10-30T07:13:00+00:00") + ) + + DataUpdater.dump_state(realm, encoded_device_id) + + objects_query = + from o in "com_example_testobject_v1", + prefix: ^realm, + where: o.device_id == ^device_id and o.path == "/", + select: [ + device_id: o.device_id, + path: o.path, + reception_timestamp: fragment("toUnixTimestamp(?)", o.reception_timestamp), + reception_timestamp_submillis: o.reception_timestamp_submillis, + v_string: o.v_string, + v_value: o.v_value + ] + + objects = Repo.all(objects_query) + + assert objects == [ + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_506_755_400_000, + reception_timestamp_submillis: 0, + v_string: "aaa", + v_value: 1.1 + ], + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_506_755_520_000, + reception_timestamp_submillis: 0, + v_string: "bbb", + v_value: 2.2 + ], + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_506_755_580_000, + reception_timestamp_submillis: 0, + v_string: "zzz", + v_value: 3.3 + ], + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_509_007_729_000, + reception_timestamp_submillis: 0, + v_string: "Astarteです", + v_value: 1.9 + ], + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_509_007_730_000, + reception_timestamp_submillis: 0, + v_string: "Hello World');", + v_value: nil + ], + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_509_007_731_000, + reception_timestamp_submillis: 0, + v_string: nil, + v_value: 0.0 + ], + [ + device_id: device_id, + path: "/", + reception_timestamp: 1_509_347_580_000, + reception_timestamp_submillis: 0, + v_string: nil, + v_value: nil + ] + ] + + # Test /producer/properties control message + data = + <<0, 0, 0, 98>> <> + :zlib.compress("com.test.LCDMonitor/time/to;com.test.LCDMonitor/weekSchedule/10/start") + + timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") + timestamp_ms = div(timestamp_us_x_10, 10_000) + + DataUpdater.handle_control( + realm, + encoded_device_id, + "/producer/properties", + data, + gen_tracking_id(), + timestamp_us_x_10 + ) + + DataUpdater.dump_state(realm, encoded_device_id) + {remove_event, remove_headers, _meta} = AMQPTestHelper.wait_and_get_message(helper_name) + assert remove_headers["x_astarte_event_type"] == "path_removed_event" + assert remove_headers["x_astarte_device_id"] == encoded_device_id + assert remove_headers["x_astarte_realm"] == realm + + assert :uuid.string_to_uuid(remove_headers["x_astarte_parent_trigger_id"]) == + DatabaseTestHelper.fake_parent_trigger_id() + + assert :uuid.string_to_uuid(remove_headers["x_astarte_simple_trigger_id"]) == + DatabaseTestHelper.path_removed_trigger_id() + + assert SimpleEvent.decode(remove_event) == %SimpleEvent{ + device_id: encoded_device_id, + event: + {:path_removed_event, + %PathRemovedEvent{interface: "com.test.LCDMonitor", path: "/time/from"}}, + timestamp: timestamp_ms, + parent_trigger_id: DatabaseTestHelper.fake_parent_trigger_id(), + realm: realm, + simple_trigger_id: DatabaseTestHelper.path_removed_trigger_id() + } + + endpoint_id = retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/time/from") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/time/from", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == nil + + endpoint_id = + retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/9/start") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/weekSchedule/9/start", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == nil + + endpoint_id = + retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/weekSchedule/10/start", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == 10 + + endpoint_id = retrieve_endpoint_id(realm, "com.test.SimpleStreamTest", 1, "/0/value") + + timestamp_ms = DateTime.from_unix!(1_507_557_632_000, :millisecond) + + value_query = + from id in IndividualDatastream, + prefix: ^keyspace_name, + where: + id.device_id == ^device_id and + id.interface_id == ^CQLUtils.interface_id("com.test.SimpleStreamTest", 1) and + id.endpoint_id == ^endpoint_id and + id.path == "/0/value" and + id.value_timestamp >= ^timestamp_ms, + select: id.integer_value + + value = Repo.one(value_query) + + assert value == 5 + + # Unset subtest + + # Delete it otherwise it gets raised + assert DataUpdater.handle_delete_volatile_trigger( + realm, + encoded_device_id, + volatile_changed_trigger_id + ) == :ok + + DataUpdater.handle_data( + realm, + encoded_device_id, + "com.test.LCDMonitor", + "/weekSchedule/10/start", + <<>>, + gen_tracking_id(), + make_timestamp("2017-10-09T15:10:32+00:00") + ) + + DataUpdater.dump_state(realm, encoded_device_id) + + endpoint_id = + retrieve_endpoint_id(realm, "com.test.LCDMonitor", 1, "/weekSchedule/10/start") + + value_query = + from ip in IndividualProperty, + prefix: ^keyspace_name, + where: + ip.device_id == ^device_id and + ip.interface_id == ^CQLUtils.interface_id("com.test.LCDMonitor", 1) and + ip.endpoint_id == ^endpoint_id and + ip.path == "/weekSchedule/10/start", + select: ip.longinteger_value + + value = Repo.one(value_query) + + assert value == nil + end + + defp retrieve_endpoint_id(realm_name, interface_name, interface_major, path) do + keyspace_name = Realm.keyspace_name(realm_name) + + query = + from i in Interface, + prefix: ^keyspace_name, + where: i.name == ^interface_name and i.major_version == ^interface_major, + select: %{ + automaton_transitions: i.automaton_transitions, + automaton_accepting_states: i.automaton_accepting_states + } + + interface_row = Repo.one!(query) + + automaton = + {:erlang.binary_to_term(interface_row[:automaton_transitions]), + :erlang.binary_to_term(interface_row[:automaton_accepting_states])} + + {:ok, endpoint_id} = Astarte.Core.Mapping.EndpointsAutomaton.resolve_path(path, automaton) + + endpoint_id + end + + defp make_timestamp(timestamp_string) do + {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) + + DateTime.to_unix(date_time, :millisecond) * 10000 + end + + defp gen_tracking_id() do + message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() + delivery_tag = {:injected_msg, make_ref()} + {message_id, delivery_tag} + end +end diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs index e3e577f8bf..5870df63e5 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/dup_unset_properties_test.exs @@ -17,8 +17,11 @@ # defmodule Astate.DataUpdaterPlant.UnsetTest do - use ExUnit.Case, async: true + use Astarte.Cases.Data, async: true + use Astarte.Cases.AMQP + import Mox + import Astarte.Helpers.DataUpdater alias Astarte.DataUpdaterPlant.DatabaseTestHelper alias Astarte.DataUpdaterPlant.AMQPTestHelper @@ -36,69 +39,65 @@ defmodule Astate.DataUpdaterPlant.UnsetTest do setup :verify_on_exit! - setup do - realm_string = "autotestrealm#{System.unique_integer([:positive])}" - {:ok, _keyspace_name} = DatabaseTestHelper.create_test_keyspace(realm_string) - - on_exit(fn -> - DatabaseTestHelper.destroy_local_test_keyspace(realm_string) - end) + setup_all %{realm_name: realm_name} do + device_id = Device.random_device_id() + encoded_device_id = Device.encode_device_id(device_id) + received_msgs = 45000 + received_bytes = 4_500_000 + existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} - helper_name = String.to_atom("helper_#{realm_string}") + insert_opts = [ + introspection: existing_introspection_map, + total_received_msgs: received_msgs, + total_received_bytes: received_bytes, + groups: ["group1"] + ] - consumer_name = String.to_atom("consumer_#{realm_string}") + DatabaseTestHelper.insert_device(realm_name, device_id, insert_opts) + test_process = self() - {:ok, _pid} = AMQPTestHelper.start_link(name: helper_name, realm: realm_string) + Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock + |> Mox.stub(:delete, fn %{realm_name: ^realm_name, device_id: ^encoded_device_id} -> + send(test_process, :unset_properties_message_received) + :ok + end) - {:ok, _consumer_pid} = - AMQPTestHelper.start_events_consumer( - name: consumer_name, - realm: realm_string, - helper_name: helper_name - ) + setup_data_updater(realm_name, encoded_device_id) - {:ok, %{realm: realm_string, helper_name: helper_name}} + %{ + device_id: device_id, + encoded_device_id: encoded_device_id, + received_msgs: received_msgs, + received_bytes: received_bytes + } end test "Unset values from interface properties", %{ realm: realm, - helper_name: helper_name + amqp_consumer: amqp_consumer, + device_id: device_id, + encoded_device_id: encoded_device_id } do - AMQPTestHelper.clean_queue(helper_name) + AMQPTestHelper.clean_queue(amqp_consumer) - encoded_device_id = "f0VMRgIBAQAAAAAAAAAAAA" keyspace_name = Realm.keyspace_name(realm) - {:ok, device_id} = Device.decode_device_id(encoded_device_id) volatile_changed_trigger_id = :crypto.strong_rand_bytes(16) - received_msgs = 45000 - received_bytes = 4_500_000 - existing_introspection_map = %{"com.test.LCDMonitor" => 1, "com.test.SimpleStreamTest" => 1} - - insert_opts = [ - introspection: existing_introspection_map, - total_received_msgs: received_msgs, - total_received_bytes: received_bytes, - groups: ["group1"] - ] - - DatabaseTestHelper.insert_device(realm, device_id, insert_opts) - assert DataUpdater.handle_delete_volatile_trigger( realm, encoded_device_id, volatile_changed_trigger_id ) == :ok - DataUpdater.handle_data( - realm, - encoded_device_id, - "com.test.LCDMonitor", - "/weekSchedule/10/start", - <<>>, - gen_tracking_id(), - make_timestamp("2017-10-09T15:10:32+00:00") - ) + # DataUpdater.handle_data( + # realm, + # encoded_device_id, + # "com.test.LCDMonitor", + # "/weekSchedule/10/start", + # <<>>, + # gen_tracking_id(), + # make_timestamp("2017-10-09T15:10:32+00:00") + # ) DataUpdater.dump_state(realm, encoded_device_id) @@ -142,16 +141,4 @@ defmodule Astate.DataUpdaterPlant.UnsetTest do endpoint_id end - - defp make_timestamp(timestamp_string) do - {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) - - DateTime.to_unix(date_time, :millisecond) * 10000 - end - - defp gen_tracking_id() do - message_id = :erlang.unique_integer([:monotonic]) |> Integer.to_string() - delivery_tag = {:injected_msg, make_ref()} - {message_id, delivery_tag} - end end diff --git a/apps/astarte_data_updater_plant/test/support/amqp_events_consumer.ex b/apps/astarte_data_updater_plant/test/support/amqp_events_consumer.ex index 1de5fc8706..31c07cf9a2 100644 --- a/apps/astarte_data_updater_plant/test/support/amqp_events_consumer.ex +++ b/apps/astarte_data_updater_plant/test/support/amqp_events_consumer.ex @@ -31,8 +31,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestEventsConsumer do # API def start_link(args) do - name = Keyword.fetch!(args, :name) - GenServer.start_link(__MODULE__, args, name: name) + GenServer.start_link(__MODULE__, args) end def ack(delivery_tag) do @@ -43,9 +42,16 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestEventsConsumer do def init(args) do realm = Keyword.fetch!(args, :realm) - helper_name = Keyword.fetch!(args, :helper_name) + consumer = Keyword.fetch!(args, :consumer) + test_id = Keyword.fetch!(args, :test_id) - initial_state = %{realm: realm, helper_name: helper_name, conn: nil, chan: nil} + initial_state = %{ + realm: realm, + consumer: consumer, + conn: nil, + chan: nil, + test_id: test_id + } {:ok, state} = rabbitmq_connect(initial_state, false) @@ -88,7 +94,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestEventsConsumer do headers_map = amqp_headers_to_map(headers) # Pass the realm to find the correct helper - AMQPTestHelper.notify_deliver(state.helper_name, payload, headers_map, other_meta) + AMQPTestHelper.notify_deliver(state.consumer, payload, headers_map, other_meta) Basic.ack(state.chan, meta.delivery_tag) @@ -110,7 +116,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestEventsConsumer do # Now we can monitor the connection pid since `conn` is established _monitor_ref <- Process.monitor(conn.pid), {:ok, chan} <- Channel.open(conn), - :ok <- setup_amqp_resources(chan, state.realm) do + :ok <- setup_amqp_resources(chan, state.test_id) do # On success, put the connection and channel into the state {:ok, %{state | conn: conn, chan: chan}} else @@ -122,26 +128,26 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestEventsConsumer do end end - defp setup_amqp_resources(chan, realm) do + defp setup_amqp_resources(chan, test_id) do with :ok <- - Exchange.declare(chan, AMQPTestHelper.events_exchange_name(realm), :direct, + Exchange.declare(chan, AMQPTestHelper.events_exchange_name(test_id), :direct, durable: true ), {:ok, _queue} <- Queue.declare( chan, - AMQPTestHelper.events_queue_name(realm), + AMQPTestHelper.events_queue_name(test_id), durable: true, auto_delete: false ), :ok <- Queue.bind( chan, - AMQPTestHelper.events_queue_name(realm), - AMQPTestHelper.events_exchange_name(realm), - routing_key: AMQPTestHelper.events_routing_key(realm) + AMQPTestHelper.events_queue_name(test_id), + AMQPTestHelper.events_exchange_name(test_id), + routing_key: AMQPTestHelper.events_routing_key(test_id) ), - {:ok, _consumer_tag} <- Basic.consume(chan, AMQPTestHelper.events_queue_name(realm)) do + {:ok, _consumer_tag} <- Basic.consume(chan, AMQPTestHelper.events_queue_name(test_id)) do :ok end end diff --git a/apps/astarte_data_updater_plant/test/support/amqp_test_helper.ex b/apps/astarte_data_updater_plant/test/support/amqp_test_helper.ex index f3c42e3865..0d0ac77df7 100644 --- a/apps/astarte_data_updater_plant/test/support/amqp_test_helper.ex +++ b/apps/astarte_data_updater_plant/test/support/amqp_test_helper.ex @@ -21,7 +21,8 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestHelper do require Logger def start_link(args) do - GenServer.start_link(__MODULE__, args, name: Keyword.fetch!(args, :name)) + name = Keyword.get(args, :name) + GenServer.start_link(__MODULE__, args, name: name) end def start_events_consumer(args) do @@ -36,23 +37,22 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestHelper do Application.get_env(:astarte_data_updater_plant, :amqp_consumer_options, []) end - def events_exchange_name(realm) do - "astarte_events_#{realm}" + def events_exchange_name(id) do + "astarte_events_#{id}" end - def events_queue_name(realm) do - "test_events_#{realm}" + def events_queue_name(id) do + "test_events_#{id}" end - def events_routing_key(realm) do - "test_events_#{realm}" + def events_routing_key(id) do + "test_events_#{id}" end def events_routing_key() do "test_events" end - # The 'name' argument is the unique atom for the GenServer process. def notify_deliver(name, payload, headers_map, other_meta) do message = {payload, Enum.into(headers_map, %{}), other_meta} GenServer.call(name, {:notify_deliver, message}) @@ -74,6 +74,10 @@ defmodule Astarte.DataUpdaterPlant.AMQPTestHelper do GenServer.call(name, :clean_queue) end + def clean_queue() do + GenServer.call(AMQPTestHelper, :clean_queue) + end + def handle_call(:wait_and_get_message, from, state) do if Map.get(state, :messages) do [oldest_message | messages] = state[:messages] diff --git a/apps/astarte_data_updater_plant/test/support/cases/amqp.ex b/apps/astarte_data_updater_plant/test/support/cases/amqp.ex new file mode 100644 index 0000000000..1f5de211df --- /dev/null +++ b/apps/astarte_data_updater_plant/test/support/cases/amqp.ex @@ -0,0 +1,42 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +defmodule Astarte.Cases.AMQP do + use ExUnit.CaseTemplate + require Logger + + alias Astarte.DataUpdaterPlant.AMQPTestHelper + alias Astarte.DataUpdaterPlant.AMQPTestEventsConsumer + + setup %{realm_name: realm} do + test_id = System.unique_integer() + amqp_consumer = start_link_supervised!({AMQPTestHelper, [test_id: test_id]}) + + events_consumer = + start_link_supervised!( + {AMQPTestEventsConsumer, + [ + realm: realm, + consumer: amqp_consumer, + test_id: test_id + ]} + ) + + %{test_id: test_id, amqp_consumer: amqp_consumer, events_consumer: events_consumer} + end +end diff --git a/apps/astarte_data_updater_plant/test/support/helpers/data_updater.ex b/apps/astarte_data_updater_plant/test/support/helpers/data_updater.ex index 109d67990a..660d16ed84 100644 --- a/apps/astarte_data_updater_plant/test/support/helpers/data_updater.ex +++ b/apps/astarte_data_updater_plant/test/support/helpers/data_updater.ex @@ -17,7 +17,6 @@ defmodule Astarte.Helpers.DataUpdater do alias Astarte.DataUpdaterPlant.DataUpdater - import Mimic def setup_data_updater(realm_name, encoded_device_id) do {:ok, message_tracker} = DataUpdater.fetch_message_tracker(realm_name, encoded_device_id) @@ -31,7 +30,10 @@ defmodule Astarte.Helpers.DataUpdater do ) Astarte.DataAccess.Config - |> allow(self(), data_updater) + |> Mimic.allow(self(), data_updater) + + Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock + |> Mox.allow(self(), data_updater) :ok = GenServer.call(data_updater, :start) end