Skip to content

Commit a7f9937

Browse files
authored
Merge pull request #7 from scripbox/add-queue-length-api
Add Flume.API.job_counts/1
2 parents 0533ce2 + 1a55b63 commit a7f9937

File tree

7 files changed

+80
-13
lines changed

7 files changed

+80
-13
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Changelog
2+
3+
## v0.2.0
4+
* Add `Flume.API.job_counts/1` to get count of jobs in the pipeline which are yet to be processed.

lib/flume/api.ex

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,25 @@ defmodule Flume.API do
7070
end
7171

7272
def worker_context, do: Pipeline.Context.get()
73+
74+
@doc """
75+
Returns count of jobs in the pipeline which are yet to be processed.
76+
77+
## Examples
78+
```
79+
Flume.API.job_counts(["queue-1", "queue-2"])
80+
{:ok, [2, 3]}
81+
82+
Flume.API.job_counts(["queue-1", "not-a-queue-name"])
83+
{:ok, [2, 0]}
84+
```
85+
"""
86+
@spec job_counts(nonempty_list(binary)) ::
87+
{:ok, nonempty_list(Redix.Protocol.redis_value())}
88+
| {:error, atom | Redix.Error.t() | Redix.ConnectionError.t()}
89+
def job_counts(queues), do: Manager.job_counts(namespace(), queues)
90+
91+
defp namespace, do: Config.namespace()
7392
end
7493
end
7594
end

lib/flume/queue/manager.ex

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Flume.Queue.Manager do
22
require Flume.Logger
33

4-
alias Flume.{Config, Event, Logger, Instrumentation, Utils}
4+
alias Flume.{Config, Event, Logger, Instrumentation, Utils, Redis}
55
alias Flume.Redis.Job
66
alias Flume.Queue.Backoff
77
alias Flume.Support.Time, as: TimeExtension
@@ -66,6 +66,20 @@ defmodule Flume.Queue.Manager do
6666
schedule_job_at(queue_name, time_in_seconds, job)
6767
end
6868

69+
def job_counts(namespace, [_queue | _] = queues) do
70+
queues
71+
|> Enum.map(&(fully_qualified_queue_name(namespace, &1) |> Redis.Command.llen()))
72+
|> Redis.Client.pipeline()
73+
|> case do
74+
{:ok, counts} ->
75+
{:ok, Enum.zip(queues, counts)}
76+
77+
{:error, reason} ->
78+
Logger.error("Error in getting job counts #{inspect(reason)}")
79+
{:error, reason}
80+
end
81+
end
82+
6983
def fetch_jobs(
7084
namespace,
7185
queue,
@@ -263,6 +277,8 @@ defmodule Flume.Queue.Manager do
263277
[scheduled_key(namespace), retry_key(namespace)]
264278
end
265279

280+
defp fully_qualified_queue_name(namespace, queue_name), do: "#{namespace}:queue:#{queue_name}"
281+
266282
def processing_key(namespace, queue), do: full_key(namespace, "queue:processing:#{queue}")
267283

268284
def rate_limit_key(namespace, queue, nil), do: full_key(namespace, "queue:limit:#{queue}")

lib/flume/redis/client.ex

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Flume.Redis.Client do
1919
@ltrim "LTRIM"
2020
@rpush "RPUSH"
2121
@lrange "LRANGE"
22-
@llen "LLEN"
2322
@lrem "LREM"
2423
@rpoplpush "RPOPLPUSH"
2524
@sadd "SADD"
@@ -195,13 +194,9 @@ defmodule Flume.Redis.Client do
195194
iex> Flume.Redis.Client.llen!("flume:test:stack")
196195
0
197196
"""
198-
def llen!(list_name) do
199-
query!([@llen, list_name])
200-
end
197+
def llen!(list_name), do: Command.llen(list_name) |> query!()
201198

202-
def llen(list_name) do
203-
query([@llen, list_name])
204-
end
199+
def llen(list_name), do: Command.llen(list_name) |> query()
205200

206201
@doc """
207202
Removes given values from the list.
@@ -224,9 +219,9 @@ defmodule Flume.Redis.Client do
224219
{:error, reason} ->
225220
{:error, reason}
226221

227-
{:ok, reponses} ->
222+
{:ok, responses} ->
228223
success_responses =
229-
reponses
224+
responses
230225
|> Enum.map(fn response ->
231226
case response do
232227
value when value in [:undefined, nil] ->
@@ -269,9 +264,9 @@ defmodule Flume.Redis.Client do
269264
{:error, reason} ->
270265
{:error, reason}
271266

272-
{:ok, reponses} ->
267+
{:ok, responses} ->
273268
success_responses =
274-
reponses
269+
responses
275270
|> Enum.map(fn response ->
276271
case response do
277272
value when value in [:undefined, nil] ->

lib/flume/redis/command.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule Flume.Redis.Command do
66
@hmget "HMGET"
77
@hscan "HSCAN"
88
@hset "HSET"
9+
@llen "LLEN"
910

1011
@doc """
1112
Prepares HDEL commands for list of {hash, key} pairs
@@ -60,4 +61,13 @@ defmodule Flume.Redis.Command do
6061
def hscan(hash, cursor, pattern), do: [@hscan, hash, cursor, @match, pattern]
6162

6263
def hset(hash, key, value), do: [@hset, hash, key, value]
64+
65+
@doc """
66+
Returns command for getting the length of a list.
67+
68+
## Examples
69+
iex> Flume.Redis.Command.llen("flume:test:stack")
70+
["LLEN", "flume:test:stack"]
71+
"""
72+
def llen(key), do: [@llen, key]
6373
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Flume.Mixfile do
44
def project do
55
[
66
app: :flume,
7-
version: "0.1.3",
7+
version: "0.2.0",
88
elixir: "~> 1.6",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/flume/queue/manager_test.exs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,4 +243,27 @@ defmodule Flume.Queue.ManagerTest do
243243
)
244244
end
245245
end
246+
247+
describe "job_counts/2" do
248+
test "returns counts of the requested queues" do
249+
jobs = JobFactory.generate_jobs("Elixir.Worker", 10)
250+
251+
[
252+
queue_1,
253+
queue_2,
254+
_
255+
] = Enum.map(1..3, &"test_#{&1}")
256+
257+
Enum.map(1..2, &Job.bulk_enqueue("#{@namespace}:queue:test_#{&1}", jobs))
258+
259+
assert {
260+
:ok,
261+
[
262+
{queue_1, 10},
263+
{queue_2, 10},
264+
{"unknown-queue", 0}
265+
]
266+
} == Manager.job_counts(@namespace, [queue_1, queue_2, "unknown-queue"])
267+
end
268+
end
246269
end

0 commit comments

Comments
 (0)