Skip to content

Commit 55fcbd7

Browse files
authored
[PBE-1321] support campaign endpoints (#156)
* [PBE-1321] align campaign api with backend impl * query_campaigns & query_segments => POST requests * [PBE-1321] create resource objects; test segment CRUD * [PBE-1321] support sync channel & segment res objects * [PBE-1321] support async layer * [PBE-1321] rename targets to target_ids * [PBE-1321] add targets query support, fix tests * [PBE-1321] lint fixes * [PBE-1321] lint fixes * [PBE-1321] lint fixes * [PBE-1321] return module scope * [PBE-1321] add custom fields * [PBE-1321] align with API * [PBE-1321] align query_targets * [PBE-1321] use SortOrder * [PBE-1321] align query API * [PBE-1321] lint fixes * [PBE-1321] revert async test init * [PBE-1321] lint fixes * [PBE-1321] v3.7 compatibility issues * [PBE-1321] disable new endpoint tests * [PBE-1321] test fixes * [PBE-1321] more scope changes
1 parent 376512e commit 55fcbd7

File tree

18 files changed

+1485
-84
lines changed

18 files changed

+1485
-84
lines changed

stream_chat/async_chat/campaign.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import datetime
2+
from typing import Any, Optional, Union
3+
4+
from stream_chat.base.campaign import CampaignInterface
5+
from stream_chat.types.campaign import CampaignData
6+
from stream_chat.types.stream_response import StreamResponse
7+
8+
9+
class Campaign(CampaignInterface):
10+
async def create(
11+
self, campaign_id: Optional[str] = None, data: Optional[CampaignData] = None
12+
) -> StreamResponse:
13+
if campaign_id is not None:
14+
self.campaign_id = campaign_id
15+
if data is not None:
16+
self.data = data
17+
state = await self.client.create_campaign( # type: ignore
18+
campaign_id=self.campaign_id, data=self.data
19+
)
20+
21+
if self.campaign_id is None and state.is_ok() and "campaign" in state:
22+
self.campaign_id = state["campaign"]["id"]
23+
return state
24+
25+
async def get(self) -> StreamResponse:
26+
return await self.client.get_campaign( # type: ignore
27+
campaign_id=self.campaign_id
28+
)
29+
30+
async def update(self, data: CampaignData) -> StreamResponse:
31+
return await self.client.update_campaign( # type: ignore
32+
campaign_id=self.campaign_id, data=data
33+
)
34+
35+
async def delete(self, **options: Any) -> StreamResponse:
36+
return await self.client.delete_campaign( # type: ignore
37+
campaign_id=self.campaign_id, **options
38+
)
39+
40+
async def start(
41+
self, scheduled_for: Optional[Union[str, datetime.datetime]] = None
42+
) -> StreamResponse:
43+
return await self.client.start_campaign( # type: ignore
44+
campaign_id=self.campaign_id, scheduled_for=scheduled_for
45+
)
46+
47+
async def stop(self) -> StreamResponse:
48+
return await self.client.stop_campaign( # type: ignore
49+
campaign_id=self.campaign_id
50+
)

stream_chat/async_chat/client.py

Lines changed: 135 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,21 @@
1313
Optional,
1414
Type,
1515
Union,
16+
cast,
1617
)
1718
from urllib.parse import urlparse
1819

20+
from stream_chat.async_chat.campaign import Campaign
21+
from stream_chat.async_chat.segment import Segment
22+
from stream_chat.types.base import SortParam
23+
from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions
24+
from stream_chat.types.segment import (
25+
QuerySegmentsOptions,
26+
QuerySegmentTargetsOptions,
27+
SegmentData,
28+
SegmentType,
29+
)
30+
1931
if sys.version_info >= (3, 8):
2032
from typing import Literal
2133
else:
@@ -537,45 +549,143 @@ async def delete_role(self, name: str) -> StreamResponse:
537549
async def list_roles(self) -> StreamResponse:
538550
return await self.get("roles")
539551

540-
async def create_segment(self, segment: Dict) -> StreamResponse:
541-
return await self.post("segments", data={"segment": segment})
552+
def segment( # type: ignore
553+
self,
554+
segment_type: SegmentType,
555+
segment_id: Optional[str] = None,
556+
data: Optional[SegmentData] = None,
557+
) -> Segment:
558+
return Segment(
559+
client=self, segment_type=segment_type, segment_id=segment_id, data=data
560+
)
561+
562+
async def create_segment(
563+
self,
564+
segment_type: SegmentType,
565+
segment_id: Optional[str] = None,
566+
data: Optional[SegmentData] = None,
567+
) -> StreamResponse:
568+
payload = {"type": segment_type.value}
569+
if segment_id is not None:
570+
payload["id"] = segment_id
571+
if data is not None:
572+
payload.update(cast(dict, data))
573+
return await self.post("segments", data=payload)
542574

543-
async def query_segments(self, **params: Any) -> StreamResponse:
544-
return await self.get("segments", params={"payload": json.dumps(params)})
575+
async def get_segment(self, segment_id: str) -> StreamResponse:
576+
return await self.get(f"segments/{segment_id}")
545577

