Skip to content

Commit dcc2712

Browse files
authored
support xread command for async interface (#666)
* support xread for async interface * bug fix
1 parent 180d978 commit dcc2712

File tree

3 files changed

+95
-0
lines changed

3 files changed

+95
-0
lines changed

src/sw/redis++/async_redis.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1866,6 +1866,41 @@ class AsyncRedis {
18661866
_callback_fmt_command<long long>(std::forward<Callback>(cb), fmt::spublish, channel, message);
18671867
}
18681868

1869+
// Stream commands.
1870+
1871+
template <typename Output>
1872+
Future<Output> xread(const StringView &key,
1873+
const StringView &id,
1874+
long long count) {
1875+
return _command<Output>(fmt::xread, key, id, count);
1876+
}
1877+
1878+
template <typename Output, typename Callback>
1879+
auto xread(const StringView &key,
1880+
const StringView &id,
1881+
long long count,
1882+
Callback &&cb)
1883+
-> typename std::enable_if<IsInvocable<typename std::decay<Callback>::type, Future<Output> &&>::value, void>::type {
1884+
_callback_fmt_command<Output>(std::forward<Callback>(cb), fmt::xread, key, id, count);
1885+
}
1886+
1887+
template <typename Output, typename Input>
1888+
auto xread(Input first, Input last, long long count)
1889+
-> typename std::enable_if<!std::is_convertible<Input, StringView>::value, Future<Output>>::type {
1890+
range_check("XREAD", first, last);
1891+
1892+
return _command<Output>(fmt::xread_range, first, last, count);
1893+
}
1894+
1895+
template <typename Output, typename Input, typename Callback>
1896+
auto xread(Input first, Input last, long long count, Callback &&cb)
1897+
-> typename std::enable_if<IsInvocable<typename std::decay<Callback>::type, Future<Output> &&>::value &&
1898+
!std::is_convertible<Input, StringView>::value, void>::type {
1899+
range_check("XREAD", first, last);
1900+
1901+
_callback_fmt_command<Output>(std::forward<Callback>(cb), fmt::xread_range<Input>, first, last, count);
1902+
}
1903+
18691904
// co_command* are used internally. DO NOT use them.
18701905

18711906
template <typename Result, typename Callback>

src/sw/redis++/async_redis_cluster.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,41 @@ class AsyncRedisCluster {
11971197
_callback_fmt_command<long long>(std::forward<Callback>(cb), fmt::spublish, channel, message);
11981198
}
11991199

1200+
// Stream commands.
1201+
1202+
template <typename Output>
1203+
Future<Output> xread(const StringView &key,
1204+
const StringView &id,
1205+
long long count) {
1206+
return _command<Output>(fmt::xread, key, id, count);
1207+
}
1208+
1209+
template <typename Output, typename Callback>
1210+
auto xread(const StringView &key,
1211+
const StringView &id,
1212+
long long count,
1213+
Callback &&cb)
1214+
-> typename std::enable_if<IsInvocable<typename std::decay<Callback>::type, Future<Output> &&>::value, void>::type {
1215+
_callback_fmt_command<Output>(std::forward<Callback>(cb), fmt::xread, key, id, count);
1216+
}
1217+
1218+
template <typename Output, typename Input>
1219+
auto xread(Input first, Input last, long long count)
1220+
-> typename std::enable_if<!std::is_convertible<Input, StringView>::value, Future<Output>>::type {
1221+
range_check("XREAD", first, last);
1222+
1223+
return _command<Output>(fmt::xread_range, first, last, count);
1224+
}
1225+
1226+
template <typename Output, typename Input, typename Callback>
1227+
auto xread(Input first, Input last, long long count, Callback &&cb)
1228+
-> typename std::enable_if<IsInvocable<typename std::decay<Callback>::type, Future<Output> &&>::value &&
1229+
!std::is_convertible<Input, StringView>::value, void>::type {
1230+
range_check("XREAD", first, last);
1231+
1232+
_callback_fmt_command<Output>(std::forward<Callback>(cb), fmt::xread_range<Input>, first, last, count);
1233+
}
1234+
12001235
// co_command* are used internally. DO NOT use them.
12011236

12021237
template <typename Result, typename Callback>

src/sw/redis++/cmd_formatter.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,31 @@ inline FormattedCommand sunsubscribe_range(Input first, Input last) {
10311031
return format_cmd(args);
10321032
}
10331033

1034+
// Stream commands.
1035+
1036+
inline FormattedCommand xread(const StringView &key, const StringView &id, long long count) {
1037+
return format_cmd("XREAD COUNT %lld STREAMS %b %b",
1038+
count,
1039+
key.data(), key.size(),
1040+
id.data(), id.size());
1041+
}
1042+
1043+
template <typename Input>
1044+
FormattedCommand xread_range(Input first, Input last, long long count) {
1045+
CmdArgs args;
1046+
args << "XREAD" << "COUNT" << count << "STREAMS";
1047+
1048+
for (auto iter = first; iter != last; ++iter) {
1049+
args << iter->first;
1050+
}
1051+
1052+
for (auto iter = first; iter != last; ++iter) {
1053+
args << iter->second;
1054+
}
1055+
1056+
return format_cmd(args);
1057+
}
1058+
10341059
}
10351060

10361061
}

0 commit comments

Comments
 (0)