Skip to content

Commit 8eb0c9d

Browse files
craig[bot]TheComputerM
andcommitted
Merge #150696
150696: pkg/util/log: add headers to otlp sink r=arjunmahishi a=TheComputerM Headers can now be attached to requests made through the OTLP sink. In HTTP mode, these are sent as standard HTTP headers. In gRPC mode, they are added to the request context and sent as gRPC metadata. Epic: none Release note (general change): Added headers config option to OTLP log sink. Co-authored-by: Mudit Somani <[email protected]>
2 parents ed79da5 + 54f2570 commit 8eb0c9d

File tree

8 files changed

+289
-16
lines changed

8 files changed

+289
-16
lines changed

docs/generated/logsinks.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ Type-specific configuration options:
290290
| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
291291
| `address` | the network address of the gRPC endpoint for ingestion of logs on your OpenTelemetry Collector/Platform. The host/address and port parts are separated with a colon. |
292292
| `mode` | decides which protocol to use for exporting logs, can be "grpc" or "http". Set to "grpc" by default. Inherited from `otlp-defaults.mode` if not specified. |
293+
| `headers` | a list of headers to attach to each gRPC or HTTP request depending on the mode selected, the sink adds the standard headers so this option should only be used for specific headers like api keys which the downstream app expects. Inherited from `otlp-defaults.headers` if not specified. |
293294
| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `otlp-defaults.compression` if not specified. |
294295

295296

