Skip to content

Commit 294c284

Browse files
authored
feat: use feature provider invalidator worker (#72)
Update applications to refresh cached feature flags / machines when they are updated. This was already the case in zebra and guard, and now we add it to all the other applications consuming feature flags too. Ref: renderedtext/project-tasks#2202
1 parent 4977fb1 commit 294c284

File tree

14 files changed

+286
-72
lines changed

14 files changed

+286
-72
lines changed

ee/audit/lib/audit/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ defmodule Audit.Application do
2525
{{Audit.Streamer.Scheduler, []}, enabled?("START_STREAMER")},
2626
{{Cachex, [name: Audit.Cache]}, true},
2727
{provider, System.get_env("FEATURE_YAML_PATH") != nil},
28+
{{Audit.FeatureProviderInvalidatorWorker, []}, true},
2829
{Supervisor.child_spec({Cachex, :feature_provider_cache}, id: :feature_provider_cache),
2930
true}
3031
]
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
defmodule Audit.FeatureProviderInvalidatorWorker do
2+
require Logger
3+
4+
@doc """
5+
This module consumes RabbitMQ feature and machine state change events
6+
and invalidates features and machines caches.
7+
"""
8+
9+
use Tackle.Multiconsumer,
10+
url: Application.get_env(:audit, :amqp_url),
11+
service: "audit",
12+
routes: [
13+
{"feature_exchange", "features_changed", :features_changed},
14+
{"feature_exchange", "organization_features_changed", :organization_features_changed}
15+
],
16+
# This queue is used to consume events from the feature exchange.
17+
# It is declared as non-durable, auto-delete and exclusive.
18+
# This means that the queue will be deleted when the consumer disconnects.
19+
# This is the desired behavior, because these events are used to invalidate pod-level caches.
20+
queue: :dynamic,
21+
queue_opts: [
22+
durable: false,
23+
auto_delete: true,
24+
exclusive: true
25+
],
26+
connection_id: Audit.FeatureProviderInvalidatorWorker
27+
28+
def features_changed(_message) do
29+
log("invalidating features")
30+
{:ok, _} = FeatureProvider.list_features(reload: true)
31+
:ok
32+
end
33+
34+
def organization_features_changed(message) do
35+
event = InternalApi.Feature.OrganizationFeaturesChanged.decode(message)
36+
log("invalidating features for org #{event.org_id}")
37+
{:ok, _} = FeatureProvider.list_features(reload: true, param: event.org_id)
38+
:ok
39+
end
40+
41+
defp log(message) do
42+
Logger.info("[FEATURE PROVIDER INVALIDATOR WORKER] #{message}")
43+
end
44+
end

ee/audit/lib/internal_api/feature.pb.ex

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,48 @@ defmodule InternalApi.Feature.Availability do
221221
field(:quantity, 2, type: :uint32)
222222
end
223223

224+
defmodule InternalApi.Feature.MachinesChanged do
225+
@moduledoc false
226+
use Protobuf, syntax: :proto3
227+
@type t :: %__MODULE__{}
228+
229+
defstruct []
230+
end
231+
232+
defmodule InternalApi.Feature.OrganizationMachinesChanged do
233+
@moduledoc false
234+
use Protobuf, syntax: :proto3
235+
236+
@type t :: %__MODULE__{
237+
org_id: String.t()
238+
}
239+
240+
defstruct [:org_id]
241+
242+
field(:org_id, 1, type: :string)
243+
end
244+
245+
defmodule InternalApi.Feature.FeaturesChanged do
246+
@moduledoc false
247+
use Protobuf, syntax: :proto3
248+
@type t :: %__MODULE__{}
249+
250+
defstruct []
251+
end
252+
253+
defmodule InternalApi.Feature.OrganizationFeaturesChanged do
254+
@moduledoc false
255+
use Protobuf, syntax: :proto3
256+
257+
@type t :: %__MODULE__{
258+
org_id: String.t()
259+
}
260+
261+
defstruct [:org_id]
262+
263+
field(:org_id, 1, type: :string)
264+
end
265+
224266
defmodule InternalApi.Feature.FeatureService.Service do
225267
@moduledoc false
226268
use GRPC.Service, name: "InternalApi.Feature.FeatureService"

front/config/config.exs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ config :front,
2929

3030
config :front, default_user_name: "Semaphore User"
3131

32-
config :front, :cache_settings, organization_features_ttl: :timer.minutes(15)
33-
3432
config :feature_provider, provider: {Front.FeatureHubProvider, []}
3533
config :front, :superjerry_client, {Support.FakeClients.Superjerry, []}
3634
config :front, :scouter_client, {Front.Clients.Scouter, []}

