Skip to content

Commit e036bcd

Browse files
authored
Merge pull request #799 from supercaracal/add-streams-support
Add Streams support
2 parents 206f50c + 3a16a0e commit e036bcd

File tree

6 files changed

+1122
-186
lines changed

6 files changed

+1122
-186
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ matrix:
5252
env: DRIVER=ruby REDIS_BRANCH=3.2 LOW_TIMEOUT=0.3
5353
- rvm: jruby-9.1.17.0
5454
env: DRIVER=ruby REDIS_BRANCH=4.0 LOW_TIMEOUT=0.3
55+
- rvm: 2.5.3
56+
env: DRIVER=ruby REDIS_BRANCH=5.0
5557

5658
notifications:
5759
irc:

lib/redis.rb

Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,6 +2808,322 @@ def geodist(key, member1, member2, unit = 'm')
28082808
end
28092809
end
28102810

2811+
# Returns the stream information each subcommand.
2812+
#
2813+
# @example stream
2814+
# redis.xinfo(:stream, 'mystream')
2815+
# @example groups
2816+
# redis.xinfo(:groups, 'mystream')
2817+
# @example consumers
2818+
# redis.xinfo(:consumers, 'mystream', 'mygroup')
2819+
#
2820+
# @param subcommand [String] e.g. `stream` `groups` `consumers`
2821+
# @param key [String] the stream key
2822+
# @param group [String] the consumer group name, required if subcommand is `consumers`
2823+
#
2824+
# @return [Hash] information of the stream if subcommand is `stream`
2825+
# @return [Array<Hash>] information of the consumer groups if subcommand is `groups`
2826+
# @return [Array<Hash>] information of the consumers if subcommand is `consumers`
2827+
def xinfo(subcommand, key, group = nil)
2828+
args = [:xinfo, subcommand, key, group].compact
2829+
synchronize do |client|
2830+
client.call(args) do |reply|
2831+
case subcommand.to_s.downcase
2832+
when 'stream' then Hashify.call(reply)
2833+
when 'groups', 'consumers' then reply.map { |arr| Hashify.call(arr) }
2834+
else reply
2835+
end
2836+
end
2837+
end
2838+
end
2839+
2840+
# Add new entry to the stream.
2841+
#
2842+
# @example Without options
2843+
# redis.xadd('mystream', f1: 'v1', f2: 'v2')
2844+
# @example With options
2845+
# redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, approximate: true)
2846+
#
2847+
# @param key [String] the stream key
2848+
# @param entry [Hash] one or multiple field-value pairs
2849+
# @param opts [Hash] several options for `XADD` command
2850+
#
2851+
# @option opts [String] :id the entry id, default value is `*`, it means auto generation
2852+
# @option opts [Integer] :maxlen max length of entries
2853+
# @option opts [Boolean] :approximate whether to add `~` modifier of maxlen or not
2854+
#
2855+
# @return [String] the entry id
2856+
def xadd(key, entry, opts = {})
2857+
args = [:xadd, key]
2858+
args.concat(['MAXLEN', (opts[:approximate] ? '~' : nil), opts[:maxlen]].compact) if opts[:maxlen]
2859+
args << (opts[:id] || '*')
2860+
args.concat(entry.to_a.flatten)
2861+
synchronize { |client| client.call(args) }
2862+
end
2863+
2864+
# Trims older entries of the stream if needed.
2865+
#
2866+
# @example Without options
2867+
# redis.xtrim('mystream', 1000)
2868+
# @example With options
2869+
# redis.xtrim('mystream', 1000, approximate: true)
2870+
#
2871+
# @param key [String] the stream key
2872+
# @param mexlen [Integer] max length of entries
2873+
# @param approximate [Boolean] whether to add `~` modifier of maxlen or not
2874+
#
2875+
# @return [Integer] the number of entries actually deleted
2876+
def xtrim(key, maxlen, approximate: false)
2877+
args = [:xtrim, key, 'MAXLEN', (approximate ? '~' : nil), maxlen].compact
2878+
synchronize { |client| client.call(args) }
2879+
end
2880+
2881+
# Delete entries by entry ids.
2882+
#
2883+
# @example With splatted entry ids
2884+
# redis.xdel('mystream', '0-1', '0-2')
2885+
# @example With arrayed entry ids
2886+
# redis.xdel('mystream', ['0-1', '0-2'])
2887+
#
2888+
# @param key [String] the stream key
2889+
# @param ids [Array<String>] one or multiple entry ids
2890+
#
2891+
# @return [Integer] the number of entries actually deleted
2892+
def xdel(key, *ids)
2893+
args = [:xdel, key].concat(ids.flatten)
2894+
synchronize { |client| client.call(args) }
2895+
end
2896+
2897+
# Fetches entries of the stream.
2898+
#
2899+
# @example Without options
2900+
# redis.xrange('mystream')
2901+
# @example With first entry id option
2902+
# redis.xrange('mystream', first: '0-1')
2903+
# @example With first and last entry id options
2904+
# redis.xrange('mystream', first: '0-1', last: '0-3')
2905+
# @example With count options
2906+
# redis.xrange('mystream', count: 10)
2907+
#
2908+
# @param key [String] the stream key
2909+
# @param first [String] first entry id of range, default value is `-`
2910+
# @param last [String] last entry id of range, default value is `+`
2911+
# @param count [Integer] the number of entries as limit
2912+
#
2913+
# @return [Hash{String => Hash}] the entries
2914+
def xrange(key, first: '-', last: '+', count: nil)
2915+
args = [:xrange, key, first, last]
2916+
args.concat(['COUNT', count]) if count
2917+
synchronize { |client| client.call(args, &HashifyStreamEntries) }
2918+
end
2919+
2920+
# Fetches entries of the stream in descending order.
2921+
#
2922+
# @example Without options
2923+
# redis.xrevrange('mystream')
2924+
# @example With first entry id option
2925+
# redis.xrevrange('mystream', first: '0-1')
2926+
# @example With first and last entry id options
2927+
# redis.xrevrange('mystream', first: '0-1', last: '0-3')
2928+
# @example With count options
2929+
# redis.xrevrange('mystream', count: 10)
2930+
#
2931+
# @param key [String] the stream key
2932+
# @param first [String] first entry id of range, default value is `-`
2933+
# @param last [String] last entry id of range, default value is `+`
2934+
# @param count [Integer] the number of entries as limit
2935+
#
2936+
# @return [Hash{String => Hash}] the entries
2937+
def xrevrange(key, first: '-', last: '+', count: nil)
2938+
args = [:xrevrange, key, last, first]
2939+
args.concat(['COUNT', count]) if count
2940+
synchronize { |client| client.call(args, &HashifyStreamEntries) }
2941+
end
2942+
2943+
# Returns the number of entries inside a stream.
2944+
#
2945+
# @example With key
2946+
# redis.xlen('mystream')
2947+
#
2948+
# @param key [String] the stream key
2949+
#
2950+
# @return [Integer] the number of entries
2951+
def xlen(key)
2952+
synchronize { |client| client.call([:xlen, key]) }
2953+
end
2954+
2955+
# Fetches entries from one or multiple streams. Optionally blocking.
2956+
#
2957+
# @example With a key
2958+
# redis.xread('mystream', '0-0')
2959+
# @example With multiple keys
2960+
# redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])
2961+
# @example With count option
2962+
# redis.xread('mystream', '0-0', count: 2)
2963+
# @example With block option
2964+
# redis.xread('mystream', '$', block: 1000)
2965+
#
2966+
# @param keys [Array<String>] one or multiple stream keys
2967+
# @param ids [Array<String>] one or multiple entry ids
2968+
# @param count [Integer] the number of entries as limit per stream
2969+
# @param block [Integer] the number of milliseconds as blocking timeout
2970+
#
2971+
# @return [Hash{String => Hash{String => Hash}}] the entries
2972+
def xread(keys, ids, count: nil, block: nil)
2973+
args = [:xread]
2974+
args.concat(['COUNT', count]) if count
2975+
args.concat(['BLOCK', block.to_i]) if block
2976+
_xread(args, keys, ids, block)
2977+
end
2978+
2979+
# Manages the consumer group of the stream.
2980+
#
2981+
# @example With `create` subcommand
2982+
# redis.xgroup(:create, 'mystream', 'mygroup', '$')
2983+
# @example With `setid` subcommand
2984+
# redis.xgroup(:setid, 'mystream', 'mygroup', '$')
2985+
# @example With `destroy` subcommand
2986+
# redis.xgroup(:destroy, 'mystream', 'mygroup')
2987+
# @example With `delconsumer` subcommand
2988+
# redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')
2989+
#
2990+
# @param subcommand [String] `create` `setid` `destroy` `delconsumer`
2991+
# @param key [String] the stream key
2992+
# @param group [String] the consumer group name
2993+
# @param id_or_consumer [String]
2994+
# * the entry id or `$`, required if subcommand is `create` or `setid`
2995+
# * the consumer name, required if subcommand is `delconsumer`
2996+
# @param mkstream [Boolean] whether to create an empty stream automatically or not
2997+
#
2998+
# @return [String] `OK` if subcommand is `create` or `setid`
2999+
# @return [Integer] effected count if subcommand is `destroy` or `delconsumer`
3000+
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
3001+
args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
3002+
synchronize { |client| client.call(args) }
3003+
end
3004+
3005+
# Fetches a subset of the entries from one or multiple streams related with the consumer group.
3006+
# Optionally blocking.
3007+
#
3008+
# @example With a key
3009+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')
3010+
# @example With multiple keys
3011+
# redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])
3012+
# @example With count option
3013+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)
3014+
# @example With block option
3015+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)
3016+
# @example With noack option
3017+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)
3018+
#
3019+
# @param group [String] the consumer group name
3020+
# @param consumer [String] the consumer name
3021+
# @param keys [Array<String>] one or multiple stream keys
3022+
# @param ids [Array<String>] one or multiple entry ids
3023+
# @param opts [Hash] several options for `XREADGROUP` command
3024+
#
3025+
# @option opts [Integer] :count the number of entries as limit
3026+
# @option opts [Integer] :block the number of milliseconds as blocking timeout
3027+
# @option opts [Boolean] :noack whether message loss is acceptable or not
3028+
#
3029+
# @return [Hash{String => Hash{String => Hash}}] the entries
3030+
def xreadgroup(group, consumer, keys, ids, opts = {})
3031+
args = [:xreadgroup, 'GROUP', group, consumer]
3032+
args.concat(['COUNT', opts[:count]]) if opts[:count]
3033+
args.concat(['BLOCK', opts[:block].to_i]) if opts[:block]
3034+
args << 'NOACK' if opts[:noack]
3035+
_xread(args, keys, ids, opts[:block])
3036+
end
3037+
3038+
# Removes one or multiple entries from the pending entries list of a stream consumer group.
3039+
#
3040+
# @example With a entry id
3041+
# redis.xack('mystream', 'mygroup', '1526569495631-0')
3042+
# @example With splatted entry ids
3043+
# redis.xack('mystream', 'mygroup', '0-1', '0-2')
3044+
# @example With arrayed entry ids
3045+
# redis.xack('mystream', 'mygroup', %w[0-1 0-2])
3046+
#
3047+
# @param key [String] the stream key
3048+
# @param group [String] the consumer group name
3049+
# @param ids [Array<String>] one or multiple entry ids
3050+
#
3051+
# @return [Integer] the number of entries successfully acknowledged
3052+
def xack(key, group, *ids)
3053+
args = [:xack, key, group].concat(ids.flatten)
3054+
synchronize { |client| client.call(args) }
3055+
end
3056+
3057+
# Changes the ownership of a pending entry
3058+
#
3059+
# @example With splatted entry ids
3060+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')
3061+
# @example With arrayed entry ids
3062+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])
3063+
# @example With idle option
3064+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)
3065+
# @example With time option
3066+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)
3067+
# @example With retrycount option
3068+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)
3069+
# @example With force option
3070+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)
3071+
# @example With justid option
3072+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)
3073+
#
3074+
# @param key [String] the stream key
3075+
# @param group [String] the consumer group name
3076+
# @param consumer [String] the consumer name
3077+
# @param min_idle_time [Integer] the number of milliseconds
3078+
# @param ids [Array<String>] one or multiple entry ids
3079+
# @param opts [Hash] several options for `XCLAIM` command
3080+
#
3081+
# @option opts [Integer] :idle the number of milliseconds as last time it was delivered of the entry
3082+
# @option opts [Integer] :time the number of milliseconds as a specific Unix Epoch time
3083+
# @option opts [Integer] :retrycount the number of retry counter
3084+
# @option opts [Boolean] :force whether to create the pending entry to the pending entries list or not
3085+
# @option opts [Boolean] :justid whether to fetch just an array of entry ids or not
3086+
#
3087+
# @return [Hash{String => Hash}] the entries successfully claimed
3088+
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
3089+
def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
3090+
args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
3091+
args.concat(['IDLE', opts[:idle].to_i]) if opts[:idle]
3092+
args.concat(['TIME', opts[:time].to_i]) if opts[:time]
3093+
args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount]
3094+
args << 'FORCE' if opts[:force]
3095+
args << 'JUSTID' if opts[:justid]
3096+
blk = opts[:justid] ? Noop : HashifyStreamEntries
3097+
synchronize { |client| client.call(args, &blk) }
3098+
end
3099+
3100+
# Fetches not acknowledging pending entries
3101+
#
3102+
# @example With key and group
3103+
# redis.xpending('mystream', 'mygroup')
3104+
# @example With range options
3105+
# redis.xpending('mystream', 'mygroup', first: '-', last: '+', count: 10)
3106+
# @example With range and consumer options
3107+
# redis.xpending('mystream', 'mygroup', 'consumer1', first: '-', last: '+', count: 10)
3108+
#
3109+
# @param key [String] the stream key
3110+
# @param group [String] the consumer group name
3111+
# @param consumer [String] the consumer name
3112+
# @param opts [Hash] several options for `XPENDING` command
3113+
#
3114+
# @option opts [String] :first first entry id of range
3115+
# @option opts [String] :last last entry id of range
3116+
# @option opts [Integer] :count the number of entries as limit
3117+
#
3118+
# @return [Hash] the summary of pending entries
3119+
# @return [Array<Hash>] the pending entries details if options were specified
3120+
def xpending(key, group, consumer = nil, **opts)
3121+
args = [:xpending, key, group, opts[:first], opts[:last], opts[:count], consumer].compact
3122+
summary_needed = consumer.nil? && opts.empty?
3123+
blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
3124+
synchronize { |client| client.call(args, &blk) }
3125+
end
3126+
28113127
# Interact with the sentinel command (masters, master, slaves, failover)
28123128
#
28133129
# @param [String] subcommand e.g. `masters`, `master`, `slaves`
@@ -2953,6 +3269,43 @@ def method_missing(command, *args)
29533269
end.compact]
29543270
}
29553271

