Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build-apiserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

jobs:
validate-kustomize:
uses: datum-cloud/actions/.github/workflows/validate-kustomize.yaml@v1.7.2
uses: datum-cloud/actions/.github/workflows/validate-kustomize.yaml@v1.9.0

publish-container-image:
# No point in trying to build the container image if the deployment
Expand All @@ -19,7 +19,7 @@ jobs:
contents: read
packages: write
attestations: write
uses: datum-cloud/actions/.github/workflows/publish-docker.yaml@v1.7.2
uses: datum-cloud/actions/.github/workflows/publish-docker.yaml@v1.9.0
with:
image-name: activity
secrets: inherit
Expand All @@ -35,7 +35,7 @@ jobs:
id-token: write
contents: read
packages: write
uses: datum-cloud/actions/.github/workflows/publish-kustomize-bundle.yaml@v1.7.2
uses: datum-cloud/actions/.github/workflows/publish-kustomize-bundle.yaml@v1.9.0
with:
bundle-name: ghcr.io/datum-cloud/activity-kustomize
bundle-path: config
Expand Down
85 changes: 69 additions & 16 deletions config/components/clickhouse-migrations/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,10 @@ data:

001_initial_schema.sql: |
-- Migration: 001_initial_schema
-- Description: High-volume multi-tenant audit events table
-- Description: High-volume multi-tenant audit events table with projections for
-- platform-wide querying and user-specific querying.
-- Author: Scot Wells <[email protected]>
-- Date: 2025-12-11
-- Strategy: Simplified scope-based partitioning with hash-based distribution

CREATE DATABASE IF NOT EXISTS audit;

Expand Down Expand Up @@ -358,9 +358,6 @@ data:
audit_id UUID MATERIALIZED
toUUIDOrZero(coalesce(JSONExtractString(event_json, 'auditID'), '')),

stage LowCardinality(String) MATERIALIZED
coalesce(JSONExtractString(event_json, 'stage'), ''),

-- Common filters
verb LowCardinality(String) MATERIALIZED
coalesce(JSONExtractString(event_json, 'verb'), ''),
Expand All @@ -380,23 +377,79 @@ data:
status_code UInt16 MATERIALIZED
toUInt16OrZero(JSONExtractString(event_json, 'responseStatus', 'code')),

-- Simple bucketing to spread parts/merges
bucket UInt8 MATERIALIZED (cityHash64(audit_id) % 16),
-- ========================================================================
-- Skip Indexes: Optimized for different query patterns
-- ========================================================================

-- Timestamp minmax index for time range queries
INDEX idx_timestamp_minmax timestamp TYPE minmax GRANULARITY 4,

-- Bloom filters with GRANULARITY 1 for high precision (critical filters)
INDEX idx_verb_set verb TYPE set(10) GRANULARITY 4,
INDEX idx_resource_bloom resource TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_verb_resource_bloom (verb, resource) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_user_bloom user TYPE bloom_filter(0.001) GRANULARITY 1,

-- Minimal skip indexes
INDEX bf_user user TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX bf_scope (scope_type, scope_name) TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX bf_audit_id audit_id TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX bf_api_resource (api_group, resource) TYPE bloom_filter(0.01) GRANULARITY 4
-- Set indexes for low-cardinality columns
INDEX idx_status_code_set status_code TYPE set(100) GRANULARITY 4,
-- Minmax index for status_code range queries
INDEX idx_status_code_minmax status_code TYPE minmax GRANULARITY 4,
)
ENGINE = MergeTree
PARTITION BY (toYYYYMM(timestamp), bucket)
ORDER BY (timestamp, scope_type, scope_name, user, audit_id, stage)
ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMMDD(timestamp)
-- Primary key optimized for tenant-scoped queries with hour bucketing
-- Hour bucketing improves compression, data locality, and deduplication efficiency
-- Deduplication occurs on the full ORDER BY key during merges
ORDER BY (toStartOfHour(timestamp), scope_type, scope_name, user, audit_id, timestamp)
PRIMARY KEY (toStartOfHour(timestamp), scope_type, scope_name, user, audit_id)

-- Move parts to cold S3-backed volume after 90 days
TTL timestamp + INTERVAL 90 DAY TO VOLUME 'cold'

SETTINGS
storage_policy = 'hot_cold',
ttl_only_drop_parts = 1;
ttl_only_drop_parts = 1,
deduplicate_merge_projection_mode = 'rebuild';

