Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/otelcol-telemetryfactory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: all

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable injecting a telemetry factory through otelcol.Factories

# One or more tracking issues or pull requests related to the change
issues: [4970]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
16 changes: 11 additions & 5 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s State) String() string {

// CollectorSettings holds configuration for creating a new Collector.
type CollectorSettings struct {
// Factories service factories.
// Factories returns component factories for the collector.
Factories func() (Factories, error)

// BuildInfo provides collector start information.
Expand Down Expand Up @@ -176,6 +176,10 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to initialize factories: %w", err)
}
if factories.Telemetry == nil {
factories.Telemetry = otelconftelemetry.NewFactory()
}

cfg, err := col.configProvider.Get(ctx, factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
Expand Down Expand Up @@ -217,10 +221,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
},
AsyncErrorChannel: col.asyncErrorChannel,
LoggingOptions: col.set.LoggingOptions,

// TODO: inject the telemetry factory through factories.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
TelemetryFactory: otelconftelemetry.NewFactory(),
TelemetryFactory: factories.Telemetry,
}, cfg.Service)
if err != nil {
return err
Expand Down Expand Up @@ -270,6 +271,10 @@ func (col *Collector) DryRun(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to initialize factories: %w", err)
}
if factories.Telemetry == nil {
factories.Telemetry = otelconftelemetry.NewFactory()
}

cfg, err := col.configProvider.Get(ctx, factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
Expand All @@ -289,6 +294,7 @@ func (col *Collector) DryRun(ctx context.Context) error {
ExportersFactories: factories.Exporters,
ConnectorsConfigs: cfg.Connectors,
ConnectorsFactories: factories.Connectors,
TelemetryFactory: factories.Telemetry,
}, service.Config{
Pipelines: cfg.Service.Pipelines,
})
Expand Down
199 changes: 103 additions & 96 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package otelcol
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"sync"
Expand All @@ -26,6 +24,8 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/telemetrytest"
)

func TestStateString(t *testing.T) {
Expand Down Expand Up @@ -79,47 +79,33 @@ func TestCollectorCancelContext(t *testing.T) {
}

func TestCollectorStateAfterConfigChange(t *testing.T) {
metricsPushRequests := make(chan chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
var watcher confmap.WatcherFunc
fileProvider := newFakeProvider("file", func(_ context.Context, uri string, w confmap.WatcherFunc) (*confmap.Retrieved, error) {
watcher = w
conf := newConfFromFile(t, uri[5:])
return confmap.NewRetrieved(conf)
})

shutdownRequests := make(chan chan struct{})
shutdown := func(ctx context.Context) error {
unblock := make(chan struct{})
select {
case <-r.Context().Done():
case metricsPushRequests <- unblock:
case <-ctx.Done():
case shutdownRequests <- unblock:
select {
case <-unblock:
case <-r.Context().Done():
case <-ctx.Done():
}
}
}))
defer srv.Close()
return nil
}
factories, err := nopFactories()
require.NoError(t, err)
factories.Telemetry = telemetry.NewFactory(
func() component.Config { return fakeTelemetryConfig{} },
telemetrytest.WithLogger(zap.NewNop(), shutdown),
)

