|
16 | 16 |
|
17 | 17 | package org.springframework.integration.redis.outbound; |
18 | 18 |
|
| 19 | +import java.util.function.Function; |
| 20 | + |
19 | 21 | import reactor.core.publisher.Mono; |
20 | 22 |
|
21 | 23 | import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; |
| 24 | +import org.springframework.data.redis.connection.RedisStreamCommands; |
22 | 25 | import org.springframework.data.redis.connection.stream.Record; |
23 | 26 | import org.springframework.data.redis.connection.stream.StreamRecords; |
24 | 27 | import org.springframework.data.redis.core.ReactiveRedisTemplate; |
@@ -60,6 +63,9 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa |
60 | 63 | @Nullable |
61 | 64 | private HashMapper<String, ?, ?> hashMapper; |
62 | 65 |
|
| 66 | + @Nullable |
| 67 | + private Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction; |
| 68 | + |
63 | 69 | /** |
64 | 70 | * Create an instance based on provided {@link ReactiveRedisConnectionFactory} and key for stream. |
65 | 71 | * @param connectionFactory the {@link ReactiveRedisConnectionFactory} to use |
@@ -106,6 +112,16 @@ public void setExtractPayload(boolean extractPayload) { |
106 | 112 | this.extractPayload = extractPayload; |
107 | 113 | } |
108 | 114 |
|
| 115 | + /** |
| 116 | + * Set a function to create a {@link RedisStreamCommands.XAddOptions} based on the request message. |
| 117 | + * Cannot be null and cannot return null. |
| 118 | + * @param addOptionsFunction the function to provide a {@link RedisStreamCommands.XAddOptions}. |
| 119 | + * @since 6.5 |
| 120 | + */ |
| 121 | + public void setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) { |
| 122 | + this.addOptionsFunction = addOptionsFunction; |
| 123 | + } |
| 124 | + |
109 | 125 | @Override |
110 | 126 | public String getComponentType() { |
111 | 127 | return "redis:stream-outbound-channel-adapter"; |
@@ -145,7 +161,12 @@ protected Mono<Void> handleMessageInternal(Message<?> message) { |
145 | 161 | StreamRecords.objectBacked(value) |
146 | 162 | .withStreamKey(streamKey); |
147 | 163 |
|
148 | | - return this.reactiveStreamOperations.add(record); |
| 164 | + if (this.addOptionsFunction == null) { |
| 165 | + return this.reactiveStreamOperations.add(record); |
| 166 | + } |
| 167 | + else { |
| 168 | + return this.reactiveStreamOperations.add(record, this.addOptionsFunction.apply(message)); |
| 169 | + } |
149 | 170 | }) |
150 | 171 | .then(); |
151 | 172 | } |
|
0 commit comments