File tree Expand file tree Collapse file tree 2 files changed +19
-9
lines changed
src/main/groovy/io/seqera/data/stream/impl Expand file tree Collapse file tree 2 files changed +19
-9
lines changed Original file line number Diff line number Diff line change 1- 1.1.0
1+ 1.1.1
Original file line number Diff line number Diff line change @@ -182,14 +182,24 @@ class RedisMessageStream implements MessageStream<String> {
182182 final params = new XAutoClaimParams ()
183183 // claim one entry at time
184184 .count(1 )
185- final messages = jedis. xautoclaim(
186- streamId,
187- config. getDefaultConsumerGroupName(),
188- consumerName,
189- config. claimTimeoutMillis,
190- STREAM_ENTRY_ZERO ,
191- params
192- )
185+ def messages
186+ try {
187+ messages = jedis. xautoclaim(
188+ streamId,
189+ config. getDefaultConsumerGroupName(),
190+ consumerName,
191+ config. claimTimeoutMillis,
192+ STREAM_ENTRY_ZERO ,
193+ params
194+ )
195+ } catch (JedisDataException e) {
196+ if (e. message. contains(" NOGROUP" )) {
197+ // The group does not exist. We initialize it and avoid printing the exception
198+ log. info " Redis message stream - consume group=$streamId do not exist"
199+ init(streamId)
200+ }
201+ throw e
202+ }
193203 final entry = messages?. getValue()? [0 ]
194204 if ( entry!= null )
195205 log. trace " Redis stream id=$streamId ; claimed entry=$entry "
You can’t perform that action at this time.
0 commit comments