546-
async def update_segment(self, segment_id: str, data: Dict) -> StreamResponse:
547-
return await self.put(f"segments/{segment_id}", data={"segment": data})
578+
async def query_segments(
579+
self,
580+
filter_conditions: Optional[Dict[str, Any]] = None,
581+
sort: Optional[List[SortParam]] = None,
582+
options: Optional[QuerySegmentsOptions] = None,
583+
) -> StreamResponse:
584+
payload = {}
585+
if filter_conditions is not None:
586+
payload["filter"] = filter_conditions
587+
if sort is not None:
588+
payload["sort"] = sort # type: ignore
589+
if options is not None:
590+
payload.update(cast(dict, options))
591+
return await self.post("segments/query", data=payload)
592+
593+
async def update_segment(
594+
self, segment_id: str, data: SegmentData
595+
) -> StreamResponse:
596+
return await self.put(f"segments/{segment_id}", data=data)
548597

549598
async def delete_segment(self, segment_id: str) -> StreamResponse:
550599
return await self.delete(f"segments/{segment_id}")
551600

552-
async def create_campaign(self, campaign: Dict) -> StreamResponse:
553-
return await self.post("campaigns", data={"campaign": campaign})
601+
async def segment_target_exists(
602+
self, segment_id: str, target_id: str
603+
) -> StreamResponse:
604+
return await self.get(f"segments/{segment_id}/target/{target_id}")
554605

555-
async def query_campaigns(self, **params: Any) -> StreamResponse:
556-
return await self.get("campaigns", params={"payload": json.dumps(params)})
606+
async def add_segment_targets(
607+
self, segment_id: str, target_ids: List[str]
608+
) -> StreamResponse:
609+
return await self.post(
610+
f"segments/{segment_id}/addtargets", data={"target_ids": target_ids}
611+
)
557612

558-
async def update_campaign(self, campaign_id: str, data: Dict) -> StreamResponse:
559-
return await self.put(f"campaigns/{campaign_id}", data={"campaign": data})
613+
async def query_segment_targets(
614+
self,
615+
segment_id: str,
616+
filter_conditions: Optional[Dict[str, Any]] = None,
617+
sort: Optional[List[SortParam]] = None,
618+
options: Optional[QuerySegmentTargetsOptions] = None,
619+
) -> StreamResponse:
620+
payload = {}
621+
if filter_conditions is not None:
622+
payload["filter"] = filter_conditions
623+
if sort is not None:
624+
payload["sort"] = sort # type: ignore
625+
if options is not None:
626+
payload.update(cast(dict, options))
627+
return await self.post(f"segments/{segment_id}/targets/query", data=payload)
628+
629+
async def remove_segment_targets(
630+
self, segment_id: str, target_ids: List[str]
631+
) -> StreamResponse:
632+
return await self.post(
633+
f"segments/{segment_id}/deletetargets", data={"target_ids": target_ids}
634+
)
560635

561-
async def delete_campaign(self, campaign_id: str, **options: Any) -> StreamResponse:
562-
return await self.delete(f"campaigns/{campaign_id}", params=options)
636+
def campaign( # type: ignore
637+
self, campaign_id: Optional[str] = None, data: Optional[CampaignData] = None
638+
) -> Campaign:
639+
return Campaign(client=self, campaign_id=campaign_id, data=data)
563640

