Skip to content

Commit b6cb609

Browse files
committed
More tests
1 parent 69e9221 commit b6cb609

File tree

1 file changed

+224
-0
lines changed

1 file changed

+224
-0
lines changed

Lib/test/test_asyncio/test_tools.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,3 +547,227 @@ def test_asyncio_utils(self):
547547
for input_, table in TEST_INPUTS_TABLE:
548548
with self.subTest(input_):
549549
self.assertEqual(tools.build_task_table(input_), table)
550+
551+
552+
class TestAsyncioToolsBasic(unittest.TestCase):
553+
def test_empty_input_tree(self):
554+
"""Test print_async_tree with empty input."""
555+
result = []
556+
expected_output = []
557+
self.assertEqual(tools.print_async_tree(result), expected_output)
558+
559+
def test_empty_input_table(self):
560+
"""Test build_task_table with empty input."""
561+
result = []
562+
expected_output = []
563+
self.assertEqual(tools.build_task_table(result), expected_output)
564+
565+
def test_only_independent_tasks_tree(self):
566+
input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])]
567+
expected = [["└── (T) taskA"], ["└── (T) taskB"]]
568+
result = tools.print_async_tree(input_)
569+
self.assertEqual(sorted(result), sorted(expected))
570+
571+
def test_only_independent_tasks_table(self):
572+
input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])]
573+
self.assertEqual(tools.build_task_table(input_), [])
574+
575+
def test_single_task_tree(self):
576+
"""Test print_async_tree with a single task and no awaits."""
577+
result = [
578+
(
579+
1,
580+
[
581+
(2, "Task-1", []),
582+
],
583+
)
584+
]
585+
expected_output = [
586+
[
587+
"└── (T) Task-1",
588+
]
589+
]
590+
self.assertEqual(tools.print_async_tree(result), expected_output)
591+
592+
def test_single_task_table(self):
593+
"""Test build_task_table with a single task and no awaits."""
594+
result = [
595+
(
596+
1,
597+
[
598+
(2, "Task-1", []),
599+
],
600+
)
601+
]
602+
expected_output = []
603+
self.assertEqual(tools.build_task_table(result), expected_output)
604+
605+
def test_cycle_detection(self):
606+
"""Test print_async_tree raises CycleFoundException for cyclic input."""
607+
result = [
608+
(
609+
1,
610+
[
611+
(2, "Task-1", [[["main"], 3]]),
612+
(3, "Task-2", [[["main"], 2]]),
613+
],
614+
)
615+
]
616+
with self.assertRaises(tools.CycleFoundException) as context:
617+
tools.print_async_tree(result)
618+
self.assertEqual(context.exception.cycles, [[3, 2, 3]])
619+
620+
def test_complex_tree(self):
621+
"""Test print_async_tree with a more complex tree structure."""
622+
result = [
623+
(
624+
1,
625+
[
626+
(2, "Task-1", []),
627+
(3, "Task-2", [[["main"], 2]]),
628+
(4, "Task-3", [[["main"], 3]]),
629+
],
630+
)
631+
]
632+
expected_output = [
633+
[
634+
"└── (T) Task-1",
635+
" └── main",
636+
" └── (T) Task-2",
637+
" └── main",
638+
" └── (T) Task-3",
639+
]
640+
]
641+
self.assertEqual(tools.print_async_tree(result), expected_output)
642+
643+
def test_complex_table(self):
644+
"""Test build_task_table with a more complex tree structure."""
645+
result = [
646+
(
647+
1,
648+
[
649+
(2, "Task-1", []),
650+
(3, "Task-2", [[["main"], 2]]),
651+
(4, "Task-3", [[["main"], 3]]),
652+
],
653+
)
654+
]
655+
expected_output = [
656+
[1, "0x3", "Task-2", "main", "Task-1", "0x2"],
657+
[1, "0x4", "Task-3", "main", "Task-2", "0x3"],
658+
]
659+
self.assertEqual(tools.build_task_table(result), expected_output)
660+
661+
def test_deep_coroutine_chain(self):
662+
input_ = [
663+
(
664+
1,
665+
[
666+
(10, "leaf", [[["c1", "c2", "c3", "c4", "c5"], 11]]),
667+
(11, "root", []),
668+
],
669+
)
670+
]
671+
expected = [
672+
[
673+
"└── (T) root",
674+
" └── c5",
675+
" └── c4",
676+
" └── c3",
677+
" └── c2",
678+
" └── c1",
679+
" └── (T) leaf",
680+
]
681+
]
682+
result = tools.print_async_tree(input_)
683+
self.assertEqual(result, expected)
684+
685+
def test_multiple_cycles_same_node(self):
686+
input_ = [
687+
(
688+
1,
689+
[
690+
(1, "Task-A", [[["call1"], 2]]),
691+
(2, "Task-B", [[["call2"], 3]]),
692+
(3, "Task-C", [[["call3"], 1], [["call4"], 2]]),
693+
],
694+
)
695+
]
696+
with self.assertRaises(tools.CycleFoundException) as ctx:
697+
tools.print_async_tree(input_)
698+
cycles = ctx.exception.cycles
699+
self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles))
700+
701+
def test_table_output_format(self):
702+
input_ = [(1, [(1, "Task-A", [[["foo"], 2]]), (2, "Task-B", [])])]
703+
table = tools.build_task_table(input_)
704+
for row in table:
705+
self.assertEqual(len(row), 6)
706+
self.assertIsInstance(row[0], int) # thread ID
707+
self.assertTrue(
708+
isinstance(row[1], str) and row[1].startswith("0x")
709+
) # hex task ID
710+
self.assertIsInstance(row[2], str) # task name
711+
self.assertIsInstance(row[3], str) # coroutine chain
712+
self.assertIsInstance(row[4], str) # awaiter name
713+
self.assertTrue(
714+
isinstance(row[5], str) and row[5].startswith("0x")
715+
) # hex awaiter ID
716+
717+
718+
class TestAsyncioToolsEdgeCases(unittest.TestCase):
719+
720+
def test_task_awaits_self(self):
721+
"""A task directly awaits itself – should raise a cycle."""
722+
input_ = [(1, [(1, "Self-Awaiter", [[["loopback"], 1]])])]
723+
with self.assertRaises(tools.CycleFoundException) as ctx:
724+
tools.print_async_tree(input_)
725+
self.assertIn([1, 1], ctx.exception.cycles)
726+
727+
def test_task_with_missing_awaiter_id(self):
728+
"""Awaiter ID not in task list – should not crash, just show 'Unknown'."""
729+
input_ = [(1, [(1, "Task-A", [[["coro"], 999]])])] # 999 not defined
730+
table = tools.build_task_table(input_)
731+
self.assertEqual(len(table), 1)
732+
self.assertEqual(table[0][4], "Unknown")
733+
734+
def test_duplicate_coroutine_frames(self):
735+
"""Same coroutine frame repeated under a parent – should deduplicate."""
736+
input_ = [
737+
(
738+
1,
739+
[
740+
(1, "Task-1", [[["frameA"], 2], [["frameA"], 3]]),
741+
(2, "Task-2", []),
742+
(3, "Task-3", []),
743+
],
744+
)
745+
]
746+
tree = tools.print_async_tree(input_)
747+
# Both children should be under the same coroutine node
748+
flat = "\n".join(tree[0])
749+
self.assertIn("frameA", flat)
750+
self.assertIn("Task-2", flat)
751+
self.assertIn("Task-1", flat)
752+
753+
flat = "\n".join(tree[1])
754+
self.assertIn("frameA", flat)
755+
self.assertIn("Task-3", flat)
756+
self.assertIn("Task-1", flat)
757+
758+
def test_task_with_no_name(self):
759+
"""Task with no name in id2name – should still render with fallback."""
760+
input_ = [(1, [(1, "root", [[["f1"], 2]]), (2, None, [])])]
761+
# If name is None, fallback to string should not crash
762+
tree = tools.print_async_tree(input_)
763+
self.assertIn("(T) None", "\n".join(tree[0]))
764+
765+
def test_tree_rendering_with_custom_emojis(self):
766+
"""Pass custom emojis to the tree renderer."""
767+
input_ = [(1, [(1, "MainTask", [[["f1", "f2"], 2]]), (2, "SubTask", [])])]
768+
tree = tools.print_async_tree(input_, task_emoji="🧵", cor_emoji="🔁")
769+
flat = "\n".join(tree[0])
770+
self.assertIn("🧵 MainTask", flat)
771+
self.assertIn("🔁 f1", flat)
772+
self.assertIn("🔁 f2", flat)
773+
self.assertIn("🧵 SubTask", flat)

0 commit comments

Comments
 (0)