Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/cli/assembler/assembler-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ docs-builder assembler index [options...] [-h|--help] [--version]
`--index-name-prefix` `<string>`
: The prefix for the computed index/alias names. Defaults: semantic-docs (optional)

`--force-reindex` `<bool?>`
: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional)

`--buffer-size` `<int?>`
: The number of documents to send to ES as part of the bulk. Defaults: 100 (optional)

Expand Down
3 changes: 3 additions & 0 deletions docs/cli/docset/index-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ docs-builder index [options...] [-h|--help] [--version]
`--index-name-prefix` `<string>`
: The prefix for the computed index/alias names. Defaults: semantic-docs (optional)

`--force-reindex` `<bool?>`
: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional)

`--buffer-size` `<int?>`
: The number of documents to send to ES as part of the bulk. Defaults: 100 (optional)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ public class ElasticsearchEndpoint
public bool CertificateIsNotRoot { get; set; }
public int? BootstrapTimeout { get; set; }
public bool NoSemantic { get; set; }
public bool ForceReindex { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,18 @@ public class ElasticsearchLexicalExporter(
IDiagnosticsCollector collector,
ElasticsearchEndpoint endpoint,
string indexNamespace,
DistributedTransport transport,
DateTimeOffset batchIndexDate
DistributedTransport transport
)
: ElasticsearchExporter<CatalogIndexChannelOptions<DocumentationDocument>, CatalogIndexChannel<DocumentationDocument>>
(logFactory, collector, endpoint, transport, o => new(o), t => new(t)
{
BulkOperationIdLookup = d => d.Url,
ScriptedHashBulkUpsertLookup = (d, channelHash) =>
{
var rand = string.Empty;
//if (d.Url.StartsWith("/docs/reference/search-connectors"))
// rand = Guid.NewGuid().ToString("N");
d.Hash = HashedBulkUpdate.CreateHash(channelHash, rand, d.Url, d.Body ?? string.Empty, string.Join(",", d.Headings.OrderBy(h => h)));
d.LastUpdated = batchIndexDate;
d.BatchIndexDate = batchIndexDate;
return new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date",
new Dictionary<string, string>
{
{ "batch_index_date", d.BatchIndexDate.ToString("o") }
});
},
// hash, last_updated and batch_index_date are all set before the docs are written to the channel
ScriptedHashBulkUpsertLookup = (d, _) => new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date",
new Dictionary<string, string>
{
{ "batch_index_date", d.BatchIndexDate.ToString("o") }
}),
GetMapping = () => CreateMapping(null),
GetMappingSettings = CreateMappingSetting,
IndexFormat =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
using Elastic.Documentation.Diagnostics;
using Elastic.Documentation.Search;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Markdown.Helpers;
using Elastic.Markdown.IO;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using Markdig.Syntax;
using Microsoft.Extensions.Logging;
using NetEscapades.EnumGenerators;

namespace Elastic.Markdown.Exporters.Elasticsearch;

[EnumExtensions]
public enum IngestStrategy { Reindex, Multiplex }

public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable
{
private readonly IDiagnosticsCollector _collector;
Expand All @@ -27,6 +32,7 @@ public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable

private readonly DateTimeOffset _batchIndexDate = DateTimeOffset.UtcNow;
private readonly DistributedTransport _transport;
private IngestStrategy _indexStrategy;

public ElasticsearchMarkdownExporter(
ILoggerFactory logFactory,
Expand All @@ -38,6 +44,7 @@ string indexNamespace
_collector = collector;
_logger = logFactory.CreateLogger<ElasticsearchMarkdownExporter>();
_endpoint = endpoints.Elasticsearch;
_indexStrategy = IngestStrategy.Reindex;

var es = endpoints.Elasticsearch;

Expand Down Expand Up @@ -66,34 +73,76 @@ string indexNamespace

_transport = new DistributedTransport(configuration);

_lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport, _batchIndexDate);
_lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport);
_semanticChannel = new ElasticsearchSemanticExporter(logFactory, collector, es, indexNamespace, _transport);

}

