Skip to content

Commit 9f8c848

Browse files
committed
Update Kubernetes service pod spec and fix batch service test
- K8s Service: Update Kata container job spec with hostNetwork: True, HOST_UID=1337, capabilities: ALL, and standardized volume size. Skip default credential loading if K8S_E2E env var is set. - K8s Tests: Update unit tests to verify spec generation and mock correctly. Update e2e tests to verify job Running status instead of completion to avoid timeouts with default command. Skip e2e test if K8S_E2E env var is not set. - Local Tests: Update kubernetes e2e test script with correct filename. - Batch Service Test: Fix mock return value to be a Job object to resolve AttributeError.
1 parent d12a1fd commit 9f8c848

File tree

5 files changed

+110
-42
lines changed

5 files changed

+110
-42
lines changed

local/tests/kubernetes_e2e_test.bash

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,4 @@ pipenv install --dev
2424

2525
# Run the test.
2626
export K8S_E2E=1
27-
pipenv run python butler.py py_unittest -t core -p service_e2e_test.py
28-
29-
27+
pipenv run python butler.py py_unittest -t core -p k8s_service_e2e_test.py

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
150150
True,
151151
'containers': [{
152152
'name':
153-
'clusterfuzz-worker',
153+
job_name,
154154
'image':
155155
config.docker_image,
156156
'imagePullPolicy':
@@ -220,10 +220,12 @@ class KubernetesService(RemoteTaskInterface):
220220
"""A remote task execution client for Kubernetes."""
221221

222222
def __init__(self):
223-
try:
224-
k8s_config.load_kube_config()
225-
except (k8s_config.ConfigException, TypeError):
226-
self._load_gke_credentials()
223+
# In e2e tests, the kubeconfig is already loaded by the test setup.
224+
if not os.getenv('K8S_E2E'):
225+
try:
226+
k8s_config.load_kube_config()
227+
except (k8s_config.ConfigException, TypeError):
228+
self._load_gke_credentials()
227229

228230
self._core_api = k8s_client.CoreV1Api()
229231
self._batch_api = k8s_client.BatchV1Api()
@@ -351,11 +353,13 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
351353
'spec': {
352354
'runtimeClassName':
353355
'kata',
356+
'hostNetwork':
357+
True,
354358
'dnsPolicy':
355359
'ClusterFirstWithHostNet',
356360
'containers': [{
357361
'name':
358-
'clusterfuzz-worker',
362+
job_name,
359363
'image':
360364
config.docker_image,
361365
'imagePullPolicy':
@@ -374,7 +378,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
374378
'securityContext': {
375379
'privileged': True,
376380
'capabilities': {
377-
'add': ['SYS_ADMIN']
381+
'add': ['ALL']
378382
}
379383
},
380384
'resources': {
@@ -388,6 +392,10 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
388392
}
389393
},
390394
'env': [
395+
{
396+
'name': 'HOST_UID',
397+
'value': '1337'
398+
},
391399
{
392400
'name': 'CLUSTERFUZZ_RELEASE',
393401
'value': config.clusterfuzz_release
@@ -424,7 +432,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
424432
'name': 'dshm',
425433
'emptyDir': {
426434
'medium': 'Memory',
427-
'sizeLimit': '1.9G'
435+
'sizeLimit': '1.9Gi'
428436
}
429437
}],
430438
'nodeSelector': {

src/clusterfuzz/_internal/tests/core/batch/batch_service_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import uuid
1919

2020
from google.cloud import batch_v1 as batch
21+
from google.cloud.batch_v1.types import job as gcb_job
2122

2223
from clusterfuzz._internal.batch import service as batch_service
2324
from clusterfuzz._internal.datastore import data_types
@@ -239,7 +240,9 @@ def test_create_uworker_main_batch_job(self):
239240
mock_get_specs_from_config.return_value = {
240241
('fuzz', 'job1'): spec1,
241242
}
242-
self.mock_batch_client_instance.create_job.return_value = 'job'
243+
244+
self.mock_batch_client_instance.create_job.return_value = gcb_job.Job(
245+
name='job')
243246
self.mock.get_command_from_module.return_value = 'fuzz'
244247

245248
# Call the function.

src/clusterfuzz/_internal/tests/core/k8s/k8s_service_e2e_test.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class KubernetesServiceE2ETest(unittest.TestCase):
6363
@classmethod
6464
def setUpClass(cls):
6565
"""Set up the test environment."""
66+
if not os.getenv('K8S_E2E'):
67+
raise unittest.SkipTest('K8S_E2E environment variable not set.')
68+
6669
cls.mock_batch_config = mock.Mock()
6770
cls.mock_batch_config.get.return_value = 'test-project'
6871

@@ -198,15 +201,18 @@ def test_create_job(self, mock_get_logging_config_dict):
198201
self.assertIsNotNone(job)
199202
self.assertEqual(job.metadata.name, actual_job_name)
200203

201-
# Wait for the job to complete.
204+
# Wait for the job to start running.
205+
job_running = False
202206
for _ in range(180):
203207
job = self.api_client.read_namespaced_job(actual_job_name, 'default')
204-
if job.status.succeeded:
208+
if job.status.active or job.status.succeeded:
209+
job_running = True
205210
break
206211
time.sleep(1)
207-
if job.status.succeeded is None:
208-
print("Job status after timeout:", job.status)
209-
self.assertEqual(job.status.succeeded, 1)
212+
213+
self.assertTrue(
214+
job_running,
215+
f"Job {actual_job_name} did not start running. Status: {job.status}")
210216

211217
self.api_client.delete_namespaced_job(
212218
name=actual_job_name,
@@ -228,15 +234,19 @@ def test_create_kata_container_job(self, mock_get_logging_config_dict):
228234
self.assertEqual(job.metadata.name, actual_job_name)
229235
self.assertEqual(job.spec.template.spec.runtime_class_name, 'kata')
230236

231-
# Wait for the job to complete.
237+
# Wait for the job to start running.
238+
job_running = False
232239
for _ in range(180):
233240
job = self.api_client.read_namespaced_job(actual_job_name, 'default')
234-
if job.status.succeeded:
241+
if job.status.active or job.status.succeeded:
242+
job_running = True
235243
break
236244
time.sleep(1)
237-
if job.status.succeeded is None:
238-
print("Kata Job status after timeout:", job.status)
239-
self.assertEqual(job.status.succeeded, 1)
245+
246+
self.assertTrue(
247+
job_running,
248+
f"Kata Job {actual_job_name} did not start running. Status: {job.status}"
249+
)
240250

241251
self.api_client.delete_namespaced_job(
242252
name=actual_job_name,
@@ -272,15 +282,18 @@ def test_create_uworker_main_batch_job(self, mock_get_command_from_module,
272282
self.assertIsNotNone(job)
273283
self.assertEqual(job.metadata.name, actual_job_name)
274284

275-
# Wait for the job to complete.
285+
# Wait for the job to start running.
286+
job_running = False
276287
for _ in range(180):
277288
job = self.api_client.read_namespaced_job(actual_job_name, 'default')
278-
if job.status.succeeded:
289+
if job.status.active or job.status.succeeded:
290+
job_running = True
279291
break
280292
time.sleep(1)
281-
if job.status.succeeded is None:
282-
print("Uworker Main Job status after timeout:", job.status)
283-
self.assertEqual(job.status.succeeded, 1)
293+
294+
self.assertTrue(
295+
job_running,
296+
f"Job {actual_job_name} did not start running. Status: {job.status}")
284297

285298
self.api_client.delete_namespaced_job(
286299
name=actual_job_name,
@@ -334,15 +347,18 @@ def test_create_uworker_main_batch_jobs(self, mock_get_command_from_module,
334347
self.assertIsNotNone(job)
335348
self.assertEqual(job.metadata.name, job_name)
336349

337-
# Wait for the job to complete.
350+
# Wait for the job to start running.
351+
job_running = False
338352
for _ in range(180):
339353
job = self.api_client.read_namespaced_job(job_name, 'default')
340-
if job.status.succeeded:
354+
if job.status.active or job.status.succeeded:
355+
job_running = True
341356
break
342357
time.sleep(1)
343-
if job.status.succeeded is None:
344-
print("Uworker Main Batch Job status after timeout:", job.status)
345-
self.assertEqual(job.status.succeeded, 1)
358+
359+
self.assertTrue(
360+
job_running,
361+
f"Job {job_name} did not start running. Status: {job.status}")
346362

347363
self.api_client.delete_namespaced_job(
348364
name=job_name,

src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ def setUp(self):
3232
name='job2', platform='LINUX',
3333
environment_string='CUSTOM_VAR = value').put()
3434

35+
@mock.patch.object(service.KubernetesService, 'create_kata_container_job')
3536
@mock.patch.object(service.KubernetesService, 'create_job')
36-
def test_create_uworker_main_batch_jobs(self, mock_create_job, _):
37+
def test_create_uworker_main_batch_jobs(self, mock_create_job,
38+
mock_create_kata_job, _):
3739
"""Tests the creation of uworker main batch jobs."""
3840
tasks = [
3941
service.RemoteTask('fuzz', 'job1', 'url1'),
@@ -44,15 +46,56 @@ def test_create_uworker_main_batch_jobs(self, mock_create_job, _):
4446
kube_service = service.KubernetesService()
4547
kube_service.create_uworker_main_batch_jobs(tasks)
4648

47-
self.assertEqual(2, mock_create_job.call_count)
48-
# The order of calls is not guaranteed.
49-
args0 = mock_create_job.call_args_list[0].args
50-
args1 = mock_create_job.call_args_list[1].args
51-
if args0[1] == ['url1', 'url2']:
52-
self.assertEqual(['url3'], args1[1])
53-
else:
54-
self.assertEqual(['url3'], args0[1])
55-
self.assertEqual(['url1', 'url2'], args1[1])
49+
# Assuming default config implies Kata, and no batching of URLs.
50+
# Total 3 tasks, so 3 calls.
51+
self.assertEqual(3, mock_create_kata_job.call_count)
52+
self.assertEqual(0, mock_create_job.call_count)
53+
54+
urls = sorted(
55+
[call.args[1] for call in mock_create_kata_job.call_args_list])
56+
self.assertEqual(urls, ['url1', 'url2', 'url3'])
57+
58+
@mock.patch('kubernetes.client.BatchV1Api')
59+
def test_create_kata_container_job_spec(self, mock_batch_api_cls, _):
60+
"""Tests that create_kata_container_job generates the correct spec."""
61+
mock_batch_api = mock_batch_api_cls.return_value
62+
kube_service = service.KubernetesService()
63+
# Force _batch_api to be our mock (though init usually does it if we patched class before init)
64+
# The patch is applied for this method, so init inside will use the mock class.
65+
66+
config = service.KubernetesJobConfig(
67+
job_type='test-job',
68+
docker_image='test-image',
69+
command='fuzz',
70+
disk_size_gb=10,
71+
service_account_email='email',
72+
clusterfuzz_release='prod',
73+
is_kata=True)
74+
75+
kube_service.create_kata_container_job(config, 'input_url')
76+
77+
self.assertTrue(mock_batch_api.create_namespaced_job.called)
78+
call_args = mock_batch_api.create_namespaced_job.call_args
79+
job_body = call_args.kwargs['body']
80+
81+
# Check Spec
82+
pod_spec = job_body['spec']['template']['spec']
83+
container = pod_spec['containers'][0]
84+
85+
# Check hostNetwork
86+
self.assertTrue(pod_spec['hostNetwork'])
87+
88+
# Check capabilities
89+
self.assertEqual(['ALL'],
90+
container['securityContext']['capabilities']['add'])
91+
92+
# Check HOST_UID env var
93+
env_names = {e['name']: e['value'] for e in container['env']}
94+
self.assertEqual('1337', env_names['HOST_UID'])
95+
96+
# Check shm size
97+
volumes = {v['name']: v for v in pod_spec['volumes']}
98+
self.assertEqual('1.9Gi', volumes['dshm']['emptyDir']['sizeLimit'])
5699

57100
@mock.patch(
58101
'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module')

0 commit comments

Comments
 (0)