@@ -196,7 +196,7 @@ class StreamChatClient {
196
196
197
197
StreamSubscription <ConnectionStatus >? _connectionStatusSubscription;
198
198
199
- final _eventController = BehaviorSubject <Event >();
199
+ final _eventController = PublishSubject <Event >();
200
200
201
201
/// Stream of [Event] coming from [_ws] connection
202
202
/// Listen to this or use the [on] method to filter specific event types
@@ -491,10 +491,12 @@ class StreamChatClient {
491
491
final previousState = wsConnectionStatus;
492
492
final currentState = _wsConnectionStatus = status;
493
493
494
- handleEvent (Event (
495
- type: EventType .connectionChanged,
496
- online: status == ConnectionStatus .connected,
497
- ));
494
+ if (previousState != currentState) {
495
+ handleEvent (Event (
496
+ type: EventType .connectionChanged,
497
+ online: status == ConnectionStatus .connected,
498
+ ));
499
+ }
498
500
499
501
if (currentState == ConnectionStatus .connected &&
500
502
previousState != ConnectionStatus .connected) {
@@ -1213,6 +1215,32 @@ class StreamChatClient {
1213
1215
messageId,
1214
1216
);
1215
1217
1218
+ /// Mark the thread with [threadId] in the channel with [channelId] of type
1219
+ /// [channelType] as read.
1220
+ Future <EmptyResponse > markThreadRead (
1221
+ String channelId,
1222
+ String channelType,
1223
+ String threadId,
1224
+ ) =>
1225
+ _chatApi.channel.markThreadRead (
1226
+ channelId,
1227
+ channelType,
1228
+ threadId,
1229
+ );
1230
+
1231
+ /// Mark the thread with [threadId] in the channel with [channelId] of type
1232
+ /// [channelType] as unread.
1233
+ Future <EmptyResponse > markThreadUnread (
1234
+ String channelId,
1235
+ String channelType,
1236
+ String threadId,
1237
+ ) =>
1238
+ _chatApi.channel.markThreadUnread (
1239
+ channelId,
1240
+ channelType,
1241
+ threadId,
1242
+ );
1243
+
1216
1244
/// Creates a new Poll
1217
1245
Future <CreatePollResponse > createPoll (Poll poll) =>
1218
1246
_chatApi.polls.createPoll (poll);
@@ -1659,6 +1687,43 @@ class StreamChatClient {
1659
1687
Future <OGAttachmentResponse > enrichUrl (String url) =>
1660
1688
_chatApi.general.enrichUrl (url);
1661
1689
1690
+ /// Queries threads with the given [options] and [pagination] params.
1691
+ Future <QueryThreadsResponse > queryThreads ({
1692
+ ThreadOptions options = const ThreadOptions (),
1693
+ PaginationParams pagination = const PaginationParams (),
1694
+ }) =>
1695
+ _chatApi.threads.queryThreads (
1696
+ options: options,
1697
+ pagination: pagination,
1698
+ );
1699
+
1700
+ /// Retrieves a thread with the given [messageId] .
1701
+ ///
1702
+ /// Optionally pass [options] to limit the response.
1703
+ Future <GetThreadResponse > getThread (
1704
+ String messageId, {
1705
+ ThreadOptions options = const ThreadOptions (),
1706
+ }) =>
1707
+ _chatApi.threads.getThread (
1708
+ messageId,
1709
+ options: options,
1710
+ );
1711
+
1712
+ /// Partially updates the thread with the given [messageId] .
1713
+ ///
1714
+ /// Use [set] to define values to be set.
1715
+ /// Use [unset] to define values to be unset.
1716
+ Future <UpdateThreadResponse > partialUpdateThread (
1717
+ String messageId, {
1718
+ Map <String , Object ?>? set ,
1719
+ List <String >? unset,
1720
+ }) =>
1721
+ _chatApi.threads.partialUpdateThread (
1722
+ messageId,
1723
+ set : set ,
1724
+ unset: unset,
1725
+ );
1726
+
1662
1727
/// Closes the [_ws] connection and resets the [state]
1663
1728
/// If [flushChatPersistence] is true the client deletes all offline
1664
1729
/// user's data.
@@ -1712,30 +1777,32 @@ class ClientState {
1712
1777
cancelEventSubscription ();
1713
1778
}
1714
1779
1715
- _eventsSubscription = CompositeSubscription ();
1716
- _eventsSubscription!
1717
- ..add (_client
1718
- .on ()
1719
- .where ((event) =>
1720
- event.me != null && event.type != EventType .healthCheck)
1721
- .map ((e) => e.me! )
1722
- .listen ((user) {
1723
- currentUser = currentUser? .merge (user) ?? user;
1724
- }))
1725
- ..add (_client
1726
- .on ()
1727
- .map ((event) => event.unreadChannels)
1728
- .whereType <int >()
1729
- .listen ((count) {
1730
- currentUser = currentUser? .copyWith (unreadChannels: count);
1731
- }))
1732
- ..add (_client
1733
- .on ()
1734
- .map ((event) => event.totalUnreadCount)
1735
- .whereType <int >()
1736
- .listen ((count) {
1737
- currentUser = currentUser? .copyWith (totalUnreadCount: count);
1738
- }));
1780
+ _eventsSubscription = CompositeSubscription ()
1781
+ ..add (
1782
+ _client.on ().listen ((event) {
1783
+ // Update the current user only if the event is not a health check.
1784
+ if (event.me case final user? ) {
1785
+ if (event.type != EventType .healthCheck) {
1786
+ currentUser = currentUser? .merge (user) ?? user;
1787
+ }
1788
+ }
1789
+
1790
+ // Update the total unread count.
1791
+ if (event.totalUnreadCount case final count? ) {
1792
+ currentUser = currentUser? .copyWith (totalUnreadCount: count);
1793
+ }
1794
+
1795
+ // Update the unread channels count.
1796
+ if (event.unreadChannels case final count? ) {
1797
+ currentUser = currentUser? .copyWith (unreadChannels: count);
1798
+ }
1799
+
1800
+ // Update the unread threads count.
1801
+ if (event.unreadThreads case final count? ) {
1802
+ currentUser = currentUser? .copyWith (unreadThreads: count);
1803
+ }
1804
+ }),
1805
+ );
1739
1806
1740
1807
_listenChannelLeft ();
1741
1808
@@ -1873,6 +1940,12 @@ class ClientState {
1873
1940
/// The current unread channels count as a stream
1874
1941
Stream <int > get unreadChannelsStream => _unreadChannelsController.stream;
1875
1942
1943
+ /// The current unread thread count.
1944
+ int get unreadThreads => _unreadThreadsController.value;
1945
+
1946
+ /// The current unread threads count as a stream.
1947
+ Stream <int > get unreadThreadsStream => _unreadThreadsController.stream;
1948
+
1876
1949
/// The current total unread messages count
1877
1950
int get totalUnreadCount => _totalUnreadCountController.value;
1878
1951
@@ -1909,28 +1982,32 @@ class ClientState {
1909
1982
}
1910
1983
1911
1984
void _computeUnreadCounts (OwnUser ? user) {
1912
- final totalUnreadCount = user? .totalUnreadCount;
1913
- if (totalUnreadCount != null ) {
1914
- _totalUnreadCountController.add (totalUnreadCount);
1985
+ if (user? .totalUnreadCount case final count? ) {
1986
+ _totalUnreadCountController.add (count);
1987
+ }
1988
+
1989
+ if (user? .unreadChannels case final count? ) {
1990
+ _unreadChannelsController.add (count);
1915
1991
}
1916
1992
1917
- final unreadChannels = user? .unreadChannels;
1918
- if (unreadChannels != null ) {
1919
- _unreadChannelsController.add (unreadChannels);
1993
+ if (user? .unreadThreads case final count? ) {
1994
+ _unreadThreadsController.add (count);
1920
1995
}
1921
1996
}
1922
1997
1923
1998
final _channelsController = BehaviorSubject <Map <String , Channel >>.seeded ({});
1924
1999
final _currentUserController = BehaviorSubject <OwnUser ?>();
1925
2000
final _usersController = BehaviorSubject <Map <String , User >>.seeded ({});
1926
2001
final _unreadChannelsController = BehaviorSubject <int >.seeded (0 );
2002
+ final _unreadThreadsController = BehaviorSubject <int >.seeded (0 );
1927
2003
final _totalUnreadCountController = BehaviorSubject <int >.seeded (0 );
1928
2004
1929
2005
/// Call this method to dispose this object
1930
2006
void dispose () {
1931
2007
cancelEventSubscription ();
1932
2008
_currentUserController.close ();
1933
2009
_unreadChannelsController.close ();
2010
+ _unreadThreadsController.close ();
1934
2011
_totalUnreadCountController.close ();
1935
2012
1936
2013
final channels = [...this .channels.keys];
0 commit comments