diff --git a/Directory.Packages.props b/Directory.Packages.props index a33c0da98..958a09fdc 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -27,7 +27,7 @@ - + diff --git a/docs/development/ingest/images/step1.png b/docs/development/ingest/images/step1.png new file mode 100644 index 000000000..c3f42e50b Binary files /dev/null and b/docs/development/ingest/images/step1.png differ diff --git a/docs/development/ingest/images/step2.png b/docs/development/ingest/images/step2.png new file mode 100644 index 000000000..1d3bcdcba Binary files /dev/null and b/docs/development/ingest/images/step2.png differ diff --git a/docs/development/ingest/images/step3.png b/docs/development/ingest/images/step3.png new file mode 100644 index 000000000..f80ec4db4 Binary files /dev/null and b/docs/development/ingest/images/step3.png differ diff --git a/docs/development/ingest/images/step4.png b/docs/development/ingest/images/step4.png new file mode 100644 index 000000000..d1daa6ea0 Binary files /dev/null and b/docs/development/ingest/images/step4.png differ diff --git a/docs/development/ingest/images/step5.png b/docs/development/ingest/images/step5.png new file mode 100644 index 000000000..b98f35570 Binary files /dev/null and b/docs/development/ingest/images/step5.png differ diff --git a/docs/development/ingest/images/step6.png b/docs/development/ingest/images/step6.png new file mode 100644 index 000000000..651627369 Binary files /dev/null and b/docs/development/ingest/images/step6.png differ diff --git a/docs/development/ingest/index.md b/docs/development/ingest/index.md new file mode 100644 index 000000000..72ae839f6 --- /dev/null +++ b/docs/development/ingest/index.md @@ -0,0 +1,190 @@ +--- +navigation_title: "Elasticsearch Ingest" +--- + +# Elasticsearch Ingestion + +## Elasticsearch Integration + +The Elasticsearch integration consists of two primary exporters that work together to maintain both lexical and semantic search indices: + +1. **ElasticsearchLexicalExporter** - Handles traditional full-text search indexing with hash-based change detection +2. **ElasticsearchSemanticExporter** - Manages semantic search indices using inference models for vector embeddings + +These exporters are coordinated by the `ElasticsearchMarkdownExporter` class, which implements the `IMarkdownExporter` interface. + +### Architecture Overview + +Both exporters inherit from the abstract `ElasticsearchExporter` base class (defined in `src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs`), which provides: + +- **Channel-based ingestion**: Uses the `Elastic.Channels` library for high-performance buffered writes to Elasticsearch +- **Configurable concurrency**: Respects the `IndexNumThreads` setting to optimize throughput +- **Error handling**: Callbacks for export failures, server rejections, and retries +- **Progress tracking**: Logs buffer exports at configured intervals + +### Hash-Based Change Detection + +The lexical exporter implements an intelligent hash-based upsert strategy (`ScriptedHashBulkUpsertLookup`) that: + +1. Computes a hash from the document's URL, body content, and headings +2. On index, compares the computed hash with the stored hash +3. If hashes match: Only updates the `batch_index_date` field (minimal overhead) +4. If hashes differ: Performs a full document update with new `last_updated` timestamp + +This approach allows us to incrementally synchronize only the changed documents and deletions over to our semantic index. + +### Shutdown and Synchronization Logic + +The `StopAsync` method in `ElasticsearchMarkdownExporter` orchestrates a complex multi-phase synchronization sequence: + +#### Phase 1: Drain and Finalize Lexical Index + +```csharp +var stopped = await _lexicalChannel.StopAsync(ctx); +``` + +This calls the base `ElasticsearchExporter.StopAsync` method, which performs three critical operations: + +1. **Drain in-flight exports** (`WaitForDrainAsync`): Waits for all buffered documents to be flushed to Elasticsearch +2. **Refresh the index** (`RefreshAsync`): Makes all indexed documents immediately searchable +3. **Apply aliases** (`ApplyAliasesAsync`): Swaps index aliases to point to the newly created time-stamped index + +#### Phase 2: Semantic Index Bootstrap + +```csharp +if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode) +{ + // Bootstrap semantic index if it doesn't exist + await _semanticChannel.Channel.BootstrapElasticsearchAsync(...); + await _transport.PutAsync(semanticIndex, ...); + await _semanticChannel.Channel.ApplyAliasesAsync(ctx); +} +``` + +If the semantic index doesn't exist yet, it's created and configured with the appropriate inference model settings. + +#### Phase 3: Incremental Sync - Updates + +```csharp +_reindex updates: '{SourceIndex}' => '{DestinationIndex}' +``` + +Uses Elasticsearch's `_reindex` API to copy **only changed documents** from the lexical index to the semantic index: + +- **Query filter**: `last_updated >= _batchIndexDate` +- **Result**: Only documents that were actually modified (not just batch-tracked) are synced +- This triggers semantic embedding generation for new/changed content + +#### Phase 4: Incremental Sync - Deletions + +```csharp +_reindex deletions: '{SourceIndex}' => '{DestinationIndex}' +``` + +Uses `_reindex` with a script to propagate deletions: + +- **Query filter**: `batch_index_date < _batchIndexDate` (documents not in current batch) +- **Script**: `ctx.op = "delete"` - converts reindex operations to deletions +- **Result**: Documents removed from the documentation are deleted from semantic index + +#### Phase 5: Cleanup Lexical Index + +```csharp +await DoDeleteByQuery(lexicalWriteAlias, ctx); +``` + +Removes stale documents from the lexical index using `_delete_by_query`: + +- **Query filter**: `batch_index_date < _batchIndexDate` +- **Result**: Lexical index only contains documents from the current batch + +### Task Monitoring + +Both `DoReindex` and `DoDeleteByQuery` methods use Elasticsearch's task API to monitor long-running operations: + +1. Submit the operation with `wait_for_completion=false` to get a task ID +2. Poll the `/_tasks/{taskId}` endpoint every 5 seconds +3. Log progress metrics: total documents, created, updated, deleted, batches, and elapsed time +4. Continue until `completed: true` + +This provides real-time visibility into large-scale index operations without blocking the application. + +### Index Naming Strategy + +Both exporters use time-stamped index names with write aliases: + +- **Lexical**: `{prefix}-lexical-{namespace}-{timestamp}` with alias `{prefix}-lexical-{namespace}` +- **Semantic**: `{prefix}-semantic-{namespace}-{timestamp}` with alias `{prefix}-semantic-{namespace}` + +The `-latest` formatted alias (e.g., `...-{yyyy.MM.dd.HHmmss}`) is used as a write alias during the current indexing operation, then swapped to the read alias upon completion. This enables zero-downtime reindexing. + +### Error Handling + +The `StopAsync` sequence includes comprehensive error tracking: + +- Failed drains, refreshes, or alias operations emit global errors via `IDiagnosticsCollector` +- The lexical channel stop must succeed (`stopped == true`) or an exception is thrown +- Task failures during reindex/delete operations are logged and recorded as global errors + +This ensures that indexing problems are visible and prevent silent data corruption. + +## Indexing Flow Visualization + +::::{stepper} + + + +::::{stepper} + +:::{step} Initial state: Both indexes contain existing documents + +![images/step1.png](images/step1.png) +::: +:::{step} Lexical Index processing + +![images/step2.png](images/step2.png) + +* ID 1: Hash matches → Only batch_index_date updated (blue) +* ID 2: Hash changed → Full upsert with new last_updated (green) +* ID 3: No incoming data → Untouched (gray) +* ID 4: New document → Insert (green) +* ID 5: Not included in current batch → Untouched (gray) + +::: + +:::{step} Sync updates to Semantic Index + +![images/step3.png](images/step3.png) + +* Copy documents from Lexical Index where last_updated >= 2024-10-15 +* Only IDs 2 and 4 synced (ID 1 has old last_updated date) + +::: + +:::{step} Mark deletions in both indexes + +![images/step4.png](images/step4.png) + +* Lexical Index: Mark IDs 3 and 5 (batch_index_date < 2024-10-15) as red +* Semantic Index: Sync deletion of ID 5 from Lexical Index, mark as red + +::: + +:::{step} Delete from Semantic Index first + +![images/step5.png](images/step5.png) + +* Remove ID 5 from Semantic Index +* Lexical Index still has IDs 3 and 5 marked for deletion + +::: + +:::{step} Complete deletion and final sync + +![images/step6.png](images/step6.png) + +* Delete IDs 3 and 5 from Lexical Index +* Semantic Index remains as-is (batch_index_date not updated there) +* Both indexes now synchronized with same document IDs +::: +:::: \ No newline at end of file diff --git a/docs/development/toc.yml b/docs/development/toc.yml index f9ff0490e..8e571cd12 100644 --- a/docs/development/toc.yml +++ b/docs/development/toc.yml @@ -1,3 +1,6 @@ toc: - file: index.md + - folder: ingest + children: + - file: index.md - toc: link-validation diff --git a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs index 21afc6ac4..c3fc345f1 100644 --- a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs +++ b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs @@ -43,4 +43,5 @@ public class ElasticsearchEndpoint public X509Certificate? Certificate { get; set; } public bool CertificateIsNotRoot { get; set; } public int? BootstrapTimeout { get; set; } + public bool NoSemantic { get; set; } } diff --git a/src/Elastic.Documentation/Exporter.cs b/src/Elastic.Documentation/Exporter.cs index 6af9ed524..ef4f23d72 100644 --- a/src/Elastic.Documentation/Exporter.cs +++ b/src/Elastic.Documentation/Exporter.cs @@ -11,7 +11,6 @@ public enum Exporter Html, LLMText, Elasticsearch, - ElasticsearchNoSemantic, Configuration, DocumentationState, LinkMetadata, diff --git a/src/Elastic.Documentation/Search/DocumentationDocument.cs b/src/Elastic.Documentation/Search/DocumentationDocument.cs index b6264a442..8a0eb6781 100644 --- a/src/Elastic.Documentation/Search/DocumentationDocument.cs +++ b/src/Elastic.Documentation/Search/DocumentationDocument.cs @@ -23,6 +23,15 @@ public record DocumentationDocument [JsonPropertyName("url")] public string Url { get; set; } = string.Empty; + /// The date of the batch update this document was part of last. + /// This date could be higher than the date_last_updated. + [JsonPropertyName("batch_index_date")] + public DateTimeOffset BatchIndexDate { get; set; } + + /// The date this document was last updated, + [JsonPropertyName("last_updated")] + public DateTimeOffset LastUpdated { get; set; } + // TODO make this required once all doc_sets have published again [JsonPropertyName("hash")] public string Hash { get; set; } = string.Empty; @@ -45,7 +54,7 @@ public record DocumentationDocument [JsonPropertyName("body")] public string? Body { get; set; } - // Stripped body is the body with markdown removed, suitable for search indexing + // Stripped body is the body with Markdown removed, suitable for search indexing [JsonPropertyName("stripped_body")] public string? StrippedBody { get; set; } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs new file mode 100644 index 000000000..f731b9b10 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs @@ -0,0 +1,262 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Channels; +using Elastic.Documentation.Configuration; +using Elastic.Documentation.Diagnostics; +using Elastic.Documentation.Search; +using Elastic.Documentation.Serialization; +using Elastic.Ingest.Elasticsearch.Catalog; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Semantic; +using Elastic.Transport; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch; + +public class ElasticsearchLexicalExporter( + ILoggerFactory logFactory, + IDiagnosticsCollector collector, + ElasticsearchEndpoint endpoint, + string indexNamespace, + DistributedTransport transport, + DateTimeOffset batchIndexDate +) + : ElasticsearchExporter, CatalogIndexChannel> + (logFactory, collector, endpoint, transport, o => new(o), t => new(t) + { + BulkOperationIdLookup = d => d.Url, + ScriptedHashBulkUpsertLookup = (d, channelHash) => + { + var rand = string.Empty; + //if (d.Url.StartsWith("/docs/reference/search-connectors")) + // rand = Guid.NewGuid().ToString("N"); + d.Hash = HashedBulkUpdate.CreateHash(channelHash, rand, d.Url, d.Body ?? string.Empty, string.Join(",", d.Headings.OrderBy(h => h))); + d.LastUpdated = batchIndexDate; + d.BatchIndexDate = batchIndexDate; + return new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date", + new Dictionary + { + { "batch_index_date", d.BatchIndexDate.ToString("o") } + }); + }, + GetMapping = () => CreateMapping(null), + GetMappingSettings = CreateMappingSetting, + IndexFormat = + $"{endpoint.IndexNamePrefix.Replace("semantic", "lexical").ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", + ActiveSearchAlias = $"{endpoint.IndexNamePrefix.Replace("semantic", "lexical").ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}" + }); + +public class ElasticsearchSemanticExporter( + ILoggerFactory logFactory, + IDiagnosticsCollector collector, + ElasticsearchEndpoint endpoint, + string indexNamespace, + DistributedTransport transport +) + : ElasticsearchExporter, SemanticIndexChannel> + (logFactory, collector, endpoint, transport, o => new(o), t => new(t) + { + BulkOperationIdLookup = d => d.Url, + GetMapping = (inferenceId, _) => CreateMapping(inferenceId), + GetMappingSettings = (_, _) => CreateMappingSetting(), + IndexFormat = $"{endpoint.IndexNamePrefix.ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", + ActiveSearchAlias = $"{endpoint.IndexNamePrefix}-{indexNamespace.ToLowerInvariant()}", + IndexNumThreads = endpoint.IndexNumThreads, + SearchNumThreads = endpoint.SearchNumThreads, + InferenceCreateTimeout = TimeSpan.FromMinutes(endpoint.BootstrapTimeout ?? 4) + }); + + +public abstract class ElasticsearchExporter : IDisposable + where TChannelOptions : CatalogIndexChannelOptionsBase + where TChannel : CatalogIndexChannel +{ + private readonly IDiagnosticsCollector _collector; + public TChannel Channel { get; } + private readonly ILogger _logger; + + protected ElasticsearchExporter( + ILoggerFactory logFactory, + IDiagnosticsCollector collector, + ElasticsearchEndpoint endpoint, + DistributedTransport transport, + Func createChannel, + Func createOptions + ) + { + _collector = collector; + _logger = logFactory.CreateLogger>(); + //The max num threads per allocated node, from testing its best to limit our max concurrency + //producing to this number as well + var options = createOptions(transport); + var i = 0; + options.BufferOptions = new BufferOptions + { + OutboundBufferMaxSize = endpoint.BufferSize, + ExportMaxConcurrency = endpoint.IndexNumThreads, + ExportMaxRetries = endpoint.MaxRetries + }; + options.SerializerContext = SourceGenerationContext.Default; + options.ExportBufferCallback = () => + { + var count = Interlocked.Increment(ref i); + _logger.LogInformation("Exported {Count} documents to Elasticsearch index {Format}", + count * endpoint.BufferSize, string.Format(options.IndexFormat, "latest")); + }; + options.ExportExceptionCallback = e => + { + _logger.LogError(e, "Failed to export document"); + _collector.EmitGlobalError("Elasticsearch export: failed to export document", e); + }; + options.ServerRejectionCallback = items => _logger.LogInformation("Server rejection: {Rejection}", items.First().Item2); + Channel = createChannel(options); + _logger.LogInformation($"Bootstrapping {nameof(SemanticIndexChannel)} Elasticsearch target for indexing"); + } + + public async ValueTask StopAsync(Cancel ctx = default) + { + _logger.LogInformation("Waiting to drain all inflight exports to Elasticsearch"); + var drained = await Channel.WaitForDrainAsync(null, ctx); + if (!drained) + _collector.EmitGlobalError("Elasticsearch export: failed to complete indexing in a timely fashion while shutting down"); + + _logger.LogInformation("Refreshing target index {Index}", Channel.IndexName); + var refreshed = await Channel.RefreshAsync(ctx); + if (!refreshed) + _collector.EmitGlobalError($"Refreshing target index {Channel.IndexName} did not complete successfully"); + + _logger.LogInformation("Applying aliases to {Index}", Channel.IndexName); + var swapped = await Channel.ApplyAliasesAsync(ctx); + if (!swapped) + _collector.EmitGlobalError($"${nameof(ElasticsearchMarkdownExporter)} failed to apply aliases to index {Channel.IndexName}"); + + return drained && refreshed && swapped; + } + + public async ValueTask RefreshAsync(Cancel ctx = default) => await Channel.RefreshAsync(ctx); + + public async ValueTask TryWrite(DocumentationDocument document, Cancel ctx = default) + { + if (Channel.TryWrite(document)) + return true; + + if (await Channel.WaitToWriteAsync(ctx)) + return Channel.TryWrite(document); + return false; + } + + + protected static string CreateMappingSetting() => + // language=json + """ + { + "analysis": { + "analyzer": { + "synonyms_analyzer": { + "tokenizer": "whitespace", + "filter": [ + "lowercase", + "synonyms_filter" + ] + }, + "highlight_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "english_stop" + ] + }, + "hierarchy_analyzer": { "tokenizer": "path_tokenizer" } + }, + "filter": { + "synonyms_filter": { + "type": "synonym", + "synonyms_set": "docs", + "updateable": true + }, + "english_stop": { + "type": "stop", + "stopwords": "_english_" + } + }, + "tokenizer": { + "path_tokenizer": { + "type": "path_hierarchy", + "delimiter": "/" + } + } + } + } + """; + + protected static string CreateMapping(string? inferenceId) => + $$""" + { + "properties": { + "url" : { + "type": "keyword", + "fields": { + "match": { "type": "text" }, + "prefix": { "type": "text", "analyzer" : "hierarchy_analyzer" } + } + }, + "hash" : { "type" : "keyword" }, + "title": { + "type": "text", + "search_analyzer": "synonyms_analyzer", + "fields": { + "keyword": { + "type": "keyword" + } + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $$""", "semantic_text": {{{InferenceMapping(inferenceId)}}}""" : "")}} + } + }, + "url_segment_count": { + "type": "integer" + }, + "body": { + "type": "text" + }, + "stripped_body": { + "type": "text", + "search_analyzer": "highlight_analyzer", + "term_vector": "with_positions_offsets" + } + {{(!string.IsNullOrWhiteSpace(inferenceId) ? AbstractInferenceMapping(inferenceId) : AbstractMapping())}} + } + } + """; + + private static string AbstractMapping() => + """ + , "abstract": { + "type": "text" + } + """; + + private static string InferenceMapping(string inferenceId) => + $""" + "type": "semantic_text", + "inference_id": "{inferenceId}" + """; + + private static string AbstractInferenceMapping(string inferenceId) => + // langugage=json + $$""" + , "abstract": { + {{InferenceMapping(inferenceId)}} + } + """; + + + public void Dispose() + { + Channel.Complete(); + Channel.Dispose(); + + GC.SuppressFinalize(this); + } + +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs new file mode 100644 index 000000000..804b46f13 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -0,0 +1,284 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.IO.Abstractions; +using Elastic.Documentation.Configuration; +using Elastic.Documentation.Diagnostics; +using Elastic.Documentation.Search; +using Elastic.Ingest.Elasticsearch; +using Elastic.Markdown.Helpers; +using Elastic.Markdown.IO; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Markdig.Syntax; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch; + +public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable +{ + private readonly IDiagnosticsCollector _collector; + private readonly ILogger _logger; + private readonly ElasticsearchLexicalExporter _lexicalChannel; + private readonly ElasticsearchSemanticExporter _semanticChannel; + + private readonly ElasticsearchEndpoint _endpoint; + + private readonly DateTimeOffset _batchIndexDate = DateTimeOffset.UtcNow; + private readonly DistributedTransport _transport; + + public ElasticsearchMarkdownExporter( + ILoggerFactory logFactory, + IDiagnosticsCollector collector, + DocumentationEndpoints endpoints, + string indexNamespace + ) + { + _collector = collector; + _logger = logFactory.CreateLogger(); + _endpoint = endpoints.Elasticsearch; + + var es = endpoints.Elasticsearch; + + var configuration = new ElasticsearchConfiguration(es.Uri) + { + Authentication = es.ApiKey is { } apiKey + ? new ApiKey(apiKey) + : es is { Username: { } username, Password: { } password } + ? new BasicAuthentication(username, password) + : null, + EnableHttpCompression = true, + DebugMode = _endpoint.DebugMode, + CertificateFingerprint = _endpoint.CertificateFingerprint, + ProxyAddress = _endpoint.ProxyAddress, + ProxyPassword = _endpoint.ProxyPassword, + ProxyUsername = _endpoint.ProxyUsername, + ServerCertificateValidationCallback = _endpoint.DisableSslVerification + ? CertificateValidations.AllowAll + : _endpoint.Certificate is { } cert + ? _endpoint.CertificateIsNotRoot + ? CertificateValidations.AuthorityPartOfChain(cert) + : CertificateValidations.AuthorityIsRoot(cert) + : null + }; + + _transport = new DistributedTransport(configuration); + + _lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport, _batchIndexDate); + _semanticChannel = new ElasticsearchSemanticExporter(logFactory, collector, es, indexNamespace, _transport); + + } + + /// + public async ValueTask StartAsync(Cancel ctx = default) => + await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); + + /// + public async ValueTask StopAsync(Cancel ctx = default) + { + var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest"); + var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest"); + + var semanticIndex = _semanticChannel.Channel.IndexName; + var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx); + + if (_endpoint.NoSemantic) + { + _logger.LogInformation("--no-semantic was specified so exiting early before syncing to {Index}", semanticIndex); + return; + } + + var stopped = await _lexicalChannel.StopAsync(ctx); + if (!stopped) + throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}"); + + if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex); + _ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); + var semanticIndexPut = await _transport.PutAsync(semanticIndex, PostData.String("{}"), ctx); + if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode) + throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}"); + _ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx); + } + + _logger.LogInformation("_reindex updates: '{SourceIndex}' => '{DestinationIndex}'", lexicalWriteAlias, semanticWriteAlias); + var request = PostData.Serializable(new + { + dest = new { index = semanticWriteAlias }, + source = new + { + index = lexicalWriteAlias, + size = 100, + query = new + { + range = new + { + last_updated = new { gte = _batchIndexDate.ToString("o") } + } + } + } + }); + await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "updates", ctx); + + _logger.LogInformation("_reindex deletions: '{SourceIndex}' => '{DestinationIndex}'", lexicalWriteAlias, semanticWriteAlias); + request = PostData.Serializable(new + { + dest = new { index = semanticWriteAlias }, + script = new { source = "ctx.op = \"delete\"" }, + source = new + { + index = lexicalWriteAlias, + size = 100, + query = new + { + range = new + { + batch_index_date = new { lt = _batchIndexDate.ToString("o") } + } + } + } + }); + await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "deletions", ctx); + + await DoDeleteByQuery(lexicalWriteAlias, ctx); + } + + private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) + { + // delete all documents with batch_index_date < _batchIndexDate + // they weren't part of the current export + _logger.LogInformation("Delete data in '{SourceIndex}' not part of batch date: {Date}", lexicalWriteAlias, _batchIndexDate.ToString("o")); + var request = PostData.Serializable(new + { + query = new + { + range = new + { + batch_index_date = new { lt = _batchIndexDate.ToString("o") } + } + } + }); + var reindexUrl = $"/{lexicalWriteAlias}/_delete_by_query?wait_for_completion=false"; + var reindexNewChanges = await _transport.PostAsync(reindexUrl, request, ctx); + var taskId = reindexNewChanges.Body.Get("task"); + if (string.IsNullOrWhiteSpace(taskId)) + { + _collector.EmitGlobalError($"Failed to delete data in '{lexicalWriteAlias}' not part of batch date: {_batchIndexDate:o}"); + return; + } + _logger.LogInformation("_delete_by_query task id: {TaskId}", taskId); + bool completed; + do + { + var reindexTask = await _transport.GetAsync($"/_tasks/{taskId}", ctx); + completed = reindexTask.Body.Get("completed"); + var total = reindexTask.Body.Get("task.status.total"); + var updated = reindexTask.Body.Get("task.status.updated"); + var created = reindexTask.Body.Get("task.status.created"); + var deleted = reindexTask.Body.Get("task.status.deleted"); + var batches = reindexTask.Body.Get("task.status.batches"); + var runningTimeInNanos = reindexTask.Body.Get("task.running_time_in_nanos"); + var time = TimeSpan.FromMicroseconds(runningTimeInNanos / 1000); + _logger.LogInformation("_delete_by_query '{SourceIndex}': {RunningTimeInNanos} Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", + lexicalWriteAlias, time.ToString(@"hh\:mm\:ss"), total, updated, created, deleted, batches); + if (!completed) + await Task.Delay(TimeSpan.FromSeconds(5), ctx); + + } while (!completed); + } + + private async ValueTask DoReindex(PostData request, string lexicalWriteAlias, string semanticWriteAlias, string typeOfSync, Cancel ctx) + { + var reindexUrl = "/_reindex?wait_for_completion=false&require_alias=true&scroll=10m"; + var reindexNewChanges = await _transport.PostAsync(reindexUrl, request, ctx); + var taskId = reindexNewChanges.Body.Get("task"); + if (string.IsNullOrWhiteSpace(taskId)) + { + _collector.EmitGlobalError($"Failed to reindex {typeOfSync} data to '{semanticWriteAlias}'"); + return; + } + _logger.LogInformation("_reindex {Type} task id: {TaskId}", typeOfSync, taskId); + bool completed; + do + { + var reindexTask = await _transport.GetAsync($"/_tasks/{taskId}", ctx); + completed = reindexTask.Body.Get("completed"); + var total = reindexTask.Body.Get("task.status.total"); + var updated = reindexTask.Body.Get("task.status.updated"); + var created = reindexTask.Body.Get("task.status.created"); + var deleted = reindexTask.Body.Get("task.status.deleted"); + var batches = reindexTask.Body.Get("task.status.batches"); + var runningTimeInNanos = reindexTask.Body.Get("task.running_time_in_nanos"); + var time = TimeSpan.FromMicroseconds(runningTimeInNanos / 1000); + _logger.LogInformation("_reindex {Type}: {RunningTimeInNanos} '{SourceIndex}' => '{DestinationIndex}'. Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", + typeOfSync, time.ToString(@"hh\:mm\:ss"), lexicalWriteAlias, semanticWriteAlias, total, updated, created, deleted, batches); + if (!completed) + await Task.Delay(TimeSpan.FromSeconds(5), ctx); + + } while (!completed); + } + + public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, Cancel ctx) + { + var file = fileContext.SourceFile; + var url = file.Url; + + if (url is "/docs" or "/docs/404") + { + // Skip the root and 404 pages + _logger.LogInformation("Skipping export for {Url}", url); + return true; + } + + IPositionalNavigation navigation = fileContext.DocumentationSet; + + // Remove the first h1 because we already have the title + // and we don't want it to appear in the body + var h1 = fileContext.Document.Descendants().FirstOrDefault(h => h.Level == 1); + if (h1 is not null) + _ = fileContext.Document.Remove(h1); + + var body = LlmMarkdownExporter.ConvertToLlmMarkdown(fileContext.Document, fileContext.BuildContext); + + var headings = fileContext.Document.Descendants() + .Select(h => h.GetData("header") as string ?? string.Empty) // TODO: Confirm that 'header' data is correctly set for all HeadingBlock instances and that this extraction is reliable. + .Where(text => !string.IsNullOrEmpty(text)) + .ToArray(); + + var @abstract = !string.IsNullOrEmpty(body) + ? body[..Math.Min(body.Length, 400)] + " " + string.Join(" \n- ", headings) + : string.Empty; + + var doc = new DocumentationDocument + { + Url = url, + Title = file.Title, + Body = body, + StrippedBody = body.StripMarkdown(), + Description = fileContext.SourceFile.YamlFrontMatter?.Description, + Abstract = @abstract, + Applies = fileContext.SourceFile.YamlFrontMatter?.AppliesTo, + UrlSegmentCount = url.Split('/', StringSplitOptions.RemoveEmptyEntries).Length, + Parents = navigation.GetParentsOfMarkdownFile(file).Select(i => new ParentDocument + { + Title = i.NavigationTitle, + Url = i.Url + }).Reverse().ToArray(), + Headings = headings + }; + return await _lexicalChannel.TryWrite(doc, ctx); + } + + /// + public ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Cancel ctx) => ValueTask.FromResult(true); + + /// + public void Dispose() + { + _lexicalChannel.Dispose(); + _semanticChannel.Dispose(); + GC.SuppressFinalize(this); + } +} diff --git a/src/Elastic.Markdown/Exporters/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/ElasticsearchMarkdownExporter.cs deleted file mode 100644 index 8c3367a95..000000000 --- a/src/Elastic.Markdown/Exporters/ElasticsearchMarkdownExporter.cs +++ /dev/null @@ -1,343 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System.IO.Abstractions; -using Elastic.Channels; -using Elastic.Documentation.Configuration; -using Elastic.Documentation.Diagnostics; -using Elastic.Documentation.Extensions; -using Elastic.Documentation.Search; -using Elastic.Documentation.Serialization; -using Elastic.Ingest.Elasticsearch; -using Elastic.Ingest.Elasticsearch.Catalog; -using Elastic.Ingest.Elasticsearch.Semantic; -using Elastic.Markdown.Helpers; -using Elastic.Markdown.IO; -using Elastic.Transport; -using Elastic.Transport.Products.Elasticsearch; -using Markdig.Syntax; -using Microsoft.Extensions.Logging; - -namespace Elastic.Markdown.Exporters; - -public class ElasticsearchMarkdownExporter(ILoggerFactory logFactory, IDiagnosticsCollector collector, string indexNamespace, DocumentationEndpoints endpoints) - : ElasticsearchMarkdownExporterBase, CatalogIndexChannel> - (logFactory, collector, endpoints) -{ - /// - protected override CatalogIndexChannelOptions NewOptions(DistributedTransport transport) => new(transport) - { - BulkOperationIdLookup = d => d.Url, - GetMapping = () => CreateMapping(null), - GetMappingSettings = CreateMappingSetting, - IndexFormat = $"{Endpoint.IndexNamePrefix.ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", - ActiveSearchAlias = $"{Endpoint.IndexNamePrefix}-{indexNamespace.ToLowerInvariant()}", - }; - - /// - protected override CatalogIndexChannel NewChannel(CatalogIndexChannelOptions options) => new(options); -} - -public class ElasticsearchMarkdownSemanticExporter(ILoggerFactory logFactory, IDiagnosticsCollector collector, string indexNamespace, DocumentationEndpoints endpoints) - : ElasticsearchMarkdownExporterBase, SemanticIndexChannel> - (logFactory, collector, endpoints) -{ - /// - protected override SemanticIndexChannelOptions NewOptions(DistributedTransport transport) => new(transport) - { - BulkOperationIdLookup = d => d.Url, - GetMapping = (inferenceId, _) => CreateMapping(inferenceId), - GetMappingSettings = (_, _) => CreateMappingSetting(), - IndexFormat = $"{Endpoint.IndexNamePrefix.ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", - ActiveSearchAlias = $"{Endpoint.IndexNamePrefix}-{indexNamespace.ToLowerInvariant()}", - IndexNumThreads = Endpoint.IndexNumThreads, - SearchNumThreads = Endpoint.SearchNumThreads, - InferenceCreateTimeout = TimeSpan.FromMinutes(Endpoint.BootstrapTimeout ?? 4), - }; - - /// - protected override SemanticIndexChannel NewChannel(SemanticIndexChannelOptions options) => new(options); -} - - -public abstract class ElasticsearchMarkdownExporterBase( - ILoggerFactory logFactory, - IDiagnosticsCollector collector, - DocumentationEndpoints endpoints -) - : IMarkdownExporter, IDisposable - where TChannelOptions : CatalogIndexChannelOptionsBase - where TChannel : CatalogIndexChannel -{ - private TChannel? _channel; - private readonly ILogger _logger = logFactory.CreateLogger(); - - protected abstract TChannelOptions NewOptions(DistributedTransport transport); - protected abstract TChannel NewChannel(TChannelOptions options); - - protected ElasticsearchEndpoint Endpoint { get; } = endpoints.Elasticsearch; - - protected static string CreateMappingSetting() => - // language=json - """ - { - "analysis": { - "analyzer": { - "synonyms_analyzer": { - "tokenizer": "whitespace", - "filter": [ - "lowercase", - "synonyms_filter" - ] - }, - "highlight_analyzer": { - "tokenizer": "standard", - "filter": [ - "lowercase", - "english_stop" - ] - }, - "hierarchy_analyzer": { "tokenizer": "path_tokenizer" } - }, - "filter": { - "synonyms_filter": { - "type": "synonym", - "synonyms_set": "docs", - "updateable": true - }, - "english_stop": { - "type": "stop", - "stopwords": "_english_" - } - }, - "tokenizer": { - "path_tokenizer": { - "type": "path_hierarchy", - "delimiter": "/" - } - } - } - } - """; - - protected static string CreateMapping(string? inferenceId) => - $$""" - { - "properties": { - "url" : { - "type": "keyword", - "fields": { - "match": { "type": "text" }, - "prefix": { "type": "text", "analyzer" : "hierarchy_analyzer" } - } - }, - "hash" : { "type" : "keyword" }, - "title": { - "type": "text", - "search_analyzer": "synonyms_analyzer", - "fields": { - "keyword": { - "type": "keyword" - } - {{(!string.IsNullOrWhiteSpace(inferenceId) ? $$""", "semantic_text": {{{InferenceMapping(inferenceId)}}}""" : "")}} - } - }, - "url_segment_count": { - "type": "integer" - }, - "body": { - "type": "text" - }, - "stripped_body": { - "type": "text", - "search_analyzer": "highlight_analyzer", - "term_vector": "with_positions_offsets" - } - {{(!string.IsNullOrWhiteSpace(inferenceId) ? AbstractInferenceMapping(inferenceId) : AbstractMapping())}} - } - } - """; - - private static string AbstractMapping() => - """ - , "abstract": { - "type": "text" - } - """; - - private static string InferenceMapping(string inferenceId) => - $""" - "type": "semantic_text", - "inference_id": "{inferenceId}" - """; - - private static string AbstractInferenceMapping(string inferenceId) => - // langugage=json - $$""" - , "abstract": { - {{InferenceMapping(inferenceId)}} - } - """; - - public async ValueTask StartAsync(Cancel ctx = default) - { - if (_channel != null) - return; - - var es = endpoints.Elasticsearch; - - var configuration = new ElasticsearchConfiguration(es.Uri) - { - Authentication = es.ApiKey is { } apiKey - ? new ApiKey(apiKey) - : es is { Username: { } username, Password: { } password } - ? new BasicAuthentication(username, password) - : null, - EnableHttpCompression = true, - DebugMode = Endpoint.DebugMode, - CertificateFingerprint = Endpoint.CertificateFingerprint, - ProxyAddress = Endpoint.ProxyAddress, - ProxyPassword = Endpoint.ProxyPassword, - ProxyUsername = Endpoint.ProxyUsername, - ServerCertificateValidationCallback = Endpoint.DisableSslVerification - ? CertificateValidations.AllowAll - : Endpoint.Certificate is { } cert - ? Endpoint.CertificateIsNotRoot - ? CertificateValidations.AuthorityPartOfChain(cert) - : CertificateValidations.AuthorityIsRoot(cert) - : null - }; - - var transport = new DistributedTransport(configuration); - - //The max num threads per allocated node, from testing its best to limit our max concurrency - //producing to this number as well - var options = NewOptions(transport); - var i = 0; - options.BufferOptions = new BufferOptions - { - OutboundBufferMaxSize = Endpoint.BufferSize, - ExportMaxConcurrency = Endpoint.IndexNumThreads, - ExportMaxRetries = Endpoint.MaxRetries, - }; - options.SerializerContext = SourceGenerationContext.Default; - options.ExportBufferCallback = () => - { - var count = Interlocked.Increment(ref i); - _logger.LogInformation("Exported {Count} documents to Elasticsearch index {Format}", - count * Endpoint.BufferSize, options.IndexFormat); - }; - options.ExportExceptionCallback = e => _logger.LogError(e, "Failed to export document"); - options.ServerRejectionCallback = items => _logger.LogInformation("Server rejection: {Rejection}", items.First().Item2); - _channel = NewChannel(options); - _logger.LogInformation($"Bootstrapping {nameof(SemanticIndexChannel)} Elasticsearch target for indexing"); - _ = await _channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); - } - - public async ValueTask StopAsync(Cancel ctx = default) - { - if (_channel is null) - return; - - _logger.LogInformation("Waiting to drain all inflight exports to Elasticsearch"); - var drained = await _channel.WaitForDrainAsync(null, ctx); - if (!drained) - collector.EmitGlobalError("Elasticsearch export: failed to complete indexing in a timely fashion while shutting down"); - - _logger.LogInformation("Refreshing target index {Index}", _channel.IndexName); - var refreshed = await _channel.RefreshAsync(ctx); - if (!refreshed) - _logger.LogError("Refreshing target index {Index} did not complete successfully", _channel.IndexName); - - _logger.LogInformation("Applying aliases to {Index}", _channel.IndexName); - var swapped = await _channel.ApplyAliasesAsync(ctx); - if (!swapped) - collector.EmitGlobalError($"${nameof(ElasticsearchMarkdownExporter)} failed to apply aliases to index {_channel.IndexName}"); - } - - public void Dispose() - { - if (_channel is not null) - { - _channel.Complete(); - _channel.Dispose(); - } - - GC.SuppressFinalize(this); - } - - private async ValueTask TryWrite(DocumentationDocument document, Cancel ctx = default) - { - if (_channel is null) - return false; - - if (_channel.TryWrite(document)) - return true; - - if (await _channel.WaitToWriteAsync(ctx)) - return _channel.TryWrite(document); - return false; - } - - public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, Cancel ctx) - { - var file = fileContext.SourceFile; - var url = file.Url; - - if (url is "/docs" or "/docs/404") - { - // Skip the root and 404 pages - _logger.LogInformation("Skipping export for {Url}", url); - return true; - } - - IPositionalNavigation navigation = fileContext.DocumentationSet; - - // Remove the first h1 because we already have the title - // and we don't want it to appear in the body - var h1 = fileContext.Document.Descendants().FirstOrDefault(h => h.Level == 1); - if (h1 is not null) - _ = fileContext.Document.Remove(h1); - - var body = LlmMarkdownExporter.ConvertToLlmMarkdown(fileContext.Document, fileContext.BuildContext); - - var headings = fileContext.Document.Descendants() - .Select(h => h.GetData("header") as string ?? string.Empty) // TODO: Confirm that 'header' data is correctly set for all HeadingBlock instances and that this extraction is reliable. - .Where(text => !string.IsNullOrEmpty(text)) - .ToArray(); - - var @abstract = !string.IsNullOrEmpty(body) - ? body[..Math.Min(body.Length, 400)] + " " + string.Join(" \n- ", headings) - : string.Empty; - - var doc = new DocumentationDocument - { - Url = url, - Hash = ShortId.Create(url, body), - Title = file.Title, - Body = body, - StrippedBody = body.StripMarkdown(), - Description = fileContext.SourceFile.YamlFrontMatter?.Description, - Abstract = @abstract, - Applies = fileContext.SourceFile.YamlFrontMatter?.AppliesTo, - UrlSegmentCount = url.Split('/', StringSplitOptions.RemoveEmptyEntries).Length, - Parents = navigation.GetParentsOfMarkdownFile(file).Select(i => new ParentDocument - { - Title = i.NavigationTitle, - Url = i.Url - }).Reverse().ToArray(), - Headings = headings, - }; - return await TryWrite(doc, ctx); - } - - /// - public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Cancel ctx) - { - if (_channel is null) - return false; - - return await _channel.RefreshAsync(ctx); - } -} diff --git a/src/Elastic.Markdown/Exporters/ExporterExtensions.cs b/src/Elastic.Markdown/Exporters/ExporterExtensions.cs index c544d887a..298244330 100644 --- a/src/Elastic.Markdown/Exporters/ExporterExtensions.cs +++ b/src/Elastic.Markdown/Exporters/ExporterExtensions.cs @@ -5,6 +5,7 @@ using Elastic.Documentation; using Elastic.Documentation.Configuration; using Elastic.Documentation.Configuration.Assembler; +using Elastic.Markdown.Exporters.Elasticsearch; using Microsoft.Extensions.Logging; namespace Elastic.Markdown.Exporters; @@ -24,9 +25,7 @@ string indexNamespace if (exportOptions.Contains(Exporter.Configuration)) markdownExporters.Add(new ConfigurationExporter(logFactory, context.ConfigurationFileProvider, context)); if (exportOptions.Contains(Exporter.Elasticsearch)) - markdownExporters.Add(new ElasticsearchMarkdownSemanticExporter(logFactory, context.Collector, indexNamespace, context.Endpoints)); - if (exportOptions.Contains(Exporter.ElasticsearchNoSemantic)) - markdownExporters.Add(new ElasticsearchMarkdownExporter(logFactory, context.Collector, indexNamespace, context.Endpoints)); + markdownExporters.Add(new ElasticsearchMarkdownExporter(logFactory, context.Collector, context.Endpoints, indexNamespace)); return markdownExporters; } } diff --git a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs index a892816e1..42ff53ba2 100644 --- a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs +++ b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs @@ -131,7 +131,10 @@ public async Task Index(IDiagnosticsCollector collector, if (bootstrapTimeout.HasValue) cfg.BootstrapTimeout = bootstrapTimeout.Value; - var exporters = new HashSet { noSemantic.GetValueOrDefault(false) ? ElasticsearchNoSemantic : Elasticsearch }; + if (noSemantic.HasValue) + cfg.NoSemantic = noSemantic.Value; + + var exporters = new HashSet { Elasticsearch }; return await BuildAll(collector, strict: false, environment, metadataOnly: true, showHints: false, exporters, fileSystem, ctx); } diff --git a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs index b7ef38baf..b4a4ee656 100644 --- a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs +++ b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs @@ -128,7 +128,10 @@ public async Task Index(IDiagnosticsCollector collector, if (bootstrapTimeout.HasValue) cfg.BootstrapTimeout = bootstrapTimeout.Value; - var exporters = new HashSet { noSemantic.GetValueOrDefault(false) ? ElasticsearchNoSemantic : Elasticsearch }; + if (noSemantic.HasValue) + cfg.NoSemantic = noSemantic.Value; + + var exporters = new HashSet { Elasticsearch }; return await Build(collector, fileSystem, metadataOnly: true, strict: false, path: path, output: null, pathPrefix: null,