Skip to content

Commit 29ffb6d

Browse files
committed
Introduce sync strategy to Elasticsearch exporter
1 parent 97de90d commit 29ffb6d

File tree

9 files changed

+108
-30
lines changed

9 files changed

+108
-30
lines changed

docs/cli/assembler/assembler-index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ docs-builder assembler index [options...] [-h|--help] [--version]
4444
`--index-name-prefix` `<string>`
4545
: The prefix for the computed index/alias names. Defaults: semantic-docs (optional)
4646

47+
`--force-reindex` `<bool?>`
48+
: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional)
49+
4750
`--buffer-size` `<int?>`
4851
: The number of documents to send to ES as part of the bulk. Defaults: 100 (optional)
4952

docs/cli/docset/index-command.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ docs-builder index [options...] [-h|--help] [--version]
4040
`--index-name-prefix` `<string>`
4141
: The prefix for the computed index/alias names. Defaults: semantic-docs (optional)
4242

43+
`--force-reindex` `<bool?>`
44+
: Force reindex strategy to semantic index, by default, we multiplex writes if the semantic index does not exist yet (optional)
45+
4346
`--buffer-size` `<int?>`
4447
: The number of documents to send to ES as part of the bulk. Defaults: 100 (optional)
4548

src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ public class ElasticsearchEndpoint
4444
public bool CertificateIsNotRoot { get; set; }
4545
public int? BootstrapTimeout { get; set; }
4646
public bool NoSemantic { get; set; }
47+
public bool ForceReindex { get; set; }
4748
}

src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,18 @@ public class ElasticsearchLexicalExporter(
2020
IDiagnosticsCollector collector,
2121
ElasticsearchEndpoint endpoint,
2222
string indexNamespace,
23-
DistributedTransport transport,
24-
DateTimeOffset batchIndexDate
23+
DistributedTransport transport
2524
)
2625
: ElasticsearchExporter<CatalogIndexChannelOptions<DocumentationDocument>, CatalogIndexChannel<DocumentationDocument>>
2726
(logFactory, collector, endpoint, transport, o => new(o), t => new(t)
2827
{
2928
BulkOperationIdLookup = d => d.Url,
30-
ScriptedHashBulkUpsertLookup = (d, channelHash) =>
31-
{
32-
var rand = string.Empty;
33-
//if (d.Url.StartsWith("/docs/reference/search-connectors"))
34-
// rand = Guid.NewGuid().ToString("N");
35-
d.Hash = HashedBulkUpdate.CreateHash(channelHash, rand, d.Url, d.Body ?? string.Empty, string.Join(",", d.Headings.OrderBy(h => h)));
36-
d.LastUpdated = batchIndexDate;
37-
d.BatchIndexDate = batchIndexDate;
38-
return new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date",
39-
new Dictionary<string, string>
40-
{
41-
{ "batch_index_date", d.BatchIndexDate.ToString("o") }
42-
});
43-
},
29+
// hash, last_updated and batch_index_date are all set before the docs are written to the channel
30+
ScriptedHashBulkUpsertLookup = (d, _) => new HashedBulkUpdate("hash", d.Hash, "ctx._source.batch_index_date = params.batch_index_date",
31+
new Dictionary<string, string>
32+
{
33+
{ "batch_index_date", d.BatchIndexDate.ToString("o") }
34+
}),
4435
GetMapping = () => CreateMapping(null),
4536
GetMappingSettings = CreateMappingSetting,
4637
IndexFormat =

src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,20 @@
77
using Elastic.Documentation.Diagnostics;
88
using Elastic.Documentation.Search;
99
using Elastic.Ingest.Elasticsearch;
10+
using Elastic.Ingest.Elasticsearch.Indices;
1011
using Elastic.Markdown.Helpers;
1112
using Elastic.Markdown.IO;
1213
using Elastic.Transport;
1314
using Elastic.Transport.Products.Elasticsearch;
1415
using Markdig.Syntax;
1516
using Microsoft.Extensions.Logging;
17+
using NetEscapades.EnumGenerators;
1618

1719
namespace Elastic.Markdown.Exporters.Elasticsearch;
1820

21+
[EnumExtensions]
22+
public enum IngestStrategy { Reindex, Multiplex }
23+
1924
public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable
2025
{
2126
private readonly IDiagnosticsCollector _collector;
@@ -27,6 +32,7 @@ public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable
2732

2833
private readonly DateTimeOffset _batchIndexDate = DateTimeOffset.UtcNow;
2934
private readonly DistributedTransport _transport;
35+
private IngestStrategy _indexStrategy;
3036

3137
public ElasticsearchMarkdownExporter(
3238
ILoggerFactory logFactory,
@@ -38,6 +44,7 @@ string indexNamespace
3844
_collector = collector;
3945
_logger = logFactory.CreateLogger<ElasticsearchMarkdownExporter>();
4046
_endpoint = endpoints.Elasticsearch;
47+
_indexStrategy = IngestStrategy.Reindex;
4148

4249
var es = endpoints.Elasticsearch;
4350

@@ -66,14 +73,41 @@ string indexNamespace
6673

6774
_transport = new DistributedTransport(configuration);
6875

69-
_lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport, _batchIndexDate);
76+
_lexicalChannel = new ElasticsearchLexicalExporter(logFactory, collector, es, indexNamespace, _transport);
7077
_semanticChannel = new ElasticsearchSemanticExporter(logFactory, collector, es, indexNamespace, _transport);
71-
7278
}
7379

7480
/// <inheritdoc />
75-
public async ValueTask StartAsync(Cancel ctx = default) =>
76-
await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
81+
public async ValueTask StartAsync(Cancel ctx = default)
82+
{
83+
_ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
84+
85+
var semanticIndex = _semanticChannel.Channel.IndexName;
86+
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
87+
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
88+
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
89+
{
90+
_logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex);
91+
_ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
92+
var semanticIndexPut = await _transport.PutAsync<StringResponse>(semanticIndex, PostData.String("{}"), ctx);
93+
if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode)
94+
throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}");
95+
_ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx);
96+
if (!_endpoint.ForceReindex)
97+
{
98+
_indexStrategy = IngestStrategy.Multiplex;
99+
_logger.LogInformation("Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex", semanticIndex);
100+
}
101+
}
102+
_logger.LogInformation("Using {IndexStrategy} to sync lexical index to semantic index", _indexStrategy.ToStringFast(true));
103+
}
104+
105+
public async ValueTask<long> CountAsync(string body, Cancel ctx = default)
106+
{
107+
var lexicalSearchAlias = _lexicalChannel.Channel.Options.ActiveSearchAlias;
108+
var countResponse = await _transport.PostAsync<DynamicResponse>($"/{lexicalSearchAlias}/_count", PostData.String(body), ctx);
109+
return countResponse.Body.Get<long>("count");
110+
}
77111

