diff --git a/libs/core/download.py b/libs/core/download.py index cd8ded1..101b3de 100644 --- a/libs/core/download.py +++ b/libs/core/download.py @@ -21,23 +21,41 @@ def __init__(self, input_path, file_name, cache_path, types): self.file_name = file_name def __requset__(self): + """ + 发起HTTP/HTTPS请求并处理响应。 + + 该方法根据config中的配置信息发起POST或GET请求,下载文件或获取HTML内容,并保存到缓存路径。 + 对于HTTP和HTTPS请求,设置最大重试次数以提高请求成功的概率。此外,针对不同的操作系统类型, + 采取不同的处理方式:对于'Android'或'iOS',以二进制形式下载文件并显示下载进度;对于其他情况, + 则直接保存响应的HTML文本。 + + Raises: + Exception: 如果请求过程中发生任何异常,均抛出异常。 + """ try: + # 创建一个Session对象,以保持会话状态 session = requests.Session() + # 为HTTP和HTTPS请求分别添加重试适配器,以提高请求的容错性 session.mount('http://', HTTPAdapter(max_retries=3)) session.mount('https://', HTTPAdapter(max_retries=3)) + # 禁用keep-alive,以避免长连接带来的潜在问题 session.keep_alive = False + # 设置默认的重试次数,以应对网络问题 session.adapters.DEFAULT_RETRIES = 5 + # 禁用urllib3的警告,减少不必要的日志输出 urllib3.disable_warnings() + # 根据配置中的请求方法,发起POST或GET请求 if config.method.upper() == "POST": - resp = session.post( - url=self.url, params=config.data, headers=config.headers, timeout=30) + resp = session.post(url=self.url, params=config.data, headers=config.headers, timeout=30) else: - resp = session.get(url=self.url, data=config.data, - headers=config.headers, timeout=30) + resp = session.get(url=self.url, data=config.data, headers=config.headers, timeout=30) + # 检查响应状态码,确保请求成功 if resp.status_code == requests.codes.ok: + # 根据操作系统类型,处理响应内容 if self.types == "Android" or self.types == "iOS": + # 下载文件并显示下载进度 count = 0 progress_tmp = 0 length = float(resp.headers['content-length']) @@ -50,17 +68,19 @@ def __requset__(self): if progress != progress_tmp: progress_tmp = progress print("\r", end="") - print( - "[*] Download progress: {}%: ".format(progress), "▋" * (progress // 2), end="") + print("[*] Download progress: {}%: ".format(progress), "▋" * (progress // 2), end="") sys.stdout.flush() - f.close() + f.close() else: + # 直接保存HTML文本 html = resp.text with open(self.cache_path, "w", encoding='utf-8', errors='ignore') as f: f.write(html) f.close() + # 设置下载标志为True,表示下载成功 cores.download_flag = True except Exception as e: + # 捕获并抛出异常,便于上层调用者处理 raise Exception(e) def run(self): diff --git a/libs/core/net.py b/libs/core/net.py index 52b3a2d..e59ce16 100644 --- a/libs/core/net.py +++ b/libs/core/net.py @@ -19,79 +19,130 @@ def __init__(self, threadID, name, domain_queue, worksheet): self.worksheet = worksheet def __get_Http_info__(self, threadLock): + """ + 获取HTTP信息并更新Excel工作表。 + + 从域名队列中获取域名和URL IP,然后请求URL的信息。 + 如果请求成功且结果不为错误,则将信息写入Excel工作表中。 + + 参数: + - threadLock: 线程锁,用于同步对共享资源的访问。 + + 返回值: + 无 + """ while True: + # 检查域名队列是否为空,如果为空则退出循环 if self.domain_queue.empty(): break + + # 从队列中获取域名信息,包括域名和URL IP domains = self.domain_queue.get(timeout=5) domain = domains["domain"] url_ip = domains["url_ip"] + + # 短暂休眠,以模拟处理时间或减轻目标服务器的负担 time.sleep(2) + + # 发起HTTP请求并获取结果 result = self.__get_request_result__(url_ip) print("[+] Processing URL address:"+url_ip) + + # 如果结果不为错误,则尝试获取线程锁以更新Excel工作表 + # 检查结果是否为错误 if result != "error": + # 尝试获取线程锁,以便安全地写入Excel if self.lock.acquire(True): + # 更新Excel行号并写入域名、URL IP等信息 cores.excel_row = cores.excel_row + 1 - self.worksheet.cell(row=cores.excel_row, - column=1, value=cores.excel_row-1) - self.worksheet.cell(row=cores.excel_row, - column=2, value=url_ip) - self.worksheet.cell(row=cores.excel_row, - column=3, value=domain) + self.worksheet.cell(row=cores.excel_row, column=1, value=cores.excel_row-1) + self.worksheet.cell(row=cores.excel_row, column=2, value=url_ip) + self.worksheet.cell(row=cores.excel_row, column=3, value=domain) + # 如果结果不是超时,则写入更多详细信息 if result != "timeout": - self.worksheet.cell( - row=cores.excel_row, column=4, value=result["status"]) - self.worksheet.cell( - row=cores.excel_row, column=5, value=result["des_ip"]) - self.worksheet.cell( - row=cores.excel_row, column=6, value=result["server"]) - self.worksheet.cell( - row=cores.excel_row, column=7, value=result["title"]) - self.worksheet.cell( - row=cores.excel_row, column=8, value=result["cdn"]) + self.worksheet.cell(row=cores.excel_row, column=4, value=result["status"]) + self.worksheet.cell(row=cores.excel_row, column=5, value=result["des_ip"]) + self.worksheet.cell(row=cores.excel_row, column=6, value=result["server"]) + self.worksheet.cell(row=cores.excel_row, column=7, value=result["title"]) + self.worksheet.cell(row=cores.excel_row, column=8, value=result["cdn"]) + # 释放线程锁,允许其他线程进行写入操作 self.lock.release() - def __get_request_result__(self, url): - result = {"status": "", "server": "", "cookie": "", - "cdn": "", "des_ip": "", "sou_ip": "", "title": ""} - cdn = "" - try: - with requests.get(url, timeout=5, stream=True) as rsp: - status_code = rsp.status_code - result["status"] = status_code - headers = rsp.headers - if "Server" in headers: - result["server"] = headers['Server'] - if "Cookie" in headers: - result["cookie"] = headers['Cookie'] - if "X-Via" in headers: - cdn = cdn + headers['X-Via'] - if "Via" in headers: - cdn = cdn + headers['Via'] - result["cdn"] = cdn - sock = rsp.raw._connection.sock - - if sock: - des_ip = sock.getpeername()[0] - sou_ip = sock.getsockname()[0] - if des_ip: - result["des_ip"] = des_ip - if sou_ip: - result["sou_ip"] = sou_ip - sock.close() - html = rsp.text - title = re.findall('(.+)', html) - if title: - result["title"] = title[0] - rsp.close() - return result - except requests.exceptions.InvalidURL as e: - return "error" - except requests.exceptions.ConnectionError as e1: - return "timeout" - except requests.exceptions.ReadTimeout as e2: - return "timeout" + class SomeClass: + """ + 一个示例类,用于演示如何添加注释。 + """ + + def __get_request_result__(self, url): + """ + 发送GET请求并获取结果。 + + 此函数通过requests库发送GET请求,并解析响应头和内容,提取相关信息,如服务器类型、Cookie、CDN信息、目标IP和源IP以及页面标题。 + + 参数: + url (str): 目标URL地址。 + + 返回: + dict: 包含请求状态、服务器信息、CDN信息、目标IP、源IP和页面标题的字典。 + 如果URL无效或请求失败,则返回"error"或"timeout"。 + """ + # 初始化结果字典,用于存储请求结果和解析出的信息 + result = {"status": "", "server": "", "cookie": "", + "cdn": "", "des_ip": "", "sou_ip": "", "title": ""} + # 初始化CDN信息字符串 + cdn = "" + try: + # 发送GET请求,设置超时时间和流式响应 + with requests.get(url, timeout=5, stream=True) as rsp: + # 获取并记录HTTP状态码 + status_code = rsp.status_code + result["status"] = status_code + # 获取响应头 + headers = rsp.headers + # 检查并记录服务器类型 + if "Server" in headers: + result["server"] = headers['Server'] + # 检查并记录Cookie信息 + if "Cookie" in headers: + result["cookie"] = headers['Cookie'] + # 检查并累加CDN信息 + if "X-Via" in headers: + cdn = cdn + headers['X-Via'] + if "Via" in headers: + cdn = cdn + headers['Via'] + result["cdn"] = cdn + # 获取底层socket对象 + sock = rsp.raw._connection.sock + # 如果socket对象存在,获取目标IP和源IP信息 + if sock: + des_ip = sock.getpeername()[0] + sou_ip = sock.getsockname()[0] + if des_ip: + result["des_ip"] = des_ip + if sou_ip: + result["sou_ip"] = sou_ip + # 关闭socket连接 + sock.close() + # 获取页面内容 + html = rsp.text + # 使用正则表达式提取页面标题 + title = re.findall('(.+)', html) + if title: + result["title"] = title[0] + # 关闭响应 + rsp.close() + # 返回结果字典 + return result + # 异常处理:无效URL + except requests.exceptions.InvalidURL as e: + return "error" + # 异常处理:连接错误或读取超时 + except requests.exceptions.ConnectionError as e1: + return "timeout" + except requests.exceptions.ReadTimeout as e2: + return "timeout" def run(self): threadLock = threading.Lock() diff --git a/libs/task/ios_task.py b/libs/task/ios_task.py index c99e397..94e03d5 100644 --- a/libs/task/ios_task.py +++ b/libs/task/ios_task.py @@ -23,72 +23,180 @@ def __init__(self, path): self.permissions = [] def start(self): + """ + 启动文件处理流程。 + + 本函数根据文件类型(ipa或Mach-o)来执行相应的处理流程。 + 它首先检查文件扩展名,然后根据文件内容或类型进行解码或扫描。 + + Returns: + dict: 包含处理结果的字典,包括shell_flag、file_queue等信息。 + """ + # 获取文件路径 file_path = self.path + + # 判断文件是否为ipa文件 if file_path.split(".")[-1] == 'ipa': + # 对ipa文件进行解码 self.__decode_ipa__(cores.output_path) + # 扫描解码后的ipa文件 self.__scanner_file_by_ipa__(cores.output_path) - elif self.__get_file_header__(file_path): - self.file_queue.put(file_path) else: - raise Exception( - "Retrieval of this file type is not supported. Select IPA file or Mach-o file.") - return {"shell_flag": self.shell_flag, "file_queue": self.file_queue, "comp_list": [], "packagename": None, "file_identifier": self.file_identifier, "permissions": self.permissions} + # 判断文件是否为Mach-o文件 + if self.__get_file_header__(file_path): + # 将文件路径放入文件队列中 + self.file_queue.put(file_path) + else: + # 抛出异常,提示不支持的文件类型 + raise Exception( + "Retrieval of this file type is not supported. Select IPA file or Mach-o file.") + # 返回包含处理结果的字典 + return {"shell_flag": self.shell_flag, "file_queue": self.file_queue, "comp_list": [], "packagename": None, "file_identifier": self.file_identifier, "permissions": self.permissions} def __get_file_header__(self, file_path): + """ + 获取文件头信息并判断是否为Mach-O文件。 + + 参数: + file_path (str): 文件路径。 + + 返回: + bool: 如果文件是Mach-O格式,则返回True,否则返回False。 + """ + # 初始化文件头位置指针 hex_hand = 0x0 + + # 提取文件名作为文件标识符 macho_name = os.path.split(file_path)[-1] self.file_identifier.append(macho_name) + + # 打开二进制文件以读取文件头信息 with open(file_path, "rb") as macho_file: + # 移动文件读取指针到文件头 macho_file.seek(hex_hand, 0) + + # 读取并转换文件头4字节为十六进制表示 magic = binascii.hexlify(macho_file.read(4)).decode().upper() + + # 定义Mach-O文件的魔数列表 macho_magics = ["CFFAEDFE", "CEFAEDFE", "BEBAFECA", "CAFEBABE"] + + # 检查文件头魔数是否匹配Mach-O文件格式 if magic in macho_magics: + # 如果是Mach-O文件,调用私有方法进行进一步处理 self.__shell_test__(macho_file, hex_hand) + + # 关闭文件并返回True表示处理成功 macho_file.close() return True + + # 如果不是Mach-O文件,关闭文件并返回False macho_file.close() return False def __shell_test__(self, macho_file, hex_hand): + """ + 检测给定的macho文件是否包含特定的加密信息,以判断是否具有特定的shell功能。 + + 参数: + - macho_file: 文件对象,指向待检测的macho文件。 + - hex_hand: int,文件内部的初始读取位置指针。 + + 该方法通过读取和解析macho文件的特定部分,判断文件是否具有特定的加密标识, + 从而确定文件是否包含shell功能。这一过程涉及对文件的二进制内容进行解析,并根据 + 解析结果更新类实例的属性。 + """ while True: + # 读取文件的前4字节并将其转换为十六进制字符串,用于判断文件类型 magic = binascii.hexlify(macho_file.read(4)).decode().upper() if magic == "2C000000": + # 当文件类型匹配时,重置文件读取位置到指定的加密信息位置 macho_file.seek(hex_hand, 0) + # 读取并转换加密信息命令的24字节到十六进制字符串 encryption_info_command = binascii.hexlify( macho_file.read(24)).decode() + # 提取加密信息命令的最后8个字符,用于判断加密标识 cryptid = encryption_info_command[-8:len( encryption_info_command)] if cryptid == "01000000": + # 当加密标识匹配时,设置shell标志为True,表示文件包含shell功能 self.shell_flag = True break + # 如果当前文件类型不匹配,移动文件读取位置指针,继续搜索 hex_hand = hex_hand + 4 def __scanner_file_by_ipa__(self, output): + """ + 根据IPA文件扫描特定后缀的文件。 + + 本函数关注于从IPA文件解压后的输出目录中,扫描特定后缀的文件。 + 这些文件可能包含敏感信息或需要进一步处理。 + + 参数: + - output: 解压IPA文件后的输出目录路径。 + + 返回: + 无直接返回值。但通过调用`self.__get_scanner_file__`函数处理扫描结果。 + """ + # 定义需要扫描的文件后缀列表,这些文件类型可能包含需要分析的内容 scanner_file_suffix = ["plist", "js", "xml", "html"] + + # 构造扫描目录路径,"Payload"是IPA解压后包含应用数据的目录 scanner_dir = os.path.join(output, "Payload") + + # 调用内部函数处理具体的文件扫描逻辑 self.__get_scanner_file__(scanner_dir, scanner_file_suffix) def __get_scanner_file__(self, scanner_dir, file_suffix): + """ + 递归获取指定目录下的特定后缀文件。 + + :param scanner_dir: 需要扫描的目录路径 + :param file_suffix: 需要获取的文件后缀名列表 + """ + # 获取目录下的所有文件和子目录 dir_or_files = os.listdir(scanner_dir) for dir_file in dir_or_files: + # 构造完整的文件或目录路径 dir_file_path = os.path.join(scanner_dir, dir_file) + # 如果是目录,则递归调用自身 if os.path.isdir(dir_file_path): + # 如果目录名以.app结尾,提取 ELF 文件名 if dir_file.endswith(".app"): self.elf_file_name = dir_file.replace(".app", "") self.__get_scanner_file__(dir_file_path, file_suffix) else: + # 如果文件名与ELF文件名相同,获取文件头信息并加入处理队列 if self.elf_file_name == dir_file: self.__get_file_header__(dir_file_path) self.file_queue.put(dir_file_path) continue + # 如果资源标志为真,对文件后缀进行处理 if cores.resource_flag: dir_file_suffix = dir_file.split(".") + # 对有后缀的文件进行判断 if len(dir_file_suffix) > 1: + # 如果文件后缀在指定的后缀列表中,获取文件头信息并加入处理队列 if dir_file_suffix[-1] in file_suffix: self.__get_file_header__(dir_file_path) self.file_queue.put(dir_file_path) def __decode_ipa__(self, output_path): + """ + 解压IPA文件到指定的输出路径。 + + 该函数首先将IPA文件(实际上是一个zip文件)解压到输出路径中。 + 然后它会尝试根据文件名的编码问题进行处理,以确保文件名正确显示。 + 接着,函数会检查解压出来的文件,特别是位于"Payload"目录下的文件, + 并根据需要对它们进行重命名和移动,以确保文件结构与预期一致。 + 最后,函数会清理解压过程中产生的临时目录,以保持文件系统的整洁。 + + 参数: + output_path (str): IPA文件解压后的输出路径。 + + 返回: + 无 + """ with zipfile.ZipFile(self.path, "r") as zip_files: zip_file_names = zip_files.namelist() zip_files.extract(zip_file_names[0], output_path) @@ -135,10 +243,30 @@ def __decode_ipa__(self, output_path): shutil.rmtree(old_ext_path) def __get_parse_dir__(self, output_path, file_path): + """ + 根据给定的文件路径和输出路径,获取解析后的目录路径。 + + 该方法首先定位到文件路径中的 "Payload/" 目录和 ".app" 后缀的位置, + 然后根据操作系统对路径进行相应的处理,最后拼接输出路径和处理后的路径, + 返回解析后的目录路径。 + + 参数: + output_path (str): 解析结果的输出路径。 + file_path (str): 需要解析的文件的路径。 + + 返回: + str: 解析后的目录路径。 + """ + # 定位到 "Payload/" 目录的起始位置 start = file_path.index("Payload/") + # 定位到 ".app" 后缀的结束位置 end = file_path.index(".app") + # 提取根目录路径 root_dir = file_path[start:end] + # 如果是 Windows 系统,则将路径中的斜杠替换为反斜杠 if platform.system() == "Windows": root_dir = root_dir.replace("/", "\\") - old_root_dir = os.path.join(output_path, root_dir+".app") + # 拼接输出路径和处理后的根目录路径 + old_root_dir = os.path.join(output_path, root_dir + ".app") + # 返回解析后的目录路径 return old_root_dir diff --git a/libs/task/web_task.py b/libs/task/web_task.py index b47e882..bec0443 100644 --- a/libs/task/web_task.py +++ b/libs/task/web_task.py @@ -20,29 +20,61 @@ def __init__(self, path): self.permissions = [] def start(self): + """ + 根据配置和路径初始化扫描参数。 + + 此方法首先检查配置中的网页文件后缀列表是否为空。如果为空,则使用默认的后缀列表。 + 如果指定路径是目录,则递归扫描该目录下的文件。如果路径是文件,检查其后缀是否在后缀列表中。 + 如果文件后缀不在列表中,抛出异常;否则,将文件路径加入待处理队列。 + + Returns: + dict: 包含扫描参数的字典,如组件列表、是否包含shell脚本标志、文件队列等。 + """ + # 检查配置中的网页文件后缀列表是否为空 if len(config.web_file_suffix) <= 0: scanner_file_suffix = ["html", "js", "html", "xml"] + else: + scanner_file_suffix = config.web_file_suffix - scanner_file_suffix = config.web_file_suffix + # 判断路径是否为目录 if os.path.isdir(self.path): + # 如果是目录,则调用私有方法扫描目录下的文件 self.__get_scanner_file__(self.path, scanner_file_suffix) else: + # 如果不是目录,检查文件后缀是否在后缀列表中 if not (self.path.split(".")[-1] in scanner_file_suffix): + # 如果文件后缀不在列表中,构造错误信息并抛出异常 err_info = ("Retrieval of this file type is not supported. Select a file or directory with a suffix of %s" % ",".join(scanner_file_suffix)) raise Exception(err_info) + # 将符合条件的文件路径加入文件队列 self.file_queue.put(self.path) + + # 返回包含扫描参数的字典 return {"comp_list": [], "shell_flag": False, "file_queue": self.file_queue, "packagename": None, "file_identifier": self.file_identifier, "permissions": self.permissions} def __get_scanner_file__(self, scanner_dir, file_suffix): + """ + 递归扫描指定目录下的所有文件,特别是处理特定后缀的文件。 + + :param scanner_dir: 需要扫描的目录路径 + :param file_suffix: 关注的文件后缀名列表 + """ + # 获取目录下的所有文件和子目录 dir_or_files = os.listdir(scanner_dir) for dir_file in dir_or_files: + # 构造完整的文件或目录路径 dir_file_path = os.path.join(scanner_dir, dir_file) + # 如果是目录,则递归调用自身 if os.path.isdir(dir_file_path): self.__get_scanner_file__(dir_file_path, file_suffix) else: + # 如果文件有后缀名 if len(dir_file.split(".")) > 1: + # 如果文件后缀名在关注的后缀名列表中 if dir_file.split(".")[-1] in file_suffix: + # 打开文件,计算并获取MD5值 with open(dir_file_path,'rb') as f: dex_md5 = str(hashlib.md5().update(f.read()).hexdigest()).upper() self.file_identifier.append(dex_md5) - self.file_queue.put(dir_file_path) + # 将文件路径放入队列中 + self.file_queue.put(dir_file_path) \ No newline at end of file