Skip to content

Commit 185cda0

Browse files
committed
qq
1 parent 5690269 commit 185cda0

File tree

4 files changed

+72
-35
lines changed

4 files changed

+72
-35
lines changed

builder-config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ dist:
66

77
processors:
88
- gomod: github.com/koponk04/otel-custom-components v0.1.0
9+
import: github.com/koponk04/otel-custom-components/processor/httpprocessor
10+
path: ./

processor/httpprocessor/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package httpprocessor
22

33
import (
4-
"go.opentelemetry.io/collector/component"
4+
"fmt"
55
)
66

77
// Config defines the configuration for the HTTP processor
@@ -19,7 +19,7 @@ type Config struct {
1919
// Validate checks if the processor configuration is valid
2020
func (cfg *Config) Validate() error {
2121
if cfg.Endpoint == "" {
22-
return component.ErrNilNextConsumer
22+
return fmt.Errorf("endpoint cannot be empty")
2323
}
2424
return nil
2525
}

processor/httpprocessor/factory.go

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"go.opentelemetry.io/collector/component"
77
"go.opentelemetry.io/collector/consumer"
88
"go.opentelemetry.io/collector/processor"
9-
"go.opentelemetry.io/collector/processor/processorhelper"
109
)
1110

1211
const (
@@ -42,16 +41,10 @@ func createTracesProcessor(
4241
proc := &httpProcessor{
4342
config: processorCfg,
4443
logger: set.Logger,
44+
next: nextConsumer,
4545
}
4646

47-
return processorhelper.NewTracesProcessor(
48-
ctx,
49-
set,
50-
cfg,
51-
nextConsumer,
52-
proc.processTraces,
53-
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
54-
)
47+
return proc, nil
5548
}
5649

5750
func createMetricsProcessor(
@@ -61,19 +54,13 @@ func createMetricsProcessor(
6154
nextConsumer consumer.Metrics,
6255
) (processor.Metrics, error) {
6356
processorCfg := cfg.(*Config)
64-
proc := &httpProcessor{
57+
proc := &httpMetricsProcessor{
6558
config: processorCfg,
6659
logger: set.Logger,
60+
next: nextConsumer,
6761
}
6862

69-
return processorhelper.NewMetricsProcessor(
70-
ctx,
71-
set,
72-
cfg,
73-
nextConsumer,
74-
proc.processMetrics,
75-
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
76-
)
63+
return proc, nil
7764
}
7865

7966
func createLogsProcessor(
@@ -83,17 +70,11 @@ func createLogsProcessor(
8370
nextConsumer consumer.Logs,
8471
) (processor.Logs, error) {
8572
processorCfg := cfg.(*Config)
86-
proc := &httpProcessor{
73+
proc := &httpLogsProcessor{
8774
config: processorCfg,
8875
logger: set.Logger,
76+
next: nextConsumer,
8977
}
9078

91-
return processorhelper.NewLogsProcessor(
92-
ctx,
93-
set,
94-
cfg,
95-
nextConsumer,
96-
proc.processLogs,
97-
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
98-
)
79+
return proc, nil
9980
}

processor/httpprocessor/processor.go

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,34 @@ package httpprocessor
33
import (
44
"context"
55

6+
"go.opentelemetry.io/collector/component"
7+
"go.opentelemetry.io/collector/consumer"
68
"go.opentelemetry.io/collector/pdata/plog"
79
"go.opentelemetry.io/collector/pdata/pmetric"
810
"go.opentelemetry.io/collector/pdata/ptrace"
911
"go.uber.org/zap"
1012
)
1113

14+
// httpProcessor handles traces
1215
type httpProcessor struct {
1316
config *Config
1417
logger *zap.Logger
18+
next consumer.Traces
1519
}
1620

17-
func (p *httpProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
21+
func (p *httpProcessor) Capabilities() consumer.Capabilities {
22+
return consumer.Capabilities{MutatesData: true}
23+
}
24+
25+
func (p *httpProcessor) Start(ctx context.Context, host component.Host) error {
26+
return nil
27+
}
28+
29+
func (p *httpProcessor) Shutdown(ctx context.Context) error {
30+
return nil
31+
}
32+
33+
func (p *httpProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
1834
// Implement your trace processing logic here
1935
p.logger.Info("Processing traces", zap.Int("span_count", td.SpanCount()))
2036

@@ -33,17 +49,55 @@ func (p *httpProcessor) processTraces(ctx context.Context, td ptrace.Traces) (pt
3349
}
3450
}
3551

36-
return td, nil
52+
return p.next.ConsumeTraces(ctx, td)
53+
}
54+
55+
// httpMetricsProcessor handles metrics
56+
type httpMetricsProcessor struct {
57+
config *Config
58+
logger *zap.Logger
59+
next consumer.Metrics
60+
}
61+
62+
func (p *httpMetricsProcessor) Capabilities() consumer.Capabilities {
63+
return consumer.Capabilities{MutatesData: true}
64+
}
65+
66+
func (p *httpMetricsProcessor) Start(ctx context.Context, host component.Host) error {
67+
return nil
68+
}
69+
70+
func (p *httpMetricsProcessor) Shutdown(ctx context.Context) error {
71+
return nil
3772
}
3873

39-
func (p *httpProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
74+
func (p *httpMetricsProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
4075
// Implement your metric processing logic here
4176
p.logger.Info("Processing metrics", zap.Int("metric_count", md.MetricCount()))
4277

43-
return md, nil
78+
return p.next.ConsumeMetrics(ctx, md)
79+
}
80+
81+
// httpLogsProcessor handles logs
82+
type httpLogsProcessor struct {
83+
config *Config
84+
logger *zap.Logger
85+
next consumer.Logs
86+
}
87+
88+
func (p *httpLogsProcessor) Capabilities() consumer.Capabilities {
89+
return consumer.Capabilities{MutatesData: true}
90+
}
91+
92+
func (p *httpLogsProcessor) Start(ctx context.Context, host component.Host) error {
93+
return nil
94+
}
95+
96+
func (p *httpLogsProcessor) Shutdown(ctx context.Context) error {
97+
return nil
4498
}
4599

46-
func (p *httpProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
100+
func (p *httpLogsProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
47101
// Implement your log processing logic here
48102
p.logger.Info("Processing logs", zap.Int("log_count", ld.LogRecordCount()))
49103

@@ -62,5 +116,5 @@ func (p *httpProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Log
62116
}
63117
}
64118

65-
return ld, nil
119+
return p.next.ConsumeLogs(ctx, ld)
66120
}

0 commit comments

Comments
 (0)