Skip to content

Commit c2d1060

Browse files
craig[bot]TheComputerM
andcommitted
Merge #150655
150655: pkg/util/log: add http mode to otlp sink r=TheComputerM a=TheComputerM Added HTTP support for OTLP sink as many platforms support directly ingesting logs through otlp+http but not gRPC, it enables users to perform agentless deployments (no intermediate service like otel-collector or datadog-agent) where the logs are directly ingested by the target app (i.e datadog or grafana) Had to use proto.Marshal instead of protoutil, because the .proto definitions for OpenTelemetry are not generated by gogoproto and therefore don't have the additional methods that protoutil requires Also refactored tests to be table driven and make them run on both modes, merged the datadriven tests into the table driven ones during the process. Epic: none Release note (general change): Added HTTP mode to OTLP log sink that can export logs to OpenTelemetry-compatible targets over HTTP. Co-authored-by: Mudit Somani <[email protected]>
2 parents b124454 + dd9c774 commit c2d1060

File tree

10 files changed

+312
-177
lines changed

10 files changed

+312
-177
lines changed

docs/generated/logsinks.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ Type-specific configuration options:
289289
|--|--|
290290
| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
291291
| `address` | the network address of the gRPC endpoint for ingestion of logs on your OpenTelemetry Collector/Platform. The host/address and port parts are separated with a colon. |
292+
| `mode` | decides which protocol to use for exporting logs, can be "grpc" or "http". Set to "grpc" by default. Inherited from `otlp-defaults.mode` if not specified. |
292293
| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `otlp-defaults.compression` if not specified. |
293294

294295

