Skip to content

Commit 0e68c58

Browse files
committed
Ensure we reuse and create new indices based on channel hashes (settings/mappings)
1 parent af01e01 commit 0e68c58

File tree

4 files changed

+83
-29
lines changed

4 files changed

+83
-29
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<PackageVersion Include="Elastic.Aspire.Hosting.Elasticsearch" Version="9.3.0" />
3030
<PackageVersion Include="Elastic.Clients.Elasticsearch" Version="9.1.4" />
3131
<PackageVersion Include="FakeItEasy" Version="8.3.0" />
32-
<PackageVersion Include="Elastic.Ingest.Elasticsearch" Version="0.16.0" />
32+
<PackageVersion Include="Elastic.Ingest.Elasticsearch" Version="0.16.3" />
3333
<PackageVersion Include="InMemoryLogger" Version="1.0.66" />
3434
<PackageVersion Include="MartinCostello.Logging.XUnit.v3" Version="0.6.0" />
3535
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.7" />

src/Elastic.Documentation/Search/DocumentationDocument.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System.Text.Json.Serialization;
66
using Elastic.Documentation.AppliesTo;
7-
using Elastic.Documentation.Extensions;
87

98
namespace Elastic.Documentation.Search;
109

src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchExporter.cs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ Func<DistributedTransport, TChannelOptions> createOptions
9393
options.ExportBufferCallback = () =>
9494
{
9595
var count = Interlocked.Increment(ref i);
96-
_logger.LogInformation("Exported {Count} documents to Elasticsearch index {Format}",
97-
count * endpoint.BufferSize, string.Format(options.IndexFormat, "latest"));
96+
_logger.LogInformation("Exported {Count} documents to Elasticsearch index {IndexName}",
97+
count * endpoint.BufferSize, Channel?.IndexName ?? string.Format(options.IndexFormat, "latest"));
9898
};
9999
options.ExportExceptionCallback = e =>
100100
{
@@ -103,7 +103,7 @@ Func<DistributedTransport, TChannelOptions> createOptions
103103
};
104104
options.ServerRejectionCallback = items => _logger.LogInformation("Server rejection: {Rejection}", items.First().Item2);
105105
Channel = createChannel(options);
106-
_logger.LogInformation($"Bootstrapping {nameof(SemanticIndexChannel<DocumentationDocument>)} Elasticsearch target for indexing");
106+
_logger.LogInformation("Created {Channel} Elasticsearch target for indexing", typeof(TChannel).Name);
107107
}
108108

109109
public async ValueTask<bool> StopAsync(Cancel ctx = default)
@@ -183,7 +183,7 @@ protected static string CreateMappingSetting() =>
183183
""";
184184

185185
protected static string CreateMapping(string? inferenceId) =>
186-
$$"""
186+
$$$$""""
187187
{
188188
"properties": {
189189
"url" : {
@@ -200,8 +200,27 @@ protected static string CreateMapping(string? inferenceId) =>
200200
"sub-type" : { "type" : "keyword" },
201201
"lifecycle" : { "type" : "keyword" },
202202
"version" : { "type" : "version" }
203-
}
204-
},
203+
}
204+
},
205+
"parents" : {
206+
"type" : "object",
207+
"properties" : {
208+
"url" : {
209+
"type": "keyword",
210+
"fields": {
211+
"match": { "type": "text" },
212+
"prefix": { "type": "text", "analyzer" : "hierarchy_analyzer" }
213+
}
214+
},
215+
"title": {
216+
"type": "text",
217+
"search_analyzer": "synonyms_analyzer",
218+
"fields": {
219+
"keyword": { "type": "keyword" }
220+
}
221+
}
222+
}
223+
},
205224
"hash" : { "type" : "keyword" },
206225
"title": {
207226
"type": "text",
@@ -210,7 +229,7 @@ protected static string CreateMapping(string? inferenceId) =>
210229
"keyword": {
211230
"type": "keyword"
212231
}
213-
{{(!string.IsNullOrWhiteSpace(inferenceId) ? $$""", "semantic_text": {{{InferenceMapping(inferenceId)}}}""" : "")}}
232+
{{{{(!string.IsNullOrWhiteSpace(inferenceId) ? $$""", "semantic_text": {{{InferenceMapping(inferenceId)}}}""" : "")}}}}
214233
}
215234
},
216235
"url_segment_count": {
@@ -224,10 +243,10 @@ protected static string CreateMapping(string? inferenceId) =>
224243
"search_analyzer": "highlight_analyzer",
225244
"term_vector": "with_positions_offsets"
226245
}
227-
{{(!string.IsNullOrWhiteSpace(inferenceId) ? AbstractInferenceMapping(inferenceId) : AbstractMapping())}}
246+
{{{{(!string.IsNullOrWhiteSpace(inferenceId) ? AbstractInferenceMapping(inferenceId) : AbstractMapping())}}}}
228247
}
229248
}
230-
""";
249+
"""";
231250

232251
private static string AbstractMapping() =>
233252
"""

