Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build-apiserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

jobs:
validate-kustomize:
uses: datum-cloud/actions/.github/workflows/validate-kustomize.yaml@v1.7.2
uses: datum-cloud/actions/.github/workflows/validate-kustomize.yaml@v1.9.0

publish-container-image:
# No point in trying to build the container image if the deployment
Expand All @@ -19,7 +19,7 @@ jobs:
contents: read
packages: write
attestations: write
uses: datum-cloud/actions/.github/workflows/publish-docker.yaml@v1.7.2
uses: datum-cloud/actions/.github/workflows/publish-docker.yaml@v1.9.0
with:
image-name: activity
secrets: inherit
Expand All @@ -35,7 +35,7 @@ jobs:
id-token: write
contents: read
packages: write
uses: datum-cloud/actions/.github/workflows/publish-kustomize-bundle.yaml@v1.7.2
uses: datum-cloud/actions/.github/workflows/publish-kustomize-bundle.yaml@v1.9.0
with:
bundle-name: ghcr.io/datum-cloud/activity-kustomize
bundle-path: config
Expand Down
83 changes: 67 additions & 16 deletions config/components/clickhouse-migrations/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,10 @@ data:

001_initial_schema.sql: |
-- Migration: 001_initial_schema
-- Description: High-volume multi-tenant audit events table
-- Description: High-volume multi-tenant audit events table with projections for
-- platform-wide querying and user-specific querying.
-- Author: Scot Wells <[email protected]>
-- Date: 2025-12-11
-- Strategy: Simplified scope-based partitioning with hash-based distribution

CREATE DATABASE IF NOT EXISTS audit;

Expand Down Expand Up @@ -358,9 +358,6 @@ data:
audit_id UUID MATERIALIZED
toUUIDOrZero(coalesce(JSONExtractString(event_json, 'auditID'), '')),

stage LowCardinality(String) MATERIALIZED
coalesce(JSONExtractString(event_json, 'stage'), ''),

-- Common filters
verb LowCardinality(String) MATERIALIZED
coalesce(JSONExtractString(event_json, 'verb'), ''),
Expand All @@ -380,23 +377,77 @@ data:
status_code UInt16 MATERIALIZED
toUInt16OrZero(JSONExtractString(event_json, 'responseStatus', 'code')),

-- Simple bucketing to spread parts/merges
bucket UInt8 MATERIALIZED (cityHash64(audit_id) % 16),
-- ========================================================================
-- Skip Indexes: Optimized for different query patterns
-- ========================================================================

-- Timestamp minmax index for time range queries
INDEX idx_timestamp_minmax timestamp TYPE minmax GRANULARITY 4,

-- Bloom filters with GRANULARITY 1 for high precision (critical filters)
INDEX idx_verb_set verb TYPE set(10) GRANULARITY 4,
INDEX idx_resource_bloom resource TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_verb_resource_bloom (verb, resource) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_user_bloom user TYPE bloom_filter(0.001) GRANULARITY 1,

-- Minimal skip indexes
INDEX bf_user user TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX bf_scope (scope_type, scope_name) TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX bf_audit_id audit_id TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 4
-- Set indexes for low-cardinality columns
INDEX idx_status_code_set status_code TYPE set(100) GRANULARITY 4,
-- Minmax index for status_code range queries
INDEX idx_status_code_minmax status_code TYPE minmax GRANULARITY 4,
)
ENGINE = MergeTree
PARTITION BY (toYYYYMM(timestamp), bucket)
ORDER BY (timestamp, scope_type, scope_name, user, audit_id, stage)
ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMMDD(timestamp)
-- Primary key optimized for tenant-scoped queries
-- Deduplication occurs on the full ORDER BY key during merges
ORDER BY (timestamp, scope_type, scope_name, user, audit_id)

-- Move parts to cold S3-backed volume after 90 days
TTL timestamp + INTERVAL 90 DAY TO VOLUME 'cold'

SETTINGS
storage_policy = 'hot_cold',
ttl_only_drop_parts = 1;
ttl_only_drop_parts = 1,
deduplicate_merge_projection_mode = 'rebuild';

-- ============================================================================
-- Step 3: Add Platform Query Projection
-- ============================================================================
-- This projection is optimized for platform-wide queries that filter by
-- timestamp, api_group, and resource (common for cross-tenant analytics).
--
-- Sort order: (timestamp, api_group, resource, audit_id)
-- Use cases:
-- - "All events for 'apps' API group and 'deployments' resource in last 24 hours"
-- - "All events for core API 'pods' resource"
-- - Platform-wide verb/resource filtering
--