-- ============================================================================
-- Step 3: Add Platform Query Projection
-- ============================================================================
-- This projection is optimized for platform-wide queries that filter by
-- timestamp, api_group, and resource (common for cross-tenant analytics).
--
-- Sort order: (timestamp, api_group, resource, audit_id)
-- Use cases:
-- - "All events for 'apps' API group and 'deployments' resource in last 24 hours"
-- - "All events for core API 'pods' resource"
-- - Platform-wide verb/resource filtering
--

ALTER TABLE audit.events
ADD PROJECTION platform_query_projection
(
SELECT *
ORDER BY (timestamp, api_group, resource, audit_id)
);

-- ============================================================================
-- Step 4: Add User Query Projection
-- ============================================================================
-- This projection is optimized for user-specific queries within time ranges.
--
-- Sort order: (timestamp, user, api_group, resource)
-- Use cases:
-- - "What did [email protected] do in the last 24 hours?"
-- - "All events by system:serviceaccount:kube-system:default"
-- - User-specific verb/resource filtering
--
-- ClickHouse automatically chooses the best projection for each query based
-- on the WHERE clause filters.

ALTER TABLE audit.events
ADD PROJECTION user_query_projection
(
SELECT *
ORDER BY (timestamp, user, api_group, resource)
);

114 changes: 76 additions & 38 deletions config/components/k6-performance-tests/generated/query-load-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,64 +118,93 @@ if (TLS_CERT_FILE && TLS_KEY_FILE) {
}