src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable
3434
private readonly DateTimeOffset _batchIndexDate = DateTimeOffset.UtcNow;
3535
private readonly DistributedTransport _transport;
3636
private IngestStrategy _indexStrategy;
37+
private string _currentLexicalHash = string.Empty;
38+
private string _currentSemanticHash = string.Empty;
3739

3840
public ElasticsearchMarkdownExporter(
3941
ILoggerFactory logFactory,
@@ -81,25 +83,53 @@ string indexNamespace
8183
/// <inheritdoc />
8284
public async ValueTask StartAsync(Cancel ctx = default)
8385
{
86+
_currentLexicalHash = await _lexicalChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty;
87+
_currentSemanticHash = await _semanticChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty;
88+
8489
_ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
8590

86-
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
87-
var semanticIndexAvailable = await _transport.HeadAsync(semanticWriteAlias, ctx);
88-
_ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
89-
var semanticIndex = _semanticChannel.Channel.IndexName;
90-
if (!semanticIndexAvailable.ApiCallDetails.HasSuccessfulStatusCode)
91+
// if the previous hash does not match the current hash, we know already we want to multiplex to a new index
92+
if (_currentLexicalHash != _lexicalChannel.Channel.ChannelHash)
93+
_indexStrategy = IngestStrategy.Multiplex;
94+
95+
if (!_endpoint.NoSemantic)
9196
{
92-
_logger.LogInformation("No semantic index existed yet, creating index {Index} for semantic search", semanticIndex);
93-
var semanticIndexPut = await _transport.PutAsync<StringResponse>(semanticIndex, PostData.String("{}"), ctx);
94-
if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode)
95-
throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}");
96-
if (!_endpoint.ForceReindex)
97+
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
98+
var semanticIndexAvailable = await _transport.HeadAsync(semanticWriteAlias, ctx);
99+
if (!semanticIndexAvailable.ApiCallDetails.HasSuccessfulStatusCode && _endpoint is { ForceReindex: false, NoSemantic: false })
97100
{
98101
_indexStrategy = IngestStrategy.Multiplex;
99-
_logger.LogInformation("Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex", semanticIndex);
102+
_logger.LogInformation("Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex", semanticWriteAlias);
100103
}
104+
105+
//try re-use index if we are re-indexing. Multiplex should always go to a new index
106+
_semanticChannel.Channel.Options.TryReuseIndex = _indexStrategy == IngestStrategy.Reindex;
107+
_ = await _semanticChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx);
108+
}
109+
110+
var lexicalIndexExists = await IndexExists(_lexicalChannel.Channel.IndexName) ? "existing" : "new";
111+
var semanticIndexExists = await IndexExists(_semanticChannel.Channel.IndexName) ? "existing" : "new";
112+
if (_currentLexicalHash != _lexicalChannel.Channel.ChannelHash)
113+
{
114+
_indexStrategy = IngestStrategy.Multiplex;
115+
_logger.LogInformation("Multiplexing lexical new index: '{Index}' since current hash on server '{HashCurrent}' does not match new '{HashNew}'",
116+
_lexicalChannel.Channel.IndexName, _currentLexicalHash, _lexicalChannel.Channel.ChannelHash);
117+
}
118+
else
119+
_logger.LogInformation("Targeting {State} lexical: '{Index}'", lexicalIndexExists, _lexicalChannel.Channel.IndexName);
120+
121+
if (!_endpoint.NoSemantic && _currentSemanticHash != _semanticChannel.Channel.ChannelHash)
122+
{
123+
_indexStrategy = IngestStrategy.Multiplex;
124+
_logger.LogInformation("Multiplexing new index '{Index}' since current hash on server '{HashCurrent}' does not match new '{HashNew}'",
125+
_semanticChannel.Channel.IndexName, _currentSemanticHash, _semanticChannel.Channel.ChannelHash);
101126
}
127+
else if (!_endpoint.NoSemantic)
128+
_logger.LogInformation("Targeting {State} semantical: '{Index}'", semanticIndexExists, _semanticChannel.Channel.IndexName);
129+
102130
_logger.LogInformation("Using {IndexStrategy} to sync lexical index to semantic index", _indexStrategy.ToStringFast(true));
131+
132+
async ValueTask<bool> IndexExists(string name) => (await _transport.HeadAsync(name, ctx)).ApiCallDetails.HasSuccessfulStatusCode;
103133
}
104134