var watcher confmap.WatcherFunc
fileProvider := newFakeProvider("file", func(_ context.Context, uri string, w confmap.WatcherFunc) (*confmap.Retrieved, error) {
watcher = w
conf := newConfFromFile(t, uri[5:])
conf["service"].(map[string]any)["telemetry"] = map[string]any{
"metrics": map[string]any{
"level": "basic",
"readers": []any{
map[string]any{
// Use a periodic metric reader with a long period,
// so we should only see an export on shutdown.
"periodic": map[string]any{
"interval": 30000, // 30s
"exporter": map[string]any{
"otlp": map[string]any{
"endpoint": srv.URL,
"insecure": true,
"protocol": "http/protobuf",
},
},
},
},
},
},
}
return confmap.NewRetrieved(conf)
})
set := ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", "otelcol-nop.yaml")},
Expand All @@ -130,7 +116,7 @@ func TestCollectorStateAfterConfigChange(t *testing.T) {
}
col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
Factories: func() (Factories, error) { return factories, nil },
ConfigProviderSettings: set,
})
require.NoError(t, err)
Expand All @@ -145,7 +131,7 @@ func TestCollectorStateAfterConfigChange(t *testing.T) {
// push to the OTLP endpoint. We block the request to check
// the state of the collector during the config change event.
watcher(&confmap.ChangeEvent{})
unblock := <-metricsPushRequests
unblock := <-shutdownRequests
assert.Equal(t, StateClosing, col.GetState())
close(unblock)
assert.Eventually(t, func() bool {
Expand All @@ -156,13 +142,13 @@ func TestCollectorStateAfterConfigChange(t *testing.T) {
// config change to make sure the internal service shutdown
// does not influence collector shutdown.
watcher(&confmap.ChangeEvent{})
unblock = <-metricsPushRequests
unblock = <-shutdownRequests
assert.Equal(t, StateClosing, col.GetState())
col.Shutdown()
close(unblock)

// After the config reload, the final shutdown should occur.
close(<-metricsPushRequests)
close(<-shutdownRequests)
wg.Wait()
assert.Equal(t, StateClosed, col.GetState())
}
Expand Down Expand Up @@ -402,55 +388,34 @@ func TestNewCollectorValidatesResolverSettings(t *testing.T) {
require.Error(t, err)
}

func TestCollectorStartWithTraceContextPropagation(t *testing.T) {
tests := []struct {
file string
errExpected bool
}{
{file: "otelcol-invalidprop.yaml", errExpected: true},
{file: "otelcol-nop.yaml", errExpected: false},
{file: "otelcol-validprop.yaml", errExpected: false},
}

for _, tt := range tests {
t.Run(tt.file, func(t *testing.T) {
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", tt.file)}),
}

col, err := NewCollector(set)
require.NoError(t, err)

if tt.errExpected {
require.Error(t, col.Run(context.Background()))
assert.Equal(t, StateClosed, col.GetState())
} else {
wg := startCollector(context.Background(), t, col)
col.Shutdown()
wg.Wait()
assert.Equal(t, StateClosed, col.GetState())
}
})
}
}

func TestCollectorRun(t *testing.T) {
tests := []struct {
file string
tests := map[string]struct {
factories func() (Factories, error)
configFile string
}{
{file: "otelcol-noreaders.yaml"},
{file: "otelcol-emptyreaders.yaml"},
{file: "otelcol-multipleheaders.yaml"},
"nop": {
factories: nopFactories,
configFile: "otelcol-nop.yaml",
},
"defaul_telemetry_factory": {
factories: func() (Factories, error) {
factories, err := nopFactories()
if err != nil {
return Factories{}, err
}
factories.Telemetry = nil
return factories, nil
},
configFile: "otelcol-otelconftelemetry.yaml",
},
}

for _, tt := range tests {
t.Run(tt.file, func(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", tt.file)}),
Factories: test.factories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", test.configFile)}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand All @@ -464,7 +429,7 @@ func TestCollectorRun(t *testing.T) {
}
}

func TestCollectorShutdownBeforeRun(t *testing.T) {
func TestCollectorRun_AfterShutdown(t *testing.T) {
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
Expand All @@ -483,21 +448,41 @@ func TestCollectorShutdownBeforeRun(t *testing.T) {
assert.Equal(t, StateClosed, col.GetState())
}

func TestCollectorClosedStateOnStartUpError(t *testing.T) {
// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
func TestCollectorRun_Errors(t *testing.T) {
tests := map[string]struct {
settings CollectorSettings
expectedErr string
}{
"invalid_processor": {
settings: CollectorSettings{
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
},
expectedErr: `invalid configuration: service::pipelines::traces: references processor "invalid" which is not configured`,
},
"invalid_telemetry_config": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-otelconftelemetry.yaml")}),
},
expectedErr: "failed to get config: cannot unmarshal the configuration: decoding failed due to the following error(s):\n\n'service.telemetry' has invalid keys: metrics",
},
}
col, err := NewCollector(set)
require.NoError(t, err)

// Expect run to error
require.Error(t, col.Run(context.Background()))
for name, test := range tests {
t.Run(name, func(t *testing.T) {
col, err := NewCollector(test.settings)
require.NoError(t, err)

// Expect state to be closed
assert.Equal(t, StateClosed, col.GetState())
// Expect run to error
err = col.Run(context.Background())
require.EqualError(t, err, test.expectedErr)

// Expect state to be closed
assert.Equal(t, StateClosed, col.GetState())
})
}
}

func TestCollectorDryRun(t *testing.T) {
Expand Down Expand Up @@ -537,6 +522,28 @@ func TestCollectorDryRun(t *testing.T) {
},
expectedErr: `failed to build pipelines: cycle detected: connector "nop/forward" (traces to traces) -> connector "nop/forward" (traces to traces)`,
},
"invalid_telemetry_config": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-otelconftelemetry.yaml")}),
},
expectedErr: "failed to get config: cannot unmarshal the configuration: decoding failed due to the following error(s):\n\n'service.telemetry' has invalid keys: metrics",
},
"default_telemetry_factory": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: func() (Factories, error) {
factories, err := nopFactories()
if err != nil {
return Factories{}, err
}
factories.Telemetry = nil
return factories, nil
},
ConfigProviderSettings: newDefaultConfigProviderSettings(t, []string{filepath.Join("testdata", "otelcol-otelconftelemetry.yaml")}),
},
},
}

for name, test := range tests {
Expand Down
6 changes: 6 additions & 0 deletions otelcol/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/telemetry"
)

// Factories struct holds in a single type all component factories that
Expand All @@ -32,6 +33,11 @@ type Factories struct {
// Connectors maps connector type names in the config to the respective factory.
Connectors map[component.Type]connector.Factory

// Telemetry is the factory to create the telemetry providers for the service.
//
// If Telemetry is nil, otelconftelemetry will be used by default.
Telemetry telemetry.Factory

// ReceiverModules maps receiver types to their respective go modules.
ReceiverModules map[component.Type]string

Expand Down
Loading
Loading