Skip to content

Commit ee02ca6

Browse files
craig[bot]TheComputerM
andcommitted
Merge #148525
148525: pkg/util/log: add otlp log sink r=TheComputerM a=TheComputerM OpenTelemetry is now an industry standard for o11y and CRDB should support its logging protocol so that it works seamlessly with OTEL compatible tooling. This commit implements the log sink functionality, it uses gRPC to transmit logs in the OTEL-standard protobuf format. This sink can be used to ship logs on OTEL compatible targets like otel-collector, datadog-agent or even directly to platforms like datadog and Loki. Parsing the config for the otlp log sink was done in a previous commit. Resolves: #143049 Release note (general change): Added otlp log sink that can export logs to OpenTelemetry-compatible targets over gRPC. Co-authored-by: Mudit Somani <[email protected]>
2 parents f51a6aa + 6668c5a commit ee02ca6

File tree

12 files changed

+440
-39
lines changed

12 files changed

+440
-39
lines changed

docs/generated/logsinks.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ Type-specific configuration options:
289289
|--|--|
290290
| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
291291
| `address` | the network address of the gRPC endpoint for ingestion of logs on your OpenTelemetry Collector/Platform. The host/address and port parts are separated with a colon. |
292-
| `insecure` | Disables transport security for the underlying gRPC connection. Inherited from `otlp-defaults.insecure` if not specified. |
293292
| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `otlp-defaults.compression` if not specified. |
294293

295294

