Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/cli/assembler/assembler-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ docs-builder assembler index [options...] [-h|--help] [--version]
`--index-name-prefix` `<string>`
: The prefix for the computed index/alias names. Defaults: semantic-docs (optional)

`--force-reindex` `<bool?>`
: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional)

`--buffer-size` `<int?>`
: The number of documents to send to ES as part of the bulk. Defaults: 100 (optional)

Expand Down
3 changes: 3 additions & 0 deletions docs/cli/docset/index-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ docs-builder index [options...] [-h|--help] [--version]
`--index-name-prefix` `<string>`
: The prefix for the computed index/alias names. Defaults: semantic-docs (optional)

`--force-reindex` `<bool?>`
: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional)

`--buffer-size` `<int?>`
: The number of documents to send to ES as part of the bulk. Defaults: 100 (optional)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ public class ElasticsearchEndpoint
public bool CertificateIsNotRoot { get; set; }
public int? BootstrapTimeout { get; set; }
public bool NoSemantic { get; set; }
public bool ForceReindex { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,18 @@ public class ElasticsearchLexicalExporter(
IDiagnosticsCollector collector,
ElasticsearchEndpoint endpoint,
string indexNamespace,
DistributedTransport transport,
DateTimeOffset batchIndexDate
DistributedTransport transport
)
: ElasticsearchExporter<CatalogIndexChannelOptions<DocumentationDocument>, CatalogIndexChannel<DocumentationDocument>>
(logFactory, collector, endpoint, transport, o => new(o), t => new(t)
{
BulkOperationIdLookup = d => d.Url,
ScriptedHashBulkUpsertLookup = (d, channelHash) =>
{
var rand = string.Empty;
//if (d.Url.StartsWith("/docs/reference/search-connectors"))
// rand = Guid.NewGuid().ToString("N");
d.Hash = HashedBulkUpdate.CreateHash(channelHash, rand, d.Url, d.Body ?? string.Empty, string.Join(",", d.Headings.OrderBy(h => h)));
d.LastUpdated = batchIndexDate;
d.BatchIndexDate = batchIndexDate;
return new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date",
new Dictionary<string, string>
{
{ "batch_index_date", d.BatchIndexDate.ToString("o") }
});
},
// hash, last_updated and batch_index_date are all set before the docs are written to the channel
ScriptedHashBulkUpsertLookup = (d, _) => new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date",
new Dictionary<string, string>
{
{ "batch_index_date", d.BatchIndexDate.ToString("o") }
}),
GetMapping = () => CreateMapping(null),
GetMappingSettings = CreateMappingSetting,
IndexFormat =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
using Elastic.Documentation.Diagnostics;
using Elastic.Documentation.Search;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Markdown.Helpers;
using Elastic.Markdown.IO;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using Markdig.Syntax;
using Microsoft.Extensions.Logging;
using NetEscapades.EnumGenerators;

namespace Elastic.Markdown.Exporters.Elasticsearch;

[EnumExtensions]
public enum IngestStrategy { Reindex, Multiplex }

public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable
{
private readonly IDiagnosticsCollector _collector;
Expand All @@ -27,6 +32,7 @@ public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable

private readonly DateTimeOffset _batchIndexDate = DateTimeOffset.UtcNow;
private readonly DistributedTransport _transport;
private IngestStrategy _indexStrategy;

public ElasticsearchMarkdownExporter(
ILoggerFactory logFactory,
Expand All @@ -38,6 +44,7 @@ string indexNamespace
_collector = collector;
_logger = logFactory.CreateLogger<ElasticsearchMarkdownExporter>();
_endpoint = endpoints.Elasticsearch;
_indexStrategy = IngestStrategy.Reindex;

var es = endpoints.Elasticsearch;

Expand Down Expand Up @@ -66,14 +73,41 @@ string indexNamespace

_transport = new DistributedTransport(configuration);

_lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport, _batchIndexDate);
_lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport);
_semanticChannel = new ElasticsearchSemanticExporter(logFactory, collector, es, indexNamespace, _transport);

}

/// <inheritdoc />
public async ValueTask StartAsync(Cancel ctx = default) =>
await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
public async ValueTask StartAsync(Cancel ctx = default)
{
_ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);

var semanticIndex = _semanticChannel.Channel.IndexName;
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
{
_logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex);
_ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
var semanticIndexPut = await _transport.PutAsync<StringResponse>(semanticIndex, PostData.String("{}"), ctx);
if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode)
throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}");
_ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx);
if (!_endpoint.ForceReindex)
{
_indexStrategy = IngestStrategy.Multiplex;
_logger.LogInformation("Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex", semanticIndex);
}
}
_logger.LogInformation("Using {IndexStrategy} to sync lexical index to semantic index", _indexStrategy.ToStringFast(true));
}

public async ValueTask<long> CountAsync(string body, Cancel ctx = default)
{
var lexicalSearchAlias = _lexicalChannel.Channel.Options.ActiveSearchAlias;
var countResponse = await _transport.PostAsync<DynamicResponse>($"/{lexicalSearchAlias}/_count", PostData.String(body), ctx);
return countResponse.Body.Get<long>("count");
}

/// <inheritdoc />
public async ValueTask StopAsync(Cancel ctx = default)
Expand All @@ -82,18 +116,38 @@ public async ValueTask StopAsync(Cancel ctx = default)
var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest");

var semanticIndex = _semanticChannel.Channel.IndexName;
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);

if (_endpoint.NoSemantic)
var stopped = await _lexicalChannel.StopAsync(ctx);
if (!stopped)
throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}");