105135
private async ValueTask<long> CountAsync(string index, string body, Cancel ctx = default)
@@ -113,7 +143,6 @@ public async ValueTask StopAsync(Cancel ctx = default)
113143
{
114144
var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest");
115145
var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest");
116-
var semanticIndex = _semanticChannel.Channel.IndexName;
117146

118147
var stopped = await _lexicalChannel.StopAsync(ctx);
119148
if (!stopped)
@@ -125,23 +154,28 @@ public async ValueTask StopAsync(Cancel ctx = default)
125154
{
126155
if (!_endpoint.NoSemantic)
127156
_ = await _semanticChannel.StopAsync(ctx);
128-
else
129-
_logger.LogInformation("--no-semantic was specified when doing multiplex writes, not rolling over {SemanticIndex}", semanticIndex);
130157

131158
// cleanup lexical index of old data
132159
await DoDeleteByQuery(lexicalWriteAlias, ctx);
160+
// need to refresh the lexical index to ensure that the delete by query is available
133161
_ = await _lexicalChannel.RefreshAsync(ctx);
134-
_logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
135162
await QueryDocumentCounts(ctx);
163+
// ReSharper disable once ConvertIfStatementToConditionalTernaryExpression
164+
if (_endpoint.NoSemantic)
165+
_logger.LogInformation("Finish indexing {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
166+
else
167+
_logger.LogInformation("Finish syncing to semantic in {IndexStrategy} strategy", _indexStrategy.ToStringFast(true));
136168
return;
137169
}
138170

139171
if (_endpoint.NoSemantic)
140172
{
141-
_logger.LogInformation("--no-semantic was specified so exiting early before reindexing to {Index}", semanticIndex);
173+
_logger.LogInformation("--no-semantic was specified so exiting early before reindexing to {Index}", lexicalWriteAlias);
142174
return;
143175
}
144176

177+
var semanticIndex = _semanticChannel.Channel.IndexName;
178+
// check if the alias exists
145179
var semanticIndexHead = await _transport.HeadAsync(semanticWriteAlias, ctx);
146180
if (!semanticIndexHead.ApiCallDetails.HasSuccessfulStatusCode)
147181
{
@@ -150,7 +184,6 @@ public async ValueTask StopAsync(Cancel ctx = default)
150184
var semanticIndexPut = await _transport.PutAsync<StringResponse>(semanticIndex, PostData.String("{}"), ctx);
151185
if (!semanticIndexPut.ApiCallDetails.HasSuccessfulStatusCode)
152186
throw new Exception($"Failed to create index {semanticIndex}: {semanticIndexPut}");
153-
_ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx);
154187
}
155188
var destinationIndex = _semanticChannel.Channel.IndexName;
156189

@@ -199,6 +232,9 @@ public async ValueTask StopAsync(Cancel ctx = default)
199232

200233
await DoDeleteByQuery(lexicalWriteAlias, ctx);
201234

235+
_ = await _lexicalChannel.Channel.ApplyLatestAliasAsync(ctx);
236+
_ = await _semanticChannel.Channel.ApplyAliasesAsync(ctx);
237+
202238
_ = await _lexicalChannel.RefreshAsync(ctx);
203239
_ = await _semanticChannel.RefreshAsync(ctx);
204240

@@ -276,7 +312,7 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx)
276312

277313
private async ValueTask DoReindex(PostData request, string lexicalWriteAlias, string semanticWriteAlias, string typeOfSync, Cancel ctx)
278314
{
279-
var reindexUrl = "/_reindex?wait_for_completion=false&require_alias=true&scroll=10m";
315+
var reindexUrl = "/_reindex?wait_for_completion=false&scroll=10m";
280316
var reindexNewChanges = await _transport.PostAsync<DynamicResponse>(reindexUrl, request, ctx);
281317
var taskId = reindexNewChanges.Body.Get<string>("task");
282318
if (string.IsNullOrWhiteSpace(taskId))

0 commit comments

Comments
 (0)