|
| 1 | +import re |
| 2 | +import requests |
| 3 | +from urllib.parse import urlparse, unquote |
| 4 | +import logging |
| 5 | + |
| 6 | +logger = logging.getLogger(__name__) |
| 7 | + |
| 8 | +class GoogleSearchAPI: |
| 9 | + def __init__(self, key: str, cx: str): |
| 10 | + self._cx = cx |
| 11 | + self._key = key |
| 12 | + self._api_url = "https://www.googleapis.com/customsearch/v1" |
| 13 | + self._params = { |
| 14 | + "num": 10, |
| 15 | + "cx": self._cx, |
| 16 | + "key": self._key |
| 17 | + } |
| 18 | + |
| 19 | + def _hit_api(self, linkedin_id: str) -> list: |
| 20 | + results = [] |
| 21 | + try: |
| 22 | + params = self._params.copy() |
| 23 | + params["exactTerms"] = f"/in/{linkedin_id}" |
| 24 | + while True: |
| 25 | + resp = requests.get(self._api_url, params=params) |
| 26 | + if resp.status_code != 200: |
| 27 | + logger.warning(f"Google Custom Search API error: {resp.status_code} - {resp.text}") |
| 28 | + break |
| 29 | + |
| 30 | + data = resp.json() |
| 31 | + items = data.get("items", []) |
| 32 | + results.extend(items) |
| 33 | + |
| 34 | + next_page = data.get("queries", {}).get("nextPage", []) |
| 35 | + if not next_page: |
| 36 | + break |
| 37 | + params["start"] = next_page[0]["startIndex"] |
| 38 | + except Exception as e: |
| 39 | + logger.exception("Error in _hit_api:") |
| 40 | + return results |
| 41 | + |
| 42 | +class ProfilePicture: |
| 43 | + def __init__(self, key: str, cx: str): |
| 44 | + self._api_obj = GoogleSearchAPI(key, cx) |
| 45 | + |
| 46 | + def extract_id(self, link: str) -> str: |
| 47 | + """ To get a clean LinkedIn ID """ |
| 48 | + linkedin_id = link |
| 49 | + match = re.findall(r'\/in\/([^\/]+)\/?', urlparse(link).path) |
| 50 | + if match: |
| 51 | + linkedin_id = match[0].strip() |
| 52 | + linkedin_id = linkedin_id.strip("/") |
| 53 | + linkedin_id = unquote(linkedin_id) |
| 54 | + return linkedin_id |
| 55 | + |
| 56 | + def _check_picture_url(self, link: str) -> bool: |
| 57 | + match = re.search(r"(media-exp\d\.licdn\.com).+?(profile-displayphoto-shrink_)", link) |
| 58 | + return bool(match) |
| 59 | + |
| 60 | + def _check_url_exists(self, link: str) -> bool: |
| 61 | + try: |
| 62 | + resp = requests.head(link, timeout=5) |
| 63 | + return resp.status_code == 200 |
| 64 | + except requests.RequestException: |
| 65 | + return False |
| 66 | + |
| 67 | + def _extract_profile_picture(self, linkedin_id: str, res: list) -> str: |
| 68 | + link = "" |
| 69 | + for item in res: |
| 70 | + linkedin_url = item.get("link", "") |
| 71 | + search_id = self.extract_id(linkedin_url) |
| 72 | + if search_id == linkedin_id: |
| 73 | + metatags = item.get("pagemap", {}).get("metatags", []) |
| 74 | + metatags = [tag.get("og:image") for tag in metatags if "og:image" in tag] |
| 75 | + |
| 76 | + for url in metatags: |
| 77 | + if self._check_picture_url(url) and self._check_url_exists(url): |
| 78 | + link = url |
| 79 | + break |
| 80 | + if link: |
| 81 | + break |
| 82 | + return link |
| 83 | + |
| 84 | + def _extract_profile_info(self, linkedin_id: str, res: list) -> dict: |
| 85 | + info = {} |
| 86 | + for item in res: |
| 87 | + linkedin_url = item.get("link", "") |
| 88 | + search_id = self.extract_id(linkedin_url) |
| 89 | + if search_id == linkedin_id: |
| 90 | + info["name"] = item.get("title") |
| 91 | + info["headline"] = item.get("snippet") |
| 92 | + info["public_url"] = linkedin_url |
| 93 | + break |
| 94 | + return info |
| 95 | + |
| 96 | + def get_profile_picture(self, link: str) -> str: |
| 97 | + linkedin_id = self.extract_id(link) |
| 98 | + api_resp = self._api_obj._hit_api(linkedin_id) |
| 99 | + profile_picture_url = self._extract_profile_picture(linkedin_id, api_resp) |
| 100 | + return profile_picture_url |
| 101 | + |
| 102 | + def get_profile_info(self, link: str) -> dict: |
| 103 | + linkedin_id = self.extract_id(link) |
| 104 | + api_resp = self._api_obj._hit_api(linkedin_id) |
| 105 | + profile_info = self._extract_profile_info(linkedin_id, api_resp) |
| 106 | + return profile_info |
0 commit comments