Skip to content
This repository was archived by the owner on Dec 12, 2023. It is now read-only.

Commit 34d89cf

Browse files
committed
listener: improve resiliency
1 parent 7ecadb9 commit 34d89cf

File tree

2 files changed

+68
-37
lines changed

2 files changed

+68
-37
lines changed

linkedin_messaging/api_objects.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class ReactionSummary:
238238

239239
@dataclass_json(letter_case=LetterCase.CAMEL, undefined=Undefined.EXCLUDE)
240240
@dataclass
241-
class ConversationEvent(DataClassJsonMixin):
241+
class ConversationEvent:
242242
created_at: datetime
243243
entity_urn: URN
244244
event_content: EventContent
@@ -312,3 +312,18 @@ class SendMessageResponse(DataClassJsonMixin):
312312
class UserProfileResponse(DataClassJsonMixin):
313313
plain_id: str
314314
mini_profile: MiniProfile
315+
316+
317+
@dataclass_json(letter_case=LetterCase.CAMEL)
318+
@dataclass
319+
class RealTimeEventStreamEvent(DataClassJsonMixin):
320+
# Message real-time events
321+
previous_event_in_conversation: Optional[URN] = None
322+
event: Optional[ConversationEvent] = None
323+
324+
# Reaction real-time events
325+
reaction_added: Optional[bool] = None
326+
actor_mini_profile_urn: Optional[URN] = None
327+
event_urn: Optional[URN] = None
328+
reaction_summary: Optional[ReactionSummary] = None
329+
viewer_reacted: Optional[bool] = None

linkedin_messaging/linkedin.py

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
import logging
34
import pickle
@@ -7,7 +8,6 @@
78
Any,
89
Awaitable,
910
Callable,
10-
cast,
1111
DefaultDict,
1212
Dict,
1313
List,
@@ -17,16 +17,14 @@
1717

1818
import aiohttp
1919
from bs4 import BeautifulSoup
20-
from dataclasses_json import DataClassJsonMixin
2120

2221
from .api_objects import (
23-
ConversationEvent,
2422
ConversationResponse,
2523
ConversationsResponse,
2624
MessageAttachmentCreate,
2725
MessageCreate,
2826
Picture,
29-
ReactionSummary,
27+
RealTimeEventStreamEvent,
3028
SendMessageResponse,
3129
URN,
3230
UserProfileResponse,
@@ -59,6 +57,8 @@
5957
SEED_URL = "https://www.linkedin.com/uas/login"
6058
LOGIN_URL = "https://www.linkedin.com/checkpoint/lg/login-submit"
6159
VERIFY_URL = "https://www.linkedin.com/checkpoint/challenge/verify"
60+
REALTIME_CONNECT_URL = "https://realtime.www.linkedin.com/realtime/connect"
61+
HEARTBEAT_URL = "https://realtime.www.linkedin.com/realtime/realtimeFrontendClientConnectivityTracking" # noqa: E501
6262

6363
LINKEDIN_BASE_URL = "https://www.linkedin.com"
6464
API_BASE_URL = f"{LINKEDIN_BASE_URL}/voyager/api"
@@ -102,17 +102,30 @@ async def _post(self, relative_url: str, **kwargs) -> aiohttp.ClientResponse:
102102

103103
# region Authentication
104104

105-
async def logged_in(self) -> bool:
105+
@property
106+
def has_auth_cookies(self) -> bool:
106107
cookie_names = {c.key for c in self.session.cookie_jar}
107-
if (
108-
"liap" not in cookie_names
109-
or "li_at" not in cookie_names
110-
or "JSESSIONID" not in cookie_names
111-
):
108+
return (
109+
"liap" in cookie_names
110+
and "li_at" in cookie_names
111+
and "JSESSIONID" in cookie_names
112+
)
113+
114+
async def logged_in(self) -> bool:
115+
if not self.has_auth_cookies:
116+
return False
117+
try:
118+
return bool(await self.get_user_profile())
119+
except Exception as e:
120+
logging.exception(f"Failed getting the user profile: {e}")
112121
return False
113-
return bool(await self.get_user_profile())
114122

115-
async def login(self, email: str, password: str):
123+
async def login(self, email: str, password: str, new_session: bool = True):
124+
if new_session:
125+
if self.session:
126+
await self.session.close()
127+
self.session = aiohttp.ClientSession()
128+
116129
# Get the CSRF token.
117130
async with self.session.get(SEED_URL) as seed_response:
118131
if seed_response.status != 200:
@@ -132,7 +145,7 @@ async def login(self, email: str, password: str):
132145
) as login_response:
133146
# Check to see if the user was successfully logged in with just email and
134147
# password.
135-
if self.logged_in():
148+
if self.has_auth_cookies:
136149
for c in self.session.cookie_jar:
137150
if c.key == "JSESSIONID":
138151
self.session.headers["csrf-token"] = c.value.strip('"')
@@ -157,9 +170,11 @@ async def login(self, email: str, password: str):
157170
"challengeData",
158171
"challengeDetails",
159172
"failureRedirectUri",
173+
"flowTreeId",
160174
)
161175
}
162-
self.two_factor_payload["language"] = ("en-US",)
176+
self.two_factor_payload["language"] = "en-US"
177+
self.two_factor_payload["recognizedDevice"] = "on"
163178
raise ChallengeException()
164179

