Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
`booleanarray`, `longintegerarray`, `stringarray`, `datetimearray`, `binaryblobarray`.
- [astarte_export] Added a new command for exporting by device_id.
`mix astarte.export $REALM $FILE_XML $DEVICE_ID`
- [astarte_data_updater_plant] Automatically shard queues between DUP instances,
allowing to autoscale the service. This is done via Erlang clustering, so
the `DATA_UPDATER_PLANT_CLUSTERING_STRATEGY` env var (default `none`) has
been introduced to set clustering strategy.
- [astarte_data_updater_plant] allow to configure Erlang clustering when using
the `kubernetes` option via `DATA_UPDATER_PLANT_CLUSTERING_KUBERNETES_SELECTOR`
and `DATA_UPDATER_PLANT_CLUSTERING_KUBERNETES_NAMESPACE`.

## [1.2.1] - Unreleased
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ defmodule Astarte.DataUpdaterPlant.Application do
dup_xandra_opts = Keyword.put(xandra_options, :name, :xandra)

children = [
{Cluster.Supervisor,
[Config.cluster_topologies!(), [name: Astarte.DataUpdaterPlant.ClusterSupervisor]]},
Astarte.DataUpdaterPlantWeb.Telemetry,
{Xandra.Cluster, dup_xandra_opts},
{Astarte.DataAccess, data_access_opts},
Expand All @@ -68,8 +70,6 @@ defmodule Astarte.DataUpdaterPlant.Application do
queues: [
events_exchange_name: Config.events_exchange_name!(),
prefix: Config.data_queue_prefix!(),
range_start: Config.data_queue_range_start!(),
range_end: Config.data_queue_range_end!(),
total_count: Config.data_queue_total_count!()
],
message_handler: Impl
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# This file is part of Astarte.
#
# Copyright 2024 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

defmodule Astarte.DataUpdaterPlant.ClusteringStrategy do
use Skogsra.Type

@allowed_strategies ~w(none kubernetes)

@impl Skogsra.Type
def cast(value) when value in @allowed_strategies do
{:ok, value}
end

@impl Skogsra.Type
def cast(_) do
:error
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,30 @@ defmodule Astarte.DataUpdaterPlant.Config do
type: :integer,
default: 10

@envdoc "The Erlang cluster strategy to use. One of `none`, `kubernetes`. Defaults to `none`."
app_env :clustering_strategy,
:astarte_data_updater_plant,
:clustering_strategy,
os_env: "DATA_UPDATER_PLANT_CLUSTERING_STRATEGY",
type: Astarte.DataUpdaterPlant.ClusteringStrategy,
default: "none"

@envdoc "The Kubernetes selector to use when `kubernetes` Erlang clustering strategy is used. Defaults to `app=astarte-data-updater-plant`."
app_env :clustering_kubernetes_selector,
:astarte_data_updater_plant,
:clustering_kubernetes_selector,
os_env: "DATA_UPDATER_PLANT_CLUSTERING_KUBERNETES_SELECTOR",
type: :binary,
default: "app=astarte-data-updater-plant"

@envdoc "The Kubernetes namespace to use when `kubernetes` Erlang clustering strategy is used. Defaults to `astarte`."
app_env :clustering_kubernetes_namespace,
:astarte_data_updater_plant,
:clustering_kubernetes_namespace,
os_env: "DATA_UPDATER_PLANT_CLUSTERING_KUBERNETES_NAMESPACE",
type: :binary,
default: "astarte"

# Since we have one channel per queue, this is not configurable
def amqp_consumer_channels_per_connection_number!() do
ceil(data_queue_total_count!() / amqp_consumer_connection_number!())
Expand Down Expand Up @@ -429,6 +453,27 @@ defmodule Astarte.DataUpdaterPlant.Config do
Application.get_env(:astarte_data_updater_plant, :amqp_adapter)
end

def cluster_topologies!() do
case clustering_strategy!() do
"none" ->
[]

"kubernetes" ->
[
data_updater_plant_k8s: [
strategy: Elixir.Cluster.Strategy.Kubernetes,
config: [
mode: :ip,
kubernetes_node_basename: "astarte_data_updater_plant",
kubernetes_selector: clustering_kubernetes_selector!(),
kubernetes_namespace: clustering_kubernetes_namespace!(),
polling_interval: 10_000
]
]
]
end
end

@doc """
Returns Cassandra nodes formatted in the Xandra format.
"""
Expand Down
6 changes: 2 additions & 4 deletions apps/astarte_data_updater_plant/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,9 @@ defmodule Astarte.DataUpdaterPlant.Mixfile do
{:castore, "~> 1.0.0"},
{:cyanide, "~> 2.0"},
{:excoveralls, "~> 0.15", only: :test},
{:mississippi, github: "secomind/mississippi"},
{:mississippi, github: "annopaolo/mississippi", branch: "dynamic-queue-sharding"},
{:mox, "~> 1.0", only: :test},
# hex.pm package and esl/ex_rabbit_pool do not support amqp version 2.1.
# This fork is supporting amqp ~> 2.0 and also ~> 3.0.
{:ex_rabbit_pool, github: "simplebet/ex_rabbit_pool", ref: "latest-amqp"},
{:libcluster, "~> 3.3"},
{:pretty_log, "~> 0.1"},
{:plug_cowboy, "~> 2.1"},
{:telemetry_metrics_prometheus_core, "~> 0.4"},
Expand Down
9 changes: 7 additions & 2 deletions apps/astarte_data_updater_plant/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,34 @@
"cqerl": {:hex, :cqerl, "2.0.1", "c92929e885adc50cda1f11b73eb0121363e8418533312f8a44defb5f14853445", [:rebar3], [{:lz4, "~>0.2.4", [hex: :lz4_erl, repo: "hexpm", optional: false]}, {:re2, "1.9.5", [hex: :re2, repo: "hexpm", optional: false]}, {:semver, "~>0.0.1", [hex: :semver_erl, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}, {:uuid, "~>2.0.0", [hex: :uuid_erl, repo: "hexpm", optional: false]}], "hexpm", "96e9ee407830508187a5edff9fc49983a7122b5c4127c640320a226b59ae12fe"},
"cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"},
"current_rabbit_pool": {:hex, :current_rabbit_pool, "1.1.1", "3a9594f62fdeef845ebd75833659dd821174b51bd013a53c269bf43e67ba8df6", [:mix], [{:amqp, "~> 3.1", [hex: :amqp, repo: "hexpm", optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}], "hexpm", "69791022a2e818e55a6535ddea623b1240d44c9640ac5ddc99e07262ee711b2b"},
"cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"},
"db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"},
"decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"},
"delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []},
"ecto": {:hex, :ecto, "3.12.5", "4a312960ce612e17337e7cefcf9be45b95a3be6b36b6f94dfb3d8c361d631866", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eb18e80bef8bb57e17f5a7f068a1719fbda384d40fc37acb8eb8aeca493b6ea"},
"ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"},
"efx": {:hex, :efx, "0.2.6", "ec7c42b05073e6fdc61d971cc02d366f73f40d8093272a5326d16861003036b0", [:mix], [{:process_tree, "0.1.2", [hex: :process_tree, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "7648dcfd05f9ac39b257d4a9aa7c5b243823fee31933e725b2d346371b6d6bc4"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_rabbit_pool": {:git, "https://github.com/simplebet/ex_rabbit_pool.git", "2f7ba634dab78f3e8d5cc6c1a60030baf92791b9", [ref: "latest-amqp"]},
"excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"horde": {:git, "https://github.com/derekkraan/horde.git", "f9ef5c4c9d1ad6f24a619a2252b5f25ec6602493", []},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"jsx": {:hex, :jsx, "2.11.0", "08154624050333919b4ac1b789667d5f4db166dc50e190c4d778d1587f102ee0", [:rebar3], [], "hexpm", "eed26a0d04d217f9eecefffb89714452556cf90eb38f290a27a4d45b9988f8c0"},
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
"libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"},
"logfmt": {:hex, :logfmt, "3.3.2", "c432765cff9c26cf4ba78cf66ece183e56562dfeba6e2d9f077804cc4c756677", [:mix], [], "hexpm", "8dfc07bf11d362d1ffb11fa34647f4e78dba47247589cc94fd8c9155889c8fcb"},
"lz4": {:hex, :lz4_erl, "0.2.4", "fafc1fa39ed1d034893316852daebedd82f37df478446224ac096490be3b4a4d", [:rebar3], [], "hexpm", "e3eb9e2b5c1e4dab39db8fe0421e6fa10f9bf5843f20dab43518f8ab8e812517"},
"merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mississippi": {:git, "https://github.com/secomind/mississippi.git", "179f2e54dc7b118a972afb47cc603a1fc7a0444e", []},
"mississippi": {:git, "https://github.com/annopaolo/mississippi.git", "89dd4526336421412b3fdc6ef0081e5f1f7ee473", [branch: "dynamic-queue-sharding"]},
"mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"observer_cli": {:hex, :observer_cli, "1.6.1", "d176f967c978ab8b8a29c35c12524f78b7bb36fd4e9b8276dd75c9cb56e07e42", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "3418e319764b9dff1f469e43cbdffd7fd54ea47cbf765027c557abd146a19fb3"},
Expand Down
8 changes: 8 additions & 0 deletions apps/astarte_data_updater_plant/rel/env.bat.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
@echo off
rem Set the release to load code on demand (interactive) instead of preloading (embedded).
rem set RELEASE_MODE=interactive

rem Set the release to work across nodes.
rem RELEASE_DISTRIBUTION must be "sname" (local), "name" (distributed) or "none".
rem set RELEASE_DISTRIBUTION=name
rem set RELEASE_NODE=<%= @release.name %>
27 changes: 27 additions & 0 deletions apps/astarte_data_updater_plant/rel/env.sh.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/sh

# # Sets and enables heart (recommended only in daemon mode)
# case $RELEASE_COMMAND in
# daemon*)
# HEART_COMMAND="$RELEASE_ROOT/bin/$RELEASE_NAME $RELEASE_COMMAND"
# export HEART_COMMAND
# export ELIXIR_ERL_OPTIONS="-heart"
# ;;
# *)
# ;;
# esac

# # Set the release to load code on demand (interactive) instead of preloading (embedded).
# export RELEASE_MODE=interactive

# # Set the release to work across nodes.
# # RELEASE_DISTRIBUTION must be "sname" (local), "name" (distributed) or "none".

if [ -z "$MY_POD_IP" ]
then
export RELEASE_DISTRIBUTION=sname
export RELEASE_NODE=<%= @release.name %>
else
export RELEASE_DISTRIBUTION=name
export RELEASE_NODE=<%= @release.name %>@${MY_POD_IP}
fi
8 changes: 8 additions & 0 deletions apps/astarte_data_updater_plant/rel/remote.vm.args.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## Customize flags given to the VM: https://www.erlang.org/doc/man/erl.html
## -mode/-name/-sname/-setcookie are configured via env vars, do not set them here

## Increase number of concurrent ports/sockets
##+Q 65536

## Tweak GC to run more often
##-env ERL_FULLSWEEP_AFTER 10
8 changes: 8 additions & 0 deletions apps/astarte_data_updater_plant/rel/vm.args.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## Customize flags given to the VM: https://www.erlang.org/doc/man/erl.html
## -mode/-name/-sname/-setcookie are configured via env vars, do not set them here

## Increase number of concurrent ports/sockets
##+Q 65536

## Tweak GC to run more often
##-env ERL_FULLSWEEP_AFTER 10
Loading