Skip to content

Commit a997487

Browse files
committed
better detection of watch operations
1 parent 011109c commit a997487

File tree

1 file changed

+98
-57
lines changed

1 file changed

+98
-57
lines changed

experiment/audit/audit_log_parser.py

Lines changed: 98 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -282,41 +282,23 @@ def _normalize_audit_path(self, uri):
282282
return '/'.join(result_parts)
283283

284284
def _normalize_watch_path(self, uri):
285-
"""Normalize watch operation URI to match Swagger watch path format."""
286-
# Remove query parameters
287-
uri = uri.split('?')[0]
285+
"""Normalize watch operation URI to match Swagger watch path format.
288286
289-
# Convert regular resource path to watch path
290-
# /apis/group/version/resources -> /apis/group/version/watch/resources
291-
# /apis/group/version/namespaces/{namespace}/resources -> /apis/group/version/watch/namespaces/{namespace}/resources
292-
# /api/v1/resources -> /api/v1/watch/resources
293-
# /api/v1/namespaces/{namespace}/resources -> /api/v1/watch/namespaces/{namespace}/resources
287+
NOTE: Modern Kubernetes clients use regular resource endpoints with ?watch=true,
288+
but the OpenAPI spec still defines deprecated /watch/ paths. We need to convert
289+
from the actual audit log format to the OpenAPI spec format.
290+
"""
291+
# Remove query parameters first
292+
clean_uri = uri.split('?')[0]
294293

295-
if uri.startswith('/apis/'):
296-
# Pattern: /apis/group/version/...
297-
parts = uri.split('/')
298-
if len(parts) >= 4: # /apis/group/version/...
299-
if len(parts) >= 5 and parts[4] == 'namespaces':
300-
# /apis/group/version/namespaces/... -> /apis/group/version/watch/namespaces/...
301-
watch_uri = '/'.join(parts[:4]) + '/watch/' + '/'.join(parts[4:])
302-
else:
303-
# /apis/group/version/resources -> /apis/group/version/watch/resources
304-
watch_uri = '/'.join(parts[:4]) + '/watch/' + '/'.join(parts[4:])
305-
else:
306-
watch_uri = uri
307-
elif uri.startswith('/api/v1/'):
308-
# Pattern: /api/v1/...
309-
if '/namespaces/' in uri:
310-
# /api/v1/namespaces/... -> /api/v1/watch/namespaces/...
311-
watch_uri = uri.replace('/api/v1/', '/api/v1/watch/')
312-
else:
313-
# /api/v1/resources -> /api/v1/watch/resources
314-
watch_uri = uri.replace('/api/v1/', '/api/v1/watch/')
315-
else:
316-
watch_uri = uri
294+
# For watch operations, we try both approaches:
295+
# 1. First try to match the actual path as a regular resource operation
296+
# 2. Then try the deprecated /watch/ path format
317297

318-
# Now apply normal normalization to the watch path
319-
return self._normalize_audit_path(watch_uri)
298+
# The watch parameter in query indicates this is a watch operation,
299+
# but the path itself is a regular resource path. Most watch operations
300+
# should be matched as regular GET operations on collections.
301+
return self._normalize_audit_path(clean_uri)
320302

321303
def _k8s_verb_to_http_method(self, k8s_verb, uri): # pylint: disable=unused-argument,no-self-use
322304
"""Convert Kubernetes audit verb to HTTP method for Swagger lookup."""
@@ -342,25 +324,48 @@ def get_operation_id(self, method, uri):
342324
if not self.swagger_spec:
343325
return None
344326

345-
# Handle watch operations - convert to watch path format
346-
if method.lower() == 'watch':
347-
normalized_uri = self._normalize_watch_path(uri)
348-
else:
349-
normalized_uri = self._normalize_audit_path(uri)
350-
351-
# Convert Kubernetes verb to HTTP method
327+
# Check if this is a watch operation by looking at query parameters
328+
is_watch_operation = 'watch=true' in uri.lower()
329+
330+
# Normalize the URI (removes query parameters)
331+
normalized_uri = self._normalize_audit_path(uri)
332+
333+
# For watch operations, try multiple approaches
334+
if method.lower() == 'watch' or is_watch_operation:
335+
# Approach 1: Try the deprecated /watch/ path format from OpenAPI spec
336+
watch_uri = SwaggerEndpointMapper._convert_to_deprecated_watch_path(normalized_uri)
337+
watch_key = f"get:{watch_uri}"
338+
if watch_key in self.path_to_operation:
339+
return self.path_to_operation[watch_key]
340+
341+
# Approach 2: Try as a regular GET operation (most common)
342+
# Watch operations are typically GET requests on collections
343+
get_key = f"get:{normalized_uri}"
344+
if get_key in self.path_to_operation:
345+
return self.path_to_operation[get_key]
346+
347+
# Approach 3: Try list operations which are often used for watching
348+
list_variations = [
349+
get_key,
350+
f"get:{normalized_uri.rstrip('/')}",
351+
f"get:{normalized_uri}/" if not normalized_uri.endswith('/') else get_key,
352+
]
353+
354+
for variation in list_variations:
355+
if variation in self.path_to_operation:
356+
return self.path_to_operation[variation]
357+
358+
# For non-watch operations or if watch matching failed, use regular approach
352359
http_method = self._k8s_verb_to_http_method(method, uri).lower()
353360
key = f"{http_method}:{normalized_uri}"
354361

