Skip to content

Commit 21fb21d

Browse files
authored
Merge pull request quarkusio#50124 from Ladicek/redis-improvements
Redis: improvements
2 parents 9fe91fe + 58d7c5f commit 21fb21d

File tree

13 files changed

+1044
-150
lines changed

13 files changed

+1044
-150
lines changed

docs/src/main/asciidoc/redis-reference.adoc

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ There are other methods that can be useful to manipulate counters, such as:
643643
- `set` - to set an initial value if needed
644644
- `decr` and `decrby` - allows decrementing the stored value
645645

646-
==== Communicate with pub/sub
646+
=== Communicate with pub/sub
647647

648648
Redis allows sending _messages_ to channels and listening for these messages.
649649
These features are available from the `pubsub` group.
@@ -710,7 +710,7 @@ public static class MyCache {
710710
}
711711
----
712712

713-
==== Use Redis transactions
713+
=== Use Redis transactions
714714

715715
Redis transactions are slightly different from relational database transactions.
716716
Redis transactions are a batch of commands executed altogether.
@@ -741,7 +741,7 @@ TransactionResult result = ds.withTransaction(tx -> {
741741
});
742742
----
743743

744-
The received `tx` object can also be used to _discard_ the transaction, using: `tx.discard();`.
744+
The received `tx` object can also be used to _discard_ the transaction, using `tx.discard()`.
745745
The returned `TransactionResult` lets you retrieve the result of each command.
746746

747747
When using the reactive variant of the data source, the passed callback is a `Function<ReactiveTransactionalRedisDataSource, Uni<Void>>`:
@@ -761,7 +761,7 @@ Uni<TransactionResult> result = ds.withTransaction(tx -> {
761761

762762
Transaction execution can be conditioned by _keys_.
763763
When a passed key gets modified during the execution of a transaction, the transaction is discarded.
764-
The keys are passed as `String` as a second parameter to the `withTransaction` method:
764+
The keys are passed as ``String``s to the second, vararg parameter of the `withTransaction` method:
765765

766766
[source,java]
767767
----
@@ -776,19 +776,21 @@ IMPORTANT: You cannot use the pub/sub feature from within a transaction.
776776

777777
==== Implement the optimistic locking pattern
778778

779-
To use optimistic locking, you need to use a variant of the `withTransaction` method, allowing the execution of code before the transaction starts.
779+
To use optimistic locking, you need to use a variant of the `withTransaction` method allowing execution of code before the transaction starts.
780780
In other words, it will be executed as follows:
781781

782782
[source]
783783
----
784784
WATCH key
785785
786786
// Pre-transaction block
787-
// ....
787+
// ...
788788
// Produce a result
789789
790790
MULTI
791-
// In transaction code, receive the result produced by the pre-transaction block.
791+
792+
// In transaction code, receive the result produced by the pre-transaction block.
793+
792794
EXEC
793795
----
794796

@@ -801,14 +803,16 @@ OptimisticLockingTransactionResult<Boolean> result = blocking.withTransaction(ds
801803
HashCommands<String, String, String> hashCommands = ds.hash(String.class);
802804
return hashCommands.hexists(key, "field"); // Produce a result (boolean in this case)
803805
},
804-
(exists, tx) -> { // The transactional block, receives the result and the transactional data source
805-
if (exists) {
806-
tx.hash(String.class).hset(key, "field", "new value");
807-
} else {
808-
tx.discard();
809-
}
810-
},
811-
key); // The watched key
806+
(exists, tx) -> {
807+
// The transactional block, receives the result and the transactional data source
808+
if (exists) {
809+
tx.hash(String.class).hset(key, "field", "new value");
810+
} else {
811+
tx.discard();
812+
}
813+
},
814+
// The watched key
815+
key);
812816
----
813817

