diff --git a/.github/workflows/build-apiserver.yaml b/.github/workflows/build-apiserver.yaml index 234b8bd..8bfeb0f 100644 --- a/.github/workflows/build-apiserver.yaml +++ b/.github/workflows/build-apiserver.yaml @@ -7,7 +7,7 @@ on: jobs: validate-kustomize: - uses: datum-cloud/actions/.github/workflows/validate-kustomize.yaml@v1.7.2 + uses: datum-cloud/actions/.github/workflows/validate-kustomize.yaml@v1.9.0 publish-container-image: # No point in trying to build the container image if the deployment @@ -19,7 +19,7 @@ jobs: contents: read packages: write attestations: write - uses: datum-cloud/actions/.github/workflows/publish-docker.yaml@v1.7.2 + uses: datum-cloud/actions/.github/workflows/publish-docker.yaml@v1.9.0 with: image-name: activity secrets: inherit @@ -35,7 +35,7 @@ jobs: id-token: write contents: read packages: write - uses: datum-cloud/actions/.github/workflows/publish-kustomize-bundle.yaml@v1.7.2 + uses: datum-cloud/actions/.github/workflows/publish-kustomize-bundle.yaml@v1.9.0 with: bundle-name: ghcr.io/datum-cloud/activity-kustomize bundle-path: config diff --git a/config/components/clickhouse-migrations/configmap.yaml b/config/components/clickhouse-migrations/configmap.yaml index 49b60cc..b56fe64 100644 --- a/config/components/clickhouse-migrations/configmap.yaml +++ b/config/components/clickhouse-migrations/configmap.yaml @@ -315,10 +315,10 @@ data: 001_initial_schema.sql: | -- Migration: 001_initial_schema - -- Description: High-volume multi-tenant audit events table + -- Description: High-volume multi-tenant audit events table with projections for + -- platform-wide querying and user-specific querying. -- Author: Scot Wells -- Date: 2025-12-11 - -- Strategy: Simplified scope-based partitioning with hash-based distribution CREATE DATABASE IF NOT EXISTS audit; @@ -358,9 +358,6 @@ data: audit_id UUID MATERIALIZED toUUIDOrZero(coalesce(JSONExtractString(event_json, 'auditID'), '')), - stage LowCardinality(String) MATERIALIZED - coalesce(JSONExtractString(event_json, 'stage'), ''), - -- Common filters verb LowCardinality(String) MATERIALIZED coalesce(JSONExtractString(event_json, 'verb'), ''), @@ -380,23 +377,79 @@ data: status_code UInt16 MATERIALIZED toUInt16OrZero(JSONExtractString(event_json, 'responseStatus', 'code')), - -- Simple bucketing to spread parts/merges - bucket UInt8 MATERIALIZED (cityHash64(audit_id) % 16), + -- ======================================================================== + -- Skip Indexes: Optimized for different query patterns + -- ======================================================================== + + -- Timestamp minmax index for time range queries + INDEX idx_timestamp_minmax timestamp TYPE minmax GRANULARITY 4, + + -- Bloom filters with GRANULARITY 1 for high precision (critical filters) + INDEX idx_verb_set verb TYPE set(10) GRANULARITY 4, + INDEX idx_resource_bloom resource TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_verb_resource_bloom (verb, resource) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_user_bloom user TYPE bloom_filter(0.001) GRANULARITY 1, - -- Minimal skip indexes - INDEX bf_user user TYPE bloom_filter(0.01) GRANULARITY 4, - INDEX bf_scope (scope_type, scope_name) TYPE bloom_filter(0.01) GRANULARITY 4, - INDEX bf_audit_id audit_id TYPE bloom_filter(0.01) GRANULARITY 4, - INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 4 + -- Set indexes for low-cardinality columns + INDEX idx_status_code_set status_code TYPE set(100) GRANULARITY 4, + -- Minmax index for status_code range queries + INDEX idx_status_code_minmax status_code TYPE minmax GRANULARITY 4, ) - ENGINE = MergeTree - PARTITION BY (toYYYYMM(timestamp), bucket) - ORDER BY (timestamp, scope_type, scope_name, user, audit_id, stage) + ENGINE = ReplacingMergeTree + PARTITION BY toYYYYMMDD(timestamp) + -- Primary key optimized for tenant-scoped queries with hour bucketing + -- Hour bucketing improves compression, data locality, and deduplication efficiency + -- Deduplication occurs on the full ORDER BY key during merges + ORDER BY (toStartOfHour(timestamp), scope_type, scope_name, user, audit_id, timestamp) + PRIMARY KEY (toStartOfHour(timestamp), scope_type, scope_name, user, audit_id) -- Move parts to cold S3-backed volume after 90 days TTL timestamp + INTERVAL 90 DAY TO VOLUME 'cold' SETTINGS storage_policy = 'hot_cold', - ttl_only_drop_parts = 1; + ttl_only_drop_parts = 1, + deduplicate_merge_projection_mode = 'rebuild'; + + -- ============================================================================ + -- Step 3: Add Platform Query Projection + -- ============================================================================ + -- This projection is optimized for platform-wide queries that filter by + -- timestamp, api_group, and resource (common for cross-tenant analytics). + -- + -- Sort order: (timestamp, api_group, resource, audit_id) + -- Use cases: + -- - "All events for 'apps' API group and 'deployments' resource in last 24 hours" + -- - "All events for core API 'pods' resource" + -- - Platform-wide verb/resource filtering + -- + + ALTER TABLE audit.events + ADD PROJECTION platform_query_projection + ( + SELECT * + ORDER BY (timestamp, api_group, resource, audit_id) + ); + + -- ============================================================================ + -- Step 4: Add User Query Projection + -- ============================================================================ + -- This projection is optimized for user-specific queries within time ranges. + -- + -- Sort order: (timestamp, user, api_group, resource) + -- Use cases: + -- - "What did alice@example.com do in the last 24 hours?" + -- - "All events by system:serviceaccount:kube-system:default" + -- - User-specific verb/resource filtering + -- + -- ClickHouse automatically chooses the best projection for each query based + -- on the WHERE clause filters. + + ALTER TABLE audit.events + ADD PROJECTION user_query_projection + ( + SELECT * + ORDER BY (timestamp, user, api_group, resource) + ); diff --git a/config/components/k6-performance-tests/generated/query-load-test.js b/config/components/k6-performance-tests/generated/query-load-test.js index bc03225..96e0482 100644 --- a/config/components/k6-performance-tests/generated/query-load-test.js +++ b/config/components/k6-performance-tests/generated/query-load-test.js @@ -118,64 +118,93 @@ if (TLS_CERT_FILE && TLS_KEY_FILE) { } // Query templates with different complexity levels +// Optimized for platform-wide queries that leverage platform_query_projection const queryTemplates = [ - // Simple queries + // Platform-wide queries using api_group and resource (leverage platform_query_projection) + // These should use: ORDER BY (timestamp, api_group, resource, audit_id) { - name: 'simple_verb_filter', - filter: "verb == 'create'", + name: 'platform_core_pods', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'", // Core API pods limit: 100, }, { - name: 'simple_namespace_filter', - filter: "objectRef.namespace == 'default'", + name: 'platform_apps_deployments', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments'", limit: 100, }, { - name: 'simple_resource_filter', - filter: "objectRef.resource == 'pods'", + name: 'platform_apps_statefulsets', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'statefulsets'", limit: 100, }, - // Medium complexity queries { - name: 'medium_combined_filter', - filter: "verb == 'delete' && objectRef.namespace == 'default'", + name: 'platform_batch_jobs', + filter: "objectRef.apiGroup == 'batch' && objectRef.resource == 'jobs'", limit: 100, }, { - name: 'medium_multi_verb', - filter: "verb in ['create', 'update', 'delete']", + name: 'platform_core_services', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'services'", limit: 100, }, + + // Platform queries with verb filters (still leverages projection) + { + name: 'platform_pod_creates', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb == 'create'", + limit: 100, + }, + { + name: 'platform_deployment_updates', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && verb in ['update', 'patch']", + limit: 100, + }, + { + name: 'platform_secret_access', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'secrets' && verb in ['get', 'list']", + limit: 100, + }, + + // Platform queries by API group only { - name: 'medium_user_filter', - filter: "user.username.startsWith('system:') && verb == 'get'", + name: 'platform_apps_apigroup', + filter: "objectRef.apiGroup == 'apps'", // All apps API group resources limit: 100, }, - // Complex queries { - name: 'complex_multi_condition', - filter: "objectRef.namespace in ['default', 'kube-system'] && objectRef.resource == 'deployments' && verb in ['create', 'update']", + name: 'platform_batch_apigroup', + filter: "objectRef.apiGroup == 'batch'", // All batch API group resources limit: 100, }, + + // Complex platform queries (3+ conditions) { - name: 'complex_timestamp_range', - filter: `stageTimestamp >= timestamp('2024-01-01T00:00:00Z') && verb == 'delete'`, + name: 'platform_deployment_errors', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && responseStatus.code >= 400", limit: 50, }, { - name: 'complex_secrets_audit', - filter: "objectRef.resource == 'secrets' && stage == 'ResponseComplete' && verb in ['get', 'list']", + name: 'platform_pod_lifecycle', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb in ['create', 'delete']", limit: 100, }, - // Pagination queries (smaller limits for testing pagination) + + // Resource-only queries (should still benefit from projection) { - name: 'pagination_small', - filter: "verb == 'get'", - limit: 10, + name: 'simple_resource_configmaps', + filter: "objectRef.resource == 'configmaps'", + limit: 100, }, { - name: 'pagination_medium', - filter: "objectRef.namespace == 'default'", + name: 'simple_resource_namespaces', + filter: "objectRef.resource == 'namespaces'", + limit: 100, + }, + + // Pagination test for platform queries + { + name: 'pagination_platform_pods', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'", limit: 25, }, ]; @@ -326,22 +355,31 @@ function executeQueryWithPagination(template, maxPages = 3) { // Main test function export default function() { - // Randomly select a query template (weighted towards simpler queries) + // Randomly select a query template + // Weighted towards platform-wide api_group+resource queries to stress-test projections const rand = Math.random(); let templateIndex; - if (rand < 0.5) { - // 50% simple queries - templateIndex = Math.floor(Math.random() * 3); - } else if (rand < 0.8) { - // 30% medium queries - templateIndex = 3 + Math.floor(Math.random() * 3); + if (rand < 0.40) { + // 40% - Basic platform queries by api_group + resource (indices 0-4) + // These directly test the platform_query_projection + templateIndex = Math.floor(Math.random() * 5); + } else if (rand < 0.70) { + // 30% - Platform queries with verb filters (indices 5-7) + // Still leverage projection but add filtering + templateIndex = 5 + Math.floor(Math.random() * 3); + } else if (rand < 0.85) { + // 15% - API group-only queries (indices 8-9) + templateIndex = 8 + Math.floor(Math.random() * 2); } else if (rand < 0.95) { - // 15% complex queries - templateIndex = 6 + Math.floor(Math.random() * 3); + // 10% - Complex platform queries (indices 10-11) + templateIndex = 10 + Math.floor(Math.random() * 2); + } else if (rand < 0.98) { + // 3% - Resource-only queries (indices 12-13) + templateIndex = 12 + Math.floor(Math.random() * 2); } else { - // 5% pagination queries - templateIndex = 9 + Math.floor(Math.random() * 2); + // 2% - Pagination queries (index 14) + templateIndex = 14; } const template = queryTemplates[templateIndex]; diff --git a/config/components/nats-streams/audit-stream.yaml b/config/components/nats-streams/audit-stream.yaml index af48130..7efee05 100644 --- a/config/components/nats-streams/audit-stream.yaml +++ b/config/components/nats-streams/audit-stream.yaml @@ -34,7 +34,11 @@ spec: allowDirect: true # Deduplication window - prevents duplicate messages - duplicateWindow: 2m + # Extended to 10 minutes to handle webhook retries with exponential backoff + # and Vector restarts. NATS uses message IDs (set to Kubernetes auditID) to + # detect duplicates within this window, providing pipeline-level de-duplication + # before events reach ClickHouse. + duplicateWindow: 10m # Maximum number of consumers maxConsumers: 10 diff --git a/config/components/vector-sidecar/vector-sidecar-hr.yaml b/config/components/vector-sidecar/vector-sidecar-hr.yaml index 2390cd1..ccf050b 100644 --- a/config/components/vector-sidecar/vector-sidecar-hr.yaml +++ b/config/components/vector-sidecar/vector-sidecar-hr.yaml @@ -121,6 +121,15 @@ spec: healthcheck: enabled: true + jetstream: + enabled: true + headers: + # NATS message ID for de-duplication + # Uses the Kubernetes auditID as the NATS message ID to enable + # JetStream's duplicate detection within the duplicateWindow (10m) + # This prevents duplicate events from webhook retries or Vector restarts + message_id: "{{ .auditID }}" + # Buffer for durability # 10GB disk buffer to survive NATS outages buffer: diff --git a/docs/api.md b/docs/api.md index 1079ee2..f12e80d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -78,7 +78,7 @@ _Appears in:_ | --- | --- | --- | --- | | `startTime` _string_ | StartTime is the beginning of your search window (inclusive).

