diff --git a/docs/cli/assembler/assembler-index.md b/docs/cli/assembler/assembler-index.md index ea8d6947c..5d551e4b4 100644 --- a/docs/cli/assembler/assembler-index.md +++ b/docs/cli/assembler/assembler-index.md @@ -44,6 +44,9 @@ docs-builder assembler index [options...] [-h|--help] [--version] `--index-name-prefix` `` : The prefix for the computed index/alias names. Defaults: semantic-docs (optional) +`--force-reindex` `` +: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional) + `--buffer-size` `` : The number of documents to send to ES as part of the bulk. Defaults: 100 (optional) diff --git a/docs/cli/docset/index-command.md b/docs/cli/docset/index-command.md index 833e6a8a6..32aa3a25b 100644 --- a/docs/cli/docset/index-command.md +++ b/docs/cli/docset/index-command.md @@ -40,6 +40,9 @@ docs-builder index [options...] [-h|--help] [--version] `--index-name-prefix` `` : The prefix for the computed index/alias names. Defaults: semantic-docs (optional) +`--force-reindex` `` +: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional) + `--buffer-size` `` : The number of documents to send to ES as part of the bulk. Defaults: 100 (optional) diff --git a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs index c3fc345f1..c9a0b9361 100644 --- a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs +++ b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs @@ -44,4 +44,5 @@ public class ElasticsearchEndpoint public bool CertificateIsNotRoot { get; set; } public int? BootstrapTimeout { get; set; } public bool NoSemantic { get; set; } + public bool ForceReindex { get; set; } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs index f731b9b10..199724929 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs @@ -20,27 +20,18 @@ public class ElasticsearchLexicalExporter( IDiagnosticsCollector collector, ElasticsearchEndpoint endpoint, string indexNamespace, - DistributedTransport transport, - DateTimeOffset batchIndexDate + DistributedTransport transport ) : ElasticsearchExporter, CatalogIndexChannel> (logFactory, collector, endpoint, transport, o => new(o), t => new(t) { BulkOperationIdLookup = d => d.Url, - ScriptedHashBulkUpsertLookup = (d, channelHash) => - { - var rand = string.Empty; - //if (d.Url.StartsWith("/docs/reference/search-connectors")) - // rand = Guid.NewGuid().ToString("N"); - d.Hash = HashedBulkUpdate.CreateHash(channelHash, rand, d.Url, d.Body ?? string.Empty, string.Join(",", d.Headings.OrderBy(h => h))); - d.LastUpdated = batchIndexDate; - d.BatchIndexDate = batchIndexDate; - return new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date", - new Dictionary - { - { "batch_index_date", d.BatchIndexDate.ToString("o") } - }); - }, + // hash, last_updated and batch_index_date are all set before the docs are written to the channel + ScriptedHashBulkUpsertLookup = (d, _) => new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date", + new Dictionary + { + { "batch_index_date", d.BatchIndexDate.ToString("o") } + }), GetMapping = () => CreateMapping(null), GetMappingSettings = CreateMappingSetting, IndexFormat = diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index 7cdebf662..fb03b113c 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -7,15 +7,20 @@ using Elastic.Documentation.Diagnostics; using Elastic.Documentation.Search; using Elastic.Ingest.Elasticsearch; +using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Markdown.Helpers; using Elastic.Markdown.IO; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Markdig.Syntax; using Microsoft.Extensions.Logging; +using NetEscapades.EnumGenerators; namespace Elastic.Markdown.Exporters.Elasticsearch; +[EnumExtensions] +public enum IngestStrategy { Reindex, Multiplex } + public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable { private readonly IDiagnosticsCollector _collector; @@ -27,6 +32,7 @@ public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable private readonly DateTimeOffset _batchIndexDate = DateTimeOffset.UtcNow; private readonly DistributedTransport _transport; + private IngestStrategy _indexStrategy; public ElasticsearchMarkdownExporter( ILoggerFactory logFactory, @@ -38,6 +44,7 @@ string indexNamespace _collector = collector; _logger = logFactory.CreateLogger(); _endpoint = endpoints.Elasticsearch; + _indexStrategy = IngestStrategy.Reindex; var es = endpoints.Elasticsearch; @@ -66,34 +73,76 @@ string indexNamespace _transport = new DistributedTransport(configuration); - _lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport, _batchIndexDate); + _lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport); _semanticChannel = new ElasticsearchSemanticExporter(logFactory, collector, es, indexNamespace, _transport); - } /// - public async ValueTask StartAsync(Cancel ctx = default) => - await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); + public async ValueTask StartAsync(Cancel ctx = default) + { + _ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); + + var semanticIndex = _semanticChannel.Channel.IndexName; + var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest"); + var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx); + if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex); + _ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); + var semanticIndexPut = await _transport.PutAsync(semanticIndex, PostData.String("{}"), ctx); + if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode) + throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}"); + _ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx); + if (!_endpoint.ForceReindex) + { + _indexStrategy = IngestStrategy.Multiplex; + _logger.LogInformation("Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex", semanticIndex); + } + } + _logger.LogInformation("Using {IndexStrategy} to sync lexical index to semantic index", _indexStrategy.ToStringFast(true)); + } + + private async ValueTask CountAsync(string index, string body, Cancel ctx = default) + { + var countResponse = await _transport.PostAsync($"/{index}/_count", PostData.String(body), ctx); + return countResponse.Body.Get("count"); + } /// public async ValueTask StopAsync(Cancel ctx = default) { var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest"); var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest"); - var semanticIndex = _semanticChannel.Channel.IndexName; - var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx); - if (_endpoint.NoSemantic) + var stopped = await _lexicalChannel.StopAsync(ctx); + if (!stopped) + throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}"); + + await QueryIngestStatistics(lexicalWriteAlias, ctx); + + if (_indexStrategy == IngestStrategy.Multiplex) { - _logger.LogInformation("--no-semantic was specified so exiting early before syncing to {Index}", semanticIndex); + if (!_endpoint.NoSemantic) + _ = await _semanticChannel.StopAsync(ctx); + else + _logger.LogInformation("--no-semantic was specified when doing multiplex writes, not rolling over {SemanticIndex}", semanticIndex); + + // cleanup lexical index of old data + await DoDeleteByQuery(lexicalWriteAlias, ctx); + _ = await _lexicalChannel.RefreshAsync(ctx); + _logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true)); + await QueryDocumentCounts(ctx); return; } - var stopped = await _lexicalChannel.StopAsync(ctx); - if (!stopped) - throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}"); + if (_endpoint.NoSemantic) + { + _logger.LogInformation("--no-semantic was specified so exiting early before reindexing to {Index}", semanticIndex); + return; + } + var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx); if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode) { _logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex); @@ -148,6 +197,35 @@ public async ValueTask StopAsync(Cancel ctx = default) await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "deletions", ctx); await DoDeleteByQuery(lexicalWriteAlias, ctx); + + _ = await _lexicalChannel.RefreshAsync(ctx); + _ = await _semanticChannel.RefreshAsync(ctx); + + _logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true)); + await QueryDocumentCounts(ctx); + } + + private async ValueTask QueryIngestStatistics(string lexicalWriteAlias, Cancel ctx) + { + var lexicalSearchAlias = _lexicalChannel.Channel.Options.ActiveSearchAlias; + var updated = await CountAsync(lexicalSearchAlias, $$""" { "query": { "range": { "last_updated": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx); + var total = await CountAsync(lexicalSearchAlias, $$""" { "query": { "range": { "batch_index_date": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx); + var deleted = await CountAsync(lexicalSearchAlias, $$""" { "query": { "range": { "batch_index_date": { "lt": "{{_batchIndexDate:o}}" } } } }""", ctx); + + // TODO emit these as metrics + _logger.LogInformation("Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}", total, updated, deleted, lexicalWriteAlias); + _logger.LogInformation("Syncing to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true)); + } + + private async ValueTask QueryDocumentCounts(Cancel ctx) + { + var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest"); + var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest"); + var totalLexical = await CountAsync(lexicalWriteAlias, "{}", ctx); + var totalSemantic = await CountAsync(semanticWriteAlias, "{}", ctx); + + // TODO emit these as metrics + _logger.LogInformation("Document counts -> Semantic Index: {TotalSemantic}, Lexical Index: {TotalLexical}", totalSemantic, totalLexical); } private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) @@ -275,6 +353,18 @@ public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, }).Reverse().ToArray(), Headings = headings }; + + var semanticHash = _semanticChannel.Channel.Options.ChannelHash; + var lexicalHash = _lexicalChannel.Channel.Options.ChannelHash; + var hash = HashedBulkUpdate.CreateHash(semanticHash, lexicalHash, + doc.Url, doc.Body ?? string.Empty, string.Join(",", doc.Headings.OrderBy(h => h)) + ); + doc.Hash = hash; + doc.LastUpdated = _batchIndexDate; + doc.BatchIndexDate = _batchIndexDate; + + if (_indexStrategy == IngestStrategy.Multiplex) + return await _lexicalChannel.TryWrite(doc, ctx) && await _semanticChannel.TryWrite(doc, ctx); return await _lexicalChannel.TryWrite(doc, ctx); } diff --git a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs index 42ff53ba2..363660069 100644 --- a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs +++ b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs @@ -38,6 +38,7 @@ ICoreService githubActionsService /// The number of index threads the inference endpoint should use. Defaults: 8 /// Timeout in minutes for the inference endpoint creation. Defaults: 4 /// The prefix for the computed index/alias names. Defaults: semantic-docs + /// Force reindex strategy to semantic index /// The number of documents to send to ES as part of the bulk. Defaults: 100 /// The number of times failed bulk items should be retried. Defaults: 3 /// Buffer ES request/responses for better error messages and pass ?pretty to all requests @@ -64,6 +65,7 @@ public async Task Index(IDiagnosticsCollector collector, int? bootstrapTimeout = null, // index options string? indexNamePrefix = null, + bool? forceReindex = null, // channel buffer options int? bufferSize = null, int? maxRetries = null, @@ -133,6 +135,8 @@ public async Task Index(IDiagnosticsCollector collector, if (noSemantic.HasValue) cfg.NoSemantic = noSemantic.Value; + if (forceReindex.HasValue) + cfg.ForceReindex = forceReindex.Value; var exporters = new HashSet { Elasticsearch }; diff --git a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs index b4a4ee656..f26ba4092 100644 --- a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs +++ b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs @@ -35,6 +35,7 @@ ICoreService githubActionsService /// The number of index threads the inference endpoint should use. Defaults: 8 /// Timeout in minutes for the inference endpoint creation. Defaults: 4 /// The prefix for the computed index/alias names. Defaults: semantic-docs + /// Force reindex strategy to semantic index /// The number of documents to send to ES as part of the bulk. Defaults: 100 /// The number of times failed bulk items should be retried. Defaults: 3 /// Buffer ES request/responses for better error messages and pass ?pretty to all requests @@ -61,6 +62,7 @@ public async Task Index(IDiagnosticsCollector collector, int? bootstrapTimeout = null, // index options string? indexNamePrefix = null, + bool? forceReindex = null, // channel buffer options int? bufferSize = null, int? maxRetries = null, @@ -130,6 +132,8 @@ public async Task Index(IDiagnosticsCollector collector, if (noSemantic.HasValue) cfg.NoSemantic = noSemantic.Value; + if (forceReindex.HasValue) + cfg.ForceReindex = forceReindex.Value; var exporters = new HashSet { Elasticsearch }; diff --git a/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs b/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs index 9086a0d79..2dfcc5c89 100644 --- a/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs +++ b/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs @@ -34,6 +34,7 @@ ICoreService githubActionsService /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// The prefix for the computed index/alias names. Defaults: semantic-docs + /// Force reindex strategy to semantic index /// Timeout in minutes for the inference endpoint creation. Defaults: 4 /// The number of documents to send to ES as part of the bulk. Defaults: 100 /// The number of times failed bulk items should be retried. Defaults: 3 @@ -63,6 +64,7 @@ public async Task Index( // index options string? indexNamePrefix = null, + bool? forceReindex = null, // channel buffer options int? bufferSize = null, @@ -93,7 +95,7 @@ public async Task Index( // inference options noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout, // channel and connection options - indexNamePrefix, bufferSize, maxRetries, debugMode, + indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode, // proxy options proxyAddress, proxyPassword, proxyUsername, // certificate options @@ -106,7 +108,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, // inference options state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout, // channel and connection options - state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode, + state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode, // proxy options state.proxyAddress, state.proxyPassword, state.proxyUsername, // certificate options diff --git a/src/tooling/docs-builder/Commands/IndexCommand.cs b/src/tooling/docs-builder/Commands/IndexCommand.cs index e0ec4dd80..48f2f1f3a 100644 --- a/src/tooling/docs-builder/Commands/IndexCommand.cs +++ b/src/tooling/docs-builder/Commands/IndexCommand.cs @@ -32,6 +32,7 @@ ICoreService githubActionsService /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// The prefix for the computed index/alias names. Defaults: semantic-docs + /// Force reindex strategy to semantic index /// Timeout in minutes for the inference endpoint creation. Defaults: 4 /// The number of documents to send to ES as part of the bulk. Defaults: 100 /// The number of times failed bulk items should be retried. Defaults: 3 @@ -61,6 +62,7 @@ public async Task Index( // index options string? indexNamePrefix = null, + bool? forceReindex = null, // channel buffer options int? bufferSize = null, @@ -91,7 +93,7 @@ public async Task Index( // inference options noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout, // channel and connection options - indexNamePrefix, bufferSize, maxRetries, debugMode, + indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode, // proxy options proxyAddress, proxyPassword, proxyUsername, // certificate options @@ -104,7 +106,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, st // inference options state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout, // channel and connection options - state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode, + state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode, // proxy options state.proxyAddress, state.proxyPassword, state.proxyUsername, // certificate options