pkg/cli/log_flags_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ func TestSetupLogging(t *testing.T) {
5757
`flush-trigger-size: 1.0MiB, ` +
5858
`max-buffer-size: 50MiB, ` +
5959
`format: newline}}`
60-
const defaultOtlpConfig = `otlp-defaults: {` +
61-
`insecure: false, ` +
60+
const defaultOTLPConfig = `otlp-defaults: {` +
6261
`compression: gzip, ` +
6362
`filter: INFO, ` +
6463
`format: json, ` +
@@ -200,7 +199,7 @@ func TestSetupLogging(t *testing.T) {
200199
// Shorten the configuration for legibility during reviews of test changes.
201200
actual = strings.ReplaceAll(actual, defaultFluentConfig, "<fluentDefaults>")
202201
actual = strings.ReplaceAll(actual, defaultHTTPConfig, "<httpDefaults>")
203-
actual = strings.ReplaceAll(actual, defaultOtlpConfig, "<otlpDefaults>")
202+
actual = strings.ReplaceAll(actual, defaultOTLPConfig, "<otlpDefaults>")
204203
actual = stdFileDefaultsRe.ReplaceAllString(actual, "<stdFileDefaults($path)>")
205204
actual = fileDefaultsNoMaxSizeRe.ReplaceAllString(actual, "<fileDefaultsNoMaxSize($path)>")
206205
actual = strings.ReplaceAll(actual, fileDefaultsNoDir, "<fileDefaultsNoDir>")

pkg/testutils/lint/lint_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,7 @@ func TestLint(t *testing.T) {
12421242
":!rpc/context.go",
12431243
":!rpc/nodedialer/nodedialer_test.go",
12441244
":!util/grpcutil/grpc_util_test.go",
1245+
":!util/log/otlp_client_test.go",
12451246
":!server/server_obs_service.go",
12461247
":!server/testserver.go",
12471248
":!util/tracing/*_test.go",

pkg/util/log/BUILD.bazel

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"log_entry.go",
3535
"log_flush.go",
3636
"metric.go",
37+
"otlp_client.go",
3738
"redact.go",
3839
"registry.go",
3940
"report.go",
@@ -86,6 +87,16 @@ go_library(
8687
"@com_github_cockroachdb_redact//interfaces",
8788
"@com_github_cockroachdb_ttycolor//:ttycolor",
8889
"@com_github_petermattis_goid//:goid",
90+
"@io_opentelemetry_go_proto_otlp//collector/logs/v1:logs",
91+
"@io_opentelemetry_go_proto_otlp//common/v1:common",
92+
"@io_opentelemetry_go_proto_otlp//logs/v1:logs",
93+
"@io_opentelemetry_go_proto_otlp//resource/v1:resource",
94+
"@org_golang_google_grpc//:grpc",
95+
"@org_golang_google_grpc//codes",
96+
"@org_golang_google_grpc//connectivity",
97+
"@org_golang_google_grpc//credentials/insecure",
98+
"@org_golang_google_grpc//encoding/gzip",
99+
"@org_golang_google_grpc//status",
89100
] + select({
90101
"@io_bazel_rules_go//go/platform:aix": [
91102
"@org_golang_x_sys//unix",
@@ -164,6 +175,7 @@ go_test(
164175
"intercept_test.go",
165176
"log_decoder_test.go",
166177
"main_test.go",
178+
"otlp_client_test.go",
167179
"redact_test.go",
168180
"registry_test.go",
169181
"secondary_log_test.go",
@@ -202,6 +214,9 @@ go_test(
202214
"@com_github_pmezard_go_difflib//difflib",
203215
"@com_github_stretchr_testify//assert",
204216
"@com_github_stretchr_testify//require",
217+
"@io_opentelemetry_go_proto_otlp//collector/logs/v1:logs",
218+
"@io_opentelemetry_go_proto_otlp//common/v1:common",
219+
"@org_golang_google_grpc//:grpc",
205220
"@org_golang_x_sys//unix",
206221
],
207222
)

pkg/util/log/flags.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,21 @@ func ApplyConfig(
119119
fd2CaptureCleanupFn := func() {}
120120

121121
closer := newBufferedSinkCloser()
122+
123+
// closes the underlying gRPC connection of OTLP sinks.
124+
closeOTLPSinks := func() {
125+
for _, fc := range sinkInfos {
126+
if sink, ok := fc.sink.(*otlpSink); ok && sink.isNotShutdown() {
127+
// The reason for nolint:grpcconnclose is that we are not using *rpc.Context
128+
// as it is primarily used for communication between crdb nodes, and doesn't
129+
// fit this usecase.
130+
if err := sink.conn.Close(); err != nil { // nolint:grpcconnclose
131+
fmt.Fprintf(OrigStderr, "# OTLP Sink Cleanup Warning: %s\n", err.Error())
132+
}
133+
}
134+
}
135+
}
136+
122137
// logShutdownFn is the returned cleanup function, whose purpose
123138
// is to tear down the work we are doing here.
124139
logShutdownFn = func() {
@@ -127,6 +142,7 @@ func ApplyConfig(
127142
logging.setChannelLoggers(make(map[Channel]*loggerT), &si)
128143
fd2CaptureCleanupFn()
129144
secLoggersCancel()
145+
closeOTLPSinks()
130146
if err := closer.Close(defaultCloserTimeout); err != nil {
131147
fmt.Printf("# WARNING: %s\n", err.Error())
132148
}
@@ -367,16 +383,16 @@ func ApplyConfig(
367383
}
368384

369385
// Create the OpenTelemetry sinks.
370-
for _, fc := range config.Sinks.OtlpServers {
386+
for _, fc := range config.Sinks.OTLPServers {
371387
if fc.Filter == severity.NONE {
372388
continue
373389
}
374-
optlSinkInfo, err := newOtlpSinkInfo(*fc)
390+
otplSinkInfo, err := newOTLPSinkInfo(*fc)
375391
if err != nil {
376392
return nil, err
377393
}
378-
attachBufferWrapper(optlSinkInfo, fc.CommonSinkConfig.Buffering, closer)
379-
attachSinkInfo(optlSinkInfo, &fc.Channels)
394+
attachBufferWrapper(otplSinkInfo, fc.CommonSinkConfig.Buffering, closer)
395+
attachSinkInfo(otplSinkInfo, &fc.Channels)
380396
}
381397

382398
// Prepend the interceptor sink to all channels.
@@ -446,9 +462,20 @@ func newHTTPSinkInfo(c logconfig.HTTPSinkConfig) (*sinkInfo, error) {
446462
return info, nil
447463
}
448464

449-
func newOtlpSinkInfo(_ logconfig.OtlpSinkConfig) (*sinkInfo, error) {
450-
// TODO(mudit): Implement newOtlpSink
451-
return nil, nil
465+
func newOTLPSinkInfo(c logconfig.OTLPSinkConfig) (*sinkInfo, error) {
466+
info := &sinkInfo{}
467+
468+
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
469+
return nil, err
470+
}
471+
info.applyFilters(c.Channels)
472+
473+
otlpSink, err := newOTLPSink(c)
474+
if err != nil {
475+
return nil, err
476+
}
477+
info.sink = otlpSink
478+
return info, nil
452479
}
453480

454481
// applyFilters applies the channel filters to a sinkInfo.

pkg/util/log/logconfig/config.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ const DefaultFluentFormat = `json-fluent-compact`
3535
// when not specified in a configuration.
3636
const DefaultHTTPFormat = `json-compact`
3737

38-
// DefaultOtlpFormat is the entry format for OpenTelemetry sinks
38+
// DefaultOTLPFormat is the entry format for OpenTelemetry sinks
3939
// when not specified in a configuration.
40-
const DefaultOtlpFormat = `json`
40+
const DefaultOTLPFormat = `json`
4141

4242
// DefaultFilePerms is the default permissions used in file-defaults. It
4343
// is applied literally via os.Chmod, without considering the umask.
@@ -78,7 +78,7 @@ http-defaults:
7878
max-buffer-size: 50mib
7979
otlp-defaults:
8080
filter: INFO
81-
format: ` + DefaultOtlpFormat + `
81+
format: ` + DefaultOTLPFormat + `
8282
redactable: true
8383
exit-on-error: false
8484
buffering:
@@ -138,10 +138,10 @@ type Config struct {
138138
// configuration value.
139139
HTTPDefaults HTTPDefaults `yaml:"http-defaults,omitempty"`
140140

141-
// OtlpDefaults represents the default configuration for OpenTelemetry sinks,
141+
// OTLPDefaults represents the default configuration for OpenTelemetry sinks,
142142
// inherited when a specific OpenTelemetry sink config does not provide a
143143
// configuration value.
144-
OtlpDefaults OtlpDefaults `yaml:"otlp-defaults,omitempty"`
144+
OTLPDefaults OTLPDefaults `yaml:"otlp-defaults,omitempty"`
145145

146146
// Sinks represents the sink configurations.
147147
Sinks SinkConfig `yaml:",omitempty"`
@@ -267,8 +267,8 @@ type SinkConfig struct {
267267
FluentServers map[string]*FluentSinkConfig `yaml:"fluent-servers,omitempty"`
268268
// HTTPServers represents the list of configured http sinks.
269269
HTTPServers map[string]*HTTPSinkConfig `yaml:"http-servers,omitempty"`
270-
// OtlpServers represents the list of configured opentelemetry sinks.
271-
OtlpServers map[string]*OtlpSinkConfig `yaml:"otlp-servers,omitempty"`
270+
// OTLPServers represents the list of configured opentelemetry sinks.
271+
OTLPServers map[string]*OTLPSinkConfig `yaml:"otlp-servers,omitempty"`
272272
// Stderr represents the configuration for the stderr sink.
273273
Stderr StderrSinkConfig `yaml:",omitempty"`
274274
}
@@ -589,18 +589,15 @@ type HTTPSinkConfig struct {
589589
sinkName string
590590
}
591591

592-
type OtlpDefaults struct {
593-
// Disables transport security for the underlying gRPC connection.
594-
Insecure *bool `yaml:",omitempty"`
595-
592+
type OTLPDefaults struct {
596593
// Compression can be "none" or "gzip" to enable gzip compression.
597594
// Set to "gzip" by default.
598595
Compression *string `yaml:",omitempty"`
599596

600597
CommonSinkConfig `yaml:",inline"`
601598
}
602599

603-
// OtlpSinkConfig represents the configuration for one OpenTelemetry sink.
600+
// OTLPSinkConfig represents the configuration for one OpenTelemetry sink.
604601
//
605602
// User-facing documentation follows.
606603
// TITLE: Output to OpenTelemetry compatible collectors.
@@ -637,7 +634,7 @@ type OtlpDefaults struct {
637634
// {{site.data.alerts.callout_info}}
638635
// Run `cockroach debug check-log-config` to verify the effect of defaults inheritance.
639636
// {{site.data.alerts.end}}
640-
type OtlpSinkConfig struct {
637+
type OTLPSinkConfig struct {
641638
// Channels is the list of logging channels that use this sink.
642639
Channels ChannelFilters `yaml:",omitempty,flow"`
643640

@@ -646,7 +643,7 @@ type OtlpSinkConfig struct {
646643
// The host/address and port parts are separated with a colon.
647644
Address string `yaml:""`
648645

649-
OtlpDefaults `yaml:",inline"`
646+
OTLPDefaults `yaml:",inline"`
650647

651648
// SinkName is populated during validation.
652649
SinkName string

pkg/util/log/logconfig/validate.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
111111
}(),
112112
Compression: &GzipCompression,
113113
}
114-
baseOtlpDefaults := OtlpDefaults{
114+
baseOTLPDefaults := OTLPDefaults{
115115
CommonSinkConfig: CommonSinkConfig{
116-
Format: func() *string { s := DefaultOtlpFormat; return &s }(),
116+
Format: func() *string { s := DefaultOTLPFormat; return &s }(),
117117
Buffering: CommonBufferSinkConfigWrapper{
118118
CommonBufferSinkConfig: CommonBufferSinkConfig{
119119
MaxStaleness: &defaultBufferedStaleness,
@@ -123,19 +123,18 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
123123
},
124124
},
125125
},
126-
Insecure: &bf,
127126
Compression: &GzipCompression,
128127
}
129128

130129
propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig)
131130
propagateCommonDefaults(&baseFluentDefaults.CommonSinkConfig, baseCommonSinkConfig)
132131
propagateCommonDefaults(&baseHTTPDefaults.CommonSinkConfig, baseCommonSinkConfig)
133-
propagateCommonDefaults(&baseOtlpDefaults.CommonSinkConfig, baseCommonSinkConfig)
132+
propagateCommonDefaults(&baseOTLPDefaults.CommonSinkConfig, baseCommonSinkConfig)
134133

135134
propagateFileDefaults(&c.FileDefaults, baseFileDefaults)
136135
propagateFluentDefaults(&c.FluentDefaults, baseFluentDefaults)
137136
propagateHTTPDefaults(&c.HTTPDefaults, baseHTTPDefaults)
138-
propagateOtlpDefaults(&c.OtlpDefaults, baseOtlpDefaults)
137+
propagateOTLPDefaults(&c.OTLPDefaults, baseOTLPDefaults)
139138

140139
// Normalize the directory.
141140
if err := normalizeDir(&c.FileDefaults.Dir); err != nil {
@@ -177,13 +176,13 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
177176
}
178177
}
179178

180-
for sinkName, fc := range c.Sinks.OtlpServers {
179+
for sinkName, fc := range c.Sinks.OTLPServers {
181180
if fc == nil {
182-
fc = &OtlpSinkConfig{Channels: SelectChannels()}
183-
c.Sinks.OtlpServers[sinkName] = fc
181+
fc = &OTLPSinkConfig{Channels: SelectChannels()}
182+
c.Sinks.OTLPServers[sinkName] = fc
184183
}
185184
fc.SinkName = sinkName
186-
if err := c.validateOtlpSinkConfig(fc); err != nil {
185+
if err := c.validateOTLPSinkConfig(fc); err != nil {
187186
fmt.Fprintf(&errBuf, "otlp server %q: %v\n", sinkName, err)
188187
}
189188
}
@@ -273,7 +272,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
273272
}
274273
}
275274

276-
for serverName, fc := range c.Sinks.OtlpServers {
275+
for serverName, fc := range c.Sinks.OTLPServers {
277276
if len(fc.Channels.Filters) == 0 {
278277
fmt.Fprintf(&errBuf, "otlp server %q: no channel selected\n", serverName)
279278
}
@@ -518,8 +517,8 @@ func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error {
518517
return c.ValidateCommonSinkConfig(hsc.CommonSinkConfig)
519518
}
520519

521-
func (c *Config) validateOtlpSinkConfig(otsc *OtlpSinkConfig) error {
522-
propagateOtlpDefaults(&otsc.OtlpDefaults, c.OtlpDefaults)
520+
func (c *Config) validateOTLPSinkConfig(otsc *OTLPSinkConfig) error {
521+
propagateOTLPDefaults(&otsc.OTLPDefaults, c.OTLPDefaults)
523522
otsc.Address = strings.TrimSpace(otsc.Address)
524523
if len(otsc.Address) == 0 {
525524
return errors.New("address cannot be empty")
@@ -566,7 +565,7 @@ func propagateHTTPDefaults(target *HTTPDefaults, source HTTPDefaults) {
566565
propagateDefaults(target, source)
567566
}
568567

569-
func propagateOtlpDefaults(target *OtlpDefaults, source OtlpDefaults) {
568+
func propagateOTLPDefaults(target *OTLPDefaults, source OTLPDefaults) {
570569
propagateDefaults(target, source)
571570
}
572571

pkg/util/log/logconfig/validate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func clearExpectedValues(c *Config) {
5858
c.FileDefaults = FileDefaults{}
5959
c.FluentDefaults = FluentDefaults{}
6060
c.HTTPDefaults = HTTPDefaults{}
61-
c.OtlpDefaults = OtlpDefaults{}
61+
c.OTLPDefaults = OTLPDefaults{}
6262

6363
for _, f := range c.Sinks.FileGroups {
6464
if *f.Dir == "/default-dir" {

0 commit comments

Comments
 (0)