355362
# Direct match
356363
if key in self.path_to_operation:
357364
return self.path_to_operation[key]
358365

359-
# Try some common variations
366+
# Try common variations
360367
variations = [
361-
# Try without trailing slash
362368
key.rstrip('/'),
363-
# Try with trailing slash if not present
364369
key if key.endswith('/') else key + '/',
365370
]
366371

@@ -370,7 +375,6 @@ def get_operation_id(self, method, uri):
370375

371376
# For specific resource instance operations, try with {name} placeholder
372377
if '/{name}' not in normalized_uri and http_method == 'get':
373-
# Try adding {name} for individual resource gets
374378
name_variation = f"{http_method}:{normalized_uri}/{{name}}"
375379
if name_variation in self.path_to_operation:
376380
return self.path_to_operation[name_variation]
@@ -413,18 +417,51 @@ def _path_similarity(self, path1, path2): # pylint: disable=no-self-use
413417

414418
return matches / len(parts1) if parts1 else 0
415419

420+
@staticmethod
421+
def _convert_to_deprecated_watch_path(uri):
422+
"""Convert a regular resource path to the deprecated /watch/ path format.
423+
424+
This converts:
425+
/api/v1/namespaces/{namespace}/pods -> /api/v1/watch/namespaces/{namespace}/pods
426+
/apis/apps/v1/namespaces/{namespace}/deployments -> /apis/apps/v1/watch/namespaces/{namespace}/deployments
427+
"""
428+
if uri.startswith('/apis/'):
429+
# Pattern: /apis/group/version/...
430+
parts = uri.split('/')
431+
if len(parts) >= 4: # /apis/group/version/...
432+
# Insert 'watch' after version
433+
new_parts = parts[:4] + ['watch'] + parts[4:]
434+
return '/'.join(new_parts)
435+
elif uri.startswith('/api/v1/'):
436+
# Pattern: /api/v1/...
437+
# Insert 'watch' after /api/v1
438+
return uri.replace('/api/v1/', '/api/v1/watch/')
439+
440+
return uri
441+
416442

417443
def convert_to_k8s_endpoint_fallback(verb, uri): # pylint: disable=too-many-branches,too-many-statements,too-many-return-statements
418444
"""
419445
Fallback method: Convert HTTP verb and URI to Kubernetes endpoint format.
420446
Used when Swagger specification is not available.
421447
"""
422-
# This is the same logic as the original script
423-
uri = uri.split('?')[0]
424-
uri = re.sub(r'/namespaces/[^/]+', '/namespaces/{namespace}', uri)
425-
uri = re.sub(r'/nodes/[^/]+', '/nodes/{node}', uri)
448+
# Check if this is a watch operation from query parameters
449+
is_watch_operation = 'watch=true' in uri.lower()
450+
451+
# Clean the URI by removing query parameters
452+
clean_uri = uri.split('?')[0]
453+
clean_uri = re.sub(r'/namespaces/[^/]+', '/namespaces/{namespace}', clean_uri)
454+
clean_uri = re.sub(r'/nodes/[^/]+', '/nodes/{node}', clean_uri)
426455

456+
# For watch operations, we prefix with 'watch'
427457
verb = verb.lower()
458+
if is_watch_operation or verb == 'watch':
459+
verb_prefix = 'watch'
460+
# Use the clean URI for processing
461+
uri = clean_uri
462+
else:
463+
verb_prefix = verb
464+
uri = clean_uri
428465