ALTER TABLE audit.events
ADD PROJECTION platform_query_projection
(
SELECT *
ORDER BY (timestamp, api_group, resource, audit_id)
);

-- ============================================================================
-- Step 4: Add User Query Projection
-- ============================================================================
-- This projection is optimized for user-specific queries within time ranges.
--
-- Sort order: (timestamp, user, api_group, resource)
-- Use cases:
-- - "What did [email protected] do in the last 24 hours?"
-- - "All events by system:serviceaccount:kube-system:default"
-- - User-specific verb/resource filtering
--
-- ClickHouse automatically chooses the best projection for each query based
-- on the WHERE clause filters.

ALTER TABLE audit.events
ADD PROJECTION user_query_projection
(
SELECT *
ORDER BY (timestamp, user, api_group, resource)
);

114 changes: 76 additions & 38 deletions config/components/k6-performance-tests/generated/query-load-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,64 +118,93 @@ if (TLS_CERT_FILE && TLS_KEY_FILE) {
}

// Query templates with different complexity levels
// Optimized for platform-wide queries that leverage platform_query_projection
const queryTemplates = [
// Simple queries
// Platform-wide queries using api_group and resource (leverage platform_query_projection)
// These should use: ORDER BY (timestamp, api_group, resource, audit_id)
{
name: 'simple_verb_filter',
filter: "verb == 'create'",
name: 'platform_core_pods',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'", // Core API pods
limit: 100,
},
{
name: 'simple_namespace_filter',
filter: "objectRef.namespace == 'default'",
name: 'platform_apps_deployments',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments'",
limit: 100,
},
{
name: 'simple_resource_filter',
filter: "objectRef.resource == 'pods'",
name: 'platform_apps_statefulsets',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'statefulsets'",
limit: 100,
},
// Medium complexity queries
{
name: 'medium_combined_filter',
filter: "verb == 'delete' && objectRef.namespace == 'default'",
name: 'platform_batch_jobs',
filter: "objectRef.apiGroup == 'batch' && objectRef.resource == 'jobs'",
limit: 100,
},
{
name: 'medium_multi_verb',
filter: "verb in ['create', 'update', 'delete']",
name: 'platform_core_services',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'services'",
limit: 100,
},

// Platform queries with verb filters (still leverages projection)
{
name: 'platform_pod_creates',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb == 'create'",
limit: 100,
},
{
name: 'platform_deployment_updates',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && verb in ['update', 'patch']",
limit: 100,
},
{
name: 'platform_secret_access',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'secrets' && verb in ['get', 'list']",
limit: 100,
},

// Platform queries by API group only
{
name: 'medium_user_filter',
filter: "user.username.startsWith('system:') && verb == 'get'",
name: 'platform_apps_apigroup',
filter: "objectRef.apiGroup == 'apps'", // All apps API group resources
limit: 100,
},
// Complex queries
{
name: 'complex_multi_condition',
filter: "objectRef.namespace in ['default', 'kube-system'] && objectRef.resource == 'deployments' && verb in ['create', 'update']",
name: 'platform_batch_apigroup',
filter: "objectRef.apiGroup == 'batch'", // All batch API group resources
limit: 100,
},

// Complex platform queries (3+ conditions)
{
name: 'complex_timestamp_range',
filter: `stageTimestamp >= timestamp('2024-01-01T00:00:00Z') && verb == 'delete'`,
name: 'platform_deployment_errors',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && responseStatus.code >= 400",
limit: 50,
},
{
name: 'complex_secrets_audit',
filter: "objectRef.resource == 'secrets' && stage == 'ResponseComplete' && verb in ['get', 'list']",
name: 'platform_pod_lifecycle',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb in ['create', 'delete'] && stage == 'ResponseComplete'",
limit: 100,
},
// Pagination queries (smaller limits for testing pagination)

// Resource-only queries (should still benefit from projection)
{
name: 'pagination_small',
filter: "verb == 'get'",
limit: 10,
name: 'simple_resource_configmaps',
filter: "objectRef.resource == 'configmaps'",
limit: 100,
},
{
name: 'pagination_medium',
filter: "objectRef.namespace == 'default'",
name: 'simple_resource_namespaces',
filter: "objectRef.resource == 'namespaces'",
limit: 100,
},

// Pagination test for platform queries
{
name: 'pagination_platform_pods',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'",
limit: 25,
},
];
Expand Down Expand Up @@ -326,22 +355,31 @@ function executeQueryWithPagination(template, maxPages = 3) {

// Main test function
export default function() {
// Randomly select a query template (weighted towards simpler queries)
// Randomly select a query template
// Weighted towards platform-wide api_group+resource queries to stress-test projections
const rand = Math.random();
let templateIndex;

if (rand < 0.5) {
// 50% simple queries
templateIndex = Math.floor(Math.random() * 3);
} else if (rand < 0.8) {
// 30% medium queries
templateIndex = 3 + Math.floor(Math.random() * 3);
if (rand < 0.40) {
// 40% - Basic platform queries by api_group + resource (indices 0-4)
// These directly test the platform_query_projection
templateIndex = Math.floor(Math.random() * 5);
} else if (rand < 0.70) {
// 30% - Platform queries with verb filters (indices 5-7)
// Still leverage projection but add filtering
templateIndex = 5 + Math.floor(Math.random() * 3);
} else if (rand < 0.85) {
// 15% - API group-only queries (indices 8-9)
templateIndex = 8 + Math.floor(Math.random() * 2);
} else if (rand < 0.95) {
// 15% complex queries
templateIndex = 6 + Math.floor(Math.random() * 3);
// 10% - Complex platform queries (indices 10-11)
templateIndex = 10 + Math.floor(Math.random() * 2);
} else if (rand < 0.98) {
// 3% - Resource-only queries (indices 12-13)
templateIndex = 12 + Math.floor(Math.random() * 2);
} else {
// 5% pagination queries
templateIndex = 9 + Math.floor(Math.random() * 2);
// 2% - Pagination queries (index 14)
templateIndex = 14;
}

const template = queryTemplates[templateIndex];
Expand Down
6 changes: 5 additions & 1 deletion config/components/nats-streams/audit-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ spec:
allowDirect: true

# Deduplication window - prevents duplicate messages
duplicateWindow: 2m
# Extended to 10 minutes to handle webhook retries with exponential backoff
# and Vector restarts. NATS uses message IDs (set to Kubernetes auditID) to
# detect duplicates within this window, providing pipeline-level de-duplication
# before events reach ClickHouse.
duplicateWindow: 10m

# Maximum number of consumers
maxConsumers: 10
Expand Down
9 changes: 9 additions & 0 deletions config/components/vector-sidecar/vector-sidecar-hr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ spec:
healthcheck:
enabled: true

jetstream:
enabled: true
headers:
# NATS message ID for de-duplication
# Uses the Kubernetes auditID as the NATS message ID to enable
# JetStream's duplicate detection within the duplicateWindow (10m)
# This prevents duplicate events from webhook retries or Vector restarts
message_id: "{{ .auditID }}"

# Buffer for durability
# 10GB disk buffer to survive NATS outages
buffer:
Expand Down
9 changes: 5 additions & 4 deletions internal/cel/cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ func TestCELFilterWorkflow(t *testing.T) {
wantErr: true,
},
{
name: "invalid - unavailable field for SQL",
filter: "objectRef.apiGroup == 'apps'",
wantErr: true,
name: "objectRef.apiGroup is available",
filter: "objectRef.apiGroup == 'apps'",
wantSQL: "api_group = {arg1}",
wantArgCount: 1,
wantErr: false,
},
}

Expand Down Expand Up @@ -264,7 +266,6 @@ func TestEnvironment(t *testing.T) {
validExpressions := []string{
"auditID == 'test'",
"verb == 'delete'",
"stage == 'ResponseComplete'",
"stageTimestamp > timestamp('2024-01-01T00:00:00Z')",
"objectRef.namespace == 'default'",
"objectRef.resource == 'pods'",
Expand Down
2 changes: 1 addition & 1 deletion internal/cel/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func formatFilterError(err error) string {
msg.WriteString(fmt.Sprintf("Invalid filter: %s", errMsg))
}

msg.WriteString(". Available fields: auditID, verb, stage, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code")
msg.WriteString(". Available fields: auditID, verb, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code")
msg.WriteString(". See https://cel.dev for CEL syntax")

return msg.String()
Expand Down
Loading