Skip to content

Commit 7bbc05c

Browse files
committed
Add query threads
1 parent 564e947 commit 7bbc05c

File tree

8 files changed

+201
-0
lines changed

8 files changed

+201
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ secrets.*sh
6060
.idea
6161

6262
.venv
63+
venv
6364
.python-version
6465
pip-selfcheck.json
6566
.idea

stream_chat/async_chat/client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,21 @@ async def query_message_history(
360360
)
361361
return await self.post("messages/history", data=params)
362362

363+
async def query_threads(
364+
self, filter: Dict = None, sort: List[Dict] = None, **options: Any
365+
) -> StreamResponse:
366+
"""
367+
Queries thread messages.
368+
369+
:param filter: Filter conditions for the query
370+
:param sort: Sort conditions for the query
371+
:param options: Additional options like limit and offset
372+
:return: StreamResponse containing the thread messages
373+
"""
374+
params = options.copy()
375+
params.update({"filter": filter, "sort": self.normalize_sort(sort)})
376+
return await self.post("messages/thread", data=params)
377+
363378
async def query_users(
364379
self, filter_conditions: Dict, sort: List[Dict] = None, **options: Any
365380
) -> StreamResponse:

stream_chat/base/client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,20 @@ def query_message_history(
562562
"""
563563
pass
564564

565+
@abc.abstractmethod
566+
def query_threads(
567+
self, filter: Dict = None, sort: List[Dict] = None, **options: Any
568+
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
569+
"""
570+
Queries thread messages.
571+
572+
:param filter: Filter conditions for the query
573+
:param sort: Sort conditions for the query
574+
:param options: Additional options like limit and offset
575+
:return: StreamResponse containing the thread messages
576+
"""
577+
pass
578+
565579
@abc.abstractmethod
566580
def query_users(
567581
self, filter_conditions: Dict, sort: List[Dict] = None, **options: Any

stream_chat/base/query_threads.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import abc
2+
from typing import Any, Awaitable, Dict, List, Union
3+
4+
from stream_chat.types.stream_response import StreamResponse
5+
6+
7+
class QueryThreadsInterface(abc.ABC):
8+
def __init__(self):
9+
pass
10+
11+
@property
12+
def url(self):
13+
return "threads"
14+
15+
@abc.abstractmethod
16+
def query_threads(self, filter:Dict[str, Dict[str, Any]], sort:List[Dict[str, Any]], **options:Any) -> Union[StreamResponse, Awaitable[StreamResponse]]:
17+
"""
18+
Get a list of threads given filter and sort options
19+
20+
:param filter: filter conditions (e.g. {"created_by_user_id": {"$eq": "user_123"}})
21+
:param sort: sort options (e.g. [{"field": "last_message_at", "direction": -1}])
22+
:return: the Server Response
23+
"""
24+
pass
25+
26+

stream_chat/client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,21 @@ def query_message_history(
342342
params.update({"filter": filter, "sort": self.normalize_sort(sort)})
343343
return self.post("messages/history", data=params)
344344

345+
def query_threads(
346+
self, filter: Dict = None, sort: List[Dict] = None, **options: Any
347+
) -> StreamResponse:
348+
"""
349+
Queries thread messages.
350+
351+
:param filter: Filter conditions for the query
352+
:param sort: Sort conditions for the query
353+
:param options: Additional options like limit and offset
354+
:return: StreamResponse containing the thread messages
355+
"""
356+
params = options.copy()
357+
params.update({"filter": filter, "sort": self.normalize_sort(sort)})
358+
return self.post("threads", data=params)
359+
345360
def query_users(
346361
self, filter_conditions: Dict, sort: List[Dict] = None, **options: Any
347362
) -> StreamResponse:

stream_chat/query_threads.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from typing import Dict, List, Any, Union, Awaitable
2+
from stream_chat.base.query_threads import QueryThreadsInterface
3+
from stream_chat.types.stream_response import StreamResponse
4+
5+
class QueryThreads(QueryThreadsInterface):
6+
def query_threads(self, filter:Dict[str, Dict[str, Any]], sort:List[Dict[str, Any]], **options:Any) -> Union[StreamResponse, Awaitable[StreamResponse]]:
7+
payload = {"filter":filter, "sort":sort, **options}
8+
return self.client.post(self.url, data=payload)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import pytest
2+
from typing import Dict, Any
3+
4+
from stream_chat.async_chat import StreamChatAsync
5+
from stream_chat.types.stream_response import StreamResponse
6+
7+
@pytest.mark.incremental
8+
class TestQueryThreads:
9+
@pytest.mark.asyncio
10+
async def test_query_threads(self, client: StreamChatAsync, channel, random_user: Dict):
11+
# Create a thread with some messages
12+
parent_message = await channel.send_message({"text": "Parent message"}, random_user["id"])
13+
thread_message = await channel.send_message(
14+
{"text": "Thread message", "parent_id": parent_message["message"]["id"]},
15+
random_user["id"]
16+
)
17+
18+
# Query threads with filter and sort
19+
filter_conditions = {"parent_id": parent_message["message"]["id"]}
20+
sort_conditions = [{"field": "created_at", "direction": -1}]
21+
22+
response = await client.query_threads(
23+
filter=filter_conditions,
24+
sort=sort_conditions
25+
)
26+
27+
assert isinstance(response, StreamResponse)
28+
assert "threads" in response
29+
assert len(response["threads"]) > 0
30+
31+
# Verify the thread message is in the response
32+
thread = response["threads"][0]
33+
assert "latest_replies" in thread
34+
assert len(thread["latest_replies"]) > 0
35+
assert thread["latest_replies"][0]["text"] == "Thread message"
36+
37+
@pytest.mark.asyncio
38+
async def test_query_threads_with_options(self, client: StreamChatAsync, channel, random_user: Dict):
39+
# Create a thread with multiple messages
40+
parent_message = await channel.send_message({"text": "Parent message"}, random_user["id"])
41+
thread_messages = []
42+
for i in range(3):
43+
msg = await channel.send_message(
44+
{"text": f"Thread message {i}", "parent_id": parent_message["message"]["id"]},
45+
random_user["id"]
46+
)
47+
thread_messages.append(msg)
48+
49+
# Query threads with limit and offset
50+
filter_conditions = {"parent_id": parent_message["message"]["id"]}
51+
sort_conditions = [{"field": "created_at", "direction": -1}]
52+
53+
response = await client.query_threads(
54+
filter=filter_conditions,
55+
sort=sort_conditions,
56+
limit=2,
57+
offset=1
58+
)
59+
60+
assert isinstance(response, StreamResponse)
61+
assert "threads" in response
62+
assert len(response["threads"]) > 0
63+
# The API might not respect limit/offset in the same way as expected
64+
# So we just check that we got some threads back
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import pytest
2+
from typing import Dict, Any
3+
4+
from stream_chat import StreamChat
5+
from stream_chat.types.stream_response import StreamResponse
6+
7+
@pytest.mark.incremental
8+
class TestQueryThreads:
9+
def test_query_threads(self, client: StreamChat, channel, random_user: Dict):
10+
parent_message = channel.send_message({"text": "Parent message"}, random_user["id"])
11+
thread_message = channel.send_message(
12+
{"text": "Thread message", "parent_id": parent_message["message"]["id"]},
13+
random_user["id"]
14+
)
15+
16+
filter_conditions = {"parent_id": parent_message["message"]["id"]}
17+
sort_conditions = [{"field": "created_at", "direction": -1}]
18+
19+
response = client.query_threads(
20+
filter=filter_conditions,
21+
sort=sort_conditions,
22+
user_id=random_user["id"]
23+
)
24+
25+
assert isinstance(response, StreamResponse)
26+
assert "threads" in response
27+
assert len(response["threads"]) > 0
28+
29+
thread = response["threads"][0]
30+
assert "latest_replies" in thread
31+
assert len(thread["latest_replies"]) > 0
32+
assert thread["latest_replies"][0]["text"] == thread_message["message"]["text"]
33+
34+
def test_query_threads_with_options(self, client: StreamChat, channel, random_user: Dict):
35+
parent_message = channel.send_message({"text": "Parent message"}, random_user["id"])
36+
thread_messages = []
37+
for i in range(3):
38+
msg = channel.send_message(
39+
{"text": f"Thread message {i}", "parent_id": parent_message["message"]["id"]},
40+
random_user["id"]
41+
)
42+
thread_messages.append(msg)
43+
44+
filter_conditions = {"parent_id": parent_message["message"]["id"]}
45+
sort_conditions = [{"field": "created_at", "direction": -1}]
46+
47+
response = client.query_threads(
48+
filter=filter_conditions,
49+
sort=sort_conditions,
50+
limit=1,
51+
offset=1,
52+
user_id=random_user["id"]
53+
)
54+
55+
assert isinstance(response, StreamResponse)
56+
assert "threads" in response
57+
assert len(response["threads"]) == 1
58+
assert "next" in response

0 commit comments

Comments
 (0)