Skip to content

Commit b4965d2

Browse files
Frapschenatoulme
andauthored
[chore] move kafka exporter to generated lifecycle tests (#30531)
Relates to #27849 --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 9db1c15 commit b4965d2

File tree

6 files changed

+175
-29
lines changed

6 files changed

+175
-29
lines changed

exporter/kafkaexporter/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
148148
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
149149
exporterhelper.WithRetry(oCfg.BackOffConfig),
150150
exporterhelper.WithQueue(oCfg.QueueSettings),
151+
exporterhelper.WithStart(exp.start),
151152
exporterhelper.WithShutdown(exp.Close))
152153
}
153154

@@ -178,6 +179,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
178179
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
179180
exporterhelper.WithRetry(oCfg.BackOffConfig),
180181
exporterhelper.WithQueue(oCfg.QueueSettings),
182+
exporterhelper.WithStart(exp.start),
181183
exporterhelper.WithShutdown(exp.Close))
182184
}
183185

@@ -208,5 +210,6 @@ func (f *kafkaExporterFactory) createLogsExporter(
208210
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
209211
exporterhelper.WithRetry(oCfg.BackOffConfig),
210212
exporterhelper.WithQueue(oCfg.QueueSettings),
213+
exporterhelper.WithStart(exp.start),
211214
exporterhelper.WithShutdown(exp.Close))
212215
}

exporter/kafkaexporter/factory_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/IBM/sarama"
1313
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
1415
"go.opentelemetry.io/collector/component/componenttest"
1516
"go.opentelemetry.io/collector/exporter/exportertest"
1617
"go.opentelemetry.io/collector/pdata/plog"
@@ -124,9 +125,11 @@ func TestCreateMetricExporter(t *testing.T) {
124125
exportertest.NewNopCreateSettings(),
125126
tc.conf,
126127
)
128+
require.NoError(t, err)
129+
assert.NotNil(t, exporter, "Must return valid exporter")
130+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
127131
if tc.err != nil {
128132
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
129-
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
130133
return
131134
}
132135
assert.NoError(t, err, "Must not error")
@@ -199,9 +202,11 @@ func TestCreateLogExporter(t *testing.T) {
199202
exportertest.NewNopCreateSettings(),
200203
tc.conf,
201204
)
205+
require.NoError(t, err)
206+
assert.NotNil(t, exporter, "Must return valid exporter")
207+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
202208
if tc.err != nil {
203209
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
204-
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
205210
return
206211
}
207212
assert.NoError(t, err, "Must not error")
@@ -274,9 +279,11 @@ func TestCreateTraceExporter(t *testing.T) {
274279
exportertest.NewNopCreateSettings(),
275280
tc.conf,
276281
)
282+
require.NoError(t, err)
283+
assert.NotNil(t, exporter, "Must return valid exporter")
284+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
277285
if tc.err != nil {
278286
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
279-
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
280287
return
281288
}
282289
assert.NoError(t, err, "Must not error")

exporter/kafkaexporter/generated_component_test.go

Lines changed: 101 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010

1111
"github.com/IBM/sarama"
12+
"go.opentelemetry.io/collector/component"
1213
"go.opentelemetry.io/collector/consumer/consumererror"
1314
"go.opentelemetry.io/collector/exporter"
1415
"go.opentelemetry.io/collector/pdata/plog"
@@ -23,6 +24,7 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
2324

2425
// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
2526
type kafkaTracesProducer struct {
27+
cfg Config
2628
producer sarama.SyncProducer
2729
topic string
2830
marshaler TracesMarshaler
@@ -57,11 +59,24 @@ func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces)
5759
}
5860

5961
func (e *kafkaTracesProducer) Close(context.Context) error {
62+
if e.producer == nil {
63+
return nil
64+
}
6065
return e.producer.Close()
6166
}
6267

68+
func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
69+
producer, err := newSaramaProducer(e.cfg)
70+
if err != nil {
71+
return err
72+
}
73+
e.producer = producer
74+
return nil
75+
}
76+
6377
// kafkaMetricsProducer uses sarama to produce metrics messages to kafka
6478
type kafkaMetricsProducer struct {
79+
cfg Config
6580
producer sarama.SyncProducer
6681
topic string
6782
marshaler MetricsMarshaler
@@ -87,11 +102,24 @@ func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.M
87102
}
88103

89104
func (e *kafkaMetricsProducer) Close(context.Context) error {
105+
if e.producer == nil {
106+
return nil
107+
}
90108
return e.producer.Close()
91109
}
92110

111+
func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
112+
producer, err := newSaramaProducer(e.cfg)
113+
if err != nil {
114+
return err
115+
}
116+
e.producer = producer
117+
return nil
118+
}
119+
93120
// kafkaLogsProducer uses sarama to produce logs messages to kafka
94121
type kafkaLogsProducer struct {
122+
cfg Config
95123
producer sarama.SyncProducer
96124
topic string
97125
marshaler LogsMarshaler
@@ -117,9 +145,21 @@ func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) erro
117145
}
118146

119147
func (e *kafkaLogsProducer) Close(context.Context) error {
148+
if e.producer == nil {
149+
return nil
150+
}
120151
return e.producer.Close()
121152
}
122153

