|
1 | 1 | # |
2 | | -# Test ratelimit fields |
| 2 | +# Test ratelimit fields according to latest draft syntax |
| 3 | +# |
| 4 | +# Updated to use the new RateLimit-Policy and RateLimit header syntax: |
| 5 | +# - RateLimit-Policy: "name";q=quota;w=window;pk=:partition_key:;qu="unit" |
| 6 | +# - RateLimit: "name";r=remaining;t=time_to_reset;pk=:partition_key: |
3 | 7 | # |
4 | 8 | import http_sfv |
5 | 9 |
|
6 | 10 |
|
7 | | -def test_policy(): |
| 11 | +def test_ratelimit_policy(): |
| 12 | + # Test RateLimit-Policy header format according to latest draft |
8 | 13 | policies = [ |
9 | | - "1000;w=3600,5000;w=86400", |
10 | | - "100;w=60", |
11 | | - "10;w=1, 50;w=60, 1000;w=3600, 5000;w=86400", |
12 | | - "10;w=1;burst=1000, 1000;w=3600", |
| 14 | + '"hourly";q=1000;w=3600,"daily";q=5000;w=86400', |
| 15 | + '"permin";q=100;w=60', |
| 16 | + '"fast";q=10;w=1,"min";q=50;w=60,"hour";q=1000;w=3600,"day";q=5000;w=86400', |
| 17 | + '"burst";q=10;w=1;burst=1000,"hourly";q=1000;w=3600', |
13 | 18 | ] |
14 | 19 | for p in policies: |
15 | 20 | l = http_sfv.List() |
16 | 21 | l.parse(p.encode()) |
17 | 22 | for i in l: |
18 | | - assert i.value |
19 | | - assert "w" in i.params |
20 | | - print("value: ", i.value, "params:", dict(i.params)) |
21 | | - |
22 | | - |
23 | | -def test_all_in_one(): |
24 | | - values = [ |
25 | | - ( |
26 | | - "limit=10, remaining=10, reset=4," |
27 | | - """policy=(10;w=1 50;w=60 1000;w=3600 5000;w=86400)""" |
28 | | - ), |
29 | | - ] |
30 | | - for v in values: |
31 | | - d = http_sfv.Dictionary() |
32 | | - d.parse(v.encode()) |
33 | | - for k, v in d.items(): |
34 | | - print("parameter:", k, "value:", v) |
35 | | - assert k |
36 | | - assert v |
37 | | - if hasattr(v, "__iter__"): |
38 | | - for i in v: |
39 | | - assert i.value |
40 | | - assert "w" in i.params |
41 | | - print("value: ", i.value, "params:", dict(i.params)) |
42 | | - raise NotImplementedError |
43 | | - |
44 | | - |
45 | | -def find_quota_policy(policies, limit): |
| 23 | + assert i.value # policy name |
| 24 | + assert "q" in i.params # quota |
| 25 | + assert "w" in i.params # window |
| 26 | + print("policy: ", i.value, "params:", dict(i.params)) |
| 27 | + |
| 28 | + |
| 29 | +def test_ratelimit_fields(): |
| 30 | + # Test both RateLimit-Policy and RateLimit headers together |
| 31 | + # According to latest draft syntax |
| 32 | + policy_header = '"default";q=10;w=1,"hourly";q=50;w=60,"daily";q=1000;w=3600,"monthly";q=5000;w=86400' |
| 33 | + ratelimit_header = '"default";r=10;t=4' |
| 34 | + |
| 35 | + # Parse RateLimit-Policy header |
| 36 | + policy = http_sfv.List() |
| 37 | + policy.parse(policy_header.encode()) |
| 38 | + |
| 39 | + # Parse RateLimit header |
| 40 | + ratelimit = http_sfv.List() |
| 41 | + ratelimit.parse(ratelimit_header.encode()) |
| 42 | + |
| 43 | + # Verify policy structure |
| 44 | + for policy_item in policy: |
| 45 | + assert policy_item.value # policy name |
| 46 | + assert "q" in policy_item.params # quota |
| 47 | + assert "w" in policy_item.params # window |
| 48 | + print("policy:", policy_item.value, "quota:", policy_item.params["q"], "window:", policy_item.params["w"]) |
| 49 | + |
| 50 | + # Verify ratelimit structure |
| 51 | + for ratelimit_item in ratelimit: |
| 52 | + assert ratelimit_item.value # policy name |
| 53 | + assert "r" in ratelimit_item.params # remaining |
| 54 | + assert "t" in ratelimit_item.params # time to reset |
| 55 | + print("ratelimit:", ratelimit_item.value, "remaining:", ratelimit_item.params["r"], "reset:", ratelimit_item.params["t"]) |
| 56 | + |
| 57 | + |
| 58 | +def find_policy_by_name(policies, policy_name): |
| 59 | + """Find a policy by its name from a list of policies""" |
46 | 60 | for policy in policies: |
47 | | - if policy.value == limit: |
| 61 | + if policy.value == policy_name: |
48 | 62 | return policy.params |
49 | 63 | return {} |
50 | 64 |
|
51 | 65 |
|
52 | | -def parse_fields(headers): |
53 | | - # 27 µs ± 499 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) |
54 | | - limit = http_sfv.Item() |
55 | | - limit.parse(headers["limit"].encode()) |
56 | | - |
57 | | - policies = http_sfv.List() |
58 | | - policies.parse(headers["policy"].encode()) |
59 | | - quota_policy = find_quota_policy(policies, limit.value) |
60 | | - return limit, policies, quota_policy |
61 | | - |
62 | | - |
63 | | -def parse_fields_token(headers): |
64 | | - # 24.2 µs ± 506 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) |
65 | | - limit = http_sfv.Item() |
66 | | - limit.parse(headers["limit"].encode()) |
67 | | - |
68 | | - policies = http_sfv.Dictionary() |
69 | | - policies.parse(headers["policy"].encode()) |
70 | | - quota_policy = policies[f"q{limit.value}"] |
71 | | - return limit, policies, quota_policy.params |
72 | | - |
73 | | - |
74 | | -def parse_fields_int(headers): |
75 | | - # 24.5 µs ± 297 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) |
76 | | - limit = int(headers["limit"]) |
| 66 | +def parse_ratelimit_headers(headers): |
| 67 | + """Parse RateLimit-Policy and RateLimit headers according to latest draft""" |
| 68 | + # Parse RateLimit-Policy header |
77 | 69 | policies = http_sfv.List() |
78 | | - policies.parse(headers["policy"].encode()) |
79 | | - quota_policy = find_quota_policy(policies, limit) |
80 | | - return limit, policies, quota_policy |
81 | | - |
82 | | - |
83 | | -def test_get_policy(): |
84 | | - # To retreive the policy, I need to iterate over the list. |
| 70 | + policies.parse(headers["RateLimit-Policy"].encode()) |
| 71 | + |
| 72 | + # Parse RateLimit header |
| 73 | + ratelimit = http_sfv.List() |
| 74 | + ratelimit.parse(headers["RateLimit"].encode()) |
| 75 | + |
| 76 | + # Get the current policy name from the RateLimit header |
| 77 | + current_policy_name = ratelimit[0].value if ratelimit else None |
| 78 | + |
| 79 | + # Find the matching policy |
| 80 | + policy_params = find_policy_by_name(policies, current_policy_name) if current_policy_name else {} |
| 81 | + |
| 82 | + return policies, ratelimit, policy_params |
| 83 | + |
| 84 | + |
| 85 | +def test_quota_units_example(): |
| 86 | + """Test RateLimit headers with quota units according to latest draft""" |
| 87 | + headers = dict( |
| 88 | + **{"RateLimit-Policy": '"bandwidth";q=65535;qu="content-bytes";w=10;pk=:sdfjLJUOUH==:'}, |
| 89 | + **{"RateLimit": '"bandwidth";r=30000;t=5;pk=:sdfjLJUOUH==:'} |
| 90 | + ) |
| 91 | + |
| 92 | + policies, ratelimit, policy_params = parse_ratelimit_headers(headers) |
| 93 | + |
| 94 | + # Verify policy structure with quota units |
| 95 | + policy = policies[0] |
| 96 | + assert policy.value == "bandwidth" |
| 97 | + assert policy.params["q"] == 65535 |
| 98 | + assert policy.params["qu"] == "content-bytes" # quota unit |
| 99 | + assert policy.params["w"] == 10 |
| 100 | + |
| 101 | + # Verify ratelimit structure |
| 102 | + current = ratelimit[0] |
| 103 | + assert current.value == "bandwidth" |
| 104 | + assert current.params["r"] == 30000 # remaining bytes |
| 105 | + assert current.params["t"] == 5 |
| 106 | + |
| 107 | + print("policy:", policy.value, "quota:", policy.params["q"], "unit:", policy.params["qu"], "window:", policy.params["w"]) |
| 108 | + print("ratelimit:", current.value, "remaining:", current.params["r"], "reset:", current.params["t"]) |
| 109 | + |
| 110 | + |
| 111 | +def test_multiple_policies_example(): |
| 112 | + """Test multiple policies as shown in draft examples""" |
| 113 | + # Example from draft: RateLimit-Policy: "permin";q=50;w=60,"perhr";q=1000;w=3600 |
| 114 | + headers = dict( |
| 115 | + **{"RateLimit-Policy": '"permin";q=50;w=60,"perhr";q=1000;w=3600'}, |
| 116 | + **{"RateLimit": '"permin";r=25;t=45'} |
| 117 | + ) |
| 118 | + |
| 119 | + policies, ratelimit, policy_params = parse_ratelimit_headers(headers) |
| 120 | + |
| 121 | + # Should have 2 policies |
| 122 | + assert len(policies) == 2 |
| 123 | + |
| 124 | + # Verify first policy |
| 125 | + assert policies[0].value == "permin" |
| 126 | + assert policies[0].params["q"] == 50 |
| 127 | + assert policies[0].params["w"] == 60 |
| 128 | + |
| 129 | + # Verify second policy |
| 130 | + assert policies[1].value == "perhr" |
| 131 | + assert policies[1].params["q"] == 1000 |
| 132 | + assert policies[1].params["w"] == 3600 |
| 133 | + |
| 134 | + # Verify current ratelimit refers to permin policy |
| 135 | + assert ratelimit[0].value == "permin" |
| 136 | + assert ratelimit[0].params["r"] == 25 |
| 137 | + assert ratelimit[0].params["t"] == 45 |
| 138 | + |
| 139 | + print("Multiple policies test passed - found", len(policies), "policies") |
| 140 | + |
| 141 | + |
| 142 | +def test_basic_policy_example(): |
| 143 | + """Test basic policy example from draft""" |
| 144 | + # Example from draft: RateLimit-Policy: "default";q=100;w=10 |
| 145 | + headers = dict( |
| 146 | + **{"RateLimit-Policy": '"default";q=100;w=10'}, |
| 147 | + **{"RateLimit": '"default";r=50;t=30'} |
| 148 | + ) |
| 149 | + |
| 150 | + policies, ratelimit, policy_params = parse_ratelimit_headers(headers) |
| 151 | + |
| 152 | + # Verify policy |
| 153 | + policy = policies[0] |
| 154 | + assert policy.value == "default" |
| 155 | + assert policy.params["q"] == 100 |
| 156 | + assert policy.params["w"] == 10 |
| 157 | + |
| 158 | + # Verify ratelimit |
| 159 | + current = ratelimit[0] |
| 160 | + assert current.value == "default" |
| 161 | + assert current.params["r"] == 50 |
| 162 | + assert current.params["t"] == 30 |
| 163 | + |
| 164 | + print("Basic policy test passed") |
| 165 | + |
| 166 | + |
| 167 | +def test_policy_lookup(): |
| 168 | + """Test retrieving a specific policy from RateLimit-Policy and RateLimit headers""" |
85 | 169 | headers = dict( |
86 | | - policy=( |
87 | | - """10;w=1,""" |
88 | | - """50;w=60,""" |
89 | | - """1000;w=3600;comment="foo", """ |
90 | | - """5000;w=86400""" |
91 | | - ), |
92 | | - limit="""1000""", |
| 170 | + **{"RateLimit-Policy": ( |
| 171 | + '"fast";q=10;w=1,' |
| 172 | + '"permin";q=50;w=60,' |
| 173 | + '"hourly";q=1000;w=3600;comment="primary", ' |
| 174 | + '"daily";q=5000;w=86400' |
| 175 | + )}, |
| 176 | + **{"RateLimit": '"hourly";r=999;t=3540'} |
93 | 177 | ) |
94 | 178 |
|
95 | | - limit, policies, quota_policy = parse_fields(headers) |
96 | | - assert limit.value == 1000 |
| 179 | + policies, ratelimit, policy_params = parse_ratelimit_headers(headers) |
| 180 | + |
| 181 | + # Verify we found the hourly policy |
| 182 | + assert ratelimit[0].value == "hourly" |
| 183 | + assert ratelimit[0].params["r"] == 999 # remaining |
| 184 | + assert ratelimit[0].params["t"] == 3540 # time to reset |
97 | 185 |
|
| 186 | + # Print all policies for verification |
98 | 187 | for policy in policies: |
99 | | - print("parameter:", policy.value, "value:", policy.params["w"]) |
| 188 | + print("policy:", policy.value, "quota:", policy.params["q"], "window:", policy.params["w"]) |
| 189 | + |
| 190 | + # Verify we found the matching policy parameters |
| 191 | + assert policy_params["q"] == 1000 # quota |
| 192 | + assert policy_params["w"] == 3600 # window |
100 | 193 |
|
101 | 194 |
|
102 | | -def test_get_policy_tokenized(): |
103 | | - # To retreive the policy, I need to iterate over the list. |
| 195 | +def test_partition_key_example(): |
| 196 | + """Test RateLimit headers with partition key according to latest draft""" |
104 | 197 | headers = dict( |
105 | | - policy=( |
106 | | - """q10;w=1,""" |
107 | | - """q50;w=60,""" |
108 | | - """q1000;w=3600;comment="foo", """ |
109 | | - """q5000;w=86400""" |
110 | | - ), |
111 | | - limit="""1000""", |
| 198 | + **{"RateLimit-Policy": '"peruser";q=100;w=60;pk=:cHsdsRa894==:'}, |
| 199 | + **{"RateLimit": '"peruser";r=50;t=30;pk=:cHsdsRa894==:'} |
112 | 200 | ) |
113 | | - limit = http_sfv.Item() |
114 | | - limit.parse(headers["limit"].encode()) |
115 | | - assert limit.value == "q1000" |
116 | | - |
117 | | - policies = http_sfv.List() |
118 | | - policies.parse(headers["policy"].encode()) |
119 | | - for policy in policies: |
120 | | - print("parameter:", policy.value, "value:", policy.params["w"]) |
121 | | - |
122 | | - # I could create a dictionary out of the list. |
123 | | - policies_dict = {policy.value: policy.params for policy in policies} |
124 | | - print("policies:", policies_dict) |
125 | | - print("current policy:", policies_dict[limit.value]) |
126 | | - raise NotImplementedError |
| 201 | + |
| 202 | + policies, ratelimit, policy_params = parse_ratelimit_headers(headers) |
| 203 | + |
| 204 | + # Verify policy structure |
| 205 | + policy = policies[0] |
| 206 | + assert policy.value == "peruser" |
| 207 | + assert policy.params["q"] == 100 |
| 208 | + assert policy.params["w"] == 60 |
| 209 | + assert policy.params["pk"] == b'p{\x1d\xb1\x16\xbc\xf7' # decoded base64 |
| 210 | + |
| 211 | + # Verify ratelimit structure |
| 212 | + current = ratelimit[0] |
| 213 | + assert current.value == "peruser" |
| 214 | + assert current.params["r"] == 50 |
| 215 | + assert current.params["t"] == 30 |
| 216 | + assert current.params["pk"] == b'p{\x1d\xb1\x16\xbc\xf7' # decoded base64 |
| 217 | + |
| 218 | + print("policy:", policy.value, "quota:", policy.params["q"], "window:", policy.params["w"]) |
| 219 | + print("ratelimit:", current.value, "remaining:", current.params["r"], "reset:", current.params["t"]) |
| 220 | + |
| 221 | + |
| 222 | +# Test all functions when script is run directly |
| 223 | +if __name__ == "__main__": |
| 224 | + test_functions = [ |
| 225 | + test_ratelimit_policy, |
| 226 | + test_ratelimit_fields, |
| 227 | + test_policy_lookup, |
| 228 | + test_partition_key_example, |
| 229 | + test_quota_units_example, |
| 230 | + test_multiple_policies_example, |
| 231 | + test_basic_policy_example, |
| 232 | + ] |
| 233 | + |
| 234 | + for test_func in test_functions: |
| 235 | + try: |
| 236 | + test_func() |
| 237 | + print(f"✓ {test_func.__name__} passed") |
| 238 | + except Exception as e: |
| 239 | + print(f"✗ {test_func.__name__} failed: {e}") |
| 240 | + import traceback |
| 241 | + traceback.print_exc() |
0 commit comments