Skip to content

Commit a6ed98a

Browse files
authored
Incremental semantic index ingestion. (#2055)
* Stage * Stage * Support multiple index strategies * Small cleanups * Add documentation
1 parent c70e074 commit a6ed98a

File tree

18 files changed

+761
-351
lines changed

18 files changed

+761
-351
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<PackageVersion Include="Elastic.Aspire.Hosting.Elasticsearch" Version="9.3.0" />
3030
<PackageVersion Include="Elastic.Clients.Elasticsearch" Version="9.1.4" />
3131
<PackageVersion Include="FakeItEasy" Version="8.3.0" />
32-
<PackageVersion Include="Elastic.Ingest.Elasticsearch" Version="0.14.0" />
32+
<PackageVersion Include="Elastic.Ingest.Elasticsearch" Version="0.15.1" />
3333
<PackageVersion Include="InMemoryLogger" Version="1.0.66" />
3434
<PackageVersion Include="MartinCostello.Logging.XUnit.v3" Version="0.6.0" />
3535
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.7" />
27.2 KB
Loading
29.2 KB
Loading
30.9 KB
Loading
31.4 KB
Loading
30.1 KB
Loading
27.6 KB
Loading

docs/development/ingest/index.md

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
---
2+
navigation_title: "Elasticsearch Ingest"
3+
---
4+
5+
# Elasticsearch Ingestion
6+
7+
## Elasticsearch Integration
8+
9+
The Elasticsearch integration consists of two primary exporters that work together to maintain both lexical and semantic search indices:
10+
11+
1. **ElasticsearchLexicalExporter** - Handles traditional full-text search indexing with hash-based change detection
12+
2. **ElasticsearchSemanticExporter** - Manages semantic search indices using inference models for vector embeddings
13+
14+
These exporters are coordinated by the `ElasticsearchMarkdownExporter` class, which implements the `IMarkdownExporter` interface.
15+
16+
### Architecture Overview
17+
18+
Both exporters inherit from the abstract `ElasticsearchExporter<TChannelOptions, TChannel>` base class (defined in `src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs`), which provides:
19+
20+
- **Channel-based ingestion**: Uses the `Elastic.Channels` library for high-performance buffered writes to Elasticsearch
21+
- **Configurable concurrency**: Respects the `IndexNumThreads` setting to optimize throughput
22+
- **Error handling**: Callbacks for export failures, server rejections, and retries
23+
- **Progress tracking**: Logs buffer exports at configured intervals
24+
25+
### Hash-Based Change Detection
26+
27+
The lexical exporter implements an intelligent hash-based upsert strategy (`ScriptedHashBulkUpsertLookup`) that:
28+
29+
1. Computes a hash from the document's URL, body content, and headings
30+
2. On index, compares the computed hash with the stored hash
31+
3. If hashes match: Only updates the `batch_index_date` field (minimal overhead)
32+
4. If hashes differ: Performs a full document update with new `last_updated` timestamp
33+
34+
This approach allows us to incrementally synchronize only the changed documents and deletions over to our semantic index.
35+
36+
### Shutdown and Synchronization Logic
37+
38+
The `StopAsync` method in `ElasticsearchMarkdownExporter` orchestrates a complex multi-phase synchronization sequence:
39+
40+
#### Phase 1: Drain and Finalize Lexical Index
41+
42+
```csharp
43+
var stopped = await _lexicalChannel.StopAsync(ctx);
44+
```
45+
46+
This calls the base `ElasticsearchExporter.StopAsync` method, which performs three critical operations:
47+
48+
1. **Drain in-flight exports** (`WaitForDrainAsync`): Waits for all buffered documents to be flushed to Elasticsearch
49+
2. **Refresh the index** (`RefreshAsync`): Makes all indexed documents immediately searchable
50+
3. **Apply aliases** (`ApplyAliasesAsync`): Swaps index aliases to point to the newly created time-stamped index
51+
52+
#### Phase 2: Semantic Index Bootstrap
53+
54+
```csharp
55+
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
56+
{
57+
// Bootstrap semantic index if it doesn't exist
58+
await _semanticChannel.Channel.BootstrapElasticsearchAsync(...);
59+
await _transport.PutAsync<StringResponse>(semanticIndex, ...);
60+
await _semanticChannel.Channel.ApplyAliasesAsync(ctx);
61+
}
62+
```
63+
64+
If the semantic index doesn't exist yet, it's created and configured with the appropriate inference model settings.
65+
66+
#### Phase 3: Incremental Sync - Updates
67+
68+
```csharp
69+
_reindex updates: '{SourceIndex}' => '{DestinationIndex}'
70+
```
71+
72+
Uses Elasticsearch's `_reindex` API to copy **only changed documents** from the lexical index to the semantic index:
73+
74+
- **Query filter**: `last_updated >= _batchIndexDate`
75+
- **Result**: Only documents that were actually modified (not just batch-tracked) are synced
76+
- This triggers semantic embedding generation for new/changed content
77+
78+
#### Phase 4: Incremental Sync - Deletions
79+
80+
```csharp
81+
_reindex deletions: '{SourceIndex}' => '{DestinationIndex}'
82+
```
83+
84+
Uses `_reindex` with a script to propagate deletions:
85+
86+
- **Query filter**: `batch_index_date < _batchIndexDate` (documents not in current batch)
87+
- **Script**: `ctx.op = "delete"` - converts reindex operations to deletions
88+
- **Result**: Documents removed from the documentation are deleted from semantic index
89+
90+
#### Phase 5: Cleanup Lexical Index
91+
92+
```csharp
93+
await DoDeleteByQuery(lexicalWriteAlias, ctx);
94+
```
95+
96+
Removes stale documents from the lexical index using `_delete_by_query`:
97+
98+
- **Query filter**: `batch_index_date < _batchIndexDate`
99+
- **Result**: Lexical index only contains documents from the current batch
100+
101+
### Task Monitoring
102+
103+
Both `DoReindex` and `DoDeleteByQuery` methods use Elasticsearch's task API to monitor long-running operations:
104+
105+
1. Submit the operation with `wait_for_completion=false` to get a task ID
106+
2. Poll the `/_tasks/{taskId}` endpoint every 5 seconds
107+
3. Log progress metrics: total documents, created, updated, deleted, batches, and elapsed time
108+
4. Continue until `completed: true`
109+
110+
This provides real-time visibility into large-scale index operations without blocking the application.
111+
112+
### Index Naming Strategy
113+
114+
Both exporters use time-stamped index names with write aliases:
115+
116+
- **Lexical**: `{prefix}-lexical-{namespace}-{timestamp}` with alias `{prefix}-lexical-{namespace}`
117+
- **Semantic**: `{prefix}-semantic-{namespace}-{timestamp}` with alias `{prefix}-semantic-{namespace}`
118+
119+
The `-latest` formatted alias (e.g., `...-{yyyy.MM.dd.HHmmss}`) is used as a write alias during the current indexing operation, then swapped to the read alias upon completion. This enables zero-downtime reindexing.
120+
121+
### Error Handling
122+
123+
The `StopAsync` sequence includes comprehensive error tracking:
124+
125+
- Failed drains, refreshes, or alias operations emit global errors via `IDiagnosticsCollector`
126+
- The lexical channel stop must succeed (`stopped == true`) or an exception is thrown
127+
- Task failures during reindex/delete operations are logged and recorded as global errors
128+
129+
This ensures that indexing problems are visible and prevent silent data corruption.
130+
131+
## Indexing Flow Visualization
132+
133+
::::{stepper}
134+
135+
136+
137+
::::{stepper}
138+
139+
:::{step} Initial state: Both indexes contain existing documents
140+
141+
![images/step1.png](images/step1.png)
142+
:::
143+
:::{step} Lexical Index processing
144+
145+
![images/step2.png](images/step2.png)
146+
147+
* ID 1: Hash matches → Only batch_index_date updated (blue)
148+
* ID 2: Hash changed → Full upsert with new last_updated (green)
149+
* ID 3: No incoming data → Untouched (gray)
150+
* ID 4: New document → Insert (green)
151+
* ID 5: Not included in current batch → Untouched (gray)
152+
153+
:::
154+
155+
:::{step} Sync updates to Semantic Index
156+
157+
![images/step3.png](images/step3.png)
158+
159+
* Copy documents from Lexical Index where last_updated >= 2024-10-15
160+
* Only IDs 2 and 4 synced (ID 1 has old last_updated date)
161+
162+
:::
163+
164+
:::{step} Mark deletions in both indexes
165+
166+
![images/step4.png](images/step4.png)
167+
168+
* Lexical Index: Mark IDs 3 and 5 (batch_index_date < 2024-10-15) as red
169+
* Semantic Index: Sync deletion of ID 5 from Lexical Index, mark as red
170+
171+
:::
172+
173+
:::{step} Delete from Semantic Index first
174+
175+
![images/step5.png](images/step5.png)
176+
177+
* Remove ID 5 from Semantic Index
178+
* Lexical Index still has IDs 3 and 5 marked for deletion
179+
180+
:::
181+
182+
:::{step} Complete deletion and final sync
183+
184+
![images/step6.png](images/step6.png)
185+
186+
* Delete IDs 3 and 5 from Lexical Index
187+
* Semantic Index remains as-is (batch_index_date not updated there)
188+
* Both indexes now synchronized with same document IDs
189+
:::
190+
::::

docs/development/toc.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
toc:
22
- file: index.md
3+
- folder: ingest
4+
children:
5+
- file: index.md
36
- toc: link-validation

src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,5 @@ public class ElasticsearchEndpoint
4343
public X509Certificate? Certificate { get; set; }
4444
public bool CertificateIsNotRoot { get; set; }
4545
public int? BootstrapTimeout { get; set; }
46+
public bool NoSemantic { get; set; }
4647
}

0 commit comments

Comments
 (0)