var updated = await CountAsync($$""" { "query": { "range": { "last_updated": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx);
var total = await CountAsync($$""" { "query": { "range": { "batch_index_date": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx);
var deleted = await CountAsync($$""" { "query": { "range": { "batch_index_date": { "lt": "{{_batchIndexDate:o}}" } } } }""", ctx);

// TODO emit these as metrics
_logger.LogInformation("Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}", total, updated, deleted, lexicalWriteAlias);
_logger.LogInformation("Syncing to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));

if (_indexStrategy == IngestStrategy.Multiplex)
{
_logger.LogInformation("--no-semantic was specified so exiting early before syncing to {Index}", semanticIndex);
if (!_endpoint.NoSemantic)
_ = await _semanticChannel.StopAsync(ctx);
else
_logger.LogInformation("--no-semantic was specified when doing multiplex writes, not rolling over {SemanticIndex}", semanticIndex);

// cleanup lexical index of old data
await DoDeleteByQuery(lexicalWriteAlias, ctx);
return;
}

var stopped = await _lexicalChannel.StopAsync(ctx);
if (!stopped)
throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}");
if (_endpoint.NoSemantic)
{
_logger.LogInformation("--no-semantic was specified so exiting early before reindexing to {Index}", semanticIndex);
return;
}

var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
{
_logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex);
Expand Down Expand Up @@ -148,6 +202,8 @@ public async ValueTask StopAsync(Cancel ctx = default)
await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "deletions", ctx);

await DoDeleteByQuery(lexicalWriteAlias, ctx);

_logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
}

private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx)
Expand Down Expand Up @@ -275,6 +331,18 @@ public async ValueTask<bool> ExportAsync(MarkdownExportFileContext fileContext,
}).Reverse().ToArray(),
Headings = headings
};

var semanticHash = _semanticChannel.Channel.Options.ChannelHash;
var lexicalHash = _lexicalChannel.Channel.Options.ChannelHash;
var hash = HashedBulkUpdate.CreateHash(semanticHash, lexicalHash,
doc.Url, doc.Body ?? string.Empty, string.Join(",", doc.Headings.OrderBy(h => h))
);
doc.Hash = hash;
doc.LastUpdated = _batchIndexDate;
doc.BatchIndexDate = _batchIndexDate;

if (_indexStrategy == IngestStrategy.Multiplex)
return await _lexicalChannel.TryWrite(doc, ctx) && await _semanticChannel.TryWrite(doc, ctx);
return await _lexicalChannel.TryWrite(doc, ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ICoreService githubActionsService
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
/// <param name="debugMode">Buffer ES request/responses for better error messages and pass ?pretty to all requests</param>
Expand All @@ -64,6 +65,7 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
int? bootstrapTimeout = null,
// index options
string? indexNamePrefix = null,
bool? forceReindex = null,
// channel buffer options
int? bufferSize = null,
int? maxRetries = null,
Expand Down Expand Up @@ -133,6 +135,8 @@ public async Task<bool> Index(IDiagnosticsCollector collector,

if (noSemantic.HasValue)
cfg.NoSemantic = noSemantic.Value;
if (forceReindex.HasValue)
cfg.ForceReindex = forceReindex.Value;

var exporters = new HashSet<Exporter> { Elasticsearch };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ ICoreService githubActionsService
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
/// <param name="debugMode">Buffer ES request/responses for better error messages and pass ?pretty to all requests</param>
Expand All @@ -61,6 +62,7 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
int? bootstrapTimeout = null,
// index options
string? indexNamePrefix = null,
bool? forceReindex = null,
// channel buffer options
int? bufferSize = null,
int? maxRetries = null,
Expand Down Expand Up @@ -130,6 +132,8 @@ public async Task<bool> Index(IDiagnosticsCollector collector,

if (noSemantic.HasValue)
cfg.NoSemantic = noSemantic.Value;
if (forceReindex.HasValue)
cfg.ForceReindex = forceReindex.Value;

var exporters = new HashSet<Exporter> { Elasticsearch };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ICoreService githubActionsService
/// <param name="searchNumThreads">The number of search threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
Expand Down Expand Up @@ -63,6 +64,7 @@ public async Task<int> Index(

// index options
string? indexNamePrefix = null,
bool? forceReindex = null,

// channel buffer options
int? bufferSize = null,
Expand Down Expand Up @@ -93,7 +95,7 @@ public async Task<int> Index(
// inference options
noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout,
// channel and connection options
indexNamePrefix, bufferSize, maxRetries, debugMode,
indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode,
// proxy options
proxyAddress, proxyPassword, proxyUsername,
// certificate options
Expand All @@ -106,7 +108,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs,
// inference options
state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout,
// channel and connection options
state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode,
state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode,
// proxy options
state.proxyAddress, state.proxyPassword, state.proxyUsername,
// certificate options
Expand Down
6 changes: 4 additions & 2 deletions src/tooling/docs-builder/Commands/IndexCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ICoreService githubActionsService
/// <param name="searchNumThreads">The number of search threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
Expand Down Expand Up @@ -61,6 +62,7 @@ public async Task<int> Index(

// index options
string? indexNamePrefix = null,
bool? forceReindex = null,

// channel buffer options
int? bufferSize = null,
Expand Down Expand Up @@ -91,7 +93,7 @@ public async Task<int> Index(
// inference options
noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout,
// channel and connection options
indexNamePrefix, bufferSize, maxRetries, debugMode,
indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode,
// proxy options
proxyAddress, proxyPassword, proxyUsername,
// certificate options
Expand All @@ -104,7 +106,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, st
// inference options
state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout,
// channel and connection options
state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode,
state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode,
// proxy options
state.proxyAddress, state.proxyPassword, state.proxyUsername,
// certificate options
Expand Down
Loading