/// <inheritdoc />
public async ValueTask StartAsync(Cancel ctx = default) =>
await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
public async ValueTask StartAsync(Cancel ctx = default)
{
_ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);

var semanticIndex = _semanticChannel.Channel.IndexName;
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
{
_logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex);
_ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
var semanticIndexPut = await _transport.PutAsync<StringResponse>(semanticIndex, PostData.String("{}"), ctx);
if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode)
throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}");
_ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx);
if (!_endpoint.ForceReindex)
{
_indexStrategy = IngestStrategy.Multiplex;
_logger.LogInformation("Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex", semanticIndex);
}
}
_logger.LogInformation("Using {IndexStrategy} to sync lexical index to semantic index", _indexStrategy.ToStringFast(true));
}

private async ValueTask<long> CountAsync(string index, string body, Cancel ctx = default)
{
var countResponse = await _transport.PostAsync<DynamicResponse>($"/{index}/_count", PostData.String(body), ctx);
return countResponse.Body.Get<long>("count");
}

/// <inheritdoc />
public async ValueTask StopAsync(Cancel ctx = default)
{
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest");

var semanticIndex = _semanticChannel.Channel.IndexName;
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);

if (_endpoint.NoSemantic)
var stopped = await _lexicalChannel.StopAsync(ctx);
if (!stopped)
throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}");

await QueryIngestStatistics(lexicalWriteAlias, ctx);

if (_indexStrategy == IngestStrategy.Multiplex)
{
_logger.LogInformation("--no-semantic was specified so exiting early before syncing to {Index}", semanticIndex);
if (!_endpoint.NoSemantic)
_ = await _semanticChannel.StopAsync(ctx);
else
_logger.LogInformation("--no-semantic was specified when doing multiplex writes, not rolling over {SemanticIndex}", semanticIndex);

// cleanup lexical index of old data
await DoDeleteByQuery(lexicalWriteAlias, ctx);
_ = await _lexicalChannel.RefreshAsync(ctx);
_logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
await QueryDocumentCounts(ctx);
return;
}

var stopped = await _lexicalChannel.StopAsync(ctx);
if (!stopped)
throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}");
if (_endpoint.NoSemantic)
{
_logger.LogInformation("--no-semantic was specified so exiting early before reindexing to {Index}", semanticIndex);
return;
}

var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
{
_logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex);
Expand Down Expand Up @@ -148,6 +197,35 @@ public async ValueTask StopAsync(Cancel ctx = default)
await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "deletions", ctx);

await DoDeleteByQuery(lexicalWriteAlias, ctx);

_ = await _lexicalChannel.RefreshAsync(ctx);
_ = await _semanticChannel.RefreshAsync(ctx);

_logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
await QueryDocumentCounts(ctx);
}