Format Options:
- Relative: "now-30d", "now-2h", "now-30m" (units: s, m, h, d, w)
Use for dashboards and recurring queries - they adjust automatically.
- Absolute: "2024-01-01T00:00:00Z" (RFC3339 with timezone)
Use for historical analysis of specific time periods.

Examples:
"now-30d" → 30 days ago
"2024-06-15T14:30:00-05:00" → specific time with timezone offset | | | | `endTime` _string_ | EndTime is the end of your search window (exclusive).

Uses the same formats as StartTime. Commonly "now" for current moment.
Must be greater than StartTime.

Examples:
"now" → current time
"2024-01-02T00:00:00Z" → specific end point | | | -| `filter` _string_ | Filter narrows results using CEL (Common Expression Language). Leave empty to get all events.

Available Fields:
verb - API action: get, list, create, update, patch, delete, watch
auditID - unique event identifier
stage - request phase: RequestReceived, ResponseStarted, ResponseComplete, Panic
stageTimestamp - when this stage occurred (RFC3339 timestamp)
user.username - who made the request (user or service account)
responseStatus.code - HTTP response code (200, 201, 404, 500, etc.)
objectRef.namespace - target resource namespace
objectRef.resource - resource type (pods, deployments, secrets, configmaps, etc.)
objectRef.name - specific resource name

