77using Elastic . Documentation . Diagnostics ;
88using Elastic . Documentation . Search ;
99using Elastic . Ingest . Elasticsearch ;
10+ using Elastic . Ingest . Elasticsearch . Indices ;
1011using Elastic . Markdown . Helpers ;
1112using Elastic . Markdown . IO ;
1213using Elastic . Transport ;
1314using Elastic . Transport . Products . Elasticsearch ;
1415using Markdig . Syntax ;
1516using Microsoft . Extensions . Logging ;
17+ using NetEscapades . EnumGenerators ;
1618
1719namespace Elastic . Markdown . Exporters . Elasticsearch ;
1820
21+ [ EnumExtensions ]
22+ public enum IngestStrategy { Reindex , Multiplex }
23+
1924public class ElasticsearchMarkdownExporter : IMarkdownExporter , IDisposable
2025{
2126 private readonly IDiagnosticsCollector _collector ;
@@ -27,6 +32,7 @@ public class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposable
2732
2833 private readonly DateTimeOffset _batchIndexDate = DateTimeOffset . UtcNow ;
2934 private readonly DistributedTransport _transport ;
35+ private IngestStrategy _indexStrategy ;
3036
3137 public ElasticsearchMarkdownExporter (
3238 ILoggerFactory logFactory ,
@@ -38,6 +44,7 @@ string indexNamespace
3844 _collector = collector ;
3945 _logger = logFactory . CreateLogger < ElasticsearchMarkdownExporter > ( ) ;
4046 _endpoint = endpoints . Elasticsearch ;
47+ _indexStrategy = IngestStrategy . Reindex ;
4148
4249 var es = endpoints . Elasticsearch ;
4350
@@ -66,34 +73,76 @@ string indexNamespace
6673
6774 _transport = new DistributedTransport ( configuration ) ;
6875
69- _lexicalChannel = new ElasticsearchLexicalExporter ( logFactory , collector , es , indexNamespace , _transport , _batchIndexDate ) ;
76+ _lexicalChannel = new ElasticsearchLexicalExporter ( logFactory , collector , es , indexNamespace , _transport ) ;
7077 _semanticChannel = new ElasticsearchSemanticExporter ( logFactory , collector , es , indexNamespace , _transport ) ;
71-
7278 }
7379
7480 /// <inheritdoc />
75- public async ValueTask StartAsync ( Cancel ctx = default ) =>
76- await _lexicalChannel . Channel . BootstrapElasticsearchAsync ( BootstrapMethod . Failure , null , ctx ) ;
81+ public async ValueTask StartAsync ( Cancel ctx = default )
82+ {
83+ _ = await _lexicalChannel . Channel . BootstrapElasticsearchAsync ( BootstrapMethod . Failure , null , ctx ) ;
84+
85+ var semanticIndex = _semanticChannel . Channel . IndexName ;
86+ var semanticWriteAlias = string . Format ( _semanticChannel . Channel . Options . IndexFormat , "latest" ) ;
87+ var semanticIndexHead = await _transport . HeadAsync ( semanticWriteAlias , ctx ) ;
88+ if ( ! semanticIndexHead . ApiCallDetails . HasSuccessfulStatusCode )
89+ {
90+ _logger . LogInformation ( "No semantic index exists yet, creating index {Index} for semantic search" , semanticIndex ) ;
91+ _ = await _semanticChannel . Channel . BootstrapElasticsearchAsync ( BootstrapMethod . Failure , null , ctx ) ;
92+ var semanticIndexPut = await _transport . PutAsync < StringResponse > ( semanticIndex , PostData . String ( "{}" ) , ctx ) ;
93+ if ( ! semanticIndexPut . ApiCallDetails . HasSuccessfulStatusCode )
94+ throw new Exception ( $ "Failed to create index { semanticIndex } : { semanticIndexPut } ") ;
95+ _ = await _semanticChannel . Channel . ApplyAliasesAsync ( ctx ) ;
96+ if ( ! _endpoint . ForceReindex )
97+ {
98+ _indexStrategy = IngestStrategy . Multiplex ;
99+ _logger . LogInformation ( "Index strategy set to multiplex because {SemanticIndex} does not exist, pass --force-reindex to always use reindex" , semanticIndex ) ;
100+ }
101+ }
102+ _logger . LogInformation ( "Using {IndexStrategy} to sync lexical index to semantic index" , _indexStrategy . ToStringFast ( true ) ) ;
103+ }
104+
105+ private async ValueTask < long > CountAsync ( string index , string body , Cancel ctx = default )
106+ {
107+ var countResponse = await _transport . PostAsync < DynamicResponse > ( $ "/{ index } /_count", PostData . String ( body ) , ctx ) ;
108+ return countResponse . Body . Get < long > ( "count" ) ;
109+ }
77110
78111 /// <inheritdoc />
79112 public async ValueTask StopAsync ( Cancel ctx = default )
80113 {
81114 var semanticWriteAlias = string . Format ( _semanticChannel . Channel . Options . IndexFormat , "latest" ) ;
82115 var lexicalWriteAlias = string . Format ( _lexicalChannel . Channel . Options . IndexFormat , "latest" ) ;
83-
84116 var semanticIndex = _semanticChannel . Channel . IndexName ;
85- var semanticIndexHead = await _transport . HeadAsync ( semanticWriteAlias , ctx ) ;
86117
87- if ( _endpoint . NoSemantic )
118+ var stopped = await _lexicalChannel . StopAsync ( ctx ) ;
119+ if ( ! stopped )
120+ throw new Exception ( $ "Failed to stop { _lexicalChannel . GetType ( ) . Name } ") ;
121+
122+ await QueryIngestStatistics ( lexicalWriteAlias , ctx ) ;
123+
124+ if ( _indexStrategy == IngestStrategy . Multiplex )
88125 {
89- _logger . LogInformation ( "--no-semantic was specified so exiting early before syncing to {Index}" , semanticIndex ) ;
126+ if ( ! _endpoint . NoSemantic )
127+ _ = await _semanticChannel . StopAsync ( ctx ) ;
128+ else
129+ _logger . LogInformation ( "--no-semantic was specified when doing multiplex writes, not rolling over {SemanticIndex}" , semanticIndex ) ;
130+
131+ // cleanup lexical index of old data
132+ await DoDeleteByQuery ( lexicalWriteAlias , ctx ) ;
133+ _ = await _lexicalChannel . RefreshAsync ( ctx ) ;
134+ _logger . LogInformation ( "Finish sync to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
135+ await QueryDocumentCounts ( ctx ) ;
90136 return ;
91137 }
92138
93- var stopped = await _lexicalChannel . StopAsync ( ctx ) ;
94- if ( ! stopped )
95- throw new Exception ( $ "Failed to stop { _lexicalChannel . GetType ( ) . Name } ") ;
139+ if ( _endpoint . NoSemantic )
140+ {
141+ _logger . LogInformation ( "--no-semantic was specified so exiting early before reindexing to {Index}" , semanticIndex ) ;
142+ return ;
143+ }
96144
145+ var semanticIndexHead = await _transport . HeadAsync ( semanticWriteAlias , ctx ) ;
97146 if ( ! semanticIndexHead . ApiCallDetails . HasSuccessfulStatusCode )
98147 {
99148 _logger . LogInformation ( "No semantic index exists yet, creating index {Index} for semantic search" , semanticIndex ) ;
@@ -148,6 +197,35 @@ public async ValueTask StopAsync(Cancel ctx = default)
148197 await DoReindex ( request , lexicalWriteAlias , semanticWriteAlias , "deletions" , ctx ) ;
149198
150199 await DoDeleteByQuery ( lexicalWriteAlias , ctx ) ;
200+
201+ _ = await _lexicalChannel . RefreshAsync ( ctx ) ;
202+ _ = await _semanticChannel . RefreshAsync ( ctx ) ;
203+
204+ _logger . LogInformation ( "Finish sync to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
205+ await QueryDocumentCounts ( ctx ) ;
206+ }
207+
208+ private async ValueTask QueryIngestStatistics ( string lexicalWriteAlias , Cancel ctx )
209+ {
210+ var lexicalSearchAlias = _lexicalChannel . Channel . Options . ActiveSearchAlias ;
211+ var updated = await CountAsync ( lexicalSearchAlias , $$ """ { "query": { "range": { "last_updated": { "gte": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
212+ var total = await CountAsync ( lexicalSearchAlias , $$ """ { "query": { "range": { "batch_index_date": { "gte": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
213+ var deleted = await CountAsync ( lexicalSearchAlias , $$ """ { "query": { "range": { "batch_index_date": { "lt": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
214+
215+ // TODO emit these as metrics
216+ _logger . LogInformation ( "Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}" , total , updated , deleted , lexicalWriteAlias ) ;
217+ _logger . LogInformation ( "Syncing to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
218+ }
219+
220+ private async ValueTask QueryDocumentCounts ( Cancel ctx )
221+ {
222+ var semanticWriteAlias = string . Format ( _semanticChannel . Channel . Options . IndexFormat , "latest" ) ;
223+ var lexicalWriteAlias = string . Format ( _lexicalChannel . Channel . Options . IndexFormat , "latest" ) ;
224+ var totalLexical = await CountAsync ( lexicalWriteAlias , "{}" , ctx ) ;
225+ var totalSemantic = await CountAsync ( semanticWriteAlias , "{}" , ctx ) ;
226+
227+ // TODO emit these as metrics
228+ _logger . LogInformation ( "Document counts -> Semantic Index: {TotalSemantic}, Lexical Index: {TotalLexical}" , totalSemantic , totalLexical ) ;
151229 }
152230
153231 private async ValueTask DoDeleteByQuery ( string lexicalWriteAlias , Cancel ctx )
@@ -275,6 +353,18 @@ public async ValueTask<bool> ExportAsync(MarkdownExportFileContext fileContext,
275353 } ) . Reverse ( ) . ToArray ( ) ,
276354 Headings = headings
277355 } ;
356+
357+ var semanticHash = _semanticChannel . Channel . Options . ChannelHash ;
358+ var lexicalHash = _lexicalChannel . Channel . Options . ChannelHash ;
359+ var hash = HashedBulkUpdate . CreateHash ( semanticHash , lexicalHash ,
360+ doc . Url , doc . Body ?? string . Empty , string . Join ( "," , doc . Headings . OrderBy ( h => h ) )
361+ ) ;
362+ doc . Hash = hash ;
363+ doc . LastUpdated = _batchIndexDate ;
364+ doc . BatchIndexDate = _batchIndexDate ;
365+
366+ if ( _indexStrategy == IngestStrategy . Multiplex )
367+ return await _lexicalChannel . TryWrite ( doc , ctx ) && await _semanticChannel . TryWrite ( doc , ctx ) ;
278368 return await _lexicalChannel . TryWrite ( doc , ctx ) ;
279369 }
280370
0 commit comments