Skip to content

Latest commit

 

History

History
386 lines (296 loc) · 26.8 KB

File metadata and controls

386 lines (296 loc) · 26.8 KB

十、为 Python 工具添加永久性

Python 具有巨大的功能,作为评估人员,我们只触及了可用工具和技术的表面。我们将介绍 Python 语言中一些对我们有帮助的更高级的特性。具体来说,我们将重点介绍如何将日志构建到脚本中,然后开发多线程和多处理工具。添加这些更高级的功能意味着您开发的工具将更能经受时间的考验,并且与其他解决方案不同。

理解 Python 中的日志记录

当您编写自己的模块时,比如第 9 章中突出强调的模块使用 Python自动化报告和任务时,您希望能够轻松跟踪错误、警告和调试消息。记录器库允许您跟踪事件并将其输出到标准错误标准错误)、文件和标准输出标准错误)。使用 logger 的好处是可以轻松定义格式,并使用特定的消息类型将其发送到相关输出。这些消息类似于 syslog 消息,它们模拟相同的日志记录级别。

有关记录器库的更多详细信息,请访问https://docs.python.org/2/library/logging.html

了解多线程和多处理的区别

中有两种不同的方式可以在 Python 中同时执行请求:多线程和多处理。通常,这两个项目相互混淆,当你读到它们时,你会在博客和新闻组上看到类似的反应。如果您谈论的是使用多处理器和处理内核,那么您就是在谈论多处理。如果您停留在同一个内存块中,但不使用多个内核或进程,那么您谈论的是多线程。多线程反过来运行并发代码,但由于 Python 解释器的设计,不能并行执行任务。

提示

如果您回顾第 8 章利用 Python、Metasploit 和Immunity进行开发,并查看 Windows 内存的定义区域,您将更好地了解线程和进程在 Windows 内存结构中的工作方式。请记住,其他操作系统****操作系统处理这些内存位置的方式是不同的。

用 Python 创建多线程脚本

要理解多线程的限制,您必须理解 Python 解释器。Python 解释器使用全局解释器锁GIL),这意味着当字节码由线程执行时,它一次由线程执行。

为了更好地理解 GIL,请查看中的文档 https://docs.python.org/2/glossary.html#term-全局解释器锁

这可以防止与一次由多个线程操作数据结构相关的问题。假设数据被写入字典,并且在并发线程中通过同一个键引用不同的数据段。你会把你打算写进字典的一些数据删掉。

对于多线程 Python 应用,您将听到一个称为线程安全的术语。这意味着,“某个线程是否可以修改某个内容而不影响数据的完整性或可用性?”即使某个内容被认为不是线程安全的,您也可以使用锁(稍后将介绍)根据需要控制数据输入。

我们将使用之前在第 6 章中创建的head_request.py脚本,使用 Python评估 Web 应用,并将其作为一个新脚本进行成熟。此脚本将使用队列来保存所有需要处理的任务,这些任务将在执行期间动态分配。该队列是通过从文件中读取值并存储这些值以供以后处理而构建的。我们将合并新的记录器库,以便在脚本执行时将详细信息输出到results.log 文件。以下屏幕截图显示了此新脚本执行后的结果:

Creating a multithreaded script in Python

此外,以下突出显示的日志文件包含脚本的详细执行和并发线程的输出:

Creating a multithreaded script in Python

此脚本可在找到 https://raw.githubusercontent.com/funkandwagnalls/pythonpentest/master/multi_threaded.py

现在,随着目标的实现,我们从库需要导入什么开始,并配置两个全局变量。第一个变量保存排队的工作负载,第二个变量用于锁定线程一段时间,以便在屏幕上打印数据:

请记住:并发处理意味着要处理项目。细节是按执行的方式提供的,在控制台上显示可能会出现混乱。为了解决这个问题,我们使用一个锁来充分暂停执行,以返回必要的细节。记录器是线程安全库,但打印不是,其他库也可能不是。因此,在适当的地方使用锁。

import urllib2, argparse, sys, threading, logging, Queue, time
queue = Queue.Queue()
lock = threading.Lock()

在此之后,我们需要创建将产生线程的类,唯一新的构造函数概念是threading.Thread.__init__(self)

class Agent(threading.Thread):
    def __init__(self, queue, logger, verbose):
        threading.Thread.__init__(self)
        self.queue = queue
        self.logger = logger
        self.verbose = verbose