Operators: ==, !=, <, >, <=, >=, &&, \|\|, in
String Functions: startsWith(), endsWith(), contains()

Common Patterns:
"verb == 'delete'" - All deletions
"objectRef.namespace == 'production'" - Activity in production namespace
"verb in ['create', 'update', 'delete', 'patch']" - All write operations
"responseStatus.code >= 400" - Failed requests
"user.username.startsWith('system:serviceaccount:')" - Service account activity
"objectRef.resource == 'secrets'" - Secret access
"verb == 'delete' && objectRef.namespace == 'production'" - Production deletions

Note: Use single quotes for strings. Field names are case-sensitive.
CEL reference: https://cel.dev | | | +| `filter` _string_ | Filter narrows results using CEL (Common Expression Language). Leave empty to get all events.

Available Fields:
verb - API action: get, list, create, update, patch, delete, watch
auditID - unique event identifier
stageTimestamp - when this stage occurred (RFC3339 timestamp)
user.username - who made the request (user or service account)
responseStatus.code - HTTP response code (200, 201, 404, 500, etc.)
objectRef.namespace - target resource namespace
objectRef.resource - resource type (pods, deployments, secrets, configmaps, etc.)
objectRef.name - specific resource name

Operators: ==, !=, <, >, <=, >=, &&, \|\|, in
String Functions: startsWith(), endsWith(), contains()

