Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions extra/lib/plausible/consolidated_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ defmodule Plausible.ConsolidatedView do
:ok
end

@spec site_ids(Team.t()) :: [pos_integer()] | {:error, :not_found}
def site_ids(%Team{} = team) do
case get(team) do
@spec site_ids(Team.t() | String.t()) :: {:ok, [pos_integer()]} | {:error, :not_found}
def site_ids(consolidated_view_id) when is_binary(consolidated_view_id) do
case get(consolidated_view_id) do
nil -> {:error, :not_found}
_found -> {:ok, Teams.owned_sites_ids(team)}
view -> {:ok, Teams.owned_sites_ids(view.team)}
end
end

def site_ids(%Team{} = team) do
site_ids(team.identifier)
end

@spec get(Team.t() | String.t()) :: Site.t() | nil
def get(team_or_id)

Expand All @@ -51,7 +55,9 @@ defmodule Plausible.ConsolidatedView do
end

def get(id) when is_binary(id) do
Repo.one(from s in sites(), where: s.domain == ^id)
Repo.one(
from s in sites(), inner_join: assoc(s, :team), where: s.domain == ^id, preload: [:team]
)
end

defp do_enable(%Team{} = team) do
Expand All @@ -61,8 +67,8 @@ defmodule Plausible.ConsolidatedView do
|> Site.new_for_team(%{consolidated: true, domain: make_id(team)})
|> Repo.insert()

cv ->
{:ok, cv}
consolidated_view ->
{:ok, consolidated_view}
end
end

Expand Down
82 changes: 82 additions & 0 deletions extra/lib/plausible/consolidated_view/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule Plausible.ConsolidatedView.Cache do
@moduledoc """
Caching layer for consolidated views.

Because of how they're modelled (on top of "sites" table currently),
we have to refresh the cache whenever any regular site changes within,
as well as when the consolidating site is updated itself.
"""
alias Plausible.ConsolidatedView
import Ecto.Query

use Plausible.Cache

@cache_name :consolidated_views

@impl true
def name(), do: @cache_name

@impl true
def child_id(), do: :cache_consolidated_views

@impl true
def count_all() do
Plausible.Repo.aggregate(
from(s in ConsolidatedView.sites()),
:count
)
end

@impl true
def base_db_query() do
from sc in ConsolidatedView.sites(),
inner_join: sr in ^Plausible.Site.regular(),
on: sr.team_id == sc.team_id,
group_by: sc.id,
order_by: [desc: sc.id],
select: %{
consolidated_view_id: sc.domain,
site_ids: fragment("array_agg(?.id)", sr)
}
end

@spec refresh_updated_recently(Keyword.t()) :: :ok
def refresh_updated_recently(opts) do
recently_updated_site_ids =
from sc in ConsolidatedView.sites(),
join: sr in ^Plausible.Site.regular(),
on: sc.team_id == sr.team_id,
where: sr.updated_at > ago(^15, "minute") or sc.updated_at > ago(^15, "minute"),
select: sc.id

query =
from sc in ConsolidatedView.sites(),
join: sr in ^Plausible.Site.regular(),
on: sr.team_id == sc.team_id,
where: sc.id in subquery(recently_updated_site_ids),
group_by: sc.id,
order_by: [desc: sc.id],
select: %{consolidated_view_id: sc.domain, site_ids: fragment("array_agg(?)", sr.id)}

refresh(
:updated_recently,
query,
Keyword.put(opts, :delete_stale_items?, false)
)
end

@impl true
def get_from_source(consolidated_view_id) do
case ConsolidatedView.site_ids(consolidated_view_id) do
{:ok, some} -> some
{:error, :not_found} -> nil
end
end

