Skip to content

Commit 0b7cca8

Browse files
authored
fix(zebra): replace Task.async with Wormhole to fix Cachex issue (#612)
## 📝 Description Replaced `Task.async/await` with the `Wormhole` library in `Zebra.Workers.Scheduler.Org` to resolve Cachex blocking issues. When using `Task.async/await` inside `Cachex.fetch/3`, if the fallback function crashed or timed out, subsequent calls would block indefinitely, as described in [the linked Cachex issue](whitfin/cachex#220). By switching to `Wormhole.capture/4`, the call is executed in a supervised process with timeout handling and proper cleanup, preventing Cachex lockups. Added tests for error handling, timeouts, and caching behavior. More details in [this task](renderedtext/tasks#7605). ## ✅ Checklist - [x] I have tested this change - [x] ~This change requires documentation update~ N/A
1 parent 082c32b commit 0b7cca8

File tree

2 files changed

+108
-14
lines changed

2 files changed

+108
-14
lines changed

zebra/lib/zebra/workers/scheduler/org.ex

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,27 @@ defmodule Zebra.Workers.Scheduler.Org do
1818
"""
1919
def load(org_id) do
2020
Zebra.Cache.fetch!("quotas-#{org_id}-v3", @cache_timeout, fn ->
21-
cold_load(org_id)
21+
result =
22+
Wormhole.capture(__MODULE__, :fetch_org, [org_id],
23+
timeout: 10_500,
24+
stacktrace: true,
25+
skip_log: true
26+
)
27+
28+
case result do
29+
{:ok, {:ok, org}} ->
30+
{:commit, {:ok, new(org, org_id)}}
31+
32+
{:ok, error} ->
33+
{:ignore, error}
34+
35+
error ->
36+
{:ignore, error}
37+
end
2238
end)
2339
end
2440

25-
defp cold_load(org_id) do
26-
find_org = Task.async(fn -> fetch_org(org_id) end)
27-
28-
case Task.await(find_org) do
29-
{:ok, org} ->
30-
{:ok, new(org, org_id)}
31-
32-
e ->
33-
e
34-
end
35-
end
36-
37-
defp fetch_org(org_id) do
41+
def fetch_org(org_id) do
3842
alias InternalApi.Organization.DescribeRequest, as: Request
3943
alias InternalApi.Organization.OrganizationService.Stub
4044

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
defmodule Zebra.Workers.Scheduler.OrgTest do
2+
use Zebra.DataCase
3+
alias Zebra.Workers.Scheduler.Org
4+
5+
describe "load with error handling" do
6+
@org_id Ecto.UUID.generate()
7+
8+
setup do
9+
# Reset the cache before each test
10+
Cachex.clear(:zebra_cache)
11+
:ok
12+
end
13+
14+
test "returns data without caching when error is returned" do
15+
# First set up a failure
16+
alias Support.FakeServers.OrganizationApi, as: OrgApi
17+
18+
GrpcMock.stub(OrgApi, :describe, fn _, _ ->
19+
{:error, "Connection error"}
20+
end)
21+
22+
# Load should return error and not cache it
23+
assert {:error, _} = Org.load(@org_id)
24+
25+
# Now set up a success response
26+
GrpcMock.stub(OrgApi, :describe, fn _, _ ->
27+
InternalApi.Organization.DescribeResponse.new(
28+
status:
29+
InternalApi.ResponseStatus.new(code: InternalApi.ResponseStatus.Code.value(:OK)),
30+
organization: InternalApi.Organization.Organization.new(org_username: "testing-org")
31+
)
32+
end)
33+
34+
# It should now succeed because the error wasn't cached
35+
assert {:ok, _} = Org.load(@org_id)
36+
end
37+
38+
test "properly handles timeouts and doesn't cache them" do
39+
# First set up a timeout scenario using Wormhole
40+
alias Support.FakeServers.OrganizationApi, as: OrgApi
41+
42+
GrpcMock.stub(OrgApi, :describe, fn _, _ ->
43+
# Sleep to simulate timeout longer than Wormhole's timeout
44+
Process.sleep(15_000)
45+
{:ok, "This shouldn't be returned"}
46+
end)
47+
48+
# Load should return error due to timeout and not cache it
49+
assert {:error, _} = Org.load(@org_id)
50+
51+
# Now set up a success response
52+
GrpcMock.stub(OrgApi, :describe, fn _, _ ->
53+
InternalApi.Organization.DescribeResponse.new(
54+
status:
55+
InternalApi.ResponseStatus.new(code: InternalApi.ResponseStatus.Code.value(:OK)),
56+
organization: InternalApi.Organization.Organization.new(org_username: "testing-org")
57+
)
58+
end)
59+
60+
# It should now succeed because the timeout wasn't cached
61+
assert {:ok, _} = Org.load(@org_id)
62+
end
63+
64+
test "caches successful responses" do
65+
# Set up a success response
66+
alias Support.FakeServers.OrganizationApi, as: OrgApi
67+
68+
GrpcMock.stub(OrgApi, :describe, fn _, _ ->
69+
InternalApi.Organization.DescribeResponse.new(
70+
status:
71+
InternalApi.ResponseStatus.new(code: InternalApi.ResponseStatus.Code.value(:OK)),
72+
organization: InternalApi.Organization.Organization.new(org_username: "testing-org")
73+
)
74+
end)
75+
76+
# First call should succeed
77+
assert {:ok, org} = Org.load(@org_id)
78+
assert org.username == "testing-org"
79+
80+
# Now set up a failure, which shouldn't be used because we'll use the cached value
81+
GrpcMock.stub(OrgApi, :describe, fn _, _ ->
82+
{:error, "Connection error"}
83+
end)
84+
85+
# It should still succeed because we're using the cached value
86+
assert {:ok, org} = Org.load(@org_id)
87+
assert org.username == "testing-org"
88+
end
89+
end
90+
end

0 commit comments

Comments
 (0)