Common Patterns:
"verb == 'delete'" - All deletions
"objectRef.namespace == 'production'" - Activity in production namespace
"verb in ['create', 'update', 'delete', 'patch']" - All write operations
"responseStatus.code >= 400" - Failed requests
"user.username.startsWith('system:serviceaccount:')" - Service account activity
"objectRef.resource == 'secrets'" - Secret access
"verb == 'delete' && objectRef.namespace == 'production'" - Production deletions

Note: Use single quotes for strings. Field names are case-sensitive.
CEL reference: https://cel.dev | | | | `limit` _integer_ | Limit sets the maximum number of results per page.
Default: 100, Maximum: 1000.

Use smaller values (10-50) for exploration, larger (500-1000) for data collection.
Use continue to fetch additional pages. | | | | `continue` _string_ | Continue is the pagination cursor for fetching additional pages.

Leave empty for the first page. If status.continue is non-empty after a query,
copy that value here in a new query with identical parameters to get the next page.
Repeat until status.continue is empty.

Important: Keep all other parameters (startTime, endTime, filter, limit) identical
across paginated requests. The cursor is opaque - copy it exactly without modification. | | | diff --git a/docs/components/apiserver-architecture.md b/docs/components/apiserver-architecture.md index 7fbcf76..a16c3a4 100644 --- a/docs/components/apiserver-architecture.md +++ b/docs/components/apiserver-architecture.md @@ -97,7 +97,7 @@ Manages all ClickHouse interactions: Translates CEL expressions to ClickHouse SQL. **Supported fields:** -- `auditID`, `verb`, `stage`, `stageTimestamp` +- `auditID`, `verb`, `stageTimestamp` - `objectRef.{namespace,resource,name}` - `user.username` - `responseStatus.code` diff --git a/internal/cel/cel_test.go b/internal/cel/cel_test.go index 35373b9..71eac4f 100644 --- a/internal/cel/cel_test.go +++ b/internal/cel/cel_test.go @@ -94,9 +94,11 @@ func TestCELFilterWorkflow(t *testing.T) { wantErr: true, }, { - name: "invalid - unavailable field for SQL", - filter: "objectRef.apiGroup == 'apps'", - wantErr: true, + name: "objectRef.apiGroup is available", + filter: "objectRef.apiGroup == 'apps'", + wantSQL: "api_group = {arg1}", + wantArgCount: 1, + wantErr: false, }, } @@ -264,7 +266,6 @@ func TestEnvironment(t *testing.T) { validExpressions := []string{ "auditID == 'test'", "verb == 'delete'", - "stage == 'ResponseComplete'", "stageTimestamp > timestamp('2024-01-01T00:00:00Z')", "objectRef.namespace == 'default'", "objectRef.resource == 'pods'", diff --git a/internal/cel/errors.go b/internal/cel/errors.go index 250fc64..da0dacf 100644 --- a/internal/cel/errors.go +++ b/internal/cel/errors.go @@ -50,7 +50,7 @@ func formatFilterError(err error) string { msg.WriteString(fmt.Sprintf("Invalid filter: %s", errMsg)) } - msg.WriteString(". Available fields: auditID, verb, stage, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code") + msg.WriteString(". Available fields: auditID, verb, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code") msg.WriteString(". See https://cel.dev for CEL syntax") return msg.String() diff --git a/internal/cel/filter.go b/internal/cel/filter.go index f4deaca..528daaa 100644 --- a/internal/cel/filter.go +++ b/internal/cel/filter.go @@ -20,7 +20,7 @@ var tracer = otel.Tracer("activity-cel-filter") // Environment creates a CEL environment for audit event filtering. // -// Available fields: auditID, verb, stage, stageTimestamp, +// Available fields: auditID, verb, stageTimestamp, // objectRef.{namespace,resource,name}, user.username, responseStatus.code // // Supports standard CEL operators (==, &&, ||, in) and string methods @@ -33,7 +33,6 @@ func Environment() (*cel.Env, error) { return cel.NewEnv( cel.Variable("auditID", cel.StringType), cel.Variable("verb", cel.StringType), - cel.Variable("stage", cel.StringType), cel.Variable("stageTimestamp", cel.TimestampType), cel.Variable("objectRef", objectRefType), @@ -321,8 +320,6 @@ func (c *sqlConverter) convertIdentExpr(ident *expr.Expr_Ident) (string, error) return "audit_id", nil case "verb": return "verb", nil - case "stage": - return "stage", nil case "stageTimestamp": return "timestamp", nil @@ -379,6 +376,8 @@ func (c *sqlConverter) convertSelectExpr(sel *expr.Expr_Select) (string, error) return "resource", nil case baseObject == "objectRef" && field == "name": return "resource_name", nil + case baseObject == "objectRef" && field == "apiGroup": + return "api_group", nil case baseObject == "user" && field == "username": return "user", nil @@ -388,7 +387,7 @@ func (c *sqlConverter) convertSelectExpr(sel *expr.Expr_Select) (string, error) default: // Provide helpful suggestions for common fields that aren't filterable - return "", fmt.Errorf("field '%s.%s' is not available for filtering. Available fields: auditID, verb, stage, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code", baseObject, field) + return "", fmt.Errorf("field '%s.%s' is not available for filtering. Available fields: auditID, verb, stageTimestamp, objectRef.apiGroup, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code", baseObject, field) } } diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 5f13e83..01a5dbf 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -450,6 +450,20 @@ func (s *ClickHouseStorage) QueryAuditLogs(ctx context.Context, spec v1alpha1.Au }, nil } +// hasUserFilter checks if the CEL filter contains user-based filtering +func hasUserFilter(filter string) bool { + if filter == "" { + return false + } + // Check for common user filter patterns in CEL expressions + // This is a heuristic - doesn't need to be perfect, just helpful for optimization + return strings.Contains(filter, "user.username") || + strings.Contains(filter, "user.groups") || + strings.Contains(filter, "user.uid") || + // Also match if someone uses the materialized column directly + (strings.Contains(filter, "user") && (strings.Contains(filter, "==") || strings.Contains(filter, "!="))) +} + // buildQuery constructs a ClickHouse SQL query from the query spec func (s *ClickHouseStorage) buildQuery(ctx context.Context, spec v1alpha1.AuditLogQuerySpec, scope ScopeContext) (string, []interface{}, error) { var args []interface{} @@ -506,21 +520,40 @@ func (s *ClickHouseStorage) buildQuery(ctx context.Context, spec v1alpha1.AuditL } } + // Cursor pagination must handle hour-bucketed primary key correctly. + // Use timestamp comparison to avoid hour boundary edge cases where events + // in the same hour bucket but with different timestamps would be skipped. if spec.Continue != "" { cursorTime, cursorAuditID, err := decodeCursor(spec.Continue, spec) if err != nil { return "", nil, err } - conditions = append(conditions, "(timestamp < ? OR (timestamp = ? AND audit_id < ?))") - args = append(args, cursorTime, cursorTime, cursorAuditID) + conditions = append(conditions, "(toStartOfHour(timestamp) < toStartOfHour(?) OR (toStartOfHour(timestamp) = toStartOfHour(?) AND timestamp < ?) OR (timestamp = ? AND audit_id < ?))") + args = append(args, cursorTime, cursorTime, cursorTime, cursorTime, cursorAuditID) } if len(conditions) > 0 { query += " WHERE " + strings.Join(conditions, " AND ") } - query += " ORDER BY timestamp DESC, scope_type DESC, scope_name DESC, user DESC, audit_id DESC" + // ORDER BY must match projection/primary key sort order for ClickHouse + // to efficiently use indexes and projections. + if scope.Type == "platform" { + if hasUserFilter(spec.Filter) { + // User filter present: use user_query_projection + query += " ORDER BY timestamp DESC, user DESC, api_group DESC, resource DESC" + } else { + // No user filter: use platform_query_projection + query += " ORDER BY timestamp DESC, api_group DESC, resource DESC, audit_id DESC" + } + } else if scope.Type == "user" { + // User-scoped: use user_query_projection + query += " ORDER BY timestamp DESC, user DESC, api_group DESC, resource DESC" + } else { + // Tenant-scoped: match hour-bucketed primary key for efficient index use + query += " ORDER BY toStartOfHour(timestamp) DESC, scope_type DESC, scope_name DESC, user DESC, audit_id DESC, timestamp DESC" + } limit := spec.Limit if limit <= 0 { diff --git a/migrations/001_initial_schema.sql b/migrations/001_initial_schema.sql index 1826888..18899c6 100644 --- a/migrations/001_initial_schema.sql +++ b/migrations/001_initial_schema.sql @@ -1,8 +1,8 @@ -- Migration: 001_initial_schema --- Description: High-volume multi-tenant audit events table +-- Description: High-volume multi-tenant audit events table with projections for +-- platform-wide querying and user-specific querying. -- Author: Scot Wells -- Date: 2025-12-11 --- Strategy: Simplified scope-based partitioning with hash-based distribution CREATE DATABASE IF NOT EXISTS audit; @@ -42,9 +42,6 @@ CREATE TABLE IF NOT EXISTS audit.events audit_id UUID MATERIALIZED toUUIDOrZero(coalesce(JSONExtractString(event_json, 'auditID'), '')), - stage LowCardinality(String) MATERIALIZED - coalesce(JSONExtractString(event_json, 'stage'), ''), - -- Common filters verb LowCardinality(String) MATERIALIZED coalesce(JSONExtractString(event_json, 'verb'), ''), @@ -64,22 +61,78 @@ CREATE TABLE IF NOT EXISTS audit.events status_code UInt16 MATERIALIZED toUInt16OrZero(JSONExtractString(event_json, 'responseStatus', 'code')), - -- Simple bucketing to spread parts/merges - bucket UInt8 MATERIALIZED (cityHash64(audit_id) % 16), + -- ======================================================================== + -- Skip Indexes: Optimized for different query patterns + -- ======================================================================== + + -- Timestamp minmax index for time range queries + INDEX idx_timestamp_minmax timestamp TYPE minmax GRANULARITY 4, + + -- Bloom filters with GRANULARITY 1 for high precision (critical filters) + INDEX idx_verb_set verb TYPE set(10) GRANULARITY 4, + INDEX idx_resource_bloom resource TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_verb_resource_bloom (verb, resource) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_user_bloom user TYPE bloom_filter(0.001) GRANULARITY 1, - -- Minimal skip indexes - INDEX bf_user user TYPE bloom_filter(0.01) GRANULARITY 4, - INDEX bf_scope (scope_type, scope_name) TYPE bloom_filter(0.01) GRANULARITY 4, - INDEX bf_audit_id audit_id TYPE bloom_filter(0.01) GRANULARITY 4, - INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 4 + -- Set indexes for low-cardinality columns + INDEX idx_status_code_set status_code TYPE set(100) GRANULARITY 4, + -- Minmax index for status_code range queries + INDEX idx_status_code_minmax status_code TYPE minmax GRANULARITY 4, ) -ENGINE = MergeTree -PARTITION BY (toYYYYMM(timestamp), bucket) -ORDER BY (timestamp, scope_type, scope_name, user, audit_id, stage) +ENGINE = ReplacingMergeTree +PARTITION BY toYYYYMMDD(timestamp) +-- Primary key optimized for tenant-scoped queries with hour bucketing +-- Hour bucketing improves compression, data locality, and deduplication efficiency +-- Deduplication occurs on the full ORDER BY key during merges +ORDER BY (toStartOfHour(timestamp), scope_type, scope_name, user, audit_id, timestamp) +PRIMARY KEY (toStartOfHour(timestamp), scope_type, scope_name, user, audit_id) -- Move parts to cold S3-backed volume after 90 days TTL timestamp + INTERVAL 90 DAY TO VOLUME 'cold' SETTINGS storage_policy = 'hot_cold', - ttl_only_drop_parts = 1; + ttl_only_drop_parts = 1, + deduplicate_merge_projection_mode = 'rebuild'; + +-- ============================================================================ +-- Step 3: Add Platform Query Projection +-- ============================================================================ +-- This projection is optimized for platform-wide queries that filter by +-- timestamp, api_group, and resource (common for cross-tenant analytics). +-- +-- Sort order: (timestamp, api_group, resource, audit_id) +-- Use cases: +-- - "All events for 'apps' API group and 'deployments' resource in last 24 hours" +-- - "All events for core API 'pods' resource" +-- - Platform-wide verb/resource filtering +-- + +ALTER TABLE audit.events +ADD PROJECTION platform_query_projection +( + SELECT * + ORDER BY (timestamp, api_group, resource, audit_id) +); + +-- ============================================================================ +-- Step 4: Add User Query Projection +-- ============================================================================ +-- This projection is optimized for user-specific queries within time ranges. +-- +-- Sort order: (timestamp, user, api_group, resource) +-- Use cases: +-- - "What did alice@example.com do in the last 24 hours?" +-- - "All events by system:serviceaccount:kube-system:default" +-- - User-specific verb/resource filtering +-- +-- ClickHouse automatically chooses the best projection for each query based +-- on the WHERE clause filters. + +ALTER TABLE audit.events +ADD PROJECTION user_query_projection +( + SELECT * + ORDER BY (timestamp, user, api_group, resource) +); diff --git a/pkg/apis/activity/v1alpha1/types.go b/pkg/apis/activity/v1alpha1/types.go index 761f92f..bbe0e46 100644 --- a/pkg/apis/activity/v1alpha1/types.go +++ b/pkg/apis/activity/v1alpha1/types.go @@ -80,7 +80,6 @@ type AuditLogQuerySpec struct { // Available Fields: // verb - API action: get, list, create, update, patch, delete, watch // auditID - unique event identifier - // stage - request phase: RequestReceived, ResponseStarted, ResponseComplete, Panic // stageTimestamp - when this stage occurred (RFC3339 timestamp) // user.username - who made the request (user or service account) // responseStatus.code - HTTP response code (200, 201, 404, 500, etc.) diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 5c9a8dc..97123e5 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -468,7 +468,7 @@ func schema_pkg_apis_activity_v1alpha1_AuditLogQuerySpec(ref common.ReferenceCal }, "filter": { SchemaProps: spec.SchemaProps{ - Description: "Filter narrows results using CEL (Common Expression Language). Leave empty to get all events.\n\nAvailable Fields:\n verb - API action: get, list, create, update, patch, delete, watch\n auditID - unique event identifier\n stage - request phase: RequestReceived, ResponseStarted, ResponseComplete, Panic\n stageTimestamp - when this stage occurred (RFC3339 timestamp)\n user.username - who made the request (user or service account)\n responseStatus.code - HTTP response code (200, 201, 404, 500, etc.)\n objectRef.namespace - target resource namespace\n objectRef.resource - resource type (pods, deployments, secrets, configmaps, etc.)\n objectRef.name - specific resource name\n\nOperators: ==, !=, <, >, <=, >=, &&, ||, in String Functions: startsWith(), endsWith(), contains()\n\nCommon Patterns:\n \"verb == 'delete'\" - All deletions\n \"objectRef.namespace == 'production'\" - Activity in production namespace\n \"verb in ['create', 'update', 'delete', 'patch']\" - All write operations\n \"responseStatus.code >= 400\" - Failed requests\n \"user.username.startsWith('system:serviceaccount:')\" - Service account activity\n \"objectRef.resource == 'secrets'\" - Secret access\n \"verb == 'delete' && objectRef.namespace == 'production'\" - Production deletions\n\nNote: Use single quotes for strings. Field names are case-sensitive. CEL reference: https://cel.dev", + Description: "Filter narrows results using CEL (Common Expression Language). Leave empty to get all events.\n\nAvailable Fields:\n verb - API action: get, list, create, update, patch, delete, watch\n auditID - unique event identifier\n stageTimestamp - when this stage occurred (RFC3339 timestamp)\n user.username - who made the request (user or service account)\n responseStatus.code - HTTP response code (200, 201, 404, 500, etc.)\n objectRef.namespace - target resource namespace\n objectRef.resource - resource type (pods, deployments, secrets, configmaps, etc.)\n objectRef.name - specific resource name\n\nOperators: ==, !=, <, >, <=, >=, &&, ||, in String Functions: startsWith(), endsWith(), contains()\n\nCommon Patterns:\n \"verb == 'delete'\" - All deletions\n \"objectRef.namespace == 'production'\" - Activity in production namespace\n \"verb in ['create', 'update', 'delete', 'patch']\" - All write operations\n \"responseStatus.code >= 400\" - Failed requests\n \"user.username.startsWith('system:serviceaccount:')\" - Service account activity\n \"objectRef.resource == 'secrets'\" - Secret access\n \"verb == 'delete' && objectRef.namespace == 'production'\" - Production deletions\n\nNote: Use single quotes for strings. Field names are case-sensitive. CEL reference: https://cel.dev", Type: []string{"string"}, Format: "", }, diff --git a/test/load/src/query-load-test.js b/test/load/src/query-load-test.js index f31b5e4..910debf 100644 --- a/test/load/src/query-load-test.js +++ b/test/load/src/query-load-test.js @@ -115,64 +115,93 @@ if (TLS_CERT_FILE && TLS_KEY_FILE) { } // Query templates with different complexity levels +// Optimized for platform-wide queries that leverage platform_query_projection const queryTemplates = [ - // Simple queries + // Platform-wide queries using api_group and resource (leverage platform_query_projection) + // These should use: ORDER BY (timestamp, api_group, resource, audit_id) { - name: 'simple_verb_filter', - filter: "verb == 'create'", + name: 'platform_core_pods', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'", // Core API pods limit: 100, }, { - name: 'simple_namespace_filter', - filter: "objectRef.namespace == 'default'", + name: 'platform_apps_deployments', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments'", limit: 100, }, { - name: 'simple_resource_filter', - filter: "objectRef.resource == 'pods'", + name: 'platform_apps_statefulsets', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'statefulsets'", limit: 100, }, - // Medium complexity queries { - name: 'medium_combined_filter', - filter: "verb == 'delete' && objectRef.namespace == 'default'", + name: 'platform_batch_jobs', + filter: "objectRef.apiGroup == 'batch' && objectRef.resource == 'jobs'", limit: 100, }, { - name: 'medium_multi_verb', - filter: "verb in ['create', 'update', 'delete']", + name: 'platform_core_services', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'services'", limit: 100, }, + + // Platform queries with verb filters (still leverages projection) + { + name: 'platform_pod_creates', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb == 'create'", + limit: 100, + }, + { + name: 'platform_deployment_updates', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && verb in ['update', 'patch']", + limit: 100, + }, + { + name: 'platform_secret_access', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'secrets' && verb in ['get', 'list']", + limit: 100, + }, + + // Platform queries by API group only { - name: 'medium_user_filter', - filter: "user.username.startsWith('system:') && verb == 'get'", + name: 'platform_apps_apigroup', + filter: "objectRef.apiGroup == 'apps'", // All apps API group resources limit: 100, }, - // Complex queries { - name: 'complex_multi_condition', - filter: "objectRef.namespace in ['default', 'kube-system'] && objectRef.resource == 'deployments' && verb in ['create', 'update']", + name: 'platform_batch_apigroup', + filter: "objectRef.apiGroup == 'batch'", // All batch API group resources limit: 100, }, + + // Complex platform queries (3+ conditions) { - name: 'complex_timestamp_range', - filter: `stageTimestamp >= timestamp('2024-01-01T00:00:00Z') && verb == 'delete'`, + name: 'platform_deployment_errors', + filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && responseStatus.code >= 400", limit: 50, }, { - name: 'complex_secrets_audit', - filter: "objectRef.resource == 'secrets' && stage == 'ResponseComplete' && verb in ['get', 'list']", + name: 'platform_pod_lifecycle', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb in ['create', 'delete']", limit: 100, }, - // Pagination queries (smaller limits for testing pagination) + + // Resource-only queries (should still benefit from projection) { - name: 'pagination_small', - filter: "verb == 'get'", - limit: 10, + name: 'simple_resource_configmaps', + filter: "objectRef.resource == 'configmaps'", + limit: 100, }, { - name: 'pagination_medium', - filter: "objectRef.namespace == 'default'", + name: 'simple_resource_namespaces', + filter: "objectRef.resource == 'namespaces'", + limit: 100, + }, + + // Pagination test for platform queries + { + name: 'pagination_platform_pods', + filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'", limit: 25, }, ]; @@ -323,22 +352,31 @@ function executeQueryWithPagination(template, maxPages = 3) { // Main test function export default function() { - // Randomly select a query template (weighted towards simpler queries) + // Randomly select a query template + // Weighted towards platform-wide api_group+resource queries to stress-test projections const rand = Math.random(); let templateIndex; - if (rand < 0.5) { - // 50% simple queries - templateIndex = Math.floor(Math.random() * 3); - } else if (rand < 0.8) { - // 30% medium queries - templateIndex = 3 + Math.floor(Math.random() * 3); + if (rand < 0.40) { + // 40% - Basic platform queries by api_group + resource (indices 0-4) + // These directly test the platform_query_projection + templateIndex = Math.floor(Math.random() * 5); + } else if (rand < 0.70) { + // 30% - Platform queries with verb filters (indices 5-7) + // Still leverage projection but add filtering + templateIndex = 5 + Math.floor(Math.random() * 3); + } else if (rand < 0.85) { + // 15% - API group-only queries (indices 8-9) + templateIndex = 8 + Math.floor(Math.random() * 2); } else if (rand < 0.95) { - // 15% complex queries - templateIndex = 6 + Math.floor(Math.random() * 3); + // 10% - Complex platform queries (indices 10-11) + templateIndex = 10 + Math.floor(Math.random() * 2); + } else if (rand < 0.98) { + // 3% - Resource-only queries (indices 12-13) + templateIndex = 12 + Math.floor(Math.random() * 2); } else { - // 5% pagination queries - templateIndex = 9 + Math.floor(Math.random() * 2); + // 2% - Pagination queries (index 14) + templateIndex = 14; } const template = queryTemplates[templateIndex];