5
5
using System . IO . Abstractions ;
6
6
using Elastic . Channels ;
7
7
using Elastic . Documentation . Configuration ;
8
- using Elastic . Documentation . Configuration . Assembler ;
9
8
using Elastic . Documentation . Diagnostics ;
10
9
using Elastic . Documentation . Search ;
11
10
using Elastic . Documentation . Serialization ;
20
19
21
20
namespace Elastic . Markdown . Exporters ;
22
21
23
- public class ElasticsearchMarkdownExporter ( ILoggerFactory logFactory , IDiagnosticsCollector collector , DocumentationEndpoints endpoints )
22
+ public class ElasticsearchMarkdownExporter ( ILoggerFactory logFactory , IDiagnosticsCollector collector , string indexNamespace , DocumentationEndpoints endpoints )
24
23
: ElasticsearchMarkdownExporterBase < CatalogIndexChannelOptions < DocumentationDocument > , CatalogIndexChannel < DocumentationDocument > >
25
24
( logFactory , collector , endpoints )
26
25
{
27
26
/// <inheritdoc />
28
27
protected override CatalogIndexChannelOptions < DocumentationDocument > NewOptions ( DistributedTransport transport ) => new ( transport )
29
28
{
30
29
GetMapping = ( ) => CreateMapping ( null ) ,
31
- IndexFormat = "documentation{0:yyyy.MM.dd.HHmmss}" ,
32
- ActiveSearchAlias = "documentation"
30
+ GetMappingSettings = ( ) => CreateMappingSetting ( ) ,
31
+ IndexFormat = $ "{ Endpoint . IndexNamePrefix . ToLowerInvariant ( ) } -{ indexNamespace . ToLowerInvariant ( ) } -{{0:yyyy.MM.dd.HHmmss}}",
32
+ ActiveSearchAlias = $ "{ Endpoint . IndexNamePrefix } -{ indexNamespace . ToLowerInvariant ( ) } ",
33
33
} ;
34
34
35
35
/// <inheritdoc />
36
36
protected override CatalogIndexChannel < DocumentationDocument > NewChannel ( CatalogIndexChannelOptions < DocumentationDocument > options ) => new ( options ) ;
37
37
}
38
38
39
- public class ElasticsearchMarkdownSemanticExporter ( PublishEnvironment environment , ILoggerFactory logFactory , IDiagnosticsCollector collector , DocumentationEndpoints endpoints )
39
+ public class ElasticsearchMarkdownSemanticExporter ( ILoggerFactory logFactory , IDiagnosticsCollector collector , string indexNamespace , DocumentationEndpoints endpoints )
40
40
: ElasticsearchMarkdownExporterBase < SemanticIndexChannelOptions < DocumentationDocument > , SemanticIndexChannel < DocumentationDocument > >
41
41
( logFactory , collector , endpoints )
42
42
{
@@ -45,20 +45,23 @@ public class ElasticsearchMarkdownSemanticExporter(PublishEnvironment environmen
45
45
{
46
46
GetMapping = ( inferenceId , _ ) => CreateMapping ( inferenceId ) ,
47
47
GetMappingSettings = ( _ , _ ) => CreateMappingSetting ( ) ,
48
- IndexFormat = $ "semantic-docs-{ environment . Name } -{{0:yyyy.MM.dd.HHmmss}}",
49
- ActiveSearchAlias = $ "semantic-docs-{ environment . Name } ",
50
- IndexNumThreads = IndexNumThreads ,
51
- InferenceCreateTimeout = TimeSpan . FromMinutes ( 4 )
48
+ IndexFormat = $ "{ Endpoint . IndexNamePrefix . ToLowerInvariant ( ) } -{ indexNamespace . ToLowerInvariant ( ) } -{{0:yyyy.MM.dd.HHmmss}}",
49
+ ActiveSearchAlias = $ "{ Endpoint . IndexNamePrefix } -{ indexNamespace . ToLowerInvariant ( ) } ",
50
+ IndexNumThreads = Endpoint . IndexNumThreads ,
51
+ SearchNumThreads = Endpoint . SearchNumThreads ,
52
+ InferenceCreateTimeout = TimeSpan . FromMinutes ( Endpoint . BootstrapTimeout ?? 4 )
52
53
} ;
53
54
54
55
/// <inheritdoc />
55
56
protected override SemanticIndexChannel < DocumentationDocument > NewChannel ( SemanticIndexChannelOptions < DocumentationDocument > options ) => new ( options ) ;
56
57
}
57
58
59
+
58
60
public abstract class ElasticsearchMarkdownExporterBase < TChannelOptions , TChannel > (
59
61
ILoggerFactory logFactory ,
60
62
IDiagnosticsCollector collector ,
61
- DocumentationEndpoints endpoints )
63
+ DocumentationEndpoints endpoints
64
+ )
62
65
: IMarkdownExporter , IDisposable
63
66
where TChannelOptions : CatalogIndexChannelOptionsBase < DocumentationDocument >
64
67
where TChannel : CatalogIndexChannel < DocumentationDocument , TChannelOptions >
@@ -69,7 +72,7 @@ public abstract class ElasticsearchMarkdownExporterBase<TChannelOptions, TChanne
69
72
protected abstract TChannelOptions NewOptions ( DistributedTransport transport ) ;
70
73
protected abstract TChannel NewChannel ( TChannelOptions options ) ;
71
74
72
- protected int IndexNumThreads => 8 ;
75
+ protected ElasticsearchEndpoint Endpoint { get ; } = endpoints . Elasticsearch ;
73
76
74
77
protected static string CreateMappingSetting ( ) =>
75
78
// language=json
@@ -97,7 +100,6 @@ protected static string CreateMappingSetting() =>
97
100
""" ;
98
101
99
102
protected static string CreateMapping ( string ? inferenceId ) =>
100
- // langugage=json
101
103
$$ """
102
104
{
103
105
"properties": {
@@ -131,15 +133,13 @@ protected static string CreateMapping(string? inferenceId) =>
131
133
""" ;
132
134
133
135
private static string AbstractMapping ( ) =>
134
- // langugage=json
135
136
"""
136
137
, "abstract": {
137
138
"type": "text"
138
139
}
139
140
""" ;
140
141
141
142
private static string InferenceMapping ( string inferenceId ) =>
142
- // langugage=json
143
143
$ """
144
144
"type": "semantic_text",
145
145
"inference_id": "{ inferenceId } "
@@ -159,12 +159,26 @@ public async ValueTask StartAsync(Cancel ctx = default)
159
159
return ;
160
160
161
161
var es = endpoints . Elasticsearch ;
162
+
162
163
var configuration = new ElasticsearchConfiguration ( es . Uri )
163
164
{
164
165
Authentication = es . ApiKey is { } apiKey
165
166
? new ApiKey ( apiKey )
166
- : es . Username is { } username && es . Password is { } password
167
+ : es is { Username : { } username , Password : { } password }
167
168
? new BasicAuthentication ( username , password )
169
+ : null ,
170
+ EnableHttpCompression = true ,
171
+ DebugMode = Endpoint . DebugMode ,
172
+ CertificateFingerprint = Endpoint . CertificateFingerprint ,
173
+ ProxyAddress = Endpoint . ProxyAddress ,
174
+ ProxyPassword = Endpoint . ProxyPassword ,
175
+ ProxyUsername = Endpoint . ProxyUsername ,
176
+ ServerCertificateValidationCallback = Endpoint . DisableSslVerification
177
+ ? CertificateValidations . AllowAll
178
+ : Endpoint . Certificate is { } cert
179
+ ? Endpoint . CertificateIsNotRoot
180
+ ? CertificateValidations . AuthorityPartOfChain ( cert )
181
+ : CertificateValidations . AuthorityIsRoot ( cert )
168
182
: null
169
183
} ;
170
184
@@ -173,14 +187,20 @@ public async ValueTask StartAsync(Cancel ctx = default)
173
187
//The max num threads per allocated node, from testing its best to limit our max concurrency
174
188
//producing to this number as well
175
189
var options = NewOptions ( transport ) ;
190
+ var i = 0 ;
176
191
options . BufferOptions = new BufferOptions
177
192
{
178
- OutboundBufferMaxSize = 100 ,
179
- ExportMaxConcurrency = IndexNumThreads ,
180
- ExportMaxRetries = 3
193
+ OutboundBufferMaxSize = Endpoint . BufferSize ,
194
+ ExportMaxConcurrency = Endpoint . IndexNumThreads ,
195
+ ExportMaxRetries = Endpoint . MaxRetries ,
181
196
} ;
182
197
options . SerializerContext = SourceGenerationContext . Default ;
183
- options . ExportBufferCallback = ( ) => _logger . LogInformation ( "Exported buffer to Elasticsearch" ) ;
198
+ options . ExportBufferCallback = ( ) =>
199
+ {
200
+ var count = Interlocked . Increment ( ref i ) ;
201
+ _logger . LogInformation ( "Exported {Count} documents to Elasticsearch index {Format}" ,
202
+ count * Endpoint . BufferSize , options . IndexFormat ) ;
203
+ } ;
184
204
options . ExportExceptionCallback = e => _logger . LogError ( e , "Failed to export document" ) ;
185
205
options . ServerRejectionCallback = items => _logger . LogInformation ( "Server rejection: {Rejection}" , items . First ( ) . Item2 ) ;
186
206
_channel = NewChannel ( options ) ;
@@ -206,7 +226,7 @@ public async ValueTask StopAsync(Cancel ctx = default)
206
226
_logger . LogInformation ( "Applying aliases to {Index}" , _channel . IndexName ) ;
207
227
var swapped = await _channel . ApplyAliasesAsync ( ctx ) ;
208
228
if ( ! swapped )
209
- collector . EmitGlobalError ( $ "{ nameof ( ElasticsearchMarkdownExporter ) } failed to apply aliases to index { _channel . IndexName } ") ;
229
+ collector . EmitGlobalError ( $ "$ { nameof ( ElasticsearchMarkdownExporter ) } failed to apply aliases to index { _channel . IndexName } ") ;
210
230
}
211
231
212
232
public void Dispose ( )
0 commit comments