@@ -31,10 +31,10 @@ type elasticsearchExporter struct {
3131 component.TelemetrySettings
3232 userAgent string
3333
34- config * Config
35- index string
36- dynamicIndex bool
37- model mappingModel
34+ config * Config
35+ index string
36+ dynamicIndex bool
37+ logstashFormat LogstashFormatSettings
3838
3939 wg sync.WaitGroup // active sessions
4040 bulkIndexer bulkIndexer
@@ -58,10 +58,6 @@ func newExporter(
5858 index string ,
5959 dynamicIndex bool ,
6060) * elasticsearchExporter {
61- model := & encodeModel {
62- mode : cfg .MappingMode (),
63- }
64-
6561 userAgent := fmt .Sprintf (
6662 "%s/%s (%s/%s)" ,
6763 set .BuildInfo .Description ,
@@ -74,11 +70,11 @@ func newExporter(
7470 TelemetrySettings : set .TelemetrySettings ,
7571 userAgent : userAgent ,
7672
77- config : cfg ,
78- index : index ,
79- dynamicIndex : dynamicIndex ,
80- model : model ,
81- bufferPool : pool .NewBufferPool (),
73+ config : cfg ,
74+ index : index ,
75+ dynamicIndex : dynamicIndex ,
76+ logstashFormat : cfg . LogstashFormat ,
77+ bufferPool : pool .NewBufferPool (),
8278 }
8379}
8480
@@ -159,6 +155,10 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
159155func (e * elasticsearchExporter ) pushLogsData (ctx context.Context , ld plog.Logs ) error {
160156 mappingMode := e .config .MappingMode ()
161157 router := newDocumentRouter (mappingMode , e .dynamicIndex , e .index , e .config )
158+ encoder , err := newEncoder (mappingMode )
159+ if err != nil {
160+ return err
161+ }
162162
163163 e .wg .Add (1 )
164164 defer e .wg .Done ()
@@ -178,9 +178,16 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
178178 for j := 0 ; j < ills .Len (); j ++ {
179179 ill := ills .At (j )
180180 scope := ill .Scope ()
181+ ec := encodingContext {
182+ resource : resource ,
183+ resourceSchemaURL : rl .SchemaUrl (),
184+ scope : scope ,
185+ scopeSchemaURL : ill .SchemaUrl (),
186+ }
187+
181188 logs := ill .LogRecords ()
182189 for k := 0 ; k < logs .Len (); k ++ {
183- if err := e .pushLogRecord (ctx , router , resource , rl . SchemaUrl () , logs .At (k ), scope , ill . SchemaUrl ( ), session ); err != nil {
190+ if err := e .pushLogRecord (ctx , router , encoder , ec , logs .At (k ), session ); err != nil {
184191 if cerr := ctx .Err (); cerr != nil {
185192 return cerr
186193 }
@@ -208,21 +215,19 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
208215func (e * elasticsearchExporter ) pushLogRecord (
209216 ctx context.Context ,
210217 router documentRouter ,
211- resource pcommon. Resource ,
212- resourceSchemaURL string ,
218+ encoder documentEncoder ,
219+ ec encodingContext ,
213220 record plog.LogRecord ,
214- scope pcommon.InstrumentationScope ,
215- scopeSchemaURL string ,
216221 bulkIndexerSession bulkIndexerSession ,
217222) error {
218- index , err := router .routeLogRecord (resource , scope , record .Attributes ())
223+ index , err := router .routeLogRecord (ec . resource , ec . scope , record .Attributes ())
219224 if err != nil {
220225 return err
221226 }
222227
223228 buf := e .bufferPool .NewPooledBuffer ()
224229 docID := e .extractDocumentIDAttribute (record .Attributes ())
225- if err := e . model . encodeLog (resource , resourceSchemaURL , record , scope , scopeSchemaURL , index , buf .Buffer ); err != nil {
230+ if err := encoder . encodeLog (ec , record , index , buf .Buffer ); err != nil {
226231 buf .Recycle ()
227232 return fmt .Errorf ("failed to encode log event: %w" , err )
228233 }
@@ -250,6 +255,10 @@ func (e *elasticsearchExporter) pushMetricsData(
250255 mappingMode := e .config .MappingMode ()
251256 router := newDocumentRouter (mappingMode , e .dynamicIndex , e .index , e .config )
252257 hasher := newDataPointHasher (mappingMode )
258+ encoder , err := newEncoder (mappingMode )
259+ if err != nil {
260+ return err
261+ }
253262
254263 e .wg .Add (1 )
255264 defer e .wg .Done ()
@@ -363,10 +372,17 @@ func (e *elasticsearchExporter) pushMetricsData(
363372 for index , groupedDataPoints := range groupedDataPointsByIndex {
364373 for _ , dpGroup := range groupedDataPoints {
365374 buf := e .bufferPool .NewPooledBuffer ()
366- dynamicTemplates , err := e .model .encodeMetrics (
367- dpGroup .resource , dpGroup .resourceSchemaURL ,
368- dpGroup .scope , dpGroup .scopeSchemaURL ,
369- dpGroup .dataPoints , & validationErrs , index , buf .Buffer ,
375+ dynamicTemplates , err := encoder .encodeMetrics (
376+ encodingContext {
377+ resource : dpGroup .resource ,
378+ resourceSchemaURL : dpGroup .resourceSchemaURL ,
379+ scope : dpGroup .scope ,
380+ scopeSchemaURL : dpGroup .scopeSchemaURL ,
381+ },
382+ dpGroup .dataPoints ,
383+ & validationErrs ,
384+ index ,
385+ buf .Buffer ,
370386 )
371387 if err != nil {
372388 buf .Recycle ()
@@ -401,6 +417,10 @@ func (e *elasticsearchExporter) pushTraceData(
401417) error {
402418 mappingMode := e .config .MappingMode ()
403419 router := newDocumentRouter (mappingMode , e .dynamicIndex , e .index , e .config )
420+ encoder , err := newEncoder (mappingMode )
421+ if err != nil {
422+ return err
423+ }
404424
405425 e .wg .Add (1 )
406426 defer e .wg .Done ()
@@ -420,18 +440,25 @@ func (e *elasticsearchExporter) pushTraceData(
420440 for j := 0 ; j < scopeSpans .Len (); j ++ {
421441 scopeSpan := scopeSpans .At (j )
422442 scope := scopeSpan .Scope ()
443+ ec := encodingContext {
444+ resource : resource ,
445+ resourceSchemaURL : il .SchemaUrl (),
446+ scope : scope ,
447+ scopeSchemaURL : scopeSpan .SchemaUrl (),
448+ }
449+
423450 spans := scopeSpan .Spans ()
424451 for k := 0 ; k < spans .Len (); k ++ {
425452 span := spans .At (k )
426- if err := e .pushTraceRecord (ctx , router , resource , il . SchemaUrl () , span , scope , scopeSpan . SchemaUrl () , session ); err != nil {
453+ if err := e .pushTraceRecord (ctx , router , encoder , ec , span , session ); err != nil {
427454 if cerr := ctx .Err (); cerr != nil {
428455 return cerr
429456 }
430457 errs = append (errs , err )
431458 }
432459 for ii := 0 ; ii < span .Events ().Len (); ii ++ {
433460 spanEvent := span .Events ().At (ii )
434- if err := e .pushSpanEvent (ctx , router , resource , il . SchemaUrl () , span , spanEvent , scope , scopeSpan . SchemaUrl () , session ); err != nil {
461+ if err := e .pushSpanEvent (ctx , router , encoder , ec , span , spanEvent , session ); err != nil {
435462 errs = append (errs , err )
436463 }
437464 }
@@ -451,20 +478,18 @@ func (e *elasticsearchExporter) pushTraceData(
451478func (e * elasticsearchExporter ) pushTraceRecord (
452479 ctx context.Context ,
453480 router documentRouter ,
454- resource pcommon. Resource ,
455- resourceSchemaURL string ,
481+ encoder documentEncoder ,
482+ ec encodingContext ,
456483 span ptrace.Span ,
457- scope pcommon.InstrumentationScope ,
458- scopeSchemaURL string ,
459484 bulkIndexerSession bulkIndexerSession ,
460485) error {
461- index , err := router .routeSpan (resource , scope , span .Attributes ())
486+ index , err := router .routeSpan (ec . resource , ec . scope , span .Attributes ())
462487 if err != nil {
463488 return err
464489 }
465490
466491 buf := e .bufferPool .NewPooledBuffer ()
467- if err := e . model . encodeSpan (resource , resourceSchemaURL , span , scope , scopeSchemaURL , index , buf .Buffer ); err != nil {
492+ if err := encoder . encodeSpan (ec , span , index , buf .Buffer ); err != nil {
468493 buf .Recycle ()
469494 return fmt .Errorf ("failed to encode trace record: %w" , err )
470495 }
@@ -475,24 +500,21 @@ func (e *elasticsearchExporter) pushTraceRecord(
475500func (e * elasticsearchExporter ) pushSpanEvent (
476501 ctx context.Context ,
477502 router documentRouter ,
478- resource pcommon. Resource ,
479- resourceSchemaURL string ,
503+ encoder documentEncoder ,
504+ ec encodingContext ,
480505 span ptrace.Span ,
481506 spanEvent ptrace.SpanEvent ,
482- scope pcommon.InstrumentationScope ,
483- scopeSchemaURL string ,
484507 bulkIndexerSession bulkIndexerSession ,
485508) error {
486- index , err := router .routeSpanEvent (resource , scope , spanEvent .Attributes ())
509+ index , err := router .routeSpanEvent (ec . resource , ec . scope , spanEvent .Attributes ())
487510 if err != nil {
488511 return err
489512 }
490513
491514 buf := e .bufferPool .NewPooledBuffer ()
492- e .model .encodeSpanEvent (resource , resourceSchemaURL , span , spanEvent , scope , scopeSchemaURL , index , buf .Buffer )
493- if buf .Buffer .Len () == 0 {
515+ if err := encoder .encodeSpanEvent (ec , span , spanEvent , index , buf .Buffer ); err != nil || buf .Buffer .Len () == 0 {
494516 buf .Recycle ()
495- return nil
517+ return err
496518 }
497519 // not recycling after Add returns an error as we don't know if it's already recycled
498520 return bulkIndexerSession .Add (ctx , index .Index , "" , buf , nil , docappender .ActionCreate )
@@ -511,6 +533,13 @@ func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string
511533}
512534
513535func (e * elasticsearchExporter ) pushProfilesData (ctx context.Context , pd pprofile.Profiles ) error {
536+ // TODO add support for routing profiles to different data_stream.namespaces?
537+ mappingMode := e .config .MappingMode ()
538+ encoder , err := newEncoder (mappingMode )
539+ if err != nil {
540+ return err
541+ }
542+
514543 e .wg .Add (1 )
515544 defer e .wg .Done ()
516545
@@ -551,7 +580,16 @@ func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofil
551580 scope := sp .Scope ()
552581 p := sp .Profiles ()
553582 for k := 0 ; k < p .Len (); k ++ {
554- if err := e .pushProfileRecord (ctx , resource , p .At (k ), scope , defaultSession , eventsSession , stackTracesSession , stackFramesSession , executablesSession ); err != nil {
583+ ec := encodingContext {
584+ resource : resource ,
585+ resourceSchemaURL : rp .SchemaUrl (),
586+ scope : scope ,
587+ scopeSchemaURL : sp .SchemaUrl (),
588+ }
589+ if err := e .pushProfileRecord (
590+ ctx , encoder , ec , p .At (k ), defaultSession , eventsSession ,
591+ stackTracesSession , stackFramesSession , executablesSession ,
592+ ); err != nil {
555593 if cerr := ctx .Err (); cerr != nil {
556594 return cerr
557595 }
@@ -603,12 +641,12 @@ func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofil
603641
604642func (e * elasticsearchExporter ) pushProfileRecord (
605643 ctx context.Context ,
606- resource pcommon. Resource ,
607- record pprofile. Profile ,
608- scope pcommon. InstrumentationScope ,
644+ encoder documentEncoder ,
645+ ec encodingContext ,
646+ profile pprofile. Profile ,
609647 defaultSession , eventsSession , stackTracesSession , stackFramesSession , executablesSession bulkIndexerSession ,
610648) error {
611- return e . model . encodeProfile (resource , scope , record , func (buf * bytes.Buffer , docID , index string ) error {
649+ return encoder . encodeProfile (ec , profile , func (buf * bytes.Buffer , docID , index string ) error {
612650 switch index {
613651 case otelserializer .StackTraceIndex :
614652 return stackTracesSession .Add (ctx , index , docID , buf , nil , docappender .ActionCreate )
0 commit comments