Skip to content

Commit a53f35b

Browse files
committed
pkg/util/log: reducing allocs in otlp sink
this aims to increase the performance and decrease the number of allocations made in the otlp sink. Benchmark results: ``` name old time/op new time/op delta OTLPSink-12 536µs ± 1% 535µs ± 0% ~ (p=0.156 n=10+9) name old alloc/op new alloc/op delta OTLPSink-12 18.5kB ±25% 15.7kB ±11% -14.97% (p=0.004 n=9+9) name old allocs/op new allocs/op delta OTLPSink-12 132 ± 0% 123 ± 0% -6.82% (p=0.000 n=10+10) ``` benchmark was done in a e2e environment (crdb->otel-collector->datadog) and with gzip compression turned on. part of: CRDB-51667 Release note: none
1 parent f17cf57 commit a53f35b

File tree

2 files changed

+121
-46
lines changed

2 files changed

+121
-46
lines changed

pkg/util/log/otlp_client.go

Lines changed: 74 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
package log
77

88
import (
9-
"bytes"
109
"context"
10+
"strings"
11+
"sync"
1112

1213
"github.com/cockroachdb/cockroach/pkg/cli/exit"
1314
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
@@ -31,17 +32,29 @@ const (
3132
logAttributeSinkKey = "sink.name"
3233
)
3334

35+
// pool for OTEL spec log record objects that we can reuse between requests
36+
var otlpLogRecordPool = sync.Pool{
37+
New: func() any {
38+
return &lpb.LogRecord{
39+
Body: &cpb.AnyValue{
40+
Value: &cpb.AnyValue_StringValue{StringValue: ""},
41+
},
42+
}
43+
},
44+
}
45+
3446
// OpenTelemetry log sink
3547
type otlpSink struct {
36-
conn *grpc.ClientConn
37-
lsc collpb.LogsServiceClient
38-
name string
39-
resource *rpb.Resource
48+
conn *grpc.ClientConn
49+
lsc collpb.LogsServiceClient
50+
51+
// requestObject should not be modified concurrently as it is reused
52+
// between requests
53+
requestObject *collpb.ExportLogsServiceRequest
4054
}
4155

4256
func newOTLPSink(config logconfig.OTLPSinkConfig) (*otlpSink, error) {
4357
dialOpts := []grpc.DialOption{
44-
grpc.WithUserAgent("CRDB OTLP over gRPC logs exporter"),
4558
grpc.WithTransportCredentials(insecure.NewCredentials()),
4659
}
4760

@@ -55,24 +68,35 @@ func newOTLPSink(config logconfig.OTLPSinkConfig) (*otlpSink, error) {
5568
}
5669

5770
lsc := collpb.NewLogsServiceClient(conn)
58-
name := config.SinkName
59-
return &otlpSink{
71+
sink := &otlpSink{
6072
conn: conn,
6173
lsc: lsc,
62-
name: name,
63-
resource: &rpb.Resource{
64-
Attributes: []*cpb.KeyValue{
74+
requestObject: &collpb.ExportLogsServiceRequest{
75+
ResourceLogs: []*lpb.ResourceLogs{
6576
{
66-
Key: logAttributeServiceKey,
67-
Value: &cpb.AnyValue{Value: &cpb.AnyValue_StringValue{StringValue: logAttributeServiceValue}},
68-
},
69-
{
70-
Key: logAttributeSinkKey,
71-
Value: &cpb.AnyValue{Value: &cpb.AnyValue_StringValue{StringValue: name}},
77+
Resource: &rpb.Resource{
78+
Attributes: []*cpb.KeyValue{
79+
{
80+
Key: logAttributeServiceKey,
81+
Value: &cpb.AnyValue{Value: &cpb.AnyValue_StringValue{StringValue: logAttributeServiceValue}},
82+
},
83+
{
84+
Key: logAttributeSinkKey,
85+
Value: &cpb.AnyValue{Value: &cpb.AnyValue_StringValue{StringValue: config.SinkName}},
86+
},
87+
},
88+
},
89+
InstrumentationLibraryLogs: []*lpb.InstrumentationLibraryLogs{
90+
{
91+
Logs: nil,
92+
},
93+
},
7294
},
7395
},
7496
},
75-
}, nil
97+
}
98+
99+
return sink, nil
76100
}
77101

78102
func (sink *otlpSink) isNotShutdown() bool {
@@ -91,43 +115,47 @@ func (sink *otlpSink) exitCode() exit.Code {
91115
return exit.LoggingNetCollectorUnavailable()
92116
}
93117

94-
// converts the raw bytes into OTEL log records
95-
func extractRecordsToOTLP(b []byte) []*lpb.LogRecord {
96-
bodies := bytes.Split(b, []byte("\n"))
97-
records := make([]*lpb.LogRecord, 0, len(bodies))
98-
99-
for _, body := range bodies {
100-
body = bytes.TrimSpace(body)
101-
if len(body) > 0 {
102-
records = append(records, &lpb.LogRecord{
103-
Body: &cpb.AnyValue{
104-
Value: &cpb.AnyValue_StringValue{
105-
StringValue: string(body),
106-
},
107-
},
108-
})
118+
// converts the raw bytes into OTEL log records using the otlpLogRecordPool
119+
func otlpExtractRecords(b []byte) []*lpb.LogRecord {
120+
body := string(b)
121+
records := make([]*lpb.LogRecord, 0, strings.Count(body, "\n")+1)
122+
123+
start := 0
124+
for i, ch := range body {
125+
if ch == '\n' {
126+
if i > start {
127+
record := otlpLogRecordPool.Get().(*lpb.LogRecord)
128+
record.Body.Value.(*cpb.AnyValue_StringValue).StringValue = body[start:i]
129+
records = append(records, record)
130+
}
131+
start = i + 1
109132
}
110133
}
111134

135+
// check at the very end to ensure entire buffer is processed
136+
if start < len(body) {
137+
record := otlpLogRecordPool.Get().(*lpb.LogRecord)
138+
record.Body.Value.(*cpb.AnyValue_StringValue).StringValue = body[start:]
139+
records = append(records, record)
140+
}
141+
112142
return records
113143
}
114144

115145
func (sink *otlpSink) output(b []byte, opts sinkOutputOptions) error {
116-
records := extractRecordsToOTLP(b)
117146
ctx := context.Background()
118147

119-
_, err := sink.lsc.Export(ctx, &collpb.ExportLogsServiceRequest{
120-
ResourceLogs: []*lpb.ResourceLogs{
121-
{
122-
Resource: sink.resource,
123-
InstrumentationLibraryLogs: []*lpb.InstrumentationLibraryLogs{
124-
{
125-
Logs: records,
126-
},
127-
},
128-
},
129-
},
130-
})
148+
records := otlpExtractRecords(b)
149+
sink.requestObject.ResourceLogs[0].InstrumentationLibraryLogs[0].Logs = records
150+
151+
// transmit the log over the network
152+
_, err := sink.lsc.Export(ctx, sink.requestObject)
153+
154+
// put the records back into the pool
155+
for _, record := range records {
156+
record.Body.Value.(*cpb.AnyValue_StringValue).StringValue = ""
157+
otlpLogRecordPool.Put(record)
158+
}
131159

132160
if status.Code(err) == codes.OK {
133161
return nil

pkg/util/log/otlp_client_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,50 @@ func TestOTLPClientSeverity(t *testing.T) {
219219
return strings.Join(output, "\n")
220220
})
221221
}
222+
223+
func TestOTLPExtractRecords(t *testing.T) {
224+
tests := map[string]struct {
225+
input string
226+
result []string
227+
}{
228+
"single_line": {
229+
input: "Hello World",
230+
result: []string{"Hello World"},
231+
},
232+
"multiple_lines": {
233+
input: "Message 1\nMessage 2\n\nMessage 3",
234+
result: []string{"Message 1", "Message 2", "Message 3"},
235+
},
236+
"trailing_newline": {
237+
input: "Message 1\nMessage 2\n",
238+
result: []string{"Message 1", "Message 2"},
239+
},
240+
"leading_newline": {
241+
input: "\nMessage 1\nMessage 2",
242+
result: []string{"Message 1", "Message 2"},
243+
},
244+
"redaction_markers": {
245+
input: "Message 1\n‹Message 2›\nMessage 3",
246+
result: []string{"Message 1", "‹Message 2›", "Message 3"},
247+
},
248+
"empty_string": {
249+
input: "",
250+
result: []string{},
251+
},
252+
"newline_only": {
253+
input: "\n\n",
254+
result: []string{},
255+
},
256+
}
257+
258+
for name, test := range tests {
259+
t.Run(name, func(t *testing.T) {
260+
body := []byte(test.input)
261+
records := otlpExtractRecords(body)
262+
require.Len(t, records, len(test.result))
263+
for i, record := range records {
264+
require.Equal(t, test.result[i], record.Body.GetStringValue())
265+
}
266+
})
267+
}
268+
}

0 commit comments

Comments
 (0)