78112
/// <inheritdoc />
79113
public async ValueTask StopAsync(Cancel ctx = default)
@@ -82,18 +116,38 @@ public async ValueTask StopAsync(Cancel ctx = default)
82116
var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest");
83117

84118
var semanticIndex = _semanticChannel.Channel.IndexName;
85-
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
86119

87-
if (_endpoint.NoSemantic)
120+
var stopped = await _lexicalChannel.StopAsync(ctx);
121+
if (!stopped)
122+
throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}");
123+
124+
var updated = await CountAsync($$""" { "query": { "range": { "last_updated": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx);
125+
var total = await CountAsync($$""" { "query": { "range": { "batch_index_date": { "gte": "{{_batchIndexDate:o}}" } } } }""", ctx);
126+
var deleted = await CountAsync($$""" { "query": { "range": { "batch_index_date": { "lt": "{{_batchIndexDate:o}}" } } } }""", ctx);
127+
128+
// TODO emit these as metrics
129+
_logger.LogInformation("Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}", total, updated, deleted, lexicalWriteAlias);
130+
_logger.LogInformation("Syncing to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
131+
132+
if (_indexStrategy == IngestStrategy.Multiplex)
88133
{
89-
_logger.LogInformation("--no-semantic was specified so exiting early before syncing to {Index}", semanticIndex);
134+
if (!_endpoint.NoSemantic)
135+
_ = await _semanticChannel.StopAsync(ctx);
136+
else
137+
_logger.LogInformation("--no-semantic was specified when doing multiplex writes, not rolling over {SemanticIndex}", semanticIndex);
138+
139+
// cleanup lexical index of old data
140+
await DoDeleteByQuery(lexicalWriteAlias, ctx);
90141
return;
91142
}
92143

93-
var stopped = await _lexicalChannel.StopAsync(ctx);
94-
if (!stopped)
95-
throw new Exception($"Failed to stop {_lexicalChannel.GetType().Name}");
144+
if (_endpoint.NoSemantic)
145+
{
146+
_logger.LogInformation("--no-semantic was specified so exiting early before reindexing to {Index}", semanticIndex);
147+
return;
148+
}
96149

150+
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
97151
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
98152
{
99153
_logger.LogInformation("No semantic index exists yet, creating index {Index} for semantic search", semanticIndex);
@@ -148,6 +202,8 @@ public async ValueTask StopAsync(Cancel ctx = default)
148202
await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "deletions", ctx);
149203

150204
await DoDeleteByQuery(lexicalWriteAlias, ctx);
205+
206+
_logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
151207
}
152208

153209
private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx)
@@ -275,6 +331,18 @@ public async ValueTask<bool> ExportAsync(MarkdownExportFileContext fileContext,
275331
}).Reverse().ToArray(),
276332
Headings = headings
277333
};
334+
335+
var semanticHash = _semanticChannel.Channel.Options.ChannelHash;
336+
var lexicalHash = _lexicalChannel.Channel.Options.ChannelHash;
337+
var hash = HashedBulkUpdate.CreateHash(semanticHash, lexicalHash,
338+
doc.Url, doc.Body ?? string.Empty, string.Join(",", doc.Headings.OrderBy(h => h))
339+
);
340+
doc.Hash = hash;
341+
doc.LastUpdated = _batchIndexDate;
342+
doc.BatchIndexDate = _batchIndexDate;
343+
344+
if (_indexStrategy == IngestStrategy.Multiplex)
345+
return await _lexicalChannel.TryWrite(doc, ctx) && await _semanticChannel.TryWrite(doc, ctx);
278346
return await _lexicalChannel.TryWrite(doc, ctx);
279347
}
280348

src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ ICoreService githubActionsService
3838
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
3939
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
4040
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
41+
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
4142
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
4243
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
4344
/// <param name="debugMode">Buffer ES request/responses for better error messages and pass ?pretty to all requests</param>
@@ -64,6 +65,7 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
6465
int? bootstrapTimeout = null,
6566
// index options
6667
string? indexNamePrefix = null,
68+
bool? forceReindex = null,
6769
// channel buffer options
6870
int? bufferSize = null,
6971
int? maxRetries = null,
@@ -133,6 +135,8 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
133135

134136
if (noSemantic.HasValue)
135137
cfg.NoSemantic = noSemantic.Value;
138+
if (forceReindex.HasValue)
139+
cfg.ForceReindex = forceReindex.Value;
136140

137141
var exporters = new HashSet<Exporter> { Elasticsearch };
138142

src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ ICoreService githubActionsService
3535
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
3636
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
3737
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
38+
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
3839
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
3940
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
4041
/// <param name="debugMode">Buffer ES request/responses for better error messages and pass ?pretty to all requests</param>
@@ -61,6 +62,7 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
6162
int? bootstrapTimeout = null,
6263
// index options
6364
string? indexNamePrefix = null,
65+
bool? forceReindex = null,
6466
// channel buffer options
6567
int? bufferSize = null,
6668
int? maxRetries = null,
@@ -130,6 +132,8 @@ public async Task<bool> Index(IDiagnosticsCollector collector,
130132

131133
if (noSemantic.HasValue)
132134
cfg.NoSemantic = noSemantic.Value;
135+
if (forceReindex.HasValue)
136+
cfg.ForceReindex = forceReindex.Value;
133137

134138
var exporters = new HashSet<Exporter> { Elasticsearch };
135139

src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ ICoreService githubActionsService
3434
/// <param name="searchNumThreads">The number of search threads the inference endpoint should use. Defaults: 8</param>
3535
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
3636
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
37+
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
3738
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
3839
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
3940
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
@@ -63,6 +64,7 @@ public async Task<int> Index(
6364

6465
// index options
6566
string? indexNamePrefix = null,
67+
bool? forceReindex = null,
6668

6769
// channel buffer options
6870
int? bufferSize = null,
@@ -93,7 +95,7 @@ public async Task<int> Index(
9395
// inference options
9496
noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout,
9597
// channel and connection options
96-
indexNamePrefix, bufferSize, maxRetries, debugMode,
98+
indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode,
9799
// proxy options
98100
proxyAddress, proxyPassword, proxyUsername,
99101
// certificate options
@@ -106,7 +108,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs,
106108
// inference options
107109
state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout,
108110
// channel and connection options
109-
state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode,
111+
state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode,
110112
// proxy options
111113
state.proxyAddress, state.proxyPassword, state.proxyUsername,
112114
// certificate options

src/tooling/docs-builder/Commands/IndexCommand.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ ICoreService githubActionsService
3232
/// <param name="searchNumThreads">The number of search threads the inference endpoint should use. Defaults: 8</param>
3333
/// <param name="indexNumThreads">The number of index threads the inference endpoint should use. Defaults: 8</param>
3434
/// <param name="indexNamePrefix">The prefix for the computed index/alias names. Defaults: semantic-docs</param>
35+
/// <param name="forceReindex">Force reindex strategy to semantic index</param>
3536
/// <param name="bootstrapTimeout">Timeout in minutes for the inference endpoint creation. Defaults: 4</param>
3637
/// <param name="bufferSize">The number of documents to send to ES as part of the bulk. Defaults: 100</param>
3738
/// <param name="maxRetries">The number of times failed bulk items should be retried. Defaults: 3</param>
@@ -61,6 +62,7 @@ public async Task<int> Index(
6162

6263
// index options
6364
string? indexNamePrefix = null,
65+
bool? forceReindex = null,
6466

6567
// channel buffer options
6668
int? bufferSize = null,
@@ -91,7 +93,7 @@ public async Task<int> Index(
9193
// inference options
9294
noSemantic, indexNumThreads, searchNumThreads, bootstrapTimeout,
9395
// channel and connection options
94-
indexNamePrefix, bufferSize, maxRetries, debugMode,
96+
indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode,
9597
// proxy options
9698
proxyAddress, proxyPassword, proxyUsername,
9799
// certificate options
@@ -104,7 +106,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, st
104106
// inference options
105107
state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.bootstrapTimeout,
106108
// channel and connection options
107-
state.indexNamePrefix, state.bufferSize, state.maxRetries, state.debugMode,
109+
state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode,
108110
// proxy options
109111
state.proxyAddress, state.proxyPassword, state.proxyUsername,
110112
// certificate options

0 commit comments

Comments
 (0)