Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ RQ (Redis Queue) is a simple Clojure package for queueing jobs and processing th

We distribute the library via [Clojars](https://clojars.org/com.moclojer/rq).


[![Clojars Project](https://img.shields.io/clojars/v/com.moclojer/rq.svg)](https://clojars.org/com.moclojer/rq)

```edn
Expand Down Expand Up @@ -166,7 +165,35 @@ The `clj-rq` library provides a set of pub/sub functions that facilitate message
```clojure
(rq-pubsub/unarquive-channel! client "my-channel")
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'unarquive' to 'unarchive'.

Copilot uses AI. Check for mistakes.
```


### pool configuration

`rq/create-client` applies a tuned `GenericObjectPoolConfig` by default (`maxTotal` 128, `minIdle` 16, health checks enabled, etc.).
You can override these settings by passing an options map as the second argument.

```clojure
(def *redis-pool*
(rq/create-client "redis://localhost:6379/0"
{:pool-config {:max-total 64
:max-wait-ms 2_000
:test-on-borrow false}}))
Comment on lines +177 to +179
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the numeric literal example.

Clojure’s reader still rejects digit separators like 2_000, so copying this snippet into the REPL will fail. Please swap the literal for a form that actually reads (e.g. 2000). (ask.clojure.org)

🤖 Prompt for AI Agents
In README.md around lines 177 to 179, the numeric literal example uses a digit
separator (`2_000`) which the Clojure reader rejects; replace the literal with a
valid one (for example `2000`) so the snippet reads and evaluates in the REPL;
update the pool-config example accordingly and ensure no other numeric literals
in the surrounding example use underscore separators.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, send update commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avelino, here's the corrected code for the pool configuration example:

(def *redis-pool*
  (rq/create-client "redis://localhost:6379/0"
                    {:pool-config {:max-total 64
                                   :max-wait-ms 2000
                                   :test-on-borrow false}}))

The numeric literal has been changed from 2_000 to 2000 so it will properly read and evaluate in the REPL. You can now commit this change.


🧠 Learnings used
Learnt from: J0sueTM
Repo: moclojer/clj-rq PR: 0
File: :0-0
Timestamp: 2024-09-05T10:25:29.194Z
Learning: The `src/com/moclojer/rq/pubsub.clj` file contains the following functions: `publish!`, `group-handlers-by-channel`, `create-listener`, `unarquive-channel!`, `pack-workers-channels`, and `subscribe!`.

Learnt from: J0sueTM
Repo: moclojer/clj-rq PR: 0
File: :0-0
Timestamp: 2024-10-18T16:07:52.609Z
Learning: The `src/com/moclojer/rq/pubsub.clj` file contains the following functions: `publish!`, `group-handlers-by-channel`, `create-listener`, `unarquive-channel!`, `pack-workers-channels`, and `subscribe!`.

Learnt from: J0sueTM
Repo: moclojer/clj-rq PR: 6
File: src/com/moclojer/rq/queue.clj:10-13
Timestamp: 2024-10-08T19:16:22.185Z
Learning: In the `push!` function of `src/com/moclojer/rq/queue.clj`, options are passed directly as keyword arguments, not within a map.

Learnt from: J0sueTM
Repo: moclojer/clj-rq PR: 6
File: src/com/moclojer/rq/queue.clj:10-13
Timestamp: 2024-07-05T20:37:20.365Z
Learning: In the `push!` function of `src/com/moclojer/rq/queue.clj`, options are passed directly as keyword arguments, not within a map.

```

Supported keys inside `:pool-config` include:

- `:max-total`, `:max-idle`, `:min-idle`
- `:max-wait-ms`, `:time-between-eviction-runs-ms`, `:min-evictable-idle-ms`, `:soft-min-evictable-idle-ms`
- `:test-on-borrow`, `:test-on-return`, `:test-on-create`, `:test-while-idle`
- `:num-tests-per-eviction-run`, `:block-when-exhausted`, `:fairness`, `:lifo`
- `:jmx-enabled`, `:jmx-name-base`, `:jmx-name-prefix`, `:eviction-policy-class-name`, `:evictor-shutdown-timeout-ms`

To skip the tuned defaults and rely on Jedis' own configuration, use `:pool-config :skip`.
If you want to set values without inheriting the defaults, add `{:inherit-defaults? false ...}` to the `:pool-config` map.

```clojure
(rq/create-client "redis://localhost:6379/0" {:pool-config :skip})
```

## complete example

```clojure
Expand Down Expand Up @@ -234,7 +261,7 @@ sequenceDiagram
User->>Client: close-client client
Client-->>Logger: log closing client
Client-->>User: confirm client closure
```
```

---

Expand Down
133 changes: 127 additions & 6 deletions src/com/moclojer/rq.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(ns com.moclojer.rq
(:import
[java.net URI URISyntaxException]
[org.apache.commons.pool2.impl GenericObjectPoolConfig]
[redis.clients.jedis JedisPooled]))

(def version "0.2.2")
Expand All @@ -9,14 +11,133 @@
^{:private true :dynamic true}
*redis-pool* (ref nil))

(def ^:private default-pool-settings
{:max-total 128
:max-idle 128
:min-idle 16
:max-wait-ms 1000
:test-on-borrow true
:test-on-return true
:test-on-create true
:test-while-idle true
:min-evictable-idle-ms 60000
:soft-min-evictable-idle-ms 60000
:time-between-eviction-runs-ms 30000
:num-tests-per-eviction-run 3
:block-when-exhausted true})

(def ^:private pool-config-setters
{:max-total (fn [^GenericObjectPoolConfig cfg v]
(.setMaxTotal cfg (int v)))
:max-idle (fn [^GenericObjectPoolConfig cfg v]
(.setMaxIdle cfg (int v)))
:min-idle (fn [^GenericObjectPoolConfig cfg v]
(.setMinIdle cfg (int v)))
:max-wait-ms (fn [^GenericObjectPoolConfig cfg v]
(.setMaxWaitMillis cfg (long v)))
:test-on-borrow (fn [^GenericObjectPoolConfig cfg v]
(.setTestOnBorrow cfg (boolean v)))
:test-on-return (fn [^GenericObjectPoolConfig cfg v]
(.setTestOnReturn cfg (boolean v)))
:test-on-create (fn [^GenericObjectPoolConfig cfg v]
(.setTestOnCreate cfg (boolean v)))
:test-while-idle (fn [^GenericObjectPoolConfig cfg v]
(.setTestWhileIdle cfg (boolean v)))
:min-evictable-idle-ms (fn [^GenericObjectPoolConfig cfg v]
(.setMinEvictableIdleTimeMillis cfg (long v)))
:soft-min-evictable-idle-ms (fn [^GenericObjectPoolConfig cfg v]
(.setSoftMinEvictableIdleTimeMillis cfg (long v)))
:time-between-eviction-runs-ms (fn [^GenericObjectPoolConfig cfg v]
(.setTimeBetweenEvictionRunsMillis cfg (long v)))
:num-tests-per-eviction-run (fn [^GenericObjectPoolConfig cfg v]
(.setNumTestsPerEvictionRun cfg (int v)))
:eviction-policy-class-name (fn [^GenericObjectPoolConfig cfg v]
(.setEvictionPolicyClassName cfg (str v)))
:block-when-exhausted (fn [^GenericObjectPoolConfig cfg v]
(.setBlockWhenExhausted cfg (boolean v)))
:fairness (fn [^GenericObjectPoolConfig cfg v]
(.setFairness cfg (boolean v)))
:lifo (fn [^GenericObjectPoolConfig cfg v]
(.setLifo cfg (boolean v)))
:jmx-enabled (fn [^GenericObjectPoolConfig cfg v]
(.setJmxEnabled cfg (boolean v)))
:jmx-name-base (fn [^GenericObjectPoolConfig cfg v]
(.setJmxNameBase cfg (str v)))
:jmx-name-prefix (fn [^GenericObjectPoolConfig cfg v]
(.setJmxNamePrefix cfg (str v)))
:evictor-shutdown-timeout-ms (fn [^GenericObjectPoolConfig cfg v]
(.setEvictorShutdownTimeoutMillis cfg (long v)))})

(defn- build-pool-config
"Return a GenericObjectPoolConfig instance configured with the supplied settings map.
Throws when unknown keys are passed."
[settings]
(when (seq settings)
(let [cfg (GenericObjectPoolConfig.)]
(doseq [[k v] settings]
(when (some? v)
(if-let [setter (get pool-config-setters k)]
(setter cfg v)
(throw (ex-info (str "Unsupported pool configuration key: " k)
{:cause :invalid-pool-config-key
:key k
:allowed (set (keys pool-config-setters))})))))
cfg)))

(defn- resolve-pool-config
"Accepts nil, a map of settings, :skip, or an existing GenericObjectPoolConfig instance."
[pool-config]
(cond
(instance? GenericObjectPoolConfig pool-config) pool-config
(= :skip pool-config) nil
(map? pool-config) (let [{:keys [inherit-defaults?] :or {inherit-defaults? true}} pool-config
settings (dissoc pool-config :inherit-defaults?)]
(build-pool-config
(if inherit-defaults?
(merge default-pool-settings settings)
settings)))
(nil? pool-config) (build-pool-config default-pool-settings)
:else (throw (ex-info "Pool configuration must be nil, :skip, a map or GenericObjectPoolConfig instance."
{:cause :invalid-pool-config
:value pool-config}))))

(defn- create-pool
[url pool-config]
(let [cfg (resolve-pool-config pool-config)]
(try
(if cfg
(JedisPooled. cfg (URI. url))
(JedisPooled. url))
(catch URISyntaxException e
(throw (ex-info (str "Invalid Redis URL: " url)
{:cause :invalid-redis-url
:url url}
e))))))

(defn create-client
"Connect to redis client. If `ref?` is true, will save the created instance
in the global var `*redis-pool*. Just returns the created instance otherwise."
"Connect to redis client.
If `ref?` (or `:ref?` in the options map) is true, store the created instance
in the global var `*redis-pool*`. Otherwise, returns an atom holding the created instance.
Pool configuration can be provided through `pool-config` or `{:pool-config ...}`."
([url]
(create-client url false))
([url ref?]
(let [pool (JedisPooled. url)]
(if (and ref? (not @*redis-pool*))
(create-client url false nil))
([url ref-or-opts]
(if (map? ref-or-opts)
(let [allowed #{:ref? :pool-config}
unknown (seq (remove allowed (keys ref-or-opts)))]
(when unknown
(throw (ex-info "Unsupported option keys provided to create-client."
{:cause :invalid-create-client-options
:unknown (set unknown)
:allowed allowed})))
(create-client url
(boolean (:ref? ref-or-opts))
(:pool-config ref-or-opts)))
(create-client url ref-or-opts nil)))
([url ref? pool-config]
(let [pool (create-pool url pool-config)
store? (boolean ref?)]
(if (and store? (not @*redis-pool*))
(dosync
(ref-set *redis-pool* pool)
*redis-pool*)
Expand Down
51 changes: 49 additions & 2 deletions test/com/moclojer/rq_test.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,60 @@
(ns com.moclojer.rq-test
(:require
[clojure.test :as t]
[com.moclojer.rq :as rq]))
[com.moclojer.rq :as rq])
(:import
[org.apache.commons.pool2.impl GenericObjectPoolConfig]))

;; WARNING: redis needs to be runing.
;; WARNING: redis needs to be running.
(t/deftest create-client-test
(t/testing "redis-client being created"
(let [client (rq/create-client "redis://localhost:6379")]
(t/is (.. @client getPool getResource))
(rq/close-client client))))

(t/deftest pool-config-resolution-test
(t/testing "default settings are applied"
(let [^GenericObjectPoolConfig cfg (#'rq/resolve-pool-config nil)]
(t/is (= 128 (.getMaxTotal cfg)))
(t/is (= 128 (.getMaxIdle cfg)))
(t/is (= 16 (.getMinIdle cfg)))
(t/is (= 1000 (.getMaxWaitMillis cfg)))
(t/is (.getTestOnBorrow cfg))
(t/is (.getTestOnReturn cfg))
(t/is (.getTestOnCreate cfg))
(t/is (.getTestWhileIdle cfg))
(t/is (= 60000 (.getMinEvictableIdleTimeMillis cfg)))
(t/is (= 60000 (.getSoftMinEvictableIdleTimeMillis cfg)))
(t/is (= 30000 (.getTimeBetweenEvictionRunsMillis cfg)))
(t/is (= 3 (.getNumTestsPerEvictionRun cfg)))
(t/is (.getBlockWhenExhausted cfg))))

(t/testing "custom overrides keep defaults"
(let [^GenericObjectPoolConfig cfg (#'rq/resolve-pool-config {:max-total 64
:max-wait-ms 2500})]
(t/is (= 64 (.getMaxTotal cfg)))
(t/is (= 2500 (.getMaxWaitMillis cfg)))
(t/is (= 16 (.getMinIdle cfg)))))

(t/testing "custom overrides without defaults"
(let [^GenericObjectPoolConfig cfg (#'rq/resolve-pool-config {:inherit-defaults? false
:max-total 10})]
(t/is (= 10 (.getMaxTotal cfg)))
(t/is (not (.getTestOnBorrow cfg)))))

(t/testing "returning an instance as-is"
(let [cfg (doto (GenericObjectPoolConfig.)
(.setMaxTotal 5))]
(t/is (identical? cfg (#'rq/resolve-pool-config cfg)))))

(t/testing "skip configuration"
(t/is (nil? (#'rq/resolve-pool-config :skip)))))

(t/deftest create-client-with-options-test
(t/testing "create-client accepts option map"
(let [client (rq/create-client "redis://localhost:6379"
{:pool-config :skip})]
(t/is (instance? clojure.lang.Atom client))
(rq/close-client client))))


Loading