private async ValueTask QueryIngestStatistics(string lexicalWriteAlias, Cancel ctx)
{
var lexicalSearchAlias = _lexicalChannel.Channel.Options.ActiveSearchAlias;
var updated = await CountAsync(lexicalSearchAlias, $$""" { "query": { "range": { "last_updated": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx);
var total = await CountAsync(lexicalSearchAlias, $$""" { "query": { "range": { "batch_index_date": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx);
var deleted = await CountAsync(lexicalSearchAlias, $$""" { "query": { "range": { "batch_index_date": { "lt": "{{_batchIndexDate:o}}" } } } }""", ctx);

// TODO emit these as metrics
_logger.LogInformation("Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}", total, updated, deleted, lexicalWriteAlias);
_logger.LogInformation("Syncing to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
}

private async ValueTask QueryDocumentCounts(Cancel ctx)
{
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest");
var totalLexical = await CountAsync(lexicalWriteAlias, "{}", ctx);
var totalSemantic = await CountAsync(semanticWriteAlias, "{}", ctx);

// TODO emit these as metrics
_logger.LogInformation("Document counts -> Semantic Index: {TotalSemantic}, Lexical Index: {TotalLexical}", totalSemantic, totalLexical);
}

private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx)
Expand Down Expand Up @@ -275,6 +353,18 @@ public async ValueTask<bool> ExportAsync(MarkdownExportFileContext fileContext,
}).Reverse().ToArray(),
Headings = headings
};

var semanticHash = _semanticChannel.Channel.Options.ChannelHash;
var lexicalHash = _lexicalChannel.Channel.Options.ChannelHash;
var hash = HashedBulkUpdate.CreateHash(semanticHash, lexicalHash,
doc.Url, doc.Body ?? string.Empty, string.Join(",", doc.Headings.OrderBy(h => h))
);
doc.Hash = hash;
doc.LastUpdated = _batchIndexDate;
doc.BatchIndexDate = _batchIndexDate;

if (_indexStrategy == IngestStrategy.Multiplex)
return await _lexicalChannel.TryWrite(doc, ctx) && await _semanticChannel.TryWrite(doc, ctx);
return await _lexicalChannel.TryWrite(doc, ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ICoreService githubActionsService
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
/// <param name="debugMode">Buffer ES request/responses for better error messages and pass ?pretty to all requests</param>
Expand All @@ -64,6 +65,7 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
int? bootstrapTimeout = null,
// index options
string? indexNamePrefix = null,
bool? forceReindex = null,
// channel buffer options
int? bufferSize = null,
int? maxRetries = null,
Expand Down Expand Up @@ -133,6 +135,8 @@ public async Task<bool> Index(IDiagnosticsCollector collector,

if (noSemantic.HasValue)
cfg.NoSemantic = noSemantic.Value;
if (forceReindex.HasValue)
cfg.ForceReindex = forceReindex.Value;

var exporters = new HashSet<Exporter> { Elasticsearch };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ ICoreService githubActionsService
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
/// <param name="debugMode">Buffer ES request/responses for better error messages and pass ?pretty to all requests</param>
Expand All @@ -61,6 +62,7 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
int? bootstrapTimeout = null,
// index options
string? indexNamePrefix = null,
bool? forceReindex = null,
// channel buffer options
int? bufferSize = null,
int? maxRetries = null,
Expand Down Expand Up @@ -130,6 +132,8 @@ public async Task<bool> Index(IDiagnosticsCollector collector,

if (noSemantic.HasValue)
cfg.NoSemantic = noSemantic.Value;
if (forceReindex.HasValue)
cfg.ForceReindex = forceReindex.Value;

var exporters = new HashSet<Exporter> { Elasticsearch };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ICoreService githubActionsService
/// <param name="searchNumThreads">The number of search threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
Expand Down Expand Up @@ -63,6 +64,7 @@ public async Task<int> Index(

// index options
string? indexNamePrefix = null,
bool? forceReindex = null,

// channel buffer options
int? bufferSize = null,
Expand Down Expand Up @@ -93,7 +95,7 @@ public async Task<int> Index(
// inference options
noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout,
// channel and connection options
indexNamePrefix, bufferSize, maxRetries, debugMode,
indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode,
// proxy options
proxyAddress, proxyPassword, proxyUsername,
// certificate options
Expand All @@ -106,7 +108,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs,
// inference options
state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout,
// channel and connection options
state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode,
state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode,
// proxy options
state.proxyAddress, state.proxyPassword, state.proxyUsername,
// certificate options
Expand Down
6 changes: 4 additions & 2 deletions src/tooling/docs-builder/Commands/IndexCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ICoreService githubActionsService
/// <param name="searchNumThreads">The number of search threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
Expand Down Expand Up @@ -61,6 +62,7 @@ public async Task<int> Index(

// index options
string? indexNamePrefix = null,
bool? forceReindex = null,

// channel buffer options
int? bufferSize = null,
Expand Down Expand Up @@ -91,7 +93,7 @@ public async Task<int> Index(
// inference options
noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout,
// channel and connection options
indexNamePrefix, bufferSize, maxRetries, debugMode,
indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode,
// proxy options
proxyAddress, proxyPassword, proxyUsername,
// certificate options
Expand All @@ -104,7 +106,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, st
// inference options
state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout,
// channel and connection options
state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode,
state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode,
// proxy options
state.proxyAddress, state.proxyPassword, state.proxyUsername,
// certificate options
Expand Down
Loading