// Query templates with different complexity levels
// Optimized for platform-wide queries that leverage platform_query_projection
const queryTemplates = [
// Simple queries
// Platform-wide queries using api_group and resource (leverage platform_query_projection)
// These should use: ORDER BY (timestamp, api_group, resource, audit_id)
{
name: 'simple_verb_filter',
filter: "verb == 'create'",
name: 'platform_core_pods',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'", // Core API pods
limit: 100,
},
{
name: 'simple_namespace_filter',
filter: "objectRef.namespace == 'default'",
name: 'platform_apps_deployments',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments'",
limit: 100,
},
{
name: 'simple_resource_filter',
filter: "objectRef.resource == 'pods'",
name: 'platform_apps_statefulsets',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'statefulsets'",
limit: 100,
},
// Medium complexity queries
{
name: 'medium_combined_filter',
filter: "verb == 'delete' && objectRef.namespace == 'default'",
name: 'platform_batch_jobs',
filter: "objectRef.apiGroup == 'batch' && objectRef.resource == 'jobs'",
limit: 100,
},
{
name: 'medium_multi_verb',
filter: "verb in ['create', 'update', 'delete']",
name: 'platform_core_services',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'services'",
limit: 100,
},

// Platform queries with verb filters (still leverages projection)
{
name: 'platform_pod_creates',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb == 'create'",
limit: 100,
},
{
name: 'platform_deployment_updates',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && verb in ['update', 'patch']",
limit: 100,
},
{
name: 'platform_secret_access',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'secrets' && verb in ['get', 'list']",
limit: 100,
},

// Platform queries by API group only
{
name: 'medium_user_filter',
filter: "user.username.startsWith('system:') && verb == 'get'",
name: 'platform_apps_apigroup',
filter: "objectRef.apiGroup == 'apps'", // All apps API group resources
limit: 100,
},
// Complex queries
{
name: 'complex_multi_condition',
filter: "objectRef.namespace in ['default', 'kube-system'] && objectRef.resource == 'deployments' && verb in ['create', 'update']",
name: 'platform_batch_apigroup',
filter: "objectRef.apiGroup == 'batch'", // All batch API group resources
limit: 100,
},

// Complex platform queries (3+ conditions)
{
name: 'complex_timestamp_range',
filter: `stageTimestamp >= timestamp('2024-01-01T00:00:00Z') && verb == 'delete'`,
name: 'platform_deployment_errors',
filter: "objectRef.apiGroup == 'apps' && objectRef.resource == 'deployments' && responseStatus.code >= 400",
limit: 50,
},
{
name: 'complex_secrets_audit',
filter: "objectRef.resource == 'secrets' && stage == 'ResponseComplete' && verb in ['get', 'list']",
name: 'platform_pod_lifecycle',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods' && verb in ['create', 'delete']",
limit: 100,
},
// Pagination queries (smaller limits for testing pagination)

// Resource-only queries (should still benefit from projection)
{
name: 'pagination_small',
filter: "verb == 'get'",
limit: 10,
name: 'simple_resource_configmaps',
filter: "objectRef.resource == 'configmaps'",
limit: 100,
},
{
name: 'pagination_medium',
filter: "objectRef.namespace == 'default'",
name: 'simple_resource_namespaces',
filter: "objectRef.resource == 'namespaces'",
limit: 100,
},

// Pagination test for platform queries
{
name: 'pagination_platform_pods',
filter: "objectRef.apiGroup == '' && objectRef.resource == 'pods'",
limit: 25,
},
];
Expand Down Expand Up @@ -326,22 +355,31 @@ function executeQueryWithPagination(template, maxPages = 3) {

// Main test function
export default function() {
// Randomly select a query template (weighted towards simpler queries)
// Randomly select a query template
// Weighted towards platform-wide api_group+resource queries to stress-test projections
const rand = Math.random();
let templateIndex;

if (rand < 0.5) {
// 50% simple queries
templateIndex = Math.floor(Math.random() * 3);
} else if (rand < 0.8) {
// 30% medium queries
templateIndex = 3 + Math.floor(Math.random() * 3);
if (rand < 0.40) {
// 40% - Basic platform queries by api_group + resource (indices 0-4)
// These directly test the platform_query_projection
templateIndex = Math.floor(Math.random() * 5);
} else if (rand < 0.70) {
// 30% - Platform queries with verb filters (indices 5-7)
// Still leverage projection but add filtering
templateIndex = 5 + Math.floor(Math.random() * 3);
} else if (rand < 0.85) {
// 15% - API group-only queries (indices 8-9)
templateIndex = 8 + Math.floor(Math.random() * 2);
} else if (rand < 0.95) {
// 15% complex queries
templateIndex = 6 + Math.floor(Math.random() * 3);
// 10% - Complex platform queries (indices 10-11)
templateIndex = 10 + Math.floor(Math.random() * 2);
} else if (rand < 0.98) {
// 3% - Resource-only queries (indices 12-13)
templateIndex = 12 + Math.floor(Math.random() * 2);
} else {
// 5% pagination queries
templateIndex = 9 + Math.floor(Math.random() * 2);
// 2% - Pagination queries (index 14)
templateIndex = 14;
}

const template = queryTemplates[templateIndex];
Expand Down
6 changes: 5 additions & 1 deletion config/components/nats-streams/audit-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ spec:
allowDirect: true

# Deduplication window - prevents duplicate messages
duplicateWindow: 2m
# Extended to 10 minutes to handle webhook retries with exponential backoff
# and Vector restarts. NATS uses message IDs (set to Kubernetes auditID) to
# detect duplicates within this window, providing pipeline-level de-duplication
# before events reach ClickHouse.
duplicateWindow: 10m

# Maximum number of consumers
maxConsumers: 10
Expand Down
9 changes: 9 additions & 0 deletions config/components/vector-sidecar/vector-sidecar-hr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ spec:
healthcheck:
enabled: true

jetstream:
enabled: true
headers:
# NATS message ID for de-duplication
# Uses the Kubernetes auditID as the NATS message ID to enable
# JetStream's duplicate detection within the duplicateWindow (10m)
# This prevents duplicate events from webhook retries or Vector restarts
message_id: "{{ .auditID }}"

# Buffer for durability
# 10GB disk buffer to survive NATS outages
buffer:
Expand Down
2 changes: 1 addition & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ _Appears in:_
| --- | --- | --- | --- |
| `startTime` _string_ | StartTime is the beginning of your search window (inclusive).<br /><br />Format Options:<br />- Relative: "now-30d", "now-2h", "now-30m" (units: s, m, h, d, w)<br /> Use for dashboards and recurring queries - they adjust automatically.<br />- Absolute: "2024-01-01T00:00:00Z" (RFC3339 with timezone)<br /> Use for historical analysis of specific time periods.<br /><br />Examples:<br /> "now-30d" → 30 days ago<br /> "2024-06-15T14:30:00-05:00" → specific time with timezone offset | | |
| `endTime` _string_ | EndTime is the end of your search window (exclusive).<br /><br />Uses the same formats as StartTime. Commonly "now" for current moment.<br />Must be greater than StartTime.<br /><br />Examples:<br /> "now" → current time<br /> "2024-01-02T00:00:00Z" → specific end point | | |
| `filter` _string_ | Filter narrows results using CEL (Common Expression Language). Leave empty to get all events.<br /><br />Available Fields:<br /> verb - API action: get, list, create, update, patch, delete, watch<br /> auditID - unique event identifier<br /> stage - request phase: RequestReceived, ResponseStarted, ResponseComplete, Panic<br /> stageTimestamp - when this stage occurred (RFC3339 timestamp)<br /> user.username - who made the request (user or service account)<br /> responseStatus.code - HTTP response code (200, 201, 404, 500, etc.)<br /> objectRef.namespace - target resource namespace<br /> objectRef.resource - resource type (pods, deployments, secrets, configmaps, etc.)<br /> objectRef.name - specific resource name<br /><br />Operators: ==, !=, <, >, <=, >=, &&, \|\|, in<br />String Functions: startsWith(), endsWith(), contains()<br /><br />Common Patterns:<br /> "verb == 'delete'" - All deletions<br /> "objectRef.namespace == 'production'" - Activity in production namespace<br /> "verb in ['create', 'update', 'delete', 'patch']" - All write operations<br /> "responseStatus.code >= 400" - Failed requests<br /> "user.username.startsWith('system:serviceaccount:')" - Service account activity<br /> "objectRef.resource == 'secrets'" - Secret access<br /> "verb == 'delete' && objectRef.namespace == 'production'" - Production deletions<br /><br />Note: Use single quotes for strings. Field names are case-sensitive.<br />CEL reference: https://cel.dev | | |
| `filter` _string_ | Filter narrows results using CEL (Common Expression Language). Leave empty to get all events.<br /><br />Available Fields:<br /> verb - API action: get, list, create, update, patch, delete, watch<br /> auditID - unique event identifier<br /> stageTimestamp - when this stage occurred (RFC3339 timestamp)<br /> user.username - who made the request (user or service account)<br /> responseStatus.code - HTTP response code (200, 201, 404, 500, etc.)<br /> objectRef.namespace - target resource namespace<br /> objectRef.resource - resource type (pods, deployments, secrets, configmaps, etc.)<br /> objectRef.name - specific resource name<br /><br />Operators: ==, !=, <, >, <=, >=, &&, \|\|, in<br />String Functions: startsWith(), endsWith(), contains()<br /><br />Common Patterns:<br /> "verb == 'delete'" - All deletions<br /> "objectRef.namespace == 'production'" - Activity in production namespace<br /> "verb in ['create', 'update', 'delete', 'patch']" - All write operations<br /> "responseStatus.code >= 400" - Failed requests<br /> "user.username.startsWith('system:serviceaccount:')" - Service account activity<br /> "objectRef.resource == 'secrets'" - Secret access<br /> "verb == 'delete' && objectRef.namespace == 'production'" - Production deletions<br /><br />Note: Use single quotes for strings. Field names are case-sensitive.<br />CEL reference: https://cel.dev | | |
| `limit` _integer_ | Limit sets the maximum number of results per page.<br />Default: 100, Maximum: 1000.<br /><br />Use smaller values (10-50) for exploration, larger (500-1000) for data collection.<br />Use continue to fetch additional pages. | | |
| `continue` _string_ | Continue is the pagination cursor for fetching additional pages.<br /><br />Leave empty for the first page. If status.continue is non-empty after a query,<br />copy that value here in a new query with identical parameters to get the next page.<br />Repeat until status.continue is empty.<br /><br />Important: Keep all other parameters (startTime, endTime, filter, limit) identical<br />across paginated requests. The cursor is opaque - copy it exactly without modification. | | |

Expand Down
2 changes: 1 addition & 1 deletion docs/components/apiserver-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Manages all ClickHouse interactions:
Translates CEL expressions to ClickHouse SQL.

**Supported fields:**
- `auditID`, `verb`, `stage`, `stageTimestamp`
- `auditID`, `verb`, `stageTimestamp`
- `objectRef.{namespace,resource,name}`
- `user.username`
- `responseStatus.code`
Expand Down
9 changes: 5 additions & 4 deletions internal/cel/cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ func TestCELFilterWorkflow(t *testing.T) {
wantErr: true,
},
{
name: "invalid - unavailable field for SQL",
filter: "objectRef.apiGroup == 'apps'",
wantErr: true,
name: "objectRef.apiGroup is available",
filter: "objectRef.apiGroup == 'apps'",
wantSQL: "api_group = {arg1}",
wantArgCount: 1,
wantErr: false,
},
}

Expand Down Expand Up @@ -264,7 +266,6 @@ func TestEnvironment(t *testing.T) {
validExpressions := []string{
"auditID == 'test'",
"verb == 'delete'",
"stage == 'ResponseComplete'",
"stageTimestamp > timestamp('2024-01-01T00:00:00Z')",
"objectRef.namespace == 'default'",
"objectRef.resource == 'pods'",
Expand Down
2 changes: 1 addition & 1 deletion internal/cel/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func formatFilterError(err error) string {
msg.WriteString(fmt.Sprintf("Invalid filter: %s", errMsg))
}

msg.WriteString(". Available fields: auditID, verb, stage, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code")
msg.WriteString(". Available fields: auditID, verb, stageTimestamp, objectRef.namespace, objectRef.resource, objectRef.name, user.username, user.groups, responseStatus.code")
msg.WriteString(". See https://cel.dev for CEL syntax")

return msg.String()
Expand Down
Loading