@impl true
def unwrap_cache_keys(items) do
Enum.reduce(items, [], fn row, acc ->
[{row.consolidated_view_id, row.site_ids} | acc]
end)
end
end
16 changes: 16 additions & 0 deletions lib/plausible/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ defmodule Plausible.Application do
{Plausible.Site.Cache.RecentlyUpdated, interval: :timer.seconds(30)}
]
),
on_ee do
warmed_cache(Plausible.ConsolidatedView.Cache,
adapter_opts: [
n_lock_partitions: 1,
ttl_check_interval: false,
ets_options: [read_concurrency: true]
],
warmers: [
refresh_all:
{Plausible.ConsolidatedView.Cache.All,
interval: :timer.minutes(20) + Enum.random(1..:timer.seconds(10))},
refresh_updated_recently:
{Plausible.ConsolidatedView.Cache.RecentlyUpdated, interval: :timer.minutes(1)}
]
)
end,
warmed_cache(Plausible.Shield.IPRuleCache,
adapter_opts: [
n_lock_partitions: 1,
Expand Down
6 changes: 5 additions & 1 deletion lib/plausible/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ defmodule Plausible.Cache do
end

@spec refresh_updated_recently(Keyword.t()) :: :ok
def refresh_updated_recently(opts \\ []) do
def refresh_updated_recently(opts \\ [])

def refresh_updated_recently(opts) do
recently_updated_query =
from([s, ...] in base_db_query(),
order_by: [asc: s.updated_at],
Expand All @@ -121,6 +123,8 @@ defmodule Plausible.Cache do
)
end

defoverridable refresh_updated_recently: 1

@spec merge_items(new_items :: [any()], opts :: Keyword.t()) :: :ok
def merge_items(new_items, opts \\ [])
def merge_items([], _), do: :ok
Expand Down
103 changes: 103 additions & 0 deletions test/plausible/consolidated_view/cache_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
defmodule Plausible.CondolidatedView.CacheTest do
use Plausible.DataCase, async: true
use Plausible.Teams.Test

on_ee do
alias Plausible.ConsolidatedView
alias Plausible.ConsolidatedView.Cache

test "refresh_all stores site_ids per consolidated view id", %{test: test} do
{owner1, owner2} = {new_user(), new_user()}
{s1, s2} = {new_site(owner: owner1), new_site(owner: owner1)}
team1 = team_of(owner1)

{s3, s4, s5} = {new_site(owner: owner2), new_site(owner: owner2), new_site(owner: owner2)}
team2 = team_of(owner2)

{:ok, consolidated_view1} = ConsolidatedView.enable(team1)
{:ok, consolidated_view2} = ConsolidatedView.enable(team2)

start_test_cache(test)

:ok = Cache.refresh_all(cache_name: test)

assert Cache.size(test) == 2
assert site_ids1 = Cache.get(consolidated_view1.domain, cache_name: test, force?: true)
assert site_ids2 = Cache.get(consolidated_view2.domain, cache_name: test, force?: true)

assert s1.id in site_ids1
assert s2.id in site_ids1
assert length(site_ids1) == 2

assert s3.id in site_ids2
assert s4.id in site_ids2
assert s5.id in site_ids2
assert length(site_ids2) == 3

{:ok, ids_from_db1} = ConsolidatedView.site_ids(team1)
assert Enum.sort(ids_from_db1) == Enum.sort(site_ids1)

{:ok, ids_from_db2} = ConsolidatedView.site_ids(team2)
assert Enum.sort(ids_from_db2) == Enum.sort(site_ids2)
end

test "small refresh adds a site to existing consolidation", %{test: test} do
start_test_cache(test)

owner = new_user()
new_site(owner: owner, updated_at: yesterday())
consolidated_view = new_site(owner: owner, updated_at: yesterday(), consolidated: true)

:ok = Cache.refresh_all(cache_name: test)

assert [_] = Cache.get(consolidated_view.domain, cache_name: test, force?: true)

new_site(owner: owner)

:ok = Cache.refresh_updated_recently(cache_name: test)
assert [_, _] = Cache.get(consolidated_view.domain, cache_name: test, force?: true)
end

test "small refresh re-consolidates", %{test: test} do
start_test_cache(test)

owner = new_user()
new_site(owner: owner, updated_at: yesterday())

team = team_of(owner)

{:ok, consolidated_view} = ConsolidatedView.enable(team)

:ok = Cache.refresh_updated_recently(cache_name: test)

assert [_] = Cache.get(consolidated_view.domain, cache_name: test, force?: true)
end

test "get_from_source/1", %{test: test} do
user = new_user()
new_site(owner: user)
new_site(owner: user)
team = team_of(user)
{:ok, consolidated_view} = ConsolidatedView.enable(team)

start_test_cache(test)
:ok = Cache.refresh_all(cache_name: test)

result = Cache.get(consolidated_view.domain, cache_name: test, force?: true)
assert ^result = Cache.get(consolidated_view.domain)
assert ^result = Cache.get_from_source(consolidated_view.domain)
end

defp start_test_cache(cache_name) do
%{start: {m, f, a}} = Cache.child_spec(cache_name: cache_name)
apply(m, f, a)
end

defp yesterday() do
DateTime.shift(
DateTime.utc_now(),
day: -1
)
end
end
end
1 change: 1 addition & 0 deletions test/support/data_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Plausible.DataCase do
quote do
use Plausible.Repo
use Plausible.TestUtils
use Plausible

import Ecto.Changeset
import Plausible.DataCase
Expand Down