|  | 
|  | 1 | +import unittest | 
|  | 2 | +from test.support import import_helper, threading_helper | 
|  | 3 | + | 
|  | 4 | +resource = import_helper.import_module("resource") | 
|  | 5 | + | 
|  | 6 | + | 
|  | 7 | +NTHREADS = 10 | 
|  | 8 | +LOOP_PER_THREAD = 1000 | 
|  | 9 | + | 
|  | 10 | + | 
|  | 11 | +@threading_helper.requires_working_threading() | 
|  | 12 | +class ResourceTest(unittest.TestCase): | 
|  | 13 | +    @unittest.skipUnless(hasattr(resource, "getrusage"), "needs getrusage") | 
|  | 14 | +    @unittest.skipUnless( | 
|  | 15 | +        hasattr(resource, "RUSAGE_THREAD"), "needs RUSAGE_THREAD" | 
|  | 16 | +    ) | 
|  | 17 | +    def test_getrusage(self): | 
|  | 18 | +        ru_utime_lst = [] | 
|  | 19 | + | 
|  | 20 | +        def dummy_work(ru_utime_lst): | 
|  | 21 | +            for _ in range(LOOP_PER_THREAD): | 
|  | 22 | +                pass | 
|  | 23 | + | 
|  | 24 | +            usage_process = resource.getrusage(resource.RUSAGE_SELF) | 
|  | 25 | +            usage_thread = resource.getrusage(resource.RUSAGE_THREAD) | 
|  | 26 | +            # Process user time should be greater than thread user time | 
|  | 27 | +            self.assertGreater(usage_process.ru_utime, usage_thread.ru_utime) | 
|  | 28 | +            ru_utime_lst.append(usage_thread.ru_utime) | 
|  | 29 | + | 
|  | 30 | +        threading_helper.run_concurrently( | 
|  | 31 | +            worker_func=dummy_work, args=(ru_utime_lst,), nthreads=NTHREADS | 
|  | 32 | +        ) | 
|  | 33 | + | 
|  | 34 | +        usage_process = resource.getrusage(resource.RUSAGE_SELF) | 
|  | 35 | +        self.assertEqual(len(ru_utime_lst), NTHREADS) | 
|  | 36 | +        # Process user time should be greater than sum of all thread user times | 
|  | 37 | +        self.assertGreater(usage_process.ru_utime, sum(ru_utime_lst)) | 
|  | 38 | + | 
|  | 39 | + | 
|  | 40 | +if __name__ == "__main__": | 
|  | 41 | +    unittest.main() | 
0 commit comments