pkg/cli/log_flags_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func TestSetupLogging(t *testing.T) {
5858
`max-buffer-size: 50MiB, ` +
5959
`format: newline}}`
6060
const defaultOTLPConfig = `otlp-defaults: {` +
61+
`mode: grpc, ` +
6162
`compression: gzip, ` +
6263
`filter: INFO, ` +
6364
`format: json, ` +

pkg/testutils/lint/lint_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,7 @@ func TestLint(t *testing.T) {
14441444
":!sql/types/types_jsonpb.go",
14451445
":!sql/schemachanger/scplan/scviz/maps.go",
14461446
":!workload/schemachange/tracing.go",
1447+
":!util/log/otlp_client.go",
14471448
)
14481449
if err != nil {
14491450
t.Fatal(err)
@@ -1492,6 +1493,7 @@ func TestLint(t *testing.T) {
14921493
":!storage/mvcc_value.go",
14931494
":!roachpb/data.go",
14941495
":!sql/types/types_jsonpb.go",
1496+
":!util/log/otlp_client_test.go",
14951497
)
14961498
if err != nil {
14971499
t.Fatal(err)

pkg/util/log/BUILD.bazel

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ go_library(
9898
"@org_golang_google_grpc//encoding/gzip",
9999
"@org_golang_google_grpc//stats",
100100
"@org_golang_google_grpc//status",
101+
"@org_golang_google_protobuf//proto",
101102
] + select({
102103
"@io_bazel_rules_go//go/platform:aix": [
103104
"@org_golang_x_sys//unix",
@@ -195,6 +196,7 @@ go_test(
195196
"//pkg/util/caller",
196197
"//pkg/util/ctxgroup",
197198
"//pkg/util/encoding",
199+
"//pkg/util/httputil",
198200
"//pkg/util/leaktest",
199201
"//pkg/util/log/channel",
200202
"//pkg/util/log/logconfig",
@@ -216,8 +218,8 @@ go_test(
216218
"@com_github_stretchr_testify//assert",
217219
"@com_github_stretchr_testify//require",
218220
"@io_opentelemetry_go_proto_otlp//collector/logs/v1:logs",
219-
"@io_opentelemetry_go_proto_otlp//common/v1:common",
220221
"@org_golang_google_grpc//:grpc",
222+
"@org_golang_google_protobuf//proto",
221223
"@org_golang_x_sys//unix",
222224
],
223225
)

pkg/util/log/flags.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,8 @@ func ApplyConfig(
123123
// closes the underlying gRPC connection of OTLP sinks.
124124
closeOTLPSinks := func() {
125125
for _, fc := range sinkInfos {
126-
if sink, ok := fc.sink.(*otlpSink); ok && sink.isNotShutdown() {
127-
// The reason for nolint:grpcconnclose is that we are not using *rpc.Context
128-
// as it is primarily used for communication between crdb nodes, and doesn't
129-
// fit this usecase.
130-
if err := sink.conn.Close(); err != nil { // nolint:grpcconnclose
126+
if sink, ok := fc.sink.(*otlpSink); ok {
127+
if err := sink.client.Close(); err != nil {
131128
fmt.Fprintf(OrigStderr, "# OTLP Sink Cleanup Warning: %s\n", err.Error())
132129
}
133130
}

pkg/util/log/logconfig/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,14 @@ type HTTPSinkConfig struct {
589589
sinkName string
590590
}
591591

592+
var OTLPModeGRPC = "grpc"
593+
var OTLPModeHTTP = "http"
594+
592595
type OTLPDefaults struct {
596+
// Mode decides which protocol to use for exporting logs, can be "grpc" or "http".
597+
// Set to "grpc" by default.
598+
Mode *string `yaml:",omitempty"`
599+
593600
// Compression can be "none" or "gzip" to enable gzip compression.
594601
// Set to "gzip" by default.
595602
Compression *string `yaml:",omitempty"`

pkg/util/log/logconfig/validate.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
124124
},
125125
},
126126
Compression: &GzipCompression,
127+
Mode: &OTLPModeGRPC,
127128
}
128129

129130
propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig)
@@ -528,6 +529,10 @@ func (c *Config) validateOTLPSinkConfig(otsc *OTLPSinkConfig) error {
528529
return errors.New("compression must be 'gzip' or 'none'")
529530
}
530531

532+
if *otsc.Mode != OTLPModeGRPC && *otsc.Mode != OTLPModeHTTP {
533+
return errors.New("mode must be 'grpc' or 'http'")
534+
}
535+
531536
return c.ValidateCommonSinkConfig(otsc.CommonSinkConfig)
532537
}
533538

pkg/util/log/otlp_client.go

Lines changed: 145 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@
66
package log
77

88
import (
9+
"bytes"
10+
"compress/gzip"
911
"context"
12+
"io"
13+
"net/http"
1014
"strings"
1115
"sync"
1216

1317
"github.com/cockroachdb/cockroach/pkg/cli/exit"
18+
"github.com/cockroachdb/cockroach/pkg/util/httputil"
1419
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
1520
collpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
1621
cpb "go.opentelemetry.io/proto/otlp/common/v1"
@@ -20,9 +25,10 @@ import (
2025
"google.golang.org/grpc/codes"
2126
"google.golang.org/grpc/connectivity"
2227
"google.golang.org/grpc/credentials/insecure"
23-
"google.golang.org/grpc/encoding/gzip"
28+
grpc_gzip "google.golang.org/grpc/encoding/gzip"
2429
"google.golang.org/grpc/stats"
2530
"google.golang.org/grpc/status"
31+
"google.golang.org/protobuf/proto"
2632
)
2733

2834
const (
@@ -44,37 +50,21 @@ var otlpLogRecordPool = sync.Pool{
4450
},
4551
}
4652

53+
type otlpSinkClient interface {
54+
Export(ctx context.Context, in *collpb.ExportLogsServiceRequest) (*collpb.ExportLogsServiceResponse, error)
55+
Close() error
56+
}
57+
4758
// OpenTelemetry log sink
4859
type otlpSink struct {
49-
conn *grpc.ClientConn
50-
lsc collpb.LogsServiceClient
51-
60+
client otlpSinkClient
5261
// requestObject should not be modified concurrently as it is reused
5362
// between requests
5463
requestObject *collpb.ExportLogsServiceRequest
5564
}
5665

57-
var statsHandlerOption = &otlpStatsHandler{}
58-
5966
func newOTLPSink(config logconfig.OTLPSinkConfig) (*otlpSink, error) {
60-
dialOpts := []grpc.DialOption{
61-
grpc.WithTransportCredentials(insecure.NewCredentials()),
62-
grpc.WithStatsHandler(statsHandlerOption),
63-
}
64-
65-
if *config.Compression == logconfig.GzipCompression {
66-
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
67-
}
68-
69-
conn, err := grpc.Dial(config.Address, dialOpts...)
70-
if err != nil {
71-
return nil, err
72-
}
73-
74-
lsc := collpb.NewLogsServiceClient(conn)
7567
sink := &otlpSink{
76-
conn: conn,
77-
lsc: lsc,
7868
requestObject: &collpb.ExportLogsServiceRequest{
7969
ResourceLogs: []*lpb.ResourceLogs{
8070
{
@@ -100,11 +90,16 @@ func newOTLPSink(config logconfig.OTLPSinkConfig) (*otlpSink, error) {
10090
},
10191
}
10292

103-
return sink, nil
104-
}
93+
setClient := sink.setHTTPClient
94+
if *config.Mode == logconfig.OTLPModeGRPC {
95+
setClient = sink.setGRPCClient
96+
}
97+
98+
if err := setClient(&config); err != nil {
99+
return nil, err
100+
}
105101

106-
func (sink *otlpSink) isNotShutdown() bool {
107-
return sink.conn.GetState() != connectivity.Shutdown
102+
return sink, nil
108103
}
109104

110105
func (sink *otlpSink) active() bool {
@@ -154,7 +149,7 @@ func (sink *otlpSink) output(b []byte, opts sinkOutputOptions) error {
154149
sink.requestObject.ResourceLogs[0].InstrumentationLibraryLogs[0].Logs = records
155150

156151
// transmit the log over the network
157-
_, err := sink.lsc.Export(ctx, sink.requestObject)
152+
_, err := sink.client.Export(ctx, sink.requestObject)
158153

159154
// put the records back into the pool
160155
for _, record := range records {
@@ -201,3 +196,125 @@ func (h *otlpStatsHandler) HandleRPC(ctx context.Context, rpcInfo stats.RPCStats
201196
}
202197

203198
var _ stats.Handler = (*otlpStatsHandler)(nil)
199+
200+
// client used when sink is using gRPC for exporting logs
201+
type otlpGRPCClient struct {
202+
conn *grpc.ClientConn
203+
lsc collpb.LogsServiceClient
204+
}
205+
206+
func (c *otlpGRPCClient) Close() error {
207+
if c.conn.GetState() == connectivity.Shutdown {
208+
return nil
209+
}
210+
// The reason for nolint:grpcconnclose is that we are not using *rpc.Context
211+
// as it is primarily used for communication between crdb nodes, and doesn't
212+
// fit this usecase.
213+
return c.conn.Close() // nolint:grpcconnclose
214+
}
215+
216+
func (c *otlpGRPCClient) Export(
217+
ctx context.Context, in *collpb.ExportLogsServiceRequest,
218+
) (*collpb.ExportLogsServiceResponse, error) {
219+
return c.lsc.Export(ctx, in)
220+
}
221+
222+
var _ otlpSinkClient = (*otlpGRPCClient)(nil)
223+
224+
var statsHandlerOption = &otlpStatsHandler{}
225+
226+
func (sink *otlpSink) setGRPCClient(config *logconfig.OTLPSinkConfig) error {
227+
dialOpts := []grpc.DialOption{
228+
grpc.WithTransportCredentials(insecure.NewCredentials()),
229+
grpc.WithStatsHandler(statsHandlerOption),
230+
}
231+
232+
if *config.Compression == logconfig.GzipCompression {
233+
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpc_gzip.Name)))
234+
}
235+
236+
conn, err := grpc.Dial(config.Address, dialOpts...)
237+
if err != nil {
238+
return err
239+
}
240+
lsc := collpb.NewLogsServiceClient(conn)
241+
242+
sink.client = &otlpGRPCClient{
243+
conn: conn,
244+
lsc: lsc,
245+
}
246+
247+
return nil
248+
}
249+
250+
// client used when sink is using HTTP for exporting logs
251+
type otlpHTTPClient struct {
252+
client *http.Client
253+
request *http.Request
254+
compression string
255+
gzipWriter *gzip.Writer
256+
}
257+
258+
func (c *otlpHTTPClient) Close() error {
259+
return nil
260+
}
261+
262+
func (c *otlpHTTPClient) Export(
263+
ctx context.Context, in *collpb.ExportLogsServiceRequest,
264+
) (*collpb.ExportLogsServiceResponse, error) {
265+
body, err := proto.Marshal(in)
266+
if err != nil {
267+
return nil, err
268+
}
269+
270+
request := c.request.Clone(context.Background())
271+
switch c.compression {
272+
case logconfig.NoneCompression:
273+
request.Body = io.NopCloser(bytes.NewReader(body))
274+
case logconfig.GzipCompression:
275+
// Content-Encoding header is set when the sink is initialized
276+
// so no need to set it here
277+
var buf bytes.Buffer
278+
c.gzipWriter.Reset(&buf)
279+
if _, err := c.gzipWriter.Write(body); err != nil {
280+
return nil, err
281+
}
282+
if err := c.gzipWriter.Close(); err != nil {
283+
return nil, err
284+
}
285+
request.Body = io.NopCloser(bytes.NewReader(buf.Bytes()))
286+
}
287+
288+
resp, err := c.client.Do(request)
289+
resp.Body.Close()
290+
return nil, err
291+
}
292+
293+
var _ otlpSinkClient = (*otlpHTTPClient)(nil)
294+
295+
func (sink *otlpSink) setHTTPClient(config *logconfig.OTLPSinkConfig) error {
296+
hc := &http.Client{
297+
Transport: &http.Transport{
298+
ForceAttemptHTTP2: true,
299+
},
300+
}
301+
request, err := http.NewRequest(http.MethodPost, config.Address, http.NoBody)
302+
if err != nil {
303+
return err
304+
}
305+
request.Header.Set(httputil.ContentTypeHeader, httputil.ProtoContentType)
306+
307+
compression := *config.Compression
308+
if compression == logconfig.GzipCompression {
309+
request.Header.Set(httputil.ContentEncodingHeader, httputil.GzipEncoding)
310+
}
311+
312+
sink.client = &otlpHTTPClient{
313+
client: hc,
314+
request: request,
315+
compression: compression,
316+
gzipWriter: gzip.NewWriter(io.Discard),
317+
}
318+
319+
return nil
320+
}

0 commit comments

Comments
 (0)