front/config/test.exs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ config :wallaby, max_wait_time: 10_000
6464

6565
config :joken, current_time_adapter: Support.TimeMock
6666

67-
config :front, :cache_settings, organization_features_ttl: 0
6867
config :front, guard_grpc_timeout: 1_000
6968
config :front, permission_patrol_timeout: 1_000
7069

front/lib/front/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ defmodule Front.Application do
2121
{Phoenix.PubSub, [name: Front.PubSub, adapter: Phoenix.PubSub.PG2]},
2222
FrontWeb.Endpoint,
2323
{Task.Supervisor, [name: Front.TaskSupervisor]},
24-
Front.Tracing.Store
24+
Front.Tracing.Store,
25+
Front.FeatureProviderInvalidatorWorker
2526
] ++ reactor() ++ cache() ++ telemetry() ++ feature_provider(provider)
2627

2728
opts = [strategy: :one_for_one, name: Front.Supervisor]

front/lib/front/feature_hub_provider.ex

Lines changed: 3 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -11,75 +11,21 @@ defmodule Front.FeatureHubProvider do
1111

1212
import Front.Utils
1313

14-
@cache_version :crypto.hash(:md5, File.read(__ENV__.file) |> elem(1)) |> Base.encode64()
15-
16-
defp cache_key(org_id, operation),
17-
do: "feature_hub/#{@cache_version}/#{org_id}/#{operation}"
18-
19-
@impl FeatureProvider.Provider
20-
def provide_features(org_id, opts \\ []) do
21-
use_cache? = Keyword.get(opts, :use_cache, true)
22-
23-
if use_cache? do
24-
cache_fetch(org_id, "list_organization_features", fn ->
25-
do_list_features(org_id, update_cache: true)
26-
end)
27-
else
28-
do_list_features(org_id)
29-
end
30-
end
31-
3214
@impl FeatureProvider.Provider
33-
def provide_machines(org_id, opts \\ []) do
34-
use_cache? = Keyword.get(opts, :use_cache, true)
35-
36-
if use_cache? do
37-
cache_fetch(org_id, "list_organization_machines", fn ->
38-
do_list_machines(org_id, update_cache: true)
39-
end)
40-
else
41-
do_list_machines(org_id)
42-
end
43-
end
44-
45-
defp cache_fetch(org_id, operation, callback) do
46-
cache_key(org_id, operation)
47-
|> Front.Cache.get()
48-
|> case do
49-
{:ok, results} ->
50-
Watchman.increment({"feature_hub.#{operation}.cache_hit", [org_id]})
51-
ok(Front.Cache.decode(results))
52-
53-
{:not_cached, _} ->
54-
Watchman.increment({"feature_hub.#{operation}.cache_miss", [org_id]})
55-
callback.()
56-
end
57-
end
58-
59-
defp do_list_features(org_id, opts \\ []) do
60-
update_cache = Keyword.get(opts, :update_cache, false)
61-
15+
def provide_features(org_id, _opts \\ []) do
6216
FeatureClient.list_organization_features(%{org_id: org_id})
6317
|> unwrap(fn response ->
6418
features =
6519
response.organization_features
6620
|> Enum.map(&feature_from_grpc/1)
6721
|> Enum.filter(&FeatureProvider.Feature.visible?/1)
6822

69-
if update_cache do
70-
Front.Async.run(fn ->
71-
cache_key(org_id, "list_organization_features")
72-
|> Front.Cache.set(Front.Cache.encode(features), cache_ttl())
73-
end)
74-
end
75-
7623
ok(features)
7724
end)
7825
end
7926

