Skip to content

Commit c334b6a

Browse files
committed
prevent duplicate concurrent tasks
1 parent 427c47f commit c334b6a

File tree

1 file changed

+42
-36
lines changed

1 file changed

+42
-36
lines changed

lib/algora_web/live/admin/seed_live.ex

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ defmodule AlgoraWeb.Admin.SeedLive do
171171
</.link>
172172
<% end %>
173173
</div>
174-
<p class="text-muted-foreground line-clamp-1 max-w-[50ch] font-medium text-sm">
175-
{@value.bio}
174+
<p class="text-muted-foreground line-clamp-1 font-medium text-sm">
175+
{@value.domain}
176176
</p>
177177
</div>
178178
</div>
@@ -234,11 +234,14 @@ defmodule AlgoraWeb.Admin.SeedLive do
234234

235235
rows =
236236
rows
237-
|> Stream.map(fn row -> Enum.zip_reduce(cols, row, Map.new(), fn col, val, acc -> Map.put(acc, col, val) end) end)
238-
|> Stream.map(&process_row/1)
239-
|> Task.async_stream(&Map.put(&1, "org", get_user(&1)), max_concurrency: 10, timeout: :infinity)
240-
|> Stream.map(fn {:ok, row} -> row end)
241-
|> Enum.to_list()
237+
|> Enum.map(fn row -> Enum.zip_reduce(cols, row, Map.new(), fn col, val, acc -> Map.put(acc, col, val) end) end)
238+
|> Enum.map(&process_row/1)
239+
240+
rows
241+
|> Enum.uniq_by(&lookup_key/1)
242+
|> Task.async_stream(&get_user/1, max_concurrency: 10, timeout: :infinity)
243+
244+
rows = Enum.map(rows, &Map.put(&1, "org", get_user(&1)))
242245

243246
socket
244247
|> assign(:csv_data, rows)
@@ -255,46 +258,49 @@ defmodule AlgoraWeb.Admin.SeedLive do
255258
)
256259
end
257260

258-
defp get_user(%{"org_handle" => handle} = _row) when is_binary(handle) and handle != "" do
259-
case :ets.lookup(@user_cache_table, handle) do
261+
defp lookup_key(%{"org_handle" => handle} = _row) when is_binary(handle) and handle != "" do
262+
handle
263+
end
264+
265+
defp lookup_key(%{"company_url" => url} = _row) when is_binary(url) and url != "" do
266+
to_domain(url)
267+
end
268+
269+
defp run_cached(key, fun) do
270+
case :ets.lookup(@user_cache_table, key) do
260271
[{_, user}] ->
261272
user
262273

263274
_ ->
264-
with {:ok, user} <- Workspace.ensure_user(Algora.Admin.token(), handle),
265-
{:ok, user} <- Repo.fetch(User, user.id) do
266-
:ets.insert(@user_cache_table, {handle, user})
267-
user
268-
else
269-
_ ->
270-
:ets.insert(@user_cache_table, {handle, nil})
275+
case fun.() do
276+
{:ok, user} ->
277+
:ets.insert(@user_cache_table, {key, user})
278+
user
279+
280+
error ->
281+
Logger.error("Failed to fetch user #{key}: #{inspect(error)}")
282+
:ets.insert(@user_cache_table, {key, nil})
271283
nil
272284
end
273285
end
274286
end
275287

288+
defp get_user(%{"org_handle" => handle} = _row) when is_binary(handle) and handle != "" do
289+
run_cached(handle, fn ->
290+
with {:ok, user} <- Workspace.ensure_user(Algora.Admin.token(), handle) do
291+
Repo.fetch(User, user.id)
292+
end
293+
end)
294+
end
295+
276296
defp get_user(%{"company_url" => url} = row) when is_binary(url) and url != "" do
277-
case :ets.lookup(@user_cache_table, url) do
278-
[{_, user}] ->
279-
user
297+
domain = to_domain(url)
280298

281-
_ ->
282-
domain =
283-
url
284-
|> String.trim_leading("https://")
285-
|> String.trim_leading("http://")
286-
|> String.trim_leading("www.")
287-
288-
with {:ok, user} <- fetch_or_create_user(domain, %{hiring: true, tech_stack: row["tech_stack"]}),
289-
{:ok, user} <- Repo.fetch(User, user.id) do
290-
:ets.insert(@user_cache_table, {url, user})
291-
user
292-
else
293-
_ ->
294-
:ets.insert(@user_cache_table, {url, nil})
295-
nil
296-
end
297-
end
299+
run_cached(domain, fn ->
300+
with {:ok, user} <- fetch_or_create_user(domain, %{hiring: true, tech_stack: row["tech_stack"]}) do
301+
Repo.fetch(User, user.id)
302+
end
303+
end)
298304
end
299305

300306
defp get_user(_row), do: nil

0 commit comments

Comments
 (0)