diff --git a/src/Osintgram.py b/src/Osintgram.py index 9904ddb..bad2c0d 100644 --- a/src/Osintgram.py +++ b/src/Osintgram.py @@ -13,6 +13,12 @@ from geopy.geocoders import Nominatim from instagram_private_api import Client as AppClient from instagram_private_api import ClientCookieExpiredError, ClientLoginRequiredError, ClientError, ClientThrottledError +try: + from instagram_private_api import ClientCheckpointRequiredError +except ImportError: + # If ClientCheckpointRequiredError doesn't exist in this version, create a dummy class + class ClientCheckpointRequiredError(ClientError): + pass from prettytable import PrettyTable @@ -65,28 +71,42 @@ def setTarget(self, target): def __get_feed__(self): data = [] - result = self.api.user_feed(str(self.target_id)) - data.extend(result.get('items', [])) + try: + result = self.api.user_feed(str(self.target_id)) + data.extend(result.get('items', [])) - next_max_id = result.get('next_max_id') - while next_max_id: - results = self.api.user_feed(str(self.target_id), max_id=next_max_id) - data.extend(results.get('items', [])) - next_max_id = results.get('next_max_id') + next_max_id = result.get('next_max_id') + while next_max_id: + results = self.api.user_feed(str(self.target_id), max_id=next_max_id) + data.extend(results.get('items', [])) + next_max_id = results.get('next_max_id') + except ClientCheckpointRequiredError as e: + self.handle_checkpoint_error(e) + sys.exit(2) + except ClientError as e: + self.handle_api_error(e, "Error fetching user feed") + sys.exit(2) return data def __get_comments__(self, media_id): comments = [] - result = self.api.media_comments(str(media_id)) - comments.extend(result.get('comments', [])) + try: + result = self.api.media_comments(str(media_id)) + comments.extend(result.get('comments', [])) - next_max_id = result.get('next_max_id') - while next_max_id: - results = self.api.media_comments(str(media_id), max_id=next_max_id) - comments.extend(results.get('comments', [])) - next_max_id = results.get('next_max_id') + next_max_id = result.get('next_max_id') + while next_max_id: + results = self.api.media_comments(str(media_id), max_id=next_max_id) + comments.extend(results.get('comments', [])) + next_max_id = results.get('next_max_id') + except ClientCheckpointRequiredError as e: + self.handle_checkpoint_error(e) + raise # Re-raise to be handled by caller + except ClientError as e: + self.handle_api_error(e, f"Error fetching comments for post {media_id}") + raise # Re-raise to be handled by caller return comments @@ -274,7 +294,11 @@ def get_comment_data(self): return pc.printout("Retrieving all comments, this may take a moment...\n") - data = self.__get_feed__() + try: + data = self.__get_feed__() + except (ClientCheckpointRequiredError, ClientError): + # Error already handled in __get_feed__ + return _comments = [] t = PrettyTable(['POST ID', 'ID', 'Username', 'Comment']) @@ -283,18 +307,29 @@ def get_comment_data(self): t.align["Username"] = "l" t.align["Comment"] = "l" - for post in data: - post_id = post.get('id') - comments = self.api.media_n_comments(post_id) - for comment in comments: - t.add_row([post_id, comment.get('user_id'), comment.get('user').get('username'), comment.get('text')]) - comment = { - "post_id": post_id, - "user_id":comment.get('user_id'), - "username": comment.get('user').get('username'), - "comment": comment.get('text') - } - _comments.append(comment) + try: + for post in data: + post_id = post.get('id') + try: + comments = self.api.media_n_comments(post_id) + for comment in comments: + t.add_row([post_id, comment.get('user_id'), comment.get('user').get('username'), comment.get('text')]) + comment = { + "post_id": post_id, + "user_id":comment.get('user_id'), + "username": comment.get('user').get('username'), + "comment": comment.get('text') + } + _comments.append(comment) + except ClientCheckpointRequiredError as e: + self.handle_checkpoint_error(e) + return + except ClientError as e: + self.handle_api_error(e, f"Error fetching comments for post {post_id}") + continue + except Exception as e: + pc.printout(f"Unexpected error: {str(e)}\n", pc.RED) + return print(t) if self.writeFile: @@ -582,6 +617,34 @@ def get_user_info(self): json.dump(user, f) except ClientError as e: + try: + # Try to parse error response + if hasattr(e, 'error_response') and e.error_response: + try: + error = json.loads(e.error_response) + except (json.JSONDecodeError, ValueError): + # If error_response is not valid JSON (empty, HTML, etc.), create a simple error dict + error = {'message': str(e.msg) if hasattr(e, 'msg') else 'Unknown error', 'code': e.code if hasattr(e, 'code') else None} + else: + # No error_response available + error = {'message': str(e.msg) if hasattr(e, 'msg') else 'Unknown error', 'code': e.code if hasattr(e, 'code') else None} + + # Check for checkpoint challenge + if error.get('error_type') == 'checkpoint_challenge_required' or \ + error.get('message') == 'challenge_required' or \ + error.get('message') == 'checkpoint_required' or \ + 'challenge' in error or \ + 'checkpoint_url' in error: + self.handle_challenge_error(error) + exit(2) + except Exception as parse_error: + # If anything goes wrong parsing the error, check message directly + if hasattr(e, 'msg') and ('checkpoint' in str(e.msg).lower() or 'challenge' in str(e.msg).lower()): + error = {'message': str(e.msg)} + self.handle_challenge_error(error) + exit(2) + + # Other ClientErrors print(e) pc.printout("Oops... " + str(self.target) + " non exist, please enter a valid username.", pc.RED) pc.printout("\n") @@ -945,9 +1008,38 @@ def get_user_propic(self): pc.printout("Sorry! No results found :-(\n", pc.RED) except ClientError as e: - error = json.loads(e.error_response) - print(error['message']) - print(error['error_title']) + try: + # Try to parse error response + if hasattr(e, 'error_response') and e.error_response: + try: + error = json.loads(e.error_response) + except (json.JSONDecodeError, ValueError): + # If error_response is not valid JSON (empty, HTML, etc.), create a simple error dict + error = {'message': str(e.msg) if hasattr(e, 'msg') else 'Unknown error', 'code': e.code if hasattr(e, 'code') else None} + else: + # No error_response available + error = {'message': str(e.msg) if hasattr(e, 'msg') else 'Unknown error', 'code': e.code if hasattr(e, 'code') else None} + + # Check for checkpoint challenge + if error.get('error_type') == 'checkpoint_challenge_required' or \ + error.get('message') == 'challenge_required' or \ + error.get('message') == 'checkpoint_required' or \ + 'challenge' in error or \ + 'checkpoint_url' in error: + self.handle_challenge_error(error) + exit(2) + except Exception as parse_error: + # If anything goes wrong parsing the error, check message directly + if hasattr(e, 'msg') and ('checkpoint' in str(e.msg).lower() or 'challenge' in str(e.msg).lower()): + error = {'message': str(e.msg)} + self.handle_challenge_error(error) + exit(2) + + # Other ClientErrors + if 'message' in error: + print(error['message']) + if 'error_title' in error: + print(error['error_title']) exit(2) def get_user_stories(self): @@ -1067,14 +1159,27 @@ def get_user(self, username): return user except ClientError as e: + try: + error = json.loads(e.error_response) + except (json.JSONDecodeError, TypeError, AttributeError): + # If error_response is not valid JSON, treat as string + error = {'message': str(e.error_response)} + + # Check for checkpoint challenge + if error.get('error_type') == 'checkpoint_challenge_required' or \ + error.get('message') == 'challenge_required' or \ + error.get('message') == 'checkpoint_required' or \ + 'challenge' in error or \ + 'checkpoint_url' in error: + self.handle_challenge_error(error) + sys.exit(2) + + # Other ClientErrors pc.printout('ClientError {0!s} (Code: {1:d}, Response: {2!s})'.format(e.msg, e.code, e.error_response), pc.RED) - error = json.loads(e.error_response) if 'message' in error: print(error['message']) if 'error_title' in error: print(error['error_title']) - if 'challenge' in error: - print("Please follow this link to complete the challenge: " + error['challenge']['url']) sys.exit(2) @@ -1102,6 +1207,158 @@ def set_json_dump(self, flag): self.jsonDump = flag + def handle_checkpoint_error(self, error_exception): + """Handle ClientCheckpointRequiredError exception""" + try: + # Try to extract error information from the exception + if hasattr(error_exception, 'error_response') and error_exception.error_response: + try: + error = json.loads(error_exception.error_response) + except (json.JSONDecodeError, ValueError): + error = {'message': str(error_exception.msg) if hasattr(error_exception, 'msg') else 'checkpoint_required'} + else: + error = {'message': str(error_exception.msg) if hasattr(error_exception, 'msg') else 'checkpoint_required'} + + self.handle_challenge_error(error) + except Exception: + # If anything fails, show a basic checkpoint message + error = {'message': 'checkpoint_required'} + self.handle_challenge_error(error) + + def handle_api_error(self, error_exception, context_message=""): + """Handle general API errors""" + try: + if hasattr(error_exception, 'error_response') and error_exception.error_response: + try: + error = json.loads(error_exception.error_response) + except (json.JSONDecodeError, ValueError): + error = {'message': str(error_exception.msg) if hasattr(error_exception, 'msg') else 'Unknown error'} + else: + error = {'message': str(error_exception.msg) if hasattr(error_exception, 'msg') else 'Unknown error'} + + # Check for checkpoint challenge + if error.get('error_type') == 'checkpoint_challenge_required' or \ + error.get('message') == 'challenge_required' or \ + error.get('message') == 'checkpoint_required' or \ + 'challenge' in error or \ + 'checkpoint_url' in error or \ + (hasattr(error_exception, 'msg') and ('checkpoint' in str(error_exception.msg).lower() or 'challenge' in str(error_exception.msg).lower())): + self.handle_challenge_error(error) + sys.exit(2) + + # Other errors + if context_message: + pc.printout(f"{context_message}: ", pc.RED) + pc.printout(f"ClientError {error.get('message', 'Unknown error')}\n", pc.RED) + except Exception: + if context_message: + pc.printout(f"{context_message}: ", pc.RED) + pc.printout(f"Unknown error: {str(error_exception)}\n", pc.RED) + + def handle_challenge_error(self, error): + """Handle Instagram challenge_required errors""" + pc.printout("\n" + "="*70 + "\n", pc.YELLOW) + pc.printout("CHECKPOINT CHALLENGE REQUIRED\n", pc.RED) + pc.printout("="*70 + "\n", pc.YELLOW) + pc.printout("Instagram detected suspicious activity and requires verification.\n", pc.YELLOW) + pc.printout("You need to complete a security challenge to continue.\n\n", pc.YELLOW) + + challenge_url = None + api_path = None + + # Try to extract challenge/checkpoint URL from different possible structures + if isinstance(error, dict): + # Check for checkpoint_url (new format) + if 'checkpoint_url' in error: + challenge_url = error.get('checkpoint_url', '') + + # Check for challenge object (old format) + if 'challenge' in error: + challenge_obj = error['challenge'] + if isinstance(challenge_obj, dict): + challenge_url = challenge_url or challenge_obj.get('url', '') + api_path = challenge_obj.get('api_path', '') + elif isinstance(challenge_obj, str): + challenge_url = challenge_url or challenge_obj + + pc.printout("COMPLETE SOLUTION (STEP BY STEP):\n", pc.GREEN) + pc.printout("-" * 70 + "\n", pc.YELLOW) + + pc.printout("STEP 1 - IMPORTANT: Log in to the browser FIRST:\n", pc.CYAN) + pc.printout("1. Open Instagram.com in your browser\n", pc.YELLOW) + pc.printout("2. If you're not logged in, LOG IN first\n", pc.RED) + pc.printout("3. Instagram should automatically request the challenge\n", pc.YELLOW) + pc.printout("4. Complete ALL verification:\n", pc.YELLOW) + pc.printout(" - Confirm that it was you who tried to log in\n", pc.YELLOW) + pc.printout(" - Enter the code sent by email/SMS if requested\n", pc.YELLOW) + pc.printout(" - Confirm recent activities if requested\n", pc.YELLOW) + pc.printout("5. VERIFY that the challenge was actually completed:\n", pc.RED) + pc.printout(" - You should be able to access your account normally in the browser\n", pc.YELLOW) + pc.printout(" - No more verification messages should appear\n", pc.YELLOW) + pc.printout("6. Wait 15-30 MINUTES after completing (Instagram needs to process)\n\n", pc.RED) + + if challenge_url: + pc.printout("Alternative URL (may expire quickly):\n", pc.CYAN) + pc.printout(challenge_url + "\n", pc.CYAN) + pc.printout("NOTE: If 'page not available' (404) appears, the link has expired.\n", pc.RED) + pc.printout(" Use the browser method above.\n\n", pc.YELLOW) + + pc.printout("STEP 2 - Use the Instagram mobile app (OPTIONAL but recommended):\n", pc.CYAN) + pc.printout("1. Open the Instagram app on your phone\n", pc.YELLOW) + pc.printout("2. Verify that you can access normally\n", pc.YELLOW) + pc.printout("3. This helps normalize your account in Instagram's system\n\n", pc.YELLOW) + + pc.printout("STEP 3 - IMPORTANT: Clear cookies and log in again:\n", pc.CYAN) + pc.printout("After completing the challenge in the browser AND waiting 15-30 minutes:\n\n", pc.YELLOW) + pc.printout("Run the command with the -C flag to clear cookies:\n", pc.GREEN) + pc.printout(" python main.py -C\n\n", pc.GREEN) + pc.printout("OR manually clear the file:\n", pc.YELLOW) + pc.printout(" rm config/settings.json\n", pc.GREEN) + pc.printout(" (or delete the config/settings.json file)\n\n", pc.YELLOW) + + pc.printout("STEP 4 - If it still asks for challenge, try these solutions:\n", pc.CYAN) + pc.printout("1. Wait longer (up to 1 hour) - Instagram may take time to process\n", pc.YELLOW) + pc.printout("2. Check security notifications on Instagram:\n", pc.YELLOW) + pc.printout(" - Go to Settings > Security > Login Activity\n", pc.YELLOW) + pc.printout(" - See if there are any pending verifications\n", pc.YELLOW) + pc.printout("3. Try using a different network (without VPN/proxy)\n", pc.YELLOW) + pc.printout("4. Check if your account is not temporarily blocked\n\n", pc.YELLOW) + + pc.printout("WHY IS THIS NECESSARY?\n", pc.RED) + pc.printout("-" * 70 + "\n", pc.YELLOW) + pc.printout("When you complete the challenge in the browser, the browser\n", pc.YELLOW) + pc.printout("cookies are updated, but the cookies saved by the API are not.\n", pc.YELLOW) + pc.printout("The tool needs to log in again to get the updated cookies\n", pc.YELLOW) + pc.printout("after the challenge is completed.\n\n", pc.YELLOW) + + pc.printout("QUICK SUMMARY:\n", pc.GREEN) + pc.printout("1. LOG IN to Instagram.com in the browser (if not logged in)\n", pc.YELLOW) + pc.printout("2. Complete ALL challenge verification\n", pc.YELLOW) + pc.printout("3. Verify that you can access normally in the browser\n", pc.YELLOW) + pc.printout("4. Wait 15-30 MINUTES (Instagram needs to process)\n", pc.RED) + pc.printout("5. Run: python main.py -C\n", pc.GREEN) + pc.printout(" (the -C flag clears cookies and forces a new login)\n\n", pc.YELLOW) + + pc.printout("IMPORTANT NOTES:\n", pc.RED) + pc.printout("- Challenge URLs expire quickly (minutes)\n", pc.YELLOW) + pc.printout("- The most reliable method is to access Instagram directly in the browser\n", pc.YELLOW) + pc.printout("- It is CRUCIAL to log in to the browser BEFORE using the API\n", pc.RED) + pc.printout("- Wait 15-30 minutes after completing (not just 5-10 minutes)\n", pc.RED) + pc.printout("- ALWAYS use -C after completing the challenge to update the session\n", pc.YELLOW) + pc.printout("- Use the same account credentials\n", pc.YELLOW) + pc.printout("- Avoid VPN or proxies that may trigger verifications\n", pc.YELLOW) + pc.printout("- If it keeps asking, it may be a temporary block (wait 24h)\n\n", pc.YELLOW) + + if isinstance(error, dict): + if 'challenge' in error or 'checkpoint_url' in error: + pc.printout("Error details for debug:\n", pc.CYAN) + if 'challenge' in error: + pc.printout("Challenge: " + str(error.get('challenge', {})) + "\n", pc.YELLOW) + if 'checkpoint_url' in error: + pc.printout("Checkpoint URL: " + str(error.get('checkpoint_url', '')) + "\n", pc.YELLOW) + + pc.printout("="*70 + "\n", pc.YELLOW) + def login(self, u, p): try: settings_file = "config/settings.json" @@ -1133,14 +1390,28 @@ def login(self, u, p): on_login=lambda x: self.onlogin_callback(x, settings_file)) except ClientError as e: + try: + error = json.loads(e.error_response) + except (json.JSONDecodeError, TypeError, AttributeError): + # If error_response is not valid JSON, treat as string + error = {'message': str(e.error_response)} + + # Check for checkpoint challenge + if error.get('error_type') == 'checkpoint_challenge_required' or \ + error.get('message') == 'challenge_required' or \ + error.get('message') == 'checkpoint_required' or \ + 'challenge' in error or \ + 'checkpoint_url' in error: + self.handle_challenge_error(error) + exit(9) + + # Other ClientErrors pc.printout('ClientError {0!s} (Code: {1:d}, Response: {2!s})'.format(e.msg, e.code, e.error_response), pc.RED) - error = json.loads(e.error_response) - pc.printout(error['message'], pc.RED) - pc.printout(": ", pc.RED) + if 'message' in error: + pc.printout(error['message'], pc.RED) + pc.printout(": ", pc.RED) pc.printout(e.msg, pc.RED) pc.printout("\n") - if 'challenge' in error: - print("Please follow this link to complete the challenge: " + error['challenge']['url']) exit(9) def to_json(self, python_object): @@ -1163,8 +1434,47 @@ def onlogin_callback(self, api, new_settings_file): def check_following(self): if str(self.target_id) == self.api.authenticated_user_id: return True - endpoint = 'users/{user_id!s}/full_detail_info/'.format(**{'user_id': self.target_id}) - return self.api._call_api(endpoint)['user_detail']['user']['friendship_status']['following'] + try: + endpoint = 'users/{user_id!s}/full_detail_info/'.format(**{'user_id': self.target_id}) + result = self.api._call_api(endpoint) + return result['user_detail']['user']['friendship_status']['following'] + except ClientError as e: + # Handle errors gracefully - if we can't check, assume not following + try: + # Try to parse error response + if hasattr(e, 'error_response') and e.error_response: + try: + error = json.loads(e.error_response) + except (json.JSONDecodeError, ValueError): + # If error_response is not valid JSON (empty, HTML, etc.), create a simple error dict + error = {'message': str(e.msg) if hasattr(e, 'msg') else 'Unknown error', 'code': e.code if hasattr(e, 'code') else None} + else: + # Check for checkpoint challenge + if error.get('error_type') == 'checkpoint_challenge_required' or \ + error.get('message') == 'challenge_required' or \ + error.get('message') == 'checkpoint_required' or \ + 'challenge' in error or \ + 'checkpoint_url' in error: + self.handle_challenge_error(error) + sys.exit(2) + else: + # No error_response available, check if it's a checkpoint by message + if hasattr(e, 'msg') and ('checkpoint' in str(e.msg).lower() or 'challenge' in str(e.msg).lower()): + error = {'message': str(e.msg)} + self.handle_challenge_error(error) + sys.exit(2) + except Exception as parse_error: + # If anything goes wrong parsing the error, just continue + pass + + # For other errors (404, etc.), return False (not following) + return False + except (KeyError, TypeError) as e: + # If the response structure is unexpected, return False + return False + except Exception as e: + # For any other unexpected errors, return False + return False def check_private_profile(self): if self.is_private and not self.following: @@ -1663,11 +1973,15 @@ def get_comments(self): pc.printout("Sorry! No results found :-(\n", pc.RED) def clear_cache(self): + """Clear cached session cookies""" try: - f = open("config/settings.json",'w') - f.write("{}") - pc.printout("Cache Cleared.\n",pc.GREEN) - except FileNotFoundError: - pc.printout("Settings.json don't exist.\n",pc.RED) - finally: - f.close() + settings_file = "config/settings.json" + if os.path.isfile(settings_file): + f = open(settings_file, 'w') + f.write("{}") + f.close() + pc.printout("Cache Cleared. Next login will create a fresh session.\n", pc.GREEN) + else: + pc.printout("Settings.json doesn't exist. No cache to clear.\n", pc.YELLOW) + except Exception as e: + pc.printout(f"Error clearing cache: {e}\n", pc.RED)