564-
async def schedule_campaign(
565-
self, campaign_id: str, scheduled_for: int = None
641+
async def create_campaign(
642+
self, campaign_id: Optional[str] = None, data: Optional[CampaignData] = None
566643
) -> StreamResponse:
567-
return await self.patch(
568-
f"campaigns/{campaign_id}/schedule", data={"scheduled_for": scheduled_for}
569-
)
644+
payload = {"id": campaign_id}
645+
if data is not None:
646+
payload.update(cast(dict, data))
647+
return await self.post("campaigns", data=payload)
570648

571-
async def query_recipients(self, **params: Any) -> StreamResponse:
572-
return await self.get("recipients", params={"payload": json.dumps(params)})
649+
async def get_campaign(self, campaign_id: str) -> StreamResponse:
650+
return await self.get(f"campaigns/{campaign_id}")
573651

574-
async def stop_campaign(self, campaign_id: str) -> StreamResponse:
575-
return await self.patch(f"campaigns/{campaign_id}/stop")
652+
async def query_campaigns(
653+
self,
654+
filter_conditions: Optional[Dict[str, Any]] = None,
655+
sort: Optional[List[SortParam]] = None,
656+
options: QueryCampaignsOptions = None,
657+
) -> StreamResponse:
658+
payload = {}
659+
if filter_conditions is not None:
660+
payload["filter"] = filter_conditions
661+
if sort is not None:
662+
payload["sort"] = sort # type: ignore
663+
if options is not None:
664+
payload.update(cast(dict, options))
665+
return await self.post("campaigns/query", data=payload)
666+
667+
async def update_campaign(
668+
self, campaign_id: str, data: CampaignData
669+
) -> StreamResponse:
670+
return await self.put(f"campaigns/{campaign_id}", data=data)
671+
672+
async def delete_campaign(self, campaign_id: str, **options: Any) -> StreamResponse:
673+
return await self.delete(f"campaigns/{campaign_id}", options)
576674

577-
async def resume_campaign(self, campaign_id: str) -> StreamResponse:
578-
return await self.patch(f"campaigns/{campaign_id}/resume")
675+
async def start_campaign(
676+
self,
677+
campaign_id: str,
678+
scheduled_for: Optional[Union[str, datetime.datetime]] = None,
679+
) -> StreamResponse:
680+
payload = {}
681+
if scheduled_for is not None:
682+
if isinstance(scheduled_for, datetime.datetime):
683+
scheduled_for = scheduled_for.isoformat()
684+
payload["scheduled_for"] = scheduled_for
685+
return await self.post(f"campaigns/{campaign_id}/start", data=payload)
686+
687+
async def stop_campaign(self, campaign_id: str) -> StreamResponse:
688+
return await self.post(f"campaigns/{campaign_id}/stop")
579689

580690
async def test_campaign(
581691
self, campaign_id: str, users: Iterable[str]

stream_chat/async_chat/segment.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from typing import Dict, List, Optional
2+
3+
from stream_chat.base.segment import SegmentInterface
4+
from stream_chat.types.base import SortParam
5+
from stream_chat.types.segment import QuerySegmentTargetsOptions, SegmentData
6+
from stream_chat.types.stream_response import StreamResponse
7+
8+
9+
class Segment(SegmentInterface):
10+
async def create(
11+
self, segment_id: Optional[str] = None, data: Optional[SegmentData] = None
12+
) -> StreamResponse:
13+
if segment_id is not None:
14+
self.segment_id = segment_id
15+
if data is not None:
16+
self.data = data
17+
18+
state = await self.client.create_segment( # type: ignore
19+
segment_type=self.segment_type, segment_id=self.segment_id, data=self.data
20+
)
21+
22+
if self.segment_id is None and state.is_ok() and "segment" in state:
23+
self.segment_id = state["segment"]["id"]
24+
return state
25+
26+
async def get(self) -> StreamResponse:
27+
return await self.client.get_segment(segment_id=self.segment_id) # type: ignore
28+
29+
async def update(self, data: SegmentData) -> StreamResponse:
30+
return await self.client.update_segment( # type: ignore
31+
segment_id=self.segment_id, data=data
32+
)
33+
34+
async def delete(self) -> StreamResponse:
35+
return await self.client.delete_segment( # type: ignore
36+
segment_id=self.segment_id
37+
)
38+
39+
async def target_exists(self, target_id: str) -> StreamResponse:
40+
return await self.client.segment_target_exists( # type: ignore
41+
segment_id=self.segment_id, target_id=target_id
42+
)
43+
44+
async def add_targets(self, target_ids: list) -> StreamResponse:
45+
return await self.client.add_segment_targets( # type: ignore
46+
segment_id=self.segment_id, target_ids=target_ids
47+
)
48+
49+
async def query_targets(
50+
self,
51+
filter_conditions: Optional[Dict] = None,
52+
sort: Optional[List[SortParam]] = None,
53+
options: Optional[QuerySegmentTargetsOptions] = None,
54+
) -> StreamResponse:
55+
return await self.client.query_segment_targets( # type: ignore
56+
segment_id=self.segment_id,
57+
filter_conditions=filter_conditions,
58+
sort=sort,
59+
options=options,
60+
)
61+
62+
async def remove_targets(self, target_ids: list) -> StreamResponse:
63+
return await self.client.remove_segment_targets( # type: ignore
64+
segment_id=self.segment_id, target_ids=target_ids
65+
)

stream_chat/base/campaign.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import abc
2+
import datetime
3+
from typing import Awaitable, Optional, Union
4+
5+
from stream_chat.base.client import StreamChatInterface
6+
from stream_chat.types.campaign import CampaignData
7+
from stream_chat.types.stream_response import StreamResponse
8+
9+
10+
class CampaignInterface(abc.ABC):
11+
def __init__(
12+
self,
13+
client: StreamChatInterface,
14+
campaign_id: Optional[str] = None,
15+
data: CampaignData = None,
16+
):
17+
self.client = client
18+
self.campaign_id = campaign_id
19+
self.data = data
20+
21+
@abc.abstractmethod
22+
def create(
23+
self, campaign_id: Optional[str], data: Optional[CampaignData]
24+
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
25+
pass
26+
27+
@abc.abstractmethod
28+
def get(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
29+
pass
30+
31+
@abc.abstractmethod
32+
def update(
33+
self, data: CampaignData
34+
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
35+
pass
36+
37+
@abc.abstractmethod
38+
def delete(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
39+
pass
40+
41+
@abc.abstractmethod
42+
def start(
43+
self, scheduled_for: Optional[Union[str, datetime.datetime]] = None
44+
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
45+
pass
46+
47+
@abc.abstractmethod
48+
def stop(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
49+
pass

0 commit comments

Comments
 (0)