pkg/util/log/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ go_library(
9696
"@org_golang_google_grpc//connectivity",
9797
"@org_golang_google_grpc//credentials/insecure",
9898
"@org_golang_google_grpc//encoding/gzip",
99+
"@org_golang_google_grpc//metadata",
99100
"@org_golang_google_grpc//stats",
100101
"@org_golang_google_grpc//status",
101102
"@org_golang_google_protobuf//proto",
@@ -219,6 +220,7 @@ go_test(
219220
"@com_github_stretchr_testify//require",
220221
"@io_opentelemetry_go_proto_otlp//collector/logs/v1:logs",
221222
"@org_golang_google_grpc//:grpc",
223+
"@org_golang_google_grpc//metadata",
222224
"@org_golang_google_protobuf//proto",
223225
"@org_golang_x_sys//unix",
224226
],

pkg/util/log/logconfig/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ go_library(
2424
importpath = "github.com/cockroachdb/cockroach/pkg/util/log/logconfig",
2525
visibility = ["//visibility:public"],
2626
deps = [
27+
"//pkg/util/httputil",
2728
"//pkg/util/humanizeutil",
2829
"//pkg/util/log/logpb",
2930
"@com_github_cockroachdb_errors//:errors",

pkg/util/log/logconfig/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,11 @@ type OTLPDefaults struct {
597597
// Set to "grpc" by default.
598598
Mode *string `yaml:",omitempty"`
599599

600+
// Headers is a list of headers to attach to each gRPC or HTTP request depending
601+
// on the mode selected, the sink adds the standard headers so this option should
602+
// only be used for specific headers like api keys which the downstream app expects.
603+
Headers map[string]string `yaml:",omitempty,flow"`
604+
600605
// Compression can be "none" or "gzip" to enable gzip compression.
601606
// Set to "gzip" by default.
602607
Compression *string `yaml:",omitempty"`

pkg/util/log/logconfig/testdata/validate

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -889,13 +889,161 @@ sinks:
889889
----
890890
ERROR: otlp server "custom": address cannot be empty
891891

892-
# Check that invalid proto is rejected.
892+
# Check that invalid compression is rejected.
893893
yaml
894894
sinks:
895895
otlp-servers:
896896
custom:
897897
address: localhost:4317
898+
channels: all
898899
compression: ''
899900
----
900901
ERROR: otlp server "custom": compression must be 'gzip' or 'none'
901-
otlp server "custom": no channel selected
902+
903+
# Check that invalid mode is rejected.
904+
yaml
905+
sinks:
906+
otlp-servers:
907+
custom:
908+
address: localhost:4317
909+
channels: all
910+
mode: ''
911+
----
912+
ERROR: otlp server "custom": mode must be 'grpc' or 'http'
913+
914+
# Check that address is valid in http mode
915+
yaml
916+
sinks:
917+
otlp-servers:
918+
custom:
919+
address: localhost:4317
920+
channels: all
921+
mode: http
922+
----
923+
ERROR: otlp server "custom": address must start with 'http://' or 'https://'
924+
925+
# Check that invalid content type header is rejected.
926+
yaml
927+
sinks:
928+
otlp-servers:
929+
custom:
930+
address: http://localhost:4317
931+
channels: all
932+
mode: http
933+
headers:
934+
Content-Type: "protobuf"
935+
----
936+
ERROR: otlp server "custom": header Content-Type is not allowed
937+
938+
# Check that invalid content type header with grpc mode is rejected.
939+
yaml
940+
sinks:
941+
otlp-servers:
942+
custom:
943+
address: localhost:4317
944+
channels: all
945+
mode: grpc
946+
headers:
947+
Content-Type: "protobuf"
948+
----
949+
ERROR: otlp server "custom": header Content-Type is not allowed
950+
951+
# Check that invalid content encoding header is rejected.
952+
yaml
953+
sinks:
954+
otlp-servers:
955+
custom:
956+
address: http://localhost:4317
957+
channels: all
958+
mode: http
959+
headers:
960+
Content-Encoding: "snappy"
961+
----
962+
ERROR: otlp server "custom": header Content-Encoding is not allowed
963+
964+
# Basic OTLP sink with HTTP mode
965+
yaml
966+
sinks:
967+
otlp-servers:
968+
custom:
969+
address: https://http-dd-log-intake.com
970+
channels: all
971+
mode: http
972+
compression: none
973+
headers:
974+
X-DD-API: "xxx"
975+
X-DD-MODE: "otlp"
976+
----
977+
sinks:
978+
file-groups:
979+
default:
980+
channels: {INFO: all}
981+
filter: INFO
982+
otlp-servers:
983+
custom:
984+
channels: {INFO: all}
985+
address: https://http-dd-log-intake.com
986+
mode: http
987+
headers: {X-DD-API: xxx, X-DD-MODE: otlp}
988+
compression: none
989+
filter: INFO
990+
format: json
991+
redact: false
992+
redactable: true
993+
exit-on-error: false
994+
auditable: false
995+
buffering:
996+
max-staleness: 5s
997+
flush-trigger-size: 1.0MiB
998+
max-buffer-size: 50MiB
999+
format: newline
1000+
sinkname: custom
1001+
stderr:
1002+
filter: NONE
1003+
capture-stray-errors:
1004+
enable: true
1005+
dir: /default-dir
1006+
max-group-size: 100MiB
1007+
1008+
# Basic OTLP sink with gRPC mode
1009+
yaml
1010+
sinks:
1011+
otlp-servers:
1012+
custom:
1013+
address: localhost:4317
1014+
channels: all
1015+
mode: grpc
1016+
compression: gzip
1017+
headers:
1018+
dd-api: "xxx"
1019+
----
1020+
sinks:
1021+
file-groups:
1022+
default:
1023+
channels: {INFO: all}
1024+
filter: INFO
1025+
otlp-servers:
1026+
custom:
1027+
channels: {INFO: all}
1028+
address: localhost:4317
1029+
mode: grpc
1030+
headers: {dd-api: xxx}
1031+
compression: gzip
1032+
filter: INFO
1033+
format: json
1034+
redact: false
1035+
redactable: true
1036+
exit-on-error: false
1037+
auditable: false
1038+
buffering:
1039+
max-staleness: 5s
1040+
flush-trigger-size: 1.0MiB
1041+
max-buffer-size: 50MiB
1042+
format: newline
1043+
sinkname: custom
1044+
stderr:
1045+
filter: NONE
1046+
capture-stray-errors:
1047+
enable: true
1048+
dir: /default-dir
1049+
max-group-size: 100MiB

pkg/util/log/logconfig/validate.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import (
1111
"net/http"
1212
"path/filepath"
1313
"reflect"
14+
"slices"
1415
"sort"
1516
"strings"
1617
"time"
1718

19+
"github.com/cockroachdb/cockroach/pkg/util/httputil"
1820
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
1921
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
2022
"github.com/cockroachdb/errors"
@@ -533,6 +535,32 @@ func (c *Config) validateOTLPSinkConfig(otsc *OTLPSinkConfig) error {
533535
return errors.New("mode must be 'grpc' or 'http'")
534536
}
535537

538+
if *otsc.Mode == OTLPModeHTTP {
539+
if !(strings.HasPrefix(otsc.Address, "http://") || strings.HasPrefix(otsc.Address, "https://")) {
540+
return errors.New("address must start with 'http://' or 'https://'")
541+
}
542+
}
543+
544+
if otsc.Headers != nil {
545+
invalidHeaders := []string{
546+
httputil.ContentTypeHeader,
547+
}
548+
549+
if *otsc.Mode == OTLPModeHTTP {
550+
// gRPC has custom metadata where it tells if payload is encoded or not
551+
invalidHeaders = append(invalidHeaders, httputil.ContentEncodingHeader)
552+
}
553+
554+
for i, value := range invalidHeaders {
555+
invalidHeaders[i] = strings.ToLower(value)
556+
}
557+
for key := range otsc.Headers {
558+
if slices.Contains(invalidHeaders, strings.ToLower(key)) {
559+
return errors.Newf("header %s is not allowed", key)
560+
}
561+
}
562+
}
563+
536564
return c.ValidateCommonSinkConfig(otsc.CommonSinkConfig)
537565
}
538566

pkg/util/log/otlp_client.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"google.golang.org/grpc/connectivity"
2727
"google.golang.org/grpc/credentials/insecure"
2828
grpc_gzip "google.golang.org/grpc/encoding/gzip"
29+
"google.golang.org/grpc/metadata"
2930
"google.golang.org/grpc/stats"
3031
"google.golang.org/grpc/status"
3132
"google.golang.org/protobuf/proto"
@@ -199,8 +200,9 @@ var _ stats.Handler = (*otlpStatsHandler)(nil)
199200

200201
// client used when sink is using gRPC for exporting logs
201202
type otlpGRPCClient struct {
202-
conn *grpc.ClientConn
203-
lsc collpb.LogsServiceClient
203+
conn *grpc.ClientConn
204+
lsc collpb.LogsServiceClient
205+
metadata metadata.MD
204206
}
205207

206208
func (c *otlpGRPCClient) Close() error {
@@ -216,6 +218,9 @@ func (c *otlpGRPCClient) Close() error {
216218
func (c *otlpGRPCClient) Export(
217219
ctx context.Context, in *collpb.ExportLogsServiceRequest,
218220
) (*collpb.ExportLogsServiceResponse, error) {
221+
if c.metadata.Len() > 0 {
222+
ctx = metadata.NewOutgoingContext(ctx, c.metadata)
223+
}
219224
return c.lsc.Export(ctx, in)
220225
}
221226

@@ -240,8 +245,9 @@ func (sink *otlpSink) setGRPCClient(config *logconfig.OTLPSinkConfig) error {
240245
lsc := collpb.NewLogsServiceClient(conn)
241246

242247
sink.client = &otlpGRPCClient{
243-
conn: conn,
244-
lsc: lsc,
248+
conn: conn,
249+
lsc: lsc,
250+
metadata: metadata.New(config.Headers),
245251
}
246252

247253
return nil
@@ -267,7 +273,7 @@ func (c *otlpHTTPClient) Export(
267273
return nil, err
268274
}
269275

270-
request := c.request.Clone(context.Background())
276+
request := c.request.Clone(ctx)
271277
switch c.compression {
272278
case logconfig.NoneCompression:
273279
request.Body = io.NopCloser(bytes.NewReader(body))
@@ -303,6 +309,9 @@ func (sink *otlpSink) setHTTPClient(config *logconfig.OTLPSinkConfig) error {
303309
return err
304310
}
305311
request.Header.Set(httputil.ContentTypeHeader, httputil.ProtoContentType)
312+
for k, v := range config.Headers {
313+
request.Header.Add(k, v)
314+
}
306315

307316
compression := *config.Compression
308317
if compression == logconfig.GzipCompression {

0 commit comments

Comments
 (0)