|  | 
|  | 1 | +import os | 
|  | 2 | +import unittest | 
|  | 3 | + | 
|  | 4 | +from test.support import import_helper, threading_helper | 
|  | 5 | +from test.support.threading_helper import run_concurrently | 
|  | 6 | +from uuid import SafeUUID | 
|  | 7 | + | 
|  | 8 | +c_uuid = import_helper.import_module("_uuid") | 
|  | 9 | + | 
|  | 10 | +NTHREADS = 10 | 
|  | 11 | +UUID_PER_THREAD = 1000 | 
|  | 12 | + | 
|  | 13 | + | 
|  | 14 | +@threading_helper.requires_working_threading() | 
|  | 15 | +class UUIDTests(unittest.TestCase): | 
|  | 16 | +    @unittest.skipUnless(os.name == "posix", "POSIX only") | 
|  | 17 | +    def test_generate_time_safe(self): | 
|  | 18 | +        uuids = [] | 
|  | 19 | + | 
|  | 20 | +        def worker(): | 
|  | 21 | +            local_uuids = [] | 
|  | 22 | +            for _ in range(UUID_PER_THREAD): | 
|  | 23 | +                uuid, is_safe = c_uuid.generate_time_safe() | 
|  | 24 | +                self.assertIs(type(uuid), bytes) | 
|  | 25 | +                self.assertEqual(len(uuid), 16) | 
|  | 26 | +                # Collect the UUID only if it is safe. If not, we cannot ensure | 
|  | 27 | +                # UUID uniqueness. According to uuid_generate_time_safe() man | 
|  | 28 | +                # page, it is theoretically possible for two concurrently | 
|  | 29 | +                # running processes to generate the same UUID(s) if the return | 
|  | 30 | +                # value is not 0. | 
|  | 31 | +                if is_safe == SafeUUID.safe: | 
|  | 32 | +                    local_uuids.append(uuid) | 
|  | 33 | + | 
|  | 34 | +            # Merge all safe uuids | 
|  | 35 | +            uuids.extend(local_uuids) | 
|  | 36 | + | 
|  | 37 | +        run_concurrently(worker_func=worker, nthreads=NTHREADS) | 
|  | 38 | +        self.assertEqual(len(uuids), len(set(uuids))) | 
|  | 39 | + | 
|  | 40 | +    @unittest.skipUnless(os.name == "nt", "Windows only") | 
|  | 41 | +    def test_UuidCreate(self): | 
|  | 42 | +        uuids = [] | 
|  | 43 | + | 
|  | 44 | +        def worker(): | 
|  | 45 | +            local_uuids = [] | 
|  | 46 | +            for _ in range(UUID_PER_THREAD): | 
|  | 47 | +                uuid = c_uuid.UuidCreate() | 
|  | 48 | +                self.assertIs(type(uuid), bytes) | 
|  | 49 | +                self.assertEqual(len(uuid), 16) | 
|  | 50 | +                local_uuids.append(uuid) | 
|  | 51 | + | 
|  | 52 | +            # Merge all uuids | 
|  | 53 | +            uuids.extend(local_uuids) | 
|  | 54 | + | 
|  | 55 | +        run_concurrently(worker_func=worker, nthreads=NTHREADS) | 
|  | 56 | +        self.assertEqual(len(uuids), len(set(uuids))) | 
|  | 57 | + | 
|  | 58 | + | 
|  | 59 | +if __name__ == "__main__": | 
|  | 60 | +    unittest.main() | 
0 commit comments