80-
defp do_list_machines(org_id, opts \\ []) do
81-
update_cache = Keyword.get(opts, :update_cache, false)
82-
27+
@impl FeatureProvider.Provider
28+
def provide_machines(org_id, _opts \\ []) do
8329
FeatureClient.list_organization_machines(%{org_id: org_id})
8430
|> unwrap(fn response ->
8531
machines =
@@ -88,13 +34,6 @@ defmodule Front.FeatureHubProvider do
8834
|> Enum.filter(&FeatureProvider.Machine.enabled?/1)
8935
|> order_by_machine_type()
9036

91-
if update_cache do
92-
Front.Async.run(fn ->
93-
cache_key(org_id, "list_organization_machines")
94-
|> Front.Cache.set(Front.Cache.encode(machines), cache_ttl())
95-
end)
96-
end
97-
9837
ok(machines)
9938
end)
10039
end
@@ -145,8 +84,4 @@ defmodule Front.FeatureHubProvider do
14584
|> String.split("-", parts: 2)
14685
end)
14786
end
148-
149-
defp cache_ttl do
150-
Application.get_env(:front, :cache_settings)[:features_ttl]
151-
end
15287
end
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
defmodule Front.FeatureProviderInvalidatorWorker do
2+
require Logger
3+
4+
@doc """
5+
This module consumes RabbitMQ feature and machine state change events
6+
and invalidates features and machines caches.
7+
"""
8+
9+
use Tackle.Multiconsumer,
10+
url: Application.get_env(:front, :amqp_url),
11+
service: "front",
12+
routes: [
13+
{"feature_exchange", "machines_changed", :machines_changed},
14+
{"feature_exchange", "organization_machines_changed", :organization_machines_changed},
15+
{"feature_exchange", "features_changed", :features_changed},
16+
{"feature_exchange", "organization_features_changed", :organization_features_changed}
17+
],
18+
# This queue is used to consume events from the feature exchange.
19+
# It is declared as non-durable, auto-delete and exclusive.
20+
# This means that the queue will be deleted when the consumer disconnects.
21+
# This is the desired behavior, because these events are used to invalidate pod-level caches.
22+
queue: :dynamic,
23+
queue_opts: [
24+
durable: false,
25+
auto_delete: true,
26+
exclusive: true
27+
],
28+
connection_id: Front.FeatureProviderInvalidatorWorker
29+
30+
def machines_changed(_message) do
31+
log("invalidating machines")
32+
{:ok, _} = FeatureProvider.list_machines(reload: true)
33+
:ok
34+
end
35+
36+
def organization_machines_changed(message) do
37+
event = InternalApi.Feature.OrganizationMachinesChanged.decode(message)
38+
log("invalidating machines for org #{event.org_id}")
39+
{:ok, _} = FeatureProvider.list_machines(reload: true, param: event.org_id)
40+
:ok
41+
end
42+
43+
def features_changed(_message) do
44+
log("invalidating features")
45+
{:ok, _} = FeatureProvider.list_features(reload: true)
46+
:ok
47+
end
48+
49+
def organization_features_changed(message) do
50+
event = InternalApi.Feature.OrganizationFeaturesChanged.decode(message)
51+
log("invalidating features for org #{event.org_id}")
52+
{:ok, _} = FeatureProvider.list_features(reload: true, param: event.org_id)
53+
:ok
54+
end
55+
56+
defp log(message) do
57+
Logger.info("[FEATURE PROVIDER INVALIDATOR WORKER] #{message}")
58+
end
59+
end

github_notifier/lib/github_notifier/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ defmodule GithubNotifier.Application do
2222
{{Services.PipelineStartedNotifier, []}, enabled?("START_CONSUMERS")},
2323
{{Services.PipelineFinishedNotifier, []}, enabled?("START_CONSUMERS")},
2424
{{Services.PipelineSummaryAvailableNotifier, []}, enabled?("START_CONSUMERS")},
25+
{{GithubNotifier.FeatureProviderInvalidatorWorker, []}, true},
2526
feature_provider()
2627
])
2728

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
defmodule GithubNotifier.FeatureProviderInvalidatorWorker do
2+
require Logger
3+
4+
@doc """
5+
This module consumes RabbitMQ feature and machine state change events
6+
and invalidates features and machines caches.
7+
"""
8+
9+
use Tackle.Multiconsumer,
10+
url: Application.get_env(:github_notifier, :amqp_url),
11+
service: "github_notifier",
12+
routes: [
13+
{"feature_exchange", "features_changed", :features_changed},
14+
{"feature_exchange", "organization_features_changed", :organization_features_changed}
15+
],
16+
# This queue is used to consume events from the feature exchange.
17+
# It is declared as non-durable, auto-delete and exclusive.
18+
# This means that the queue will be deleted when the consumer disconnects.
19+
# This is the desired behavior, because these events are used to invalidate pod-level caches.
20+
queue: :dynamic,
21+
queue_opts: [
22+
durable: false,
23+
auto_delete: true,
24+
exclusive: true
25+
],
26+
connection_id: GithubNotifier.FeatureProviderInvalidatorWorker
27+
28+
def features_changed(_message) do
29+
log("invalidating features")
30+
{:ok, _} = FeatureProvider.list_features(reload: true)
31+
:ok
32+
end
33+
34+
def organization_features_changed(message) do
35+
event = InternalApi.Feature.OrganizationFeaturesChanged.decode(message)
36+
log("invalidating features for org #{event.org_id}")
37+
{:ok, _} = FeatureProvider.list_features(reload: true, param: event.org_id)
38+
:ok
39+
end
40+
41+
defp log(message) do
42+
Logger.info("[FEATURE PROVIDER INVALIDATOR WORKER] #{message}")
43+
end
44+
end

0 commit comments

Comments
 (0)