814818
If one of the watched keys is touched before or during the execution of the pre-transaction or transactional blocks, the transaction is aborted.
@@ -821,9 +825,35 @@ Consequently, the pre-transaction block must use the passed data source to execu
821825
Thus, the commands are emitted from that connection.
822826
These commands must not modify the watched keys.
823827

824-
The transaction is aborted if the pre-transaction block throws an exception (or produces a failure when using the reactive API).
828+
==== Error handling
829+
830+
If the transaction block returns successfully (or completes with an item in case of the reactive API), the transaction is executed.
831+
Note that in this case, some transaction commands may succeed and some may fail; this is standard Redis behavior.
832+
The `TransactionResult.hasErrors()` method indicates whether at least one command failed.
833+
Results of failed commands in the `TransactionResult` are represented as ``Throwable``s.
834+
If the transaction block throws an exception (or completes with a failure in case of the reactive API), the transaction is discarded automatically and the exception is rethrown (or the resulting `Uni` completes with the same failure in case of the reactive API).
835+
836+
In case of the optimistic locking API, if the pre-transaction block returns successfully (or completes with an item in case of the reactive API), the transaction is started and execution proceeds to the transaction block.
837+
If the pre-transaction block throws an exception (or completes with a failure in case of the reactive API), all watched keys are ``UNWATCH``ed automatically and the exception is rethrown (or the resulting `Uni` completes with the same failure in case of the reactive API).
838+
In this case, the transaction is not started and the transaction block is not executed.
839+
840+
==== Transactions in Redis Cluster
841+
842+
In the cluster mode, transaction commands (`MULTI`, `EXEC`, `DISCARD`, `WATCH`, `UNWATCH`) are handled specially.
843+
844+
By default, transactions in cluster are disabled and transaction commands immediately fail.
845+
846+
It is however possible to enable single-node transactions by setting `quarkus.redis.cluster-transactions` to `single-node`.
847+
Note that this setting only makes sense in the cluster mode and is ignored otherwise.
848+
849+
In this mode, the `MULTI` command is queued and is only issued when the next command is executed.
850+
This next command binds the connection to the corresponding node of the Redis cluster (so it should have keys, otherwise the target node is random).
851+
All subsequent commands are targeted to that node.
852+
If some of the subsequent commands have a key that belongs to another node, the command fails on the server side.
853+
If `WATCH` is used before `MULTI`, its key(s) determine to which node the connection is bound and the subsequent `MULTI` is not queued.
854+
If `WATCH` keys belong to multiple nodes, the command fails on the client side.
825855

826-
==== Execute custom commands
856+
=== Execute custom commands
827857

828858
To execute a custom command, or a command not supported by the API, use the following approach:
829859

@@ -927,8 +957,8 @@ The documentation of the Vert.x Redis Client is available on the https://vertx.i
927957

928958
== Configure Redis hosts programmatically
929959

930-
The `RedisHostsProvider` programmatically provides redis hosts.
931-
This allows for configuration of properties like redis connection password coming from other sources.
960+
The `RedisHostsProvider` programmatically provides Redis hosts.
961+
This allows for configuration of properties like Redis connection password coming from other sources.
932962

933963
[NOTE]
934964
====

extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/ReactiveRedisDataSource.java

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult;
3131
import io.quarkus.redis.datasource.transactions.ReactiveTransactionalRedisDataSource;
3232
import io.quarkus.redis.datasource.transactions.TransactionResult;
33-
import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource;
3433
import io.quarkus.redis.datasource.value.ReactiveValueCommands;
3534
import io.smallrye.common.annotation.Experimental;
3635
import io.smallrye.mutiny.Uni;
@@ -50,84 +49,99 @@
5049
public interface ReactiveRedisDataSource {
5150

5251
/**
53-
* Retrieves a {@link ReactiveRedisDataSource} using a single connection with the Redis Server.
54-
* The connection is acquired from the pool and released when the {@code Uni} returned by {@code function} produces
55-
* a {@code null} item or a failure.
52+
* Obtains a {@link ReactiveRedisDataSource} that uses a single connection to the Redis server
53+
* and passes it to the given {@code function}. The connection is acquired from the pool
54+
* and released when the {@code Uni} returned by {@code function} produces an item or a failure.
5655
*
57-
* @param function the function receiving the single-connection data source and producing {@code null} when the
56+
* @param function the function receiving the connection and producing {@code null} when the
5857
* connection can be released.
5958
*/
6059
Uni<Void> withConnection(Function<ReactiveRedisDataSource, Uni<Void>> function);
6160

6261
/**
63-
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
64-
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
65-
* The commands are only executed when the passed block emits the {@code null} item.
62+
* Obtains a {@link ReactiveRedisDataSource} that enqueues commands in a Redis Transaction ({@code MULTI})
63+
* and passes it to the given {@code tx} block. Note that the transaction acquires a single connection
64+
* and all the commands are enqueued on this connection. The commands are only executed when the {@code Uni}
65+
* returned by {@code tx} produces an item.
6666
* <p>
67-
* The results of the commands are retrieved using the produced {@link TransactionResult}.
67+
* The results of the commands can be obtained from the returned {@link TransactionResult}.
68+
* Errors are represented as {@code Throwable}s in the {@code TransactionResult}.
6869
* <p>
69-
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
70-
* In this case, the produced {@link TransactionResult} will be empty.
70+
* The user can discard a transaction using the {@link ReactiveTransactionalRedisDataSource#discard()} method.
71+
* In this case, the produced {@link TransactionResult} is empty.
72+
* If the {@code tx} block completes with a failure, the transaction is discarded automatically and
73+
* the resulting {@code Uni} completes with the same failure.
7174
*
72-
* @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed
73-
* at the end of the block.
75+
* @param tx the consumer receiving the transactional Redis data source. The enqueued commands are only executed
76+
* when this block completes with an item.
7477
*/
7578
Uni<TransactionResult> withTransaction(Function<ReactiveTransactionalRedisDataSource, Uni<Void>> tx);
7679

7780
/**
78-
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
79-
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
80-
* The commands are only executed when the passed block emits the {@code null} item.
81+
* Obtains a {@link ReactiveRedisDataSource} that enqueues commands in a Redis Transaction ({@code WATCH} & {@code MULTI}),
82+
* starts watching the given {@code watchedKeys}, and passes it to the given {@code tx} block.
83+
* Note that the transaction acquires a single connection and all the commands are enqueued on this connection.
84+
* The commands are only executed when the {@code Uni} returned by {@code tx} produces an item.
8185
* <p>
82-
* The results of the commands are retrieved using the produced {@link TransactionResult}.
86+
* The results of the commands can be obtained from the returned {@link TransactionResult}.
87+
* Errors are represented as {@code Throwable}s in the {@code TransactionResult}.
8388
* <p>
84-
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
85-
* In this case, the produced {@link TransactionResult} will be empty.
89+
* The user can discard a transaction using the {@link ReactiveTransactionalRedisDataSource#discard()} method.
90+
* In this case, the produced {@link TransactionResult} is empty.
91+
* If the {@code tx} block completes with a failure, the transaction is discarded automatically and
92+
* the resulting {@code Uni} completes with the same failure.
8693
*
87-
* @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed
88-
* at the end of the block.
94+
* @param tx the consumer receiving the transactional Redis data source. The enqueued commands are only executed
95+
* when this block completes with an item.
8996
* @param watchedKeys the keys to watch during the execution of the transaction. If one of these key is modified before
9097
* the completion of the transaction, the transaction is discarded.
9198
*/
9299
Uni<TransactionResult> withTransaction(Function<ReactiveTransactionalRedisDataSource, Uni<Void>> tx, String... watchedKeys);
93100

94101
/**
95-
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
96-
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
97-
* The commands are only executed when the passed block emits the {@code null} item.
102+
* Obtains a {@link ReactiveRedisDataSource} that enqueues commands in a Redis Transaction ({@code WATCH} & {@code MULTI}),
103+
* starts watching the given {@code watchedKeys}, passes it to the given {@code preTx} block and if that
104+
* doesn't fail, passes it again to the given {@code tx} block. Note that the transaction acquires a single
105+
* connection and all the commands are enqueued on this connection. The commands are only executed when
106+
* the {@code Uni} returned by {@code tx} produces an item.
98107
* <p>
99-
* This variant also allows executing code before the transaction gets started but after the key being watched:
108+
* This variant also allows executing code before the transaction gets started but after the keys are watched:
100109
*
101110
* <pre>
102-
* WATCH key
103-
* // preTxBlock
111+
* WATCH key
112+
* // preTx
104113
* element = ZRANGE k 0 0
105-
* // TxBlock
106-
* MULTI
107-
* ZREM k element
108-
* EXEC
114+
* MULTI
115+
* // tx
116+
* ZREM k element
117+
* EXEC
109118
* </pre>
110119
* <p>
111120
* The {@code preTxBlock} returns a {@link Uni Uni&lt;I&gt;}. The produced value is received by the {@code tx} block,
112121
* which can use that value to execute the appropriate operation in the transaction. The produced value can also be
113-
* retrieved from the produced {@link OptimisticLockingTransactionResult}. Commands issued in the {@code preTxBlock }
114-
* must used the passed (single-connection) {@link ReactiveRedisDataSource} instance.
122+
* obtained from the returned {@link OptimisticLockingTransactionResult}. Commands issued in the {@code preTx}
123+
* block must use the passed (single-connection) {@link ReactiveRedisDataSource} instance.
115124
* <p>
116-
* If the {@code preTxBlock} throws an exception or emits a failure, the transaction is not executed, and the returned
117-
* {@link OptimisticLockingTransactionResult} is empty.
125+
* If the {@code preTx} block completes with a failure, all watched keys are {@code UNWATCH}ed and the returned
126+
* {@code Uni} completes with the same failure.
118127
* <p>
119128
* This construct allows implementing operation relying on optimistic locking.
120-
* The results of the commands are retrieved using the produced {@link OptimisticLockingTransactionResult}.
129+
* The results of the commands can be obtained from the returned {@link OptimisticLockingTransactionResult}.
130+
* Errors are represented as {@code Throwable}s in the {@code OptimisticLockingTransactionResult}.
121131
* <p>
122-
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
123-
* In this case, the produced {@link OptimisticLockingTransactionResult} will be empty.
124-
*
125-
* @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed
126-
* at the end of the block.
132+
* The user can discard a transaction using the {@link ReactiveTransactionalRedisDataSource#discard()} method.
133+
* In this case, the produced {@link OptimisticLockingTransactionResult} is empty.
134+
* If the {@code tx} block completes with a failure, the transaction is discarded automatically and
135+
* the resulting {@code Uni} completes with the same failure.
136+
*
137+
* @param preTx the consumer receiving the Redis data source before the transaction is started but after
138+
* the {@code watchedKeys} are watched.
139+
* @param tx the consumer receiving the transactional Redis data source after the transaction is started.
140+
* The enqueued commands are only executed when this block completes with an item.
127141
* @param watchedKeys the keys to watch during the execution of the transaction. If one of these key is modified before
128142
* the completion of the transaction, the transaction is discarded.
129143
*/
130-
<I> Uni<OptimisticLockingTransactionResult<I>> withTransaction(Function<ReactiveRedisDataSource, Uni<I>> preTxBlock,
144+
<I> Uni<OptimisticLockingTransactionResult<I>> withTransaction(Function<ReactiveRedisDataSource, Uni<I>> preTx,
131145
BiFunction<I, ReactiveTransactionalRedisDataSource, Uni<Void>> tx,
132146
String... watchedKeys);
133147

0 commit comments

Comments
 (0)