然后,我们需要创建一个函数来处理每个线程中的实际数据。该函数首先定义初始值,如您所见,这些值是从队列中提取的。它们表示从文件加载到队列中的互联网协议IP)地址:

    def run(self):
        while True:
            host = self.queue.get()
            print("[*] Testing %s") % (str(host))
            target = "http://" + host
            target_secure = "https://" + host

从这里开始,我们将处理主机潜在网站的不安全和安全版本。以下代码用于网站的不安全部分,其工作类似于第 6 章使用 Python评估 Web 应用中突出显示的脚本。唯一的区别在于,我们添加了新的记录器功能,以将详细信息打印到结果日志文件中。正如您在下面的代码中所看到的,将详细信息写入记录器与写入 print 语句几乎相同。您还将注意到,我们使用了with语句来锁定线程进程,以便打印详细信息。这对于 I**/O**是不必要的,但如果不是这样,则很难阅读:

            try:
                request = urllib2.Request(target)
                request.get_method = lambda : 'HEAD'
                response = urllib2.urlopen(request)
            except:
                with lock:
                    self.logger.debug("[-] No web server at %s 
                        reported by thread %s" % (str(target), str
                            (threading.current_thread().name)))
                    print("[-] No web server at %s reported by thread %s") % 
                        (str(target), str(threading.current_thread().name))
                response = None
            if response != None:
                with lock:
                    self.logger.debug("[+] Response from %s reported by 
                        thread %s" % (str(target), str(threading.current_thread().
                          name)))
                    print("[*] Response from insecure service on %s reported by 
                        thread %s") % (str(target), str(threading.current_thread().name))
                self.logger.debug(response.info())

请求-响应指令的安全部分与代码的非安全部分几乎相同,如下所示:

            try:
                target_secure = urllib2.urlopen(target_secure)
                request_secure.get_method = lambda : 'HEAD'
                response_secure = urllib2.urlopen(request_secure)
            except:
                with lock:
                    self.logger.debug("[-] No secure web server at %s reported by 
                        thread %s" % (str(target_secure), str(threading.current_thread().name)))
                    print("[-] No secure web server at %s reported by 
                        thread %s") % (str(target_secure), str(threading.current_thread().name))
                response_secure = None
            if response_secure != None:
                with lock:
                    self.logger.debug("[+] Secure web server at %s reported by 
                        thread %s" % (str(target_secure), str(threading.current_thread().name)))
                    print("[*] Response from secure service on %s reported by thread %s") 
                        % (str(target_secure), str(threading.current_thread().name))
                self.logger.debug(response_secure.info())

最后,此函数列出了已完成的任务:

            self.queue.task_done()

正如前面突出显示的一样,参数和选项的配置与其他脚本非常相似。因此,为了简洁起见,省略了这些内容,但是可以在前面的链接中找到它们。但是,更改的是记录器的配置。我们设置了一个变量,该变量可以通过参数传递日志文件的名称。然后,我们配置记录器,使其处于输出到文件的适当级别,并且该格式标记线程的输出,以包括时间、线程名称、日志记录级别和实际消息。最后,我们配置将用作所有日志记录操作参考的对象:

    log = args.log                                                                                    # Configure the log output file
    if ".log" not in log:
        log = log + ".log"
    level = logging.DEBUG                                                                             # Logging level
    format = logging.Formatter("%(asctime)s [%(threadName)-12.12s] 
      [%(levelname)-5.5s]  %(message)s") 
    logger_obj = logging.getLogger()                                                                  # Getter for logging agent
    file_handler = logging.FileHandler(args.log)                                                                                                         
    targets_list = []
    # Configure logger formats for STDERR and output file
    file_handler.setFormatter(format)
    # Configure logger object
    logger_obj.addHandler(file_handler)
    logger_obj.setLevel(level)

设置好记录器后,我们可以实际设置使脚本多线程化所需的最后几行代码。我们将所有目标从文件加载到一个列表中,然后将该列表解析到队列中。我们可以做得更紧一些,但是下面的格式更容易阅读。然后我们生成 worker 并将setDaemon设置为True,以便脚本在主线程完成后终止,从而防止脚本挂起:

    # Load the targets into a list and remove trailing "\n"
    with open(targets) as f:
        targets_list = [line.rstrip() for line in f.readlines()]
    # Spawn workers to access site
    for thread in range(0, threads):
        worker = Agent(queue, logger_obj, verbose)
        worker.setDaemon(True)
        worker.start()
    # Build queue of work
    for target in targets_list:
        queue.put(target)
    # Wait for the queue to finish processing
    queue.join()
if __name__ == '__main__':
    main()

前面的详细信息创建了一个功能性多线程 Python 脚本,但存在一些问题。Python 多线程非常容易出错。即使使用编写良好的脚本,您也可以在每次迭代中返回不同的错误。此外,如前面的代码所示,完成相对较短的任务需要大量的代码。最后,根据执行脚本的情况和操作系统,线程可能不会提高处理性能。另一种解决方案是使用多处理而不是多线程,多线程更容易编码,更不容易出错,并且(再次)可以使用多个内核或处理器。

Python 有许多库可以支持并发性,从而简化编码。例如,使用货币处理 URL 可以通过简单请求(完成 http://pythonhosted.org/simple-requests/ ),已于建成 http://www.gevent.org/ 。前面的代码示例用于显示如何修改并发脚本以包含多线程支持。在使脚本成熟时,您应该看看其他库是否可以直接启用更好的功能,以便提高您的个人知识并创建仍然相关的脚本。

用 Python 创建多处理脚本

在进入使用 Python 创建多处理脚本之前,您应该了解大多数人遇到的陷阱。这将帮助您在将来尝试成熟您的工具集。Python 中的多处理脚本将遇到四个主要问题:

  • 对象序列化
  • 并行写入或读取数据并处理锁
  • 具有相关并行性的操作系统细微差别应用接口API
  • 将当前脚本(线程或非线程脚本)转换为利用并行性的脚本

在用 Python 编写多处理脚本时,最大的障碍是处理对象的序列化(称为 pickle)和反序列化(称为 unpickle)。当您编写自己的与多处理相关的代码时,可能会看到对 pickle 库的引用错误。这意味着您遇到了与数据序列化方式相关的问题。

Python 中的某些对象无法序列化,因此必须找到解决方法。最常见的引用方式是使用copy_reg库。此库提供了一种定义函数的方法,以便可以序列化这些函数。

可以想象,就像并发代码一样,将数据写入和读取到单个文件或其他输入/输出I/O资源将导致问题。这是因为每个核心或处理器都是同时处理数据的,而且在大多数情况下,这是在其他进程不知道的情况下处理的。因此,如果您正在编写需要输出详细信息的代码,则可以锁定进程,以便适当地处理详细信息。此功能通过使用multiprocessing.Lock()功能来处理。

除了 I/O 之外,还有一个额外的问题,即进程之间使用共享内存。由于这些过程相对独立地运行(取决于实现),因此在内存中引用的可扩展数据可能会有问题。谢天谢地,multiprocessing图书馆提供了许多工具来帮助我们。基本解决方案是使用multiprocessing.Values()multiprocessing.Arrays(),可以跨流程共享。

有关共享内存和多处理的更多详细信息,请参见https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.SharedTypes

在处理和内存管理方面,并非所有的操作系统都是平等创建的。对于系统工程师和开发人员来说,了解这些不同的操作系统在这些级别上是如何工作的是非常必要的。作为评估人员,我们在开发更高级的工具和创建漏洞利用时也有同样的需求,正如前面所强调的。

想想有多少次你看到一个新的工具或脚本出现,而它只在一个操作系统或发行版上测试过;当您使用它时,该产品在其他地方不起作用。多处理脚本也不例外,在编写这些脚本时,请记住最终目标。如果您不打算让脚本在 Kali 之外的任何地方运行,那么请确保在那里进行测试。如果要在 Windows 上运行它,则需要验证相同的脚本设计方法是否也适用于 Windows。具体来说,多处理代码的入口点需要在main()函数内,或者本质上在检查__name__是否等于'__main__':的下方。如果不是这样,您可能正在创建一个分叉炸弹,或者一个无限循环的生成过程,最终导致系统崩溃。

为了更好地理解 Windows 对进程分叉和 Python 多处理的限制,您可以参考https://docs.python.org/2/library/multiprocessing.html#windows

最后要考虑的是将已建立的脚本转换为多处理脚本。尽管互联网上有大量演示,显示用户使用线程或非线程脚本并将其转换为多处理脚本,但它们通常只适用于演示。将功能代码转换为稳定且有用的多处理脚本通常需要重写。这是因为前面提到的要点突出了您必须克服的挑战。

那么你从这一切中学到了什么?

  • 将并行执行的函数必须是可拾取的
  • 在处理 I/O 时,可能需要合并锁,而共享内存需要来自多处理库的特定函数
  • 需要保护并行进程的主要入口点
  • 脚本不容易从线程或非线程格式转换为多处理格式,因此,应该考虑重新设计脚本

为简洁起见,参数和选项的详细信息已被删除,但完整的详细信息可在中找到 https://raw.githubusercontent.com/funkandwagnalls/pythonpentest/master/multi_process.py

考虑到中的所有这些,我们现在可以重写head_request.py脚本,以适应多个多处理。run()函数的代码在很大程度上被重写,以便容纳对象,以便对其进行酸洗。这是因为host_request函数是由每个子流程运行的。urllib2请求和响应是不可拾取的对象,因此,在传递之前需要将数据转换为字符串。此外,对于多处理脚本,必须处理记录器,而不是直接调用记录器。通过这种方式,子流程知道使用通用文件名引用写入什么。

此格式防止多个进程同时写入文件。首先,我们创建一个时间戳,它将在抓取日志处理程序时用作参考。以下代码突出显示了初始值的配置以及不安全的服务请求和响应说明:

import multiprocessing, urllib2, argparse, sys, logging, datetime, time
def host_request(host):
    print("[*] Testing %s") % (str(host))
    target = "http://" + host
    target_secure = "https://" + host
    timenow = time.time()
    record = datetime.datetime.fromtimestamp(timenow).strftime
      ('%Y-%m-%d %H:%M:%S')
    logger = logging.getLogger(record)
    try:
        request = urllib2.Request(target)
        request.get_method = lambda : 'HEAD'
        response = urllib2.urlopen(request)
        response_data = str(response.info())
        logger.debug("[*] %s" % response_data)
        response.close()
    except:
        response = None
        response_data = None

在不安全请求和响应指令之后是安全服务请求和响应指令,如下所示:

    try:
        request_secure = urllib2.urlopen(target_secure)
        request_secure.get_method = lambda : 'HEAD'
        response_secure = str(urllib2.urlopen(request_secure).read())
        response_secure_data = str(response.info())
        logger.debug("[*] %s" % response_secure_data)
        response_secure.close()
    except:
        response_secure = None
        response_secure_data = None

捕获请求和响应详细信息后,返回详细信息并适当记录:

    if response_data != None and response_secure_data != None:
        r = "[+] Insecure webserver detected at %s reported by %s" % 
          (target, str(multiprocessing.Process().name))
        rs = "[+] Secure webserver detected at %s reported by %s" % 
          (target_secure, str(multiprocessing.Process().name))
        logger.debug("[+] Insecure web server detected at %s and reported 
          by process %s" % (str(target), str(multiprocessing.Process().name)))
        logger.debug("[+] Secure web server detected at %s and reported by process 
          %s" % (str(target_secure), str(multiprocessing.Process().name)))
        return(r, rs)
    elif response_data == None and response_secure_data == None:
        r = "[-] No insecure webserver at %s reported by %s" % (target, 
          str(multiprocessing.Process().name))
        rs = "[-] No secure webserver at %s reported by %s" % (target_secure, 
          str(multiprocessing.Process().name))
        logger.debug("[-] Insecure web server was not detected at %s and reported 
          by process %s" % (str(target), str(multiprocessing.Process().name)))
        logger.debug("[-] Secure web server was not detected at %s and reported 
          by process %s" % (str(target_secure), str(multiprocessing.Process().name)))
        return(r, rs)
    elif response_data != None and response_secure_data == None:
        r = "[+] Insecure webserver detected at %s reported by %s" % 
          (target, str(multiprocessing.Process().name))
        rs = "[-] No secure webserver at %s reported by %s" % (target_secure, 
          str(multiprocessing.Process().name))
        logger.debug("[+] Insecure web server detected at %s and reported by 
          process %s" % (str(target), str(multiprocessing.Process().name)))
        logger.debug("[-] Secure web server was not detected at %s and reported 
          by process %s" % (str(target_secure), str(multiprocessing.Process().name)))
        return(r, rs)
    elif response_secure_data != None and response_data == None:
        response = "[-] No insecure webserver at %s reported by %s" % 
          (target, str(multiprocessing.Process().name))
        rs = "[+] Secure webserver detected at %s reported by %s" % (target_secure, 
          str(multiprocessing.Process().name))
        logger.debug("[-] Insecure web server was not detected at %s and reported by 
          process %s" % (str(target), str(multiprocessing.Process().name)))
        logger.debug("[+] Secure web server detected at %s and reported by process %s" 
          % (str(target_secure), str(multiprocessing.Process().name)))
        return(r, rs)
    else:
        logger.debug("[-] No results were recorded for %s or %s" % (str(target), str(target_secure)))

如前所述,记录器使用处理程序,我们通过创建定义记录器设计的函数来完成。然后,每个子流程将使用multiprocessing.map中的initializer参数调用此函数。这意味着我们可以跨进程完全控制记录器,这可以防止需要传递的不可拾取对象出现问题:

def log_init(log):
    level = logging.DEBUG                                                                            
    format = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]  %(message)s") # Log format
    logger_obj = logging.getLogger()                                                                 
    file_handler = logging.FileHandler(log)                                                                                                           
    targets_list = []
    # Configure logger formats for STDERR and output file
    file_handler.setFormatter(format)
    # Configure logger object
    logger_obj.addHandler(file_handler)
    logger_obj.setLevel(level)

现在,通过main()函数中的所有这些细节,我们为参数和选项定义了命令行界面CLI。然后,我们从目标文件和参数变量生成将要测试的数据:

    # Set Constructors
    targets = args.targets                                                                            
    verbose = args.verbose                                                                            
    processes = args.multiprocess                                                                            
    log = args.log                                                                                    
    if ".log" not in log:
        log = log + ".log"
    # Load the targets into a list and remove trailing "\n"
    with open(targets) as f:
        targets_list = [line.rstrip() for line in f.readlines()]

最后,下面的代码使用map函数,在迭代目标列表时调用函数host_requestmap函数允许多处理脚本以类似于先前多线程脚本的方式对工作进行排队。然后,我们可以使用 CLI 参数加载的 processs 变量来定义要生成的子进程的数量,这允许我们动态控制分叉的进程的数量。这是一个非常猜测和检查的过程控制方法。

提示

如果您想更具体一些,另一种方法是确定 CPU 的数量,并将其加倍以确定进程的数量。这可以通过以下方式实现:processes = multiprocessing.cpu_count() *2

    # Establish pool list
    pool = multiprocessing.Pool(processes=threads, 
      initializer=log_init(log))
    # Queue up the targets to assess
    results = pool.map(host_request, targets_list)
    for result in results:
        for value in result:
            print(value)
if __name__ == '__main__':
    main()

生成代码后,我们可以输出帮助文件以决定脚本需要如何运行,如以下屏幕截图所示:

Creating a multiprocessing script in Python

当脚本运行时,输出逐条列出请求成功、失败和相关过程,如以下屏幕截图所示:

Creating a multiprocessing script in Python

最后,results.log文件包含与脚本生成的活动相关的详细信息,如以下屏幕截图所示:

Creating a multiprocessing script in Python

我们现在已经完成了我们的多处理脚本,它可以以受控的方式处理日志记录。这是创建行业标准工具的正确方向。通过额外的时间,我们可以将这个脚本附加到我们在上一章中创建的nmap_parser.py脚本,甚至以nmap_doc_generator.py脚本为例生成详细的报告。这些功能的结合将使该工具更加有用。

建筑行业标准工具

Python 是一种奇妙的语言,这些高级技术强调控制线程、进程、I/O 和日志记录,对于为脚本添加永久性至关重要。业内有很多例子可以帮助评估安全性,比如 Sulley。这是一个自动化应用模糊化的工具,旨在帮助识别安全弱点,其结果可用于编写 Metasploit 等框架。其他工具通过改进代码库帮助强化安全性,例如开放式 Web 应用安全项目的OWASPPython 安全项目。这些都是一些工具的例子,这些工具一开始是为了满足缺失的需求,后来获得了强大的追随者。这里提到这些工具是为了突出您的工具在正确的重点下可以成为什么。

提示

当你开发你自己的工具时,记住你的目标是什么,从小处着手,增加能力。这将帮助你使项目易于管理并取得成功,与小成功相关的小奖励将促使你参与更大的创新。最后,不要害怕重新开始。很多时候,一旦你意识到你做某事的方式可能不合适,代码就会引导你走向正确的方向。

总结

第 2 章Python 脚本基础第 10 章为 Python 工具添加永久性,我们重点介绍了改进渗透测试脚本的增量方法。知识的有机增长展示了如何改进代码以满足当今环境的评估需求。它还强调了这样一个事实,即脚本在某些特定的地方适合评估员的需要,并且目前有现成的工具或项目可以完成预期的任务。在本章中,我们见证了前面示例的高潮,这些示例开发了能够运行并发代码和并行进程的工具,可以一直有效地记录数据。我希望你喜欢这本书,就像我喜欢写它一样。