Skip to content

Commit e0121f6

Browse files
authored
Merge pull request #11 from datum-cloud/feat/resource-indexer-architecture
feat: define resource indexer architecture
2 parents 46dd09f + 53d8474 commit e0121f6

File tree

5 files changed

+370
-1
lines changed

5 files changed

+370
-1
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,10 @@ using CEL-based filtering. The service integrates natively with kubectl/RBAC and
88
targets Meilisearch as the search backend.
99

1010
![](./docs/diagrams/SearchServiceContext.png)
11+
12+
## Documentation
13+
14+
- [Architecture](./docs/architecture.md) — High-level design and component
15+
overview
16+
- [Resource Indexer](./docs/components/resource-indexer.md) — Detailed design
17+
for the indexing component

docs/architecture.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@ using powerful indexing and real-time event processing.
4848
- Filter events based on active index policies
4949
- Evaluate [CEL expressions][CEL] for conditional indexing
5050
- Extract and transform resource data for indexing
51-
- Write to index backend with proper error handling and retries
51+
- Manage documents in index backend with proper error handling and retries
5252
- Manage index lifecycle (creation, updates, deletion)
5353
- Bootstrap indexes from existing state
5454

55+
See the [Resource Indexer Architecture](./components/resource-indexer.md) for
56+
detailed design documentation.
57+
5558
### Controller Manager
5659