165180
# TODO (#1) can we scrape anything from the page?
@@ -169,7 +184,7 @@ async def enter_2fa(self, two_factor_code: str):
169184
async with self.session.post(
170185
VERIFY_URL, data={**self.two_factor_payload, "pin": two_factor_code}
171186
):
172-
if self.logged_in():
187+
if self.has_auth_cookies:
173188
for c in self.session.cookie_jar:
174189
if c.key == "JSESSIONID":
175190
self.session.headers["csrf-token"] = c.value.strip('"')
@@ -333,11 +348,6 @@ def add_event_listener(
333348
):
334349
self.event_listeners[payload_key].append(fn)
335350

336-
object_translation_map: Dict[str, DataClassJsonMixin] = {
337-
"event": cast(DataClassJsonMixin, ConversationEvent),
338-
"reactionSummary": cast(DataClassJsonMixin, ReactionSummary),
339-
}
340-
341351
async def _fire(self, payload_key: str, event: Any):
342352
for listener in self.event_listeners[payload_key]:
343353
await listener(event)
@@ -346,35 +356,41 @@ async def _listen_to_event_stream(self):
346356
logging.info("Starting event stream listener")
347357

348358
async with self.session.get(
349-
"https://realtime.www.linkedin.com/realtime/connect",
359+
REALTIME_CONNECT_URL,
350360
headers={"content-type": "text/event-stream", **REQUEST_HEADERS},
351-
timeout=2 ** 128,
352361
) as resp:
362+
if resp.status != 200:
363+
raise Exception(f"Failed to connect. Status {resp.status}.")
364+
353365
while True:
354-
chunk = await resp.content.readline()
355-
if not chunk:
366+
line = await resp.content.readline()
367+
if resp.content.at_eof():
356368
break
357-
if not chunk.startswith(b"data:"):
369+
370+
if not line.startswith(b"data:"):
358371
continue
359-
data = json.loads(chunk.decode("utf-8")[6:])
372+
data = json.loads(line.decode("utf-8")[6:])
373+
360374
event_payload = data.get(
361375
"com.linkedin.realtimefrontend.DecoratedEvent", {}
362376
).get("payload", {})
363377

364-
# TODO this should probably pass the entire event_payload to the
365-
# translation map
366-
for key, translate_to in self.object_translation_map.items():
367-
value = event_payload.get(key)
368-
if value is not None:
369-
await self._fire(key, translate_to.from_dict(value))
378+
for key in self.event_listeners.keys():
379+
if event_payload.get(key):
380+
await self._fire(
381+
key, RealTimeEventStreamEvent.from_dict(event_payload)
382+
)
370383

371384
logging.info("Event stream closed")
372385

373386
async def start_listener(self):
374-
try:
375-
while True:
387+
while True:
388+
try:
376389
await self._listen_to_event_stream()
377-
except Exception as e:
378-
logging.exception(f"Error listening to event stream: {e}")
390+
except asyncio.exceptions.TimeoutError:
391+
continue
392+
except Exception as e:
393+
logging.exception(f"Error listening to event stream: {e}")
394+
continue
379395

380396
# endregion

0 commit comments

Comments
 (0)