154+
func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
155+
producer, err := newSaramaProducer(e.cfg)
156+
if err != nil {
157+
return err
158+
}
159+
e.producer = producer
160+
return nil
161+
}
162+
123163
func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
124164
c := sarama.NewConfig()
125165

@@ -171,13 +211,8 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
171211
if marshaler == nil {
172212
return nil, errUnrecognizedEncoding
173213
}
174-
producer, err := newSaramaProducer(config)
175-
if err != nil {
176-
return nil, err
177-
}
178-
179214
return &kafkaMetricsProducer{
180-
producer: producer,
215+
cfg: config,
181216
topic: config.Topic,
182217
marshaler: marshaler,
183218
logger: set.Logger,
@@ -196,12 +231,9 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma
196231
keyableMarshaler.Key()
197232
}
198233
}
199-
producer, err := newSaramaProducer(config)
200-
if err != nil {
201-
return nil, err
202-
}
234+
203235
return &kafkaTracesProducer{
204-
producer: producer,
236+
cfg: config,
205237
topic: config.Topic,
206238
marshaler: marshaler,
207239
logger: set.Logger,
@@ -213,13 +245,9 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[
213245
if marshaler == nil {
214246
return nil, errUnrecognizedEncoding
215247
}
216-
producer, err := newSaramaProducer(config)
217-
if err != nil {
218-
return nil, err
219-
}
220248

221249
return &kafkaLogsProducer{
222-
producer: producer,
250+
cfg: config,
223251
topic: config.Topic,
224252
marshaler: marshaler,
225253
logger: set.Logger,

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/IBM/sarama/mocks"
1313
"github.com/stretchr/testify/assert"
1414
"github.com/stretchr/testify/require"
15+
"go.opentelemetry.io/collector/component/componenttest"
1516
"go.opentelemetry.io/collector/config/configtls"
1617
"go.opentelemetry.io/collector/exporter/exportertest"
1718
"go.opentelemetry.io/collector/pdata/plog"
@@ -26,8 +27,9 @@ import (
2627
func TestNewExporter_err_version(t *testing.T) {
2728
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
2829
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
30+
require.NoError(t, err)
31+
err = texp.start(context.Background(), componenttest.NewNopHost())
2932
assert.Error(t, err)
30-
assert.Nil(t, texp)
3133
}
3234

3335
func TestNewExporter_err_encoding(t *testing.T) {
@@ -40,8 +42,9 @@ func TestNewExporter_err_encoding(t *testing.T) {
4042
func TestNewMetricsExporter_err_version(t *testing.T) {
4143
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
4244
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
45+
require.NoError(t, err)
46+
err = mexp.start(context.Background(), componenttest.NewNopHost())
4347
assert.Error(t, err)
44-
assert.Nil(t, mexp)
4548
}
4649

4750
func TestNewMetricsExporter_err_encoding(t *testing.T) {
@@ -60,9 +63,10 @@ func TestNewMetricsExporter_err_traces_encoding(t *testing.T) {
6063

6164
func TestNewLogsExporter_err_version(t *testing.T) {
6265
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
63-
mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
66+
lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
67+
require.NoError(t, err)
68+
err = lexp.start(context.Background(), componenttest.NewNopHost())
6469
assert.Error(t, err)
65-
assert.Nil(t, mexp)
6670
}
6771

6872
func TestNewLogsExporter_err_encoding(t *testing.T) {
@@ -98,17 +102,20 @@ func TestNewExporter_err_auth_type(t *testing.T) {
98102
},
99103
}
100104
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
105+
require.NoError(t, err)
106+
err = texp.start(context.Background(), componenttest.NewNopHost())
101107
assert.Error(t, err)
102108
assert.Contains(t, err.Error(), "failed to load TLS config")
103-
assert.Nil(t, texp)
104109
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
110+
require.NoError(t, err)
111+
err = mexp.start(context.Background(), componenttest.NewNopHost())
105112
assert.Error(t, err)
106113
assert.Contains(t, err.Error(), "failed to load TLS config")
107-
assert.Nil(t, mexp)
108114
lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
115+
require.NoError(t, err)
116+
err = lexp.start(context.Background(), componenttest.NewNopHost())
109117
assert.Error(t, err)
110118
assert.Contains(t, err.Error(), "failed to load TLS config")
111-
assert.Nil(t, lexp)
112119

113120
}
114121

@@ -120,9 +127,10 @@ func TestNewExporter_err_compression(t *testing.T) {
120127
},
121128
}
122129
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
130+
require.NoError(t, err)
131+
err = texp.start(context.Background(), componenttest.NewNopHost())
123132
assert.Error(t, err)
124133
assert.Contains(t, err.Error(), "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk")
125-
assert.Nil(t, texp)
126134
}
127135

128136
func TestTracesPusher(t *testing.T) {

exporter/kafkaexporter/metadata.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ status:
99
codeowners:
1010
active: [pavolloffay, MovieStoreGuy]
1111

12-
# TODO: Update the exporter to pass the tests
1312
tests:
13+
config:
1414
skip_lifecycle: true
15-
skip_shutdown: true

0 commit comments

Comments
 (0)