3272+
HashifyStreams =
3273+
lambda { |reply|
3274+
return {} if reply.nil?
3275+
reply.map do |stream_key, entries|
3276+
[stream_key, HashifyStreamEntries.call(entries)]
3277+
end.to_h
3278+
}
3279+
3280+
HashifyStreamEntries =
3281+
lambda { |reply|
3282+
reply.map do |entry_id, values|
3283+
[entry_id, values.each_slice(2).to_h]
3284+
end.to_h
3285+
}
3286+
3287+
HashifyStreamPendings =
3288+
lambda { |reply|
3289+
{
3290+
'size' => reply[0],
3291+
'min_entry_id' => reply[1],
3292+
'max_entry_id' => reply[2],
3293+
'consumers' => reply[3].nil? ? {} : Hash[reply[3]]
3294+
}
3295+
}
3296+
3297+
HashifyStreamPendingDetails =
3298+
lambda { |reply|
3299+
reply.map do |arr|
3300+
{
3301+
'entry_id' => arr[0],
3302+
'consumer' => arr[1],
3303+
'elapsed' => arr[2],
3304+
'count' => arr[3]
3305+
}
3306+
end
3307+
}
3308+
29563309
HashifyClusterNodeInfo =
29573310
lambda { |str|
29583311
arr = str.split(' ')
@@ -3018,6 +3371,21 @@ def _subscription(method, timeout, channels, block)
30183371
@client = original
30193372
end
30203373
end
3374+
3375+
def _xread(args, keys, ids, blocking_timeout_msec)
3376+
keys = keys.is_a?(Array) ? keys : [keys]
3377+
ids = ids.is_a?(Array) ? ids : [ids]
3378+
args.concat(['STREAMS'], keys, ids)
3379+
3380+
synchronize do |client|
3381+
if blocking_timeout_msec.nil?
3382+
client.call(args, &HashifyStreams)
3383+
else
3384+
timeout = client.timeout.to_f + blocking_timeout_msec.to_f / 1000.0
3385+
client.call_with_timeout(args, timeout, &HashifyStreams)
3386+
end
3387+
end
3388+
end
30213389
end
30223390

30233391
require_relative "redis/version"

0 commit comments

Comments
 (0)