11"""
2- Tests for custom header filtering functionality.
2+ Tests for custom header forwarding functionality.
33"""
44import fnmatch
55import pytest
@@ -16,60 +16,45 @@ def test_custom_headers_config_creation(self) -> None:
1616
1717 assert config .strategy == "allowlist"
1818 assert config .allowed_headers == []
19- assert config .max_header_size == 8192
20- assert config .max_headers_count == 50
2119
2220 def test_custom_headers_config_with_headers (self ) -> None :
2321 """Test CustomHeadersConfig with allowed headers."""
2422 config = CustomHeadersConfig (
25- allowed_headers = ["x-user-email" , "x-tenant-id" ],
26- max_header_size = 4096 ,
27- max_headers_count = 10
23+ allowed_headers = ["x-user-email" , "x-tenant-id" ]
2824 )
2925
3026 assert config .strategy == "allowlist"
3127 assert config .allowed_headers == ["x-user-email" , "x-tenant-id" ]
32- assert config .max_header_size == 4096
33- assert config .max_headers_count == 10
34-
35- def test_custom_headers_config_validation (self ) -> None :
36- """Test CustomHeadersConfig validation for positive values."""
37- # Test negative max_header_size
38- with pytest .raises (ValueError , match = "max_header_size must be greater than 0" ):
39- CustomHeadersConfig (max_header_size = - 1 )
40-
41- # Test zero max_headers_count
42- with pytest .raises (ValueError , match = "max_headers_count must be greater than 0" ):
43- CustomHeadersConfig (max_headers_count = 0 )
4428
4529
4630def filter_headers_standalone (
4731 headers : dict [str , str ] | None ,
48- config : CustomHeadersConfig
32+ config : CustomHeadersConfig | None
4933) -> dict [str , str ]:
50- """Standalone header filtering function for testing ."""
34+ """Standalone header filtering function matching the production implementation ."""
5135 if not headers :
5236 return {}
5337
38+ # Pass-through behavior: if no config, forward all headers
39+ if config is None :
40+ return headers
41+
42+ # Apply filtering based on allowlist
5443 if not config .allowed_headers :
5544 return {}
5645
5746 filtered = {}
5847 for header_name , header_value in headers .items ():
59- # Check size limit
60- if len (header_value ) > config .max_header_size :
61- continue
62-
63- # Check if header matches any allowed pattern (case-insensitive)
64- for allowed_pattern in config .allowed_headers :
65- if fnmatch .fnmatch (header_name .lower (), allowed_pattern .lower ()):
66- filtered [header_name ] = header_value
48+ # Check against allowlist patterns (case-insensitive)
49+ header_allowed = False
50+ for pattern in config .allowed_headers :
51+ if fnmatch .fnmatch (header_name .lower (), pattern .lower ()):
52+ header_allowed = True
6753 break
68-
69- # Check count limit
70- if len (filtered ) >= config .max_headers_count :
71- break
72-
54+
55+ if header_allowed :
56+ filtered [header_name ] = header_value
57+
7358 return filtered
7459
7560
@@ -85,8 +70,20 @@ def test_filter_headers_no_headers(self) -> None:
8570 result = filter_headers_standalone ({}, config )
8671 assert result == {}
8772
88- def test_filter_headers_no_allowed_headers (self ) -> None :
89- """Test header filtering with no allowed headers (secure by default)."""
73+ def test_filter_headers_pass_through_by_default (self ) -> None :
74+ """Test header filtering with no config (pass-through behavior)."""
75+ headers = {
76+ "x-user-email" :
"[email protected] " ,
77+ "x-admin-token" : "secret" ,
78+ "authorization" : "Bearer token" ,
79+ "x-custom-header" : "value"
80+ }
81+ result = filter_headers_standalone (headers , None )
82+ # All headers should pass through when no config is provided
83+ assert result == headers
84+
85+ def test_filter_headers_empty_allowlist (self ) -> None :
86+ """Test header filtering with empty allowed headers."""
9087 config = CustomHeadersConfig (allowed_headers = [])
9188 headers = {
"x-user-email" :
"[email protected] " ,
"x-admin-token" :
"secret" }
9289 result = filter_headers_standalone (headers , config )
@@ -127,40 +124,102 @@ def test_filter_headers_case_insensitive_patterns(self) -> None:
127124 }
128125 assert result == expected
129126
130- def test_filter_headers_size_limit (self ) -> None :
131- """Test header filtering with size limits."""
132- config = CustomHeadersConfig (
133- allowed_headers = ["x-data" , "x-large-data" ],
134- max_header_size = 10 # Very small limit for testing
135- )
127+ def test_filter_headers_wildcard_patterns (self ) -> None :
128+ """Test header filtering with wildcard patterns."""
129+ config = CustomHeadersConfig (allowed_headers = ["x-user-*" , "authorization" ])
136130 headers = {
137- "x-data" : "small" , # 5 chars - should pass
138- "x-large-data" : "this header value is way too long for the configured limit" # Should be rejected due to size
131+ "x-user-id" : "123" ,
132+ "x-user-email" :
"[email protected] " ,
133+ "x-user-role" : "admin" ,
134+ "authorization" : "Bearer token" ,
135+ "x-system-info" : "blocked" , # Should be filtered out
136+ "content-type" : "application/json" # Should be filtered out
139137 }
140138 result = filter_headers_standalone (headers , config )
141139
142- expected = {"x-data" : "small" }
140+ expected = {
141+ "x-user-id" : "123" ,
142+ "x-user-email" :
"[email protected] " ,
143+ "x-user-role" : "admin" ,
144+ "authorization" : "Bearer token"
145+ }
143146 assert result == expected
144147
145- def test_filter_headers_count_limit (self ) -> None :
146- """Test header filtering with count limits."""
147- config = CustomHeadersConfig (
148- allowed_headers = ["x-header-*" ],
149- max_headers_count = 2
150- )
148+ def test_filter_headers_complex_patterns (self ) -> None :
149+ """Test header filtering with complex fnmatch patterns."""
150+ config = CustomHeadersConfig (allowed_headers = ["x-tenant-*" , "x-user-[abc]*" , "auth*" ])
151151 headers = {
152- "x-header-1" : "value1" ,
153- "x-header-2" : "value2" ,
154- "x-header-3" : "value3" , # Should be ignored due to count limit
155- "x-header-4" : "value4" # Should be ignored due to count limit
152+ "x-tenant-id" : "tenant1" , # Matches x-tenant-*
153+ "x-tenant-name" : "acme" , # Matches x-tenant-*
154+ "x-user-admin" : "true" , # Matches x-user-[abc]*
155+ "x-user-beta" : "false" , # Matches x-user-[abc]*
156+ "x-user-delta" : "test" , # Does NOT match x-user-[abc]*
157+ "authorization" : "Bearer x" , # Matches auth*
158+ "authenticate" : "digest" , # Matches auth*
159+ "content-type" : "json" , # No match
156160 }
157161 result = filter_headers_standalone (headers , config )
158162
159- # Should only get first 2 headers that match
160- assert len (result ) == 2
161- assert "x-header-1" in result
162- assert "x-header-2" in result
163+ expected = {
164+ "x-tenant-id" : "tenant1" ,
165+ "x-tenant-name" : "acme" ,
166+ "x-user-admin" : "true" ,
167+ "x-user-beta" : "false" ,
168+ "authorization" : "Bearer x" ,
169+ "authenticate" : "digest"
170+ }
171+ assert result == expected
172+
173+ def test_filter_headers_all_types (self ) -> None :
174+ """Test that any header type can be allowed, not just x- prefixed ones."""
175+ config = CustomHeadersConfig (allowed_headers = ["authorization" , "accept-language" , "custom-*" ])
176+ headers = {
177+ "authorization" : "Bearer token" ,
178+ "accept-language" : "en-US" ,
179+ "custom-header" : "value" ,
180+ "custom-auth" : "token" ,
181+ "content-type" : "application/json" , # Should be blocked
182+ "x-blocked" : "value" # Should be blocked
183+ }
184+ result = filter_headers_standalone (headers , config )
185+
186+ expected = {
187+ "authorization" : "Bearer token" ,
188+ "accept-language" : "en-US" ,
189+ "custom-header" : "value" ,
190+ "custom-auth" : "token"
191+ }
192+ assert result == expected
193+
194+
195+ class TestRequestStructure :
196+ """Test the new request.headers API structure."""
197+
198+ def test_request_headers_structure (self ) -> None :
199+ """Test that headers are properly nested in request structure."""
200+ headers = {"x-user-id" : "123" , "x-tenant-id" : "tenant1" }
201+ request : dict [str , dict [str , str ]] = {"headers" : headers }
202+
203+ # Verify structure
204+ assert "headers" in request
205+ assert request ["headers" ] == headers
206+
207+ # Test extraction
208+ extracted_headers = request .get ("headers" )
209+ assert extracted_headers == headers
210+
211+ def test_request_with_no_headers (self ) -> None :
212+ """Test request structure when no headers are provided."""
213+ request : dict [str , dict [str , str ]] = {}
214+ extracted_headers = request .get ("headers" )
215+ assert extracted_headers is None
216+
217+ def test_request_with_empty_headers (self ) -> None :
218+ """Test request structure with empty headers dict."""
219+ request : dict [str , dict [str , str ]] = {"headers" : {}}
220+ extracted_headers = request .get ("headers" )
221+ assert extracted_headers == {}
163222
164223
165224if __name__ == "__main__" :
166- pytest .main ([__file__ ])
225+ pytest .main ([__file__ ])
0 commit comments