5760
**Purpose**: Manages and validates resources for the search service
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
<!-- omit from toc -->
2+
# Resource Indexer Architecture
3+
4+
- [Overview](#overview)
5+
- [Design Goals](#design-goals)
6+
- [Core Responsibilities](#core-responsibilities)
7+
- [Event Processing Flow](#event-processing-flow)
8+
- [Event Consumption](#event-consumption)
9+
- [Horizontal Scaling](#horizontal-scaling)
10+
- [Kubernetes Configuration](#kubernetes-configuration)
11+
- [Policy Management](#policy-management)
12+
- [CEL Compilation](#cel-compilation)
13+
- [Document Lifecycle](#document-lifecycle)
14+
- [Transformation](#transformation)
15+
- [Persistence and Acknowledgment](#persistence-and-acknowledgment)
16+
- [Batching](#batching)
17+
- [Duplicate Handling](#duplicate-handling)
18+
- [Bootstrap Process](#bootstrap-process)
19+
- [Multi-Cluster Bootstrap](#multi-cluster-bootstrap)
20+
- [Error Handling](#error-handling)
21+
- [Integration Points](#integration-points)
22+
- [Observability](#observability)
23+
- [Future Considerations](#future-considerations)
24+
25+
26+
## Overview
27+
28+
The Resource Indexer is a core component of the Search service responsible for
29+
maintaining a searchable index of platform resources. It consumes audit log
30+
events from NATS JetStream, applies policy-based filtering, and manages indexed
31+
documents in the search backend.
32+
33+
## Design Goals
34+
35+
- **Real-time indexing**: Process resource changes within seconds of occurrence
36+
- **Policy-driven**: Index only resources matching active index policy
37+
configurations
38+
- **Reliable delivery**: Guarantee at-least-once processing of all events
39+
- **Graceful recovery**: Resume processing from last known position after
40+
restarts
41+
- **Horizontal scalability**: Scale throughput by adding instances without
42+
coordination
43+
44+
## Core Responsibilities
45+
46+
The Resource Indexer handles:
47+
48+
- Consuming audit log events from NATS JetStream
49+
- Watching index policy resources and evaluating CEL filters
50+
- Transforming Kubernetes resources into searchable documents
51+
- Persisting documents to the index backend
52+
- Acknowledging events only after successful persistence
53+
54+
### Event Processing Flow
55+
56+
The following diagram illustrates how the indexer processes events, including
57+
policy matching, batching, and acknowledgment handling:
58+
59+
```mermaid
60+
sequenceDiagram
61+
participant JS as NATS JetStream
62+
participant Indexer as Resource Indexer
63+
participant Cache as Policy Cache
64+
participant Meili as Meilisearch
65+
66+
rect rgb(240, 248, 255)
67+
note right of JS: Create/Update Matches Policy
68+
JS->>Indexer: Deliver audit event
69+
Indexer->>Cache: Evaluate policies
70+
Cache-->>Indexer: Policy match + compiled CEL
71+
Indexer->>Indexer: Evaluate CEL filter
72+
Indexer->>Indexer: Transform resource to document
73+
Indexer->>Indexer: Queue document for upsert
74+
75+
alt Upsert queue ready (size or time threshold)
76+
Indexer->>Meili: Add/replace documents batch
77+
Meili-->>Indexer: Success
78+
Indexer->>JS: Ack all events in batch
79+
end
80+
end
81+
82+
rect rgb(245, 245, 245)
83+
note right of JS: Create Does Not Match Policy
84+
JS->>Indexer: Deliver audit event (create)
85+
Indexer->>Cache: Evaluate policies
86+
Cache-->>Indexer: No matching policy
87+
Indexer->>JS: Ack (never indexed)
88+
end
89+
90+
rect rgb(255, 248, 240)
91+
note right of JS: Update No Longer Matches Policy
92+
JS->>Indexer: Deliver audit event (update)
93+
Indexer->>Cache: Evaluate policies
94+
Cache-->>Indexer: No matching policy
95+
Indexer->>Indexer: Queue delete by UID
96+
97+
alt Delete queue ready (size or time threshold)
98+
Indexer->>Meili: Delete documents batch
99+
Meili-->>Indexer: Success
100+
Indexer->>JS: Ack all events in batch
101+
end
102+
end
103+
104+
rect rgb(255, 245, 238)
105+
note right of JS: Resource Deleted
106+
JS->>Indexer: Deliver audit event (delete)
107+
Indexer->>Indexer: Queue delete by UID
108+
109+
alt Delete queue ready (size or time threshold)
110+
Indexer->>Meili: Delete documents batch
111+
Meili-->>Indexer: Success
112+
Indexer->>JS: Ack all events in batch
113+
end
114+
end
115+
116+
rect rgb(255, 240, 240)
117+
note right of JS: Persistence Failure
118+
JS->>Indexer: Deliver audit event
119+
Indexer->>Indexer: Transform and queue
120+
Indexer->>Meili: Flush queue
121+
Meili-->>Indexer: Error
122+
note right of Indexer: Do not ack — JetStream<br/>redelivers after timeout
123+
end
124+
```
125+
126+
## Event Consumption
127+
128+
The indexer consumes audit log events from [NATS JetStream][jetstream] using
129+
[durable consumers][durable-consumers]. JetStream provides:
130+
131+
- **Delivery guarantees**: At-least-once delivery with configurable ack timeouts
132+
- **Position tracking**: Durable consumers track acknowledged messages; on
133+
restart, consumption resumes from the last acknowledged position
134+
- **Backpressure**: Pull-based consumption allows the indexer to control its
135+
processing rate
136+
137+
[jetstream]: https://docs.nats.io/nats-concepts/jetstream
138+
[durable-consumers]: https://docs.nats.io/nats-concepts/jetstream/consumers#durable-consumers
139+
140+
### Horizontal Scaling
141+
142+
The indexer uses JetStream [queue groups] for horizontal scaling. When multiple
143+
instances join the same queue group, JetStream distributes messages across them
144+
automatically — each message is delivered to exactly one instance.
145+
146+
<p align="center">
147+
<img src="../diagrams/ResourceIndexerScaling.png" alt="Resource Indexer horizontal scaling diagram">
148+
</p>
149+
150+
This enables linear throughput scaling without coordination between instances.
151+
152+
[queue groups]: https://docs.nats.io/nats-concepts/core-nats/queue
153+
154+
### Kubernetes Configuration
155+
156+
In Kubernetes environments, JetStream resources (Streams, Consumers) can be
157+
managed declaratively using [NACK] (NATS Controllers for Kubernetes). NACK
158+
provides CRDs for defining [Streams] and [Consumers] as Kubernetes resources,
159+
enabling GitOps workflows and consistent configuration across environments.
160+
161+
[NACK]: https://github.com/nats-io/nack
162+
[Streams]: https://github.com/nats-io/nack/blob/main/docs/api.md#stream
163+
[Consumers]: https://github.com/nats-io/nack/blob/main/docs/api.md#consumer
164+
165+
## Policy Management
166+
167+
Index policy resources define what to index. The indexer watches these resources
168+
using a Kubernetes [informer], which provides:
169+
170+
- **List-watch semantics**: Initial list of all policies followed by a watch
171+
stream for changes
172+
- **Local cache**: In-memory store for fast lookups during event processing
173+
- **Automatic resync**: Periodic re-list to correct any drift
174+
175+
Each indexer instance maintains its own policy cache. Since events can be routed
176+
to any instance (via queue groups), each instance caches all policies.
177+
Index policy resources are typically small and few in number, so this
178+
replication is acceptable.
179+
180+
### CEL Compilation
181+
182+
[CEL expressions][CEL] in policies must be compiled before evaluation. To avoid
183+
recompilation on every event, compile expressions when policies are added or
184+
updated and cache the compiled programs alongside the policy.
185+
186+
The indexer should wait for the informer cache to sync before processing events
187+
to ensure all active policies are available for matching.
188+
189+
[informer]: https://pkg.go.dev/k8s.io/client-go/tools/cache#SharedInformer
190+
[CEL]: https://cel.dev
191+
192+
## Document Lifecycle
193+
194+
The indexer manages documents in the search index based on audit events:
195+
196+
| Event Type | Policy Match | Action |
197+
|------------|--------------|--------|
198+
| Create | Yes | Upsert document |
199+
| Create | No | Acknowledge only (never indexed) |
200+
| Update | Yes | Upsert document |
201+
| Update | No | Delete document (may have been indexed) |
202+
| Delete || Delete document |
203+
204+
When a resource is updated and no longer matches any policy (e.g., labels
205+
changed, CEL filter no longer passes), the indexer queues a delete. Since the
206+
indexer doesn't track what was previously indexed, it always attempts deletion
207+
for non-matching updates — Meilisearch treats deletes of non-existent documents
208+
as no-ops.
209+
210+
### Transformation
211+
212+
When an event matches a policy, the indexer transforms the Kubernetes resource
213+
into a searchable document:
214+
215+
- Extract fields specified in the index policy field mappings
216+
- Normalize metadata (labels, annotations) into searchable formats
217+
- Use the resource's UID as the document identifier
218+
219+
## Persistence and Acknowledgment
220+
221+
Documents are persisted to the index backend ([Meilisearch]). To guarantee
222+
at-least-once delivery, events are only acknowledged after successful
223+
persistence.
224+
225+
[Meilisearch]: https://www.meilisearch.com/docs
226+
227+
### Batching
228+
229+
For efficiency, batch multiple operations before persisting. The indexer
230+
maintains separate queues for upserts and deletes since Meilisearch requires
231+
separate API calls for each operation type. When either queue reaches its size
232+
or time threshold:
233+
234+
1. Flush pending upserts via the [add documents][meilisearch-add-documents]
235+
endpoint
236+
2. Flush pending deletes via the [delete documents][meilisearch-delete-documents]
237+
endpoint
238+
3. On success, acknowledge all events whose operations were flushed
239+
4. On failure, do not acknowledge — JetStream redelivers after ack timeout
240+
241+
Create events that don't match any policy can be acknowledged immediately since
242+
the resource was never indexed. Update and delete events that don't match should
243+
still queue a delete operation — the indexer cannot know whether the resource
244+
was previously indexed, and Meilisearch delete operations are idempotent.
245+
246+
[meilisearch-add-documents]: https://www.meilisearch.com/docs/reference/api/documents#add-or-replace-documents
247+
[meilisearch-delete-documents]: https://www.meilisearch.com/docs/reference/api/documents#delete-documents-by-batch
248+
249+
### Duplicate Handling
250+
251+
At-least-once delivery means duplicates are possible (e.g., after a failure
252+
before acknowledgment). The index backend handles this via [document primary
253+
keys][meilisearch-primary-key] — reindexing the same resource overwrites the
254+
existing document.
255+
256+
[meilisearch-primary-key]: https://www.meilisearch.com/docs/learn/core_concepts/primary_key
257+
258+
## Bootstrap Process
259+
260+
On startup or when a new index policy is created, the indexer must populate the
261+
index with existing resources. The platform spans multiple project control
262+
planes, so bootstrap must list resources from each cluster.
263+
264+
### Multi-Cluster Bootstrap
265+
266+
The indexer uses the [multicluster-runtime] provider pattern to discover
267+
project control planes. For each discovered cluster:
268+
269+
1. List resources matching the policy selector from that cluster's API
270+
2. Transform and index each resource
271+
3. Handle concurrent modifications during bootstrap gracefully
272+
273+
The provider handles dynamic cluster discovery — as clusters come online or go
274+
offline, the indexer bootstraps or cleans up accordingly.
275+
276+
After bootstrap completes, real-time indexing continues via the JetStream event
277+
stream, which already aggregates events from all control planes.
278+
279+
[multicluster-runtime]: https://github.com/kubernetes-sigs/multicluster-runtime
280+
281+
## Error Handling
282+
283+
- **Transient failures**: Retry with exponential backoff for network errors and
284+
temporary unavailability
285+
- **Malformed events**: Log and skip events that cannot be parsed; acknowledge
286+
to prevent redelivery loops
287+
- **Backend unavailability**: Buffer events in memory (bounded) while attempting
288+
reconnection; pause consumption if buffer fills
289+
- **Policy evaluation errors**: Log and skip events with CEL evaluation
290+
failures; do not block processing of other events
291+
292+
## Integration Points
293+
294+
| System | Protocol | Purpose |
295+
|--------|----------|---------|
296+
| [NATS JetStream][jetstream] | NATS | Consume audit log events (aggregated from all clusters) |
297+
| Search API Server | HTTPS | Watch index policy resources |
298+
| Project Control Planes | HTTPS | Bootstrap existing resources |
299+
| [Meilisearch] | HTTPS/JSON | Manage indexed documents |
300+
301+
## Observability
302+
303+
The indexer should expose metrics following the [RED method][red-method]:
304+
305+
- **Rate**: Events consumed per second, documents indexed per second
306+
- **Errors**: Failed persistence operations, malformed events, policy evaluation
307+
errors
308+
- **Duration**: Event processing latency, batch flush latency, end-to-end
309+
indexing lag
310+
311+
Specific metrics, instrumentation libraries, and observability tooling will be
312+
defined during implementation.
313+
314+
[red-method]: https://www.weave.works/blog/the-red-method-key-metrics-for-microservices-architecture/
315+
316+
## Future Considerations
317+
318+
- **Control plane deletion**: When a project control plane is deleted, indexed
319+
resources from that cluster must be cleaned up. Ideally, the platform emits
320+
deletion events for all resources before the control plane is removed,
321+
allowing event-driven cleanup. If this isn't guaranteed, the indexer may need
322+
to track source cluster metadata and delete documents when a cluster is
323+
disengaged.
324+
- **Dead letter handling**: Route persistently failing events to a dead letter
325+
queue for manual inspection
326+
- **Multi-tenancy**: Support tenant-isolated indexes with policy-based routing
327+
- **Policy-based sharding**: For very large deployments, assign subsets of
328+
policies to instances using consistent hashing
24.8 KB
Loading
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
@startuml ResourceIndexerScaling
2+
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Container.puml
3+
4+
System_Ext(jetstream, "NATS JetStream", "Audit log event stream aggregated from all control planes")
5+
6+
System_Boundary(indexerGroup, "Queue Group: resource-indexer") {
7+
Container(indexer1, "Indexer #1", "Go", "Policy matching, CEL filtering, batch writes")
8+
Container(indexer2, "Indexer #2", "Go", "Policy matching, CEL filtering, batch writes")
9+
Container(indexer3, "Indexer #3", "Go", "Policy matching, CEL filtering, batch writes")
10+
}
11+
12+
note right of indexerGroup
13+
Each event delivered to
14+
exactly one instance —
15+
scales linearly without
16+
coordination
17+
end note
18+
19+
System_Ext(meilisearch, "Meilisearch", "Persistent search index")
20+
21+
Rel_D(jetstream, indexer1, "Distributes event", "NATS")
22+
Rel_D(jetstream, indexer2, "Distributes event", "NATS")
23+
Rel_D(jetstream, indexer3, "Distributes event", "NATS")
24+
25+
Rel_D(indexer1, meilisearch, "Batched upserts/deletes", "HTTPS")
26+
Rel_D(indexer2, meilisearch, "Batched upserts/deletes", "HTTPS")
27+
Rel_D(indexer3, meilisearch, "Batched upserts/deletes", "HTTPS")
28+
29+
SHOW_LEGEND()
30+
31+
@enduml

0 commit comments

Comments
 (0)