@@ -102,10 +102,9 @@ public async ValueTask StartAsync(Cancel ctx = default)
102102 _logger . LogInformation ( "Using {IndexStrategy} to sync lexical index to semantic index" , _indexStrategy . ToStringFast ( true ) ) ;
103103 }
104104
105- public async ValueTask < long > CountAsync ( string body , Cancel ctx = default )
105+ private async ValueTask < long > CountAsync ( string index , string body , Cancel ctx = default )
106106 {
107- var lexicalSearchAlias = _lexicalChannel . Channel . Options . ActiveSearchAlias ;
108- var countResponse = await _transport . PostAsync < DynamicResponse > ( $ "/{ lexicalSearchAlias } /_count", PostData . String ( body ) , ctx ) ;
107+ var countResponse = await _transport . PostAsync < DynamicResponse > ( $ "/{ index } /_count", PostData . String ( body ) , ctx ) ;
109108 return countResponse . Body . Get < long > ( "count" ) ;
110109 }
111110
@@ -114,20 +113,13 @@ public async ValueTask StopAsync(Cancel ctx = default)
114113 {
115114 var semanticWriteAlias = string . Format ( _semanticChannel . Channel . Options . IndexFormat , "latest" ) ;
116115 var lexicalWriteAlias = string . Format ( _lexicalChannel . Channel . Options . IndexFormat , "latest" ) ;
117-
118116 var semanticIndex = _semanticChannel . Channel . IndexName ;
119117
120118 var stopped = await _lexicalChannel . StopAsync ( ctx ) ;
121119 if ( ! stopped )
122120 throw new Exception ( $ "Failed to stop { _lexicalChannel . GetType ( ) . Name } ") ;
123121
124- var updated = await CountAsync ( $$ """ { "query": { "range": { "last_updated": { "gte": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
125- var total = await CountAsync ( $$ """ { "query": { "range": { "batch_index_date": { "gte": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
126- var deleted = await CountAsync ( $$ """ { "query": { "range": { "batch_index_date": { "lt": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
127-
128- // TODO emit these as metrics
129- _logger . LogInformation ( "Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}" , total , updated , deleted , lexicalWriteAlias ) ;
130- _logger . LogInformation ( "Syncing to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
122+ await QueryIngestStatistics ( lexicalWriteAlias , ctx ) ;
131123
132124 if ( _indexStrategy == IngestStrategy . Multiplex )
133125 {
@@ -138,6 +130,9 @@ public async ValueTask StopAsync(Cancel ctx = default)
138130
139131 // cleanup lexical index of old data
140132 await DoDeleteByQuery ( lexicalWriteAlias , ctx ) ;
133+ _ = await _lexicalChannel . RefreshAsync ( ctx ) ;
134+ _logger . LogInformation ( "Finish sync to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
135+ await QueryDocumentCounts ( ctx ) ;
141136 return ;
142137 }
143138
@@ -203,7 +198,34 @@ public async ValueTask StopAsync(Cancel ctx = default)
203198
204199 await DoDeleteByQuery ( lexicalWriteAlias , ctx ) ;
205200
201+ _ = await _lexicalChannel . RefreshAsync ( ctx ) ;
202+ _ = await _semanticChannel . RefreshAsync ( ctx ) ;
203+
206204 _logger . LogInformation ( "Finish sync to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
205+ await QueryDocumentCounts ( ctx ) ;
206+ }
207+
208+ private async ValueTask QueryIngestStatistics ( string lexicalWriteAlias , Cancel ctx )
209+ {
210+ var lexicalSearchAlias = _lexicalChannel . Channel . Options . ActiveSearchAlias ;
211+ var updated = await CountAsync ( lexicalSearchAlias , $$ """ { "query": { "range": { "last_updated": { "gte": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
212+ var total = await CountAsync ( lexicalSearchAlias , $$ """ { "query": { "range": { "batch_index_date": { "gte": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
213+ var deleted = await CountAsync ( lexicalSearchAlias , $$ """ { "query": { "range": { "batch_index_date": { "lt": "{{ _batchIndexDate : o}} " } } } }""" , ctx ) ;
214+
215+ // TODO emit these as metrics
216+ _logger . LogInformation ( "Exported {Total}, Updated {Updated}, Deleted, {Deleted} documents to {LexicalIndex}" , total , updated , deleted , lexicalWriteAlias ) ;
217+ _logger . LogInformation ( "Syncing to semantic index using {IndexStrategy} strategy" , _indexStrategy . ToStringFast ( true ) ) ;
218+ }
219+
220+ private async ValueTask QueryDocumentCounts ( Cancel ctx )
221+ {
222+ var semanticWriteAlias = string . Format ( _semanticChannel . Channel . Options . IndexFormat , "latest" ) ;
223+ var lexicalWriteAlias = string . Format ( _lexicalChannel . Channel . Options . IndexFormat , "latest" ) ;
224+ var totalLexical = await CountAsync ( lexicalWriteAlias , "{}" , ctx ) ;
225+ var totalSemantic = await CountAsync ( semanticWriteAlias , "{}" , ctx ) ;
226+
227+ // TODO emit these as metrics
228+ _logger . LogInformation ( "Document counts -> Semantic Index: {TotalSemantic}, Lexical Index: {TotalLexical}" , totalSemantic , totalLexical ) ;
207229 }
208230
209231 private async ValueTask DoDeleteByQuery ( string lexicalWriteAlias , Cancel ctx )
0 commit comments