|
1 | 1 | # XXX TypeErrors on calling handlers, or on bad return values from a |
2 | 2 | # handler, are obscure and unhelpful. |
3 | 3 |
|
| 4 | +import functools |
4 | 5 | import os |
5 | 6 | import re |
6 | 7 | import sys |
7 | 8 | import sysconfig |
8 | 9 | import unittest |
9 | | -import textwrap |
10 | 10 | import traceback |
11 | 11 | from io import BytesIO |
12 | 12 | from test import support |
@@ -823,128 +823,156 @@ def start_element(name, _): |
823 | 823 | self.assertEqual(started, ['doc']) |
824 | 824 |
|
825 | 825 |
|
826 | | -class AttackProtectionTest(unittest.TestCase): |
| 826 | +class AttackProtectionTestCases: |
| 827 | + """Generic interface for testing XML Expat protections. |
827 | 828 |
|
828 | | - def exponential_expansion_payload(self, ncols, nrows, text='.'): |
| 829 | + The protections being tested should mitigate attacks based |
| 830 | + on Billion Laughs payloads. |
| 831 | + """ |
| 832 | + |
| 833 | + @staticmethod |
| 834 | + def exponential_expansion_payload(ncols, nrows, text='.'): |
829 | 835 | """Create a billion laughs attack payload. |
830 | 836 |
|
831 | 837 | Be careful: the number of total items is pow(n, k), thereby |
832 | 838 | requiring at least pow(ncols, nrows) * sizeof(text) memory! |
833 | 839 | """ |
834 | | - # 'indent' affects the peak amplification factor and allocation |
835 | | - indent = ' ' * 2 |
836 | | - body = textwrap.indent('\n'.join( |
| 840 | + body = '\n'.join( |
837 | 841 | f'<!ENTITY row{i + 1} "{f"&row{i};" * ncols}">' |
838 | 842 | for i in range(nrows) |
839 | | - ), indent) |
| 843 | + ) |
840 | 844 | return f"""\ |
841 | 845 | <?xml version="1.0"?> |
842 | 846 | <!DOCTYPE doc [ |
843 | | -{indent}<!ENTITY row0 "{text}"> |
844 | | -{indent}<!ELEMENT doc (#PCDATA)> |
| 847 | +<!ENTITY row0 "{text}"> |
| 848 | +<!ELEMENT doc (#PCDATA)> |
845 | 849 | {body} |
846 | 850 | ]> |
847 | | -<doc>&row{nrows};</doc> |
848 | | -""" |
849 | | - |
850 | | - # With the default Expat configuration, the billion laughs protection may |
851 | | - # hit before the allocation limiter if exponential_expansion_payload() is |
852 | | - # not carefully parametrized. In particular, use the following assert_*() |
853 | | - # methods to check the error message of the active protection. |
| 851 | +<doc>&row{nrows};</doc>""" |
854 | 852 |
|
855 | 853 | def assert_root_parser_failure(self, func, /, *args, **kwargs): |
856 | 854 | """Check that func(*args, **kwargs) is invalid for a sub-parser.""" |
857 | 855 | msg = "parser must be a root parser" |
858 | 856 | self.assertRaisesRegex(expat.ExpatError, msg, func, *args, **kwargs) |
859 | 857 |
|
860 | | - def assert_alloc_limit_reached(self, func, /, *args, **kwargs): |
861 | | - """Check that fnuc(*args, **kwargs) hits the allocation limit.""" |
862 | | - msg = r"out of memory: line \d+, column \d+" |
863 | | - self.assertRaisesRegex(expat.ExpatError, msg, func, *args, **kwargs) |
| 858 | + def assert_active_protection(self, func, /, *args, **kwargs): |
| 859 | + """Assert that func(*args, **kwargs) triggers the attack protection.""" |
| 860 | + raise NotImplementedError |
864 | 861 |
|
865 | | - def test_set_alloc_tracker_maximum_amplification(self): |
866 | | - # On WASI, the maximum amplification factor of the payload may differ, |
867 | | - # so we craft a payload that is likely to yield an amplification factor |
868 | | - # way larger than 1.0 and way smaller than 10^4. |
869 | | - payload = self.exponential_expansion_payload(1, 2) |
| 862 | + def set_activation_threshold(self, parser, threshold): |
| 863 | + """Set the activation threshold for the tested protection.""" |
| 864 | + raise NotImplementedError |
870 | 865 |
|
871 | | - p = expat.ParserCreate() |
872 | | - # Unconditionally enable maximum amplification factor. |
873 | | - p.SetAllocTrackerActivationThreshold(0) |
874 | | - # Use a max amplification factor likely to be below the real one. |
875 | | - self.assertIsNone(p.SetAllocTrackerMaximumAmplification(1.0)) |
876 | | - self.assert_alloc_limit_reached(p.Parse, payload, True) |
| 866 | + def set_maximum_amplification(self, parser, max_factor): |
| 867 | + """Set the maximum amplification factor for the tested protection.""" |
| 868 | + raise NotImplementedError |
877 | 869 |
|
878 | | - # Re-create a parser as the current parser is now in an error state. |
879 | | - p = expat.ParserCreate() |
880 | | - # Unconditionally enable maximum amplification factor. |
881 | | - p.SetAllocTrackerActivationThreshold(0) |
882 | | - # Use a max amplification factor likely to be above the real one. |
883 | | - self.assertIsNone(p.SetAllocTrackerMaximumAmplification(10_000)) |
884 | | - self.assertIsNotNone(p.Parse(payload, True)) |
| 870 | + def test_set_maximum_amplification_reached(self): |
| 871 | + parser = expat.ParserCreate() |
| 872 | + # Unconditionally enable maximum activation factor. |
| 873 | + self.set_activation_threshold(parser, 0) |
| 874 | + # Choose a max amplification factor expected to always be exceeded. |
| 875 | + self.assertIsNone(self.set_maximum_amplification(parser, 1.0)) |
| 876 | + # Craft a payload for which the peak amplification factor is > 1.0. |
| 877 | + payload = self.exponential_expansion_payload(1, 2) |
| 878 | + self.assert_active_protection(parser.Parse, payload, True) |
885 | 879 |
|
886 | | - def test_set_alloc_tracker_maximum_amplification_infinity(self): |
| 880 | + def test_set_maximum_amplification_ignored(self): |
| 881 | + parser = expat.ParserCreate() |
| 882 | + # Unconditionally enable maximum activation factor. |
| 883 | + self.set_activation_threshold(parser, 0) |
| 884 | + # Choose a max amplification factor expected to never be exceeded. |
| 885 | + self.assertIsNone(self.set_maximum_amplification(parser, 1e4)) |
| 886 | + # Craft a payload for which the peak amplification factor is < 1e4. |
| 887 | + payload = self.exponential_expansion_payload(1, 2) |
| 888 | + self.assertIsNotNone(parser.Parse(payload, True)) |
| 889 | + |
| 890 | + def test_set_maximum_amplification_infinity(self): |
887 | 891 | inf = float('inf') # an 'inf' threshold is allowed by Expat |
888 | 892 | parser = expat.ParserCreate() |
889 | | - self.assertIsNone(parser.SetAllocTrackerMaximumAmplification(inf)) |
| 893 | + self.assertIsNone(self.set_maximum_amplification(parser, inf)) |
890 | 894 |
|
891 | | - def test_set_alloc_tracker_maximum_amplification_arg_invalid_type(self): |
| 895 | + def test_set_maximum_amplification_arg_invalid_type(self): |
892 | 896 | parser = expat.ParserCreate() |
893 | | - f = parser.SetAllocTrackerMaximumAmplification |
| 897 | + setter = functools.partial(self.set_maximum_amplification, parser) |
894 | 898 |
|
895 | | - self.assertRaises(TypeError, f, None) |
896 | | - self.assertRaises(TypeError, f, 'abc') |
| 899 | + self.assertRaises(TypeError, setter, None) |
| 900 | + self.assertRaises(TypeError, setter, 'abc') |
897 | 901 |
|
898 | | - def test_set_alloc_tracker_maximum_amplification_arg_invalid_range(self): |
| 902 | + def test_set_maximum_amplification_arg_invalid_range(self): |
899 | 903 | parser = expat.ParserCreate() |
900 | | - f = parser.SetAllocTrackerMaximumAmplification |
| 904 | + setter = functools.partial(self.set_maximum_amplification, parser) |
901 | 905 |
|
902 | 906 | msg = re.escape("'max_factor' must be at least 1.0") |
903 | | - self.assertRaisesRegex(expat.ExpatError, msg, f, float('nan')) |
904 | | - self.assertRaisesRegex(expat.ExpatError, msg, f, 0.99) |
| 907 | + self.assertRaisesRegex(expat.ExpatError, msg, setter, float('nan')) |
| 908 | + self.assertRaisesRegex(expat.ExpatError, msg, setter, 0.99) |
905 | 909 |
|
906 | | - def test_set_alloc_tracker_maximum_amplification_fail_for_subparser(self): |
| 910 | + def test_set_maximum_amplification_fail_for_subparser(self): |
907 | 911 | parser = expat.ParserCreate() |
908 | 912 | subparser = parser.ExternalEntityParserCreate(None) |
909 | | - fsub = subparser.SetAllocTrackerMaximumAmplification |
910 | | - self.assert_root_parser_failure(fsub, 123.45) |
| 913 | + setter = functools.partial(self.set_maximum_amplification, subparser) |
| 914 | + self.assert_root_parser_failure(setter, 123.45) |
911 | 915 |
|
912 | | - def test_set_alloc_tracker_activation_threshold(self): |
913 | | - # The payload is expected to have a peak allocation of |
914 | | - # at least 3 bytes but strictly less than 10^5 bytes. |
| 916 | + def test_set_attack_protection_threshold_reached(self): |
| 917 | + parser = expat.ParserCreate() |
| 918 | + # Choose a threshold expected to be always reached. |
| 919 | + self.set_activation_threshold(parser, 3) |
| 920 | + # Check that the threshold is reached by choosing a small factor |
| 921 | + # and a payload whose peak amplification factor exceeds it. |
| 922 | + self.assertIsNone(self.set_maximum_amplification(parser, 1.0)) |
915 | 923 | payload = self.exponential_expansion_payload(10, 4) |
| 924 | + self.assert_active_protection(parser.Parse, payload, True) |
916 | 925 |
|
917 | | - p = expat.ParserCreate() |
918 | | - p.SetAllocTrackerActivationThreshold(pow(10, 5)) |
919 | | - self.assertIsNone(p.SetAllocTrackerMaximumAmplification(1.0)) |
920 | | - # Check that we never reach the activation threshold. |
921 | | - self.assertIsNotNone(p.Parse(payload, True)) |
922 | | - |
923 | | - p = expat.ParserCreate() |
924 | | - p.SetAllocTrackerActivationThreshold(2) |
925 | | - # Check that we always reach the activation threshold. |
926 | | - self.assertIsNone(p.SetAllocTrackerMaximumAmplification(1.0)) |
927 | | - self.assert_alloc_limit_reached(p.Parse, payload, True) |
| 926 | + def test_set_attack_protection_threshold_ignored(self): |
| 927 | + parser = expat.ParserCreate() |
| 928 | + # Choose a threshold expected to be never reached. |
| 929 | + self.set_activation_threshold(parser, pow(10, 5)) |
| 930 | + # Check that the threshold is reached by choosing a small factor |
| 931 | + # and a payload whose peak amplification factor exceeds it. |
| 932 | + self.assertIsNone(self.set_maximum_amplification(parser, 1.0)) |
| 933 | + payload = self.exponential_expansion_payload(10, 4) |
| 934 | + self.assertIsNotNone(parser.Parse(payload, True)) |
928 | 935 |
|
929 | | - def test_set_alloc_tracker_activation_threshold_arg_invalid_type(self): |
| 936 | + def test_set_attack_protection_threshold_arg_invalid_type(self): |
930 | 937 | parser = expat.ParserCreate() |
931 | | - f = parser.SetAllocTrackerActivationThreshold |
| 938 | + setter = functools.partial(self.set_activation_threshold, parser) |
932 | 939 |
|
933 | | - self.assertRaises(TypeError, f, 1.0) |
934 | | - self.assertRaises(TypeError, f, -1.5) |
935 | | - self.assertRaises(ValueError, f, -5) |
| 940 | + self.assertRaises(TypeError, setter, 1.0) |
| 941 | + self.assertRaises(TypeError, setter, -1.5) |
| 942 | + self.assertRaises(ValueError, setter, -5) |
936 | 943 |
|
937 | | - def test_set_alloc_tracker_activation_threshold_arg_invalid_range(self): |
| 944 | + def test_set_attack_protection_threshold_arg_invalid_range(self): |
938 | 945 | _testcapi = import_helper.import_module("_testcapi") |
939 | 946 | parser = expat.ParserCreate() |
940 | | - f = parser.SetAllocTrackerActivationThreshold |
941 | | - self.assertRaises(OverflowError, f, _testcapi.ULLONG_MAX + 1) |
| 947 | + setter = functools.partial(self.set_activation_threshold, parser) |
942 | 948 |
|
943 | | - def test_set_alloc_tracker_activation_threshold_fail_for_subparser(self): |
| 949 | + self.assertRaises(OverflowError, setter, _testcapi.ULLONG_MAX + 1) |
| 950 | + |
| 951 | + def test_set_attack_protection_threshold_fail_for_subparser(self): |
944 | 952 | parser = expat.ParserCreate() |
945 | 953 | subparser = parser.ExternalEntityParserCreate(None) |
946 | | - fsub = subparser.SetAllocTrackerActivationThreshold |
947 | | - self.assert_root_parser_failure(fsub, 12345) |
| 954 | + setter = functools.partial(self.set_activation_threshold, subparser) |
| 955 | + self.assert_root_parser_failure(setter, 12345) |
| 956 | + |
| 957 | + |
| 958 | +@unittest.skipIf(expat.version_info < (2, 7, 2), "requires Expat >= 2.7.2") |
| 959 | +class MemoryProtectionTest(AttackProtectionTestCases, unittest.TestCase): |
| 960 | + |
| 961 | + # With the default Expat configuration, the billion laughs protection may |
| 962 | + # hit before the allocation limiter if exponential_expansion_payload() is |
| 963 | + # not carefully parametrized. In particular, use the following assert_*() |
| 964 | + # methods to check the error message of the active protection. |
| 965 | + |
| 966 | + def assert_active_protection(self, func, /, *args, **kwargs): |
| 967 | + """Check that fnuc(*args, **kwargs) hits the allocation limit.""" |
| 968 | + msg = r"out of memory: line \d+, column \d+" |
| 969 | + self.assertRaisesRegex(expat.ExpatError, msg, func, *args, **kwargs) |
| 970 | + |
| 971 | + def set_maximum_amplification(self, parser, max_factor): |
| 972 | + return parser.SetAllocTrackerMaximumAmplification(max_factor) |
| 973 | + |
| 974 | + def set_activation_threshold(self, parser, threshold): |
| 975 | + return parser.SetAllocTrackerActivationThreshold(threshold) |
948 | 976 |
|
949 | 977 |
|
950 | 978 | if __name__ == "__main__": |
|
0 commit comments