429466
# Handle core API v1
430467
if uri.startswith('/api/v1/'):
@@ -441,10 +478,10 @@ def convert_to_k8s_endpoint_fallback(verb, uri): # pylint: disable=too-many-bra
441478
if subresource in ['status', 'scale', 'log', 'exec', 'attach', 'portforward', 'proxy', 'binding', 'eviction', 'ephemeralcontainers']:
442479
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
443480
subresource_name = subresource[0].upper() + subresource[1:] if len(subresource) > 1 else subresource.upper()
444-
return f'{verb}CoreV1Namespaced{resource_name}{subresource_name}'
481+
return f'{verb_prefix}CoreV1Namespaced{resource_name}{subresource_name}'
445482

446483
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
447-
return f'{verb}CoreV1Namespaced{resource_name}'
484+
return f'{verb_prefix}CoreV1Namespaced{resource_name}'
448485

449486
else:
450487
resource = resource_part.split('/')[0]
@@ -455,10 +492,10 @@ def convert_to_k8s_endpoint_fallback(verb, uri): # pylint: disable=too-many-bra
455492
if subresource in ['status', 'scale']:
456493
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
457494
subresource_name = subresource[0].upper() + subresource[1:] if len(subresource) > 1 else subresource.upper()
458-
return f'{verb}CoreV1{resource_name}{subresource_name}'
495+
return f'{verb_prefix}CoreV1{resource_name}{subresource_name}'
459496

460497
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
461-
return f'{verb}CoreV1{resource_name}'
498+
return f'{verb_prefix}CoreV1{resource_name}'
462499

463500
# Handle APIs group
464501
elif uri.startswith('/apis/'):
@@ -481,10 +518,10 @@ def convert_to_k8s_endpoint_fallback(verb, uri): # pylint: disable=too-many-bra
481518
if subresource in ['status', 'scale', 'binding']:
482519
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
483520
subresource_name = subresource[0].upper() + subresource[1:] if len(subresource) > 1 else subresource.upper()
484-
return f'{verb}{group_clean.capitalize()}{version_clean}Namespaced{resource_name}{subresource_name}'
521+
return f'{verb_prefix}{group_clean.capitalize()}{version_clean}Namespaced{resource_name}{subresource_name}'
485522

486523
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
487-
return f'{verb}{group_clean.capitalize()}{version_clean}Namespaced{resource_name}'
524+
return f'{verb_prefix}{group_clean.capitalize()}{version_clean}Namespaced{resource_name}'
488525

489526
else:
490527
resource = rest.split('/')[0]
@@ -495,10 +532,10 @@ def convert_to_k8s_endpoint_fallback(verb, uri): # pylint: disable=too-many-bra
495532
if subresource in ['status', 'scale']:
496533
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
497534
subresource_name = subresource[0].upper() + subresource[1:] if len(subresource) > 1 else subresource.upper()
498-
return f'{verb}{group_clean.capitalize()}{version_clean}{resource_name}{subresource_name}'
535+
return f'{verb_prefix}{group_clean.capitalize()}{version_clean}{resource_name}{subresource_name}'
499536

500537
resource_name = resource[0].upper() + resource[1:] if len(resource) > 1 else resource.upper()
501-
return f'{verb}{group_clean.capitalize()}{version_clean}{resource_name}'
538+
return f'{verb_prefix}{group_clean.capitalize()}{version_clean}{resource_name}'
502539

503540
return None
504541

@@ -555,14 +592,18 @@ def parse_audit_logs(file_paths, swagger_mapper=None): # pylint: disable=too-ma
555592
request_uri = entry.get('requestURI', '')
556593

557594
if verb and request_uri:
595+
# Check if this is a watch operation based on query parameters
596+
# Modern Kubernetes watch operations use ?watch=true parameter
597+
is_watch_via_query = 'watch=true' in request_uri.lower()
598+
effective_verb = 'watch' if is_watch_via_query else verb
558599
# Use Swagger-based mapping (required)
559-
operation_id = swagger_mapper.get_operation_id(verb, request_uri)
600+
operation_id = swagger_mapper.get_operation_id(effective_verb, request_uri)
560601
if operation_id:
561602
endpoint_counts[operation_id] += 1
562603
swagger_matches += 1
563604
else:
564605
# Try fallback parsing for edge cases
565-
fallback_endpoint = convert_to_k8s_endpoint_fallback(verb, request_uri)
606+
fallback_endpoint = convert_to_k8s_endpoint_fallback(effective_verb, request_uri)
566607
if fallback_endpoint:
567608
endpoint_counts[fallback_endpoint] += 1
568609
fallback_matches += 1

0 commit comments

Comments
 (0)