Skip to content

Commit 9e49c51

Browse files
authored
Merge branch 'main' into cre-1323
2 parents 1cdac83 + 3b1ede2 commit 9e49c51

File tree

5 files changed

+51
-2
lines changed

5 files changed

+51
-2
lines changed

pkg/beholder/client.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
2929
)
3030

31+
const defaultGRPCCompressor = "gzip"
32+
3133
type Emitter interface {
3234
// Sends message with bytes and attributes to OTel Collector
3335
Emit(ctx context.Context, body []byte, attrKVs ...any) error
@@ -362,6 +364,13 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, auth Auth,
362364
// No auth
363365
default:
364366
}
367+
switch compressor := config.TraceCompressor; compressor {
368+
case "none":
369+
case "":
370+
exporterOpts = append(exporterOpts, otlptracegrpc.WithCompressor(defaultGRPCCompressor))
371+
default:
372+
exporterOpts = append(exporterOpts, otlptracegrpc.WithCompressor(compressor))
373+
}
365374
if config.TraceRetryConfig != nil {
366375
// NOTE: By default, the retry is enabled in the OTel SDK
367376
exporterOpts = append(exporterOpts, otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
@@ -401,6 +410,13 @@ func newMeterProvider(cfg Config, resource *sdkresource.Resource, auth Auth, cre
401410
otlpmetricgrpc.WithTLSCredentials(creds),
402411
otlpmetricgrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
403412
}
413+
switch compressor := cfg.MetricCompressor; compressor {
414+
case "none":
415+
case "":
416+
opts = append(opts, otlpmetricgrpc.WithCompressor(defaultGRPCCompressor))
417+
default:
418+
opts = append(opts, otlpmetricgrpc.WithCompressor(compressor))
419+
}
404420

405421
switch {
406422
// Rotating auth
@@ -454,6 +470,13 @@ func newLoggerOpts(cfg Config, auth Auth, creds credentials.TransportCredentials
454470
otlploggrpc.WithTLSCredentials(creds),
455471
otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
456472
}
473+
switch compressor := cfg.LogCompressor; compressor {
474+
case "none":
475+
case "":
476+
opts = append(opts, otlploggrpc.WithCompressor(defaultGRPCCompressor))
477+
default:
478+
opts = append(opts, otlploggrpc.WithCompressor(compressor))
479+
}
457480
// Log exporter auth
458481
switch {
459482
// Rotating auth

pkg/beholder/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ type Config struct {
2929
TraceBatchTimeout time.Duration
3030
TraceSpanExporter trace.SpanExporter // optional additional exporter
3131
TraceRetryConfig *RetryConfig
32+
// TraceCompressor sets the gRPC compressor for traces. Valid values: "gzip" (default), "none".
33+
TraceCompressor string
3234

3335
// OTel Metric
3436
MetricReaderInterval time.Duration
3537
MetricRetryConfig *RetryConfig
3638
MetricViews []metric.View
39+
// MetricCompressor sets the gRPC compressor for metrics. Valid values: "gzip" (default), "none".
40+
MetricCompressor string
3741

3842
// Custom Events via Chip Ingress Emitter
3943
ChipIngressEmitterEnabled bool
@@ -50,6 +54,8 @@ type Config struct {
5054
LogRetryConfig *RetryConfig
5155
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter
5256
LogLevel zapcore.Level // Log level for telemetry streaming
57+
// LogCompressor sets the gRPC compressor for logs. Valid values: "gzip" (default), "none".
58+
LogCompressor string
5359

5460
// Auth
5561
// AuthHeaders serves two purposes:
@@ -110,10 +116,12 @@ func DefaultConfig() Config {
110116
// Trace
111117
TraceSampleRatio: 1,
112118
TraceBatchTimeout: 1 * time.Second,
119+
TraceCompressor: "gzip",
113120
// OTel trace exporter retry config
114121
TraceRetryConfig: defaultRetryConfig.Copy(),
115122
// Metric
116123
MetricReaderInterval: 1 * time.Second,
124+
MetricCompressor: "gzip",
117125
// OTel metric exporter retry config
118126
MetricRetryConfig: defaultRetryConfig.Copy(),
119127
// Log
@@ -124,6 +132,7 @@ func DefaultConfig() Config {
124132
LogBatchProcessor: true,
125133
LogStreamingEnabled: true, // Enable logs streaming by default
126134
LogLevel: zapcore.InfoLevel,
135+
LogCompressor: "gzip",
127136
// Auth (defaults to static auth mode with TTL=0)
128137
AuthHeadersTTL: 0,
129138
}

pkg/beholder/config_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ func ExampleConfig() {
3636
// Trace
3737
TraceSampleRatio: 1,
3838
TraceBatchTimeout: 1 * time.Second,
39+
TraceCompressor: "gzip",
3940
// OTel trace exporter retry config
4041
TraceRetryConfig: nil,
4142
// Metric
4243
MetricReaderInterval: 1 * time.Second,
44+
MetricCompressor: "gzip",
4345
// OTel metric exporter retry config
4446
MetricRetryConfig: nil,
4547
// Log
@@ -50,6 +52,7 @@ func ExampleConfig() {
5052
LogBatchProcessor: true,
5153
LogStreamingEnabled: false, // Disable streaming logs by default
5254
LogLevel: zapcore.InfoLevel, // Default log level
55+
LogCompressor: "gzip",
5356
// Auth
5457
AuthPublicKeyHex: "",
5558
AuthHeaders: map[string]string{},
@@ -64,6 +67,6 @@ func ExampleConfig() {
6467
}
6568
fmt.Printf("%+v\n", *config.LogRetryConfig)
6669
// Output:
67-
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
70+
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
6871
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
6972
}

pkg/loop/config.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ const (
7070
envTelemetryLogExportMaxBatchSize = "CL_TELEMETRY_LOG_EXPORT_MAX_BATCH_SIZE"
7171
envTelemetryLogExportInterval = "CL_TELEMETRY_LOG_EXPORT_INTERVAL"
7272
envTelemetryLogMaxQueueSize = "CL_TELEMETRY_LOG_MAX_QUEUE_SIZE"
73+
envTelemetryTraceCompressor = "CL_TELEMETRY_TRACE_COMPRESSOR"
74+
envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR"
75+
envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR"
7376

7477
envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
7578
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
@@ -133,6 +136,9 @@ type EnvConfig struct {
133136
TelemetryLogExportMaxBatchSize int
134137
TelemetryLogExportInterval time.Duration
135138
TelemetryLogMaxQueueSize int
139+
TelemetryTraceCompressor string
140+
TelemetryMetricCompressor string
141+
TelemetryLogCompressor string
136142

137143
ChipIngressEndpoint string
138144
ChipIngressInsecureConnection bool
@@ -209,6 +215,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
209215
add(envTelemetryLogExportMaxBatchSize, strconv.Itoa(e.TelemetryLogExportMaxBatchSize))
210216
add(envTelemetryLogExportInterval, e.TelemetryLogExportInterval.String())
211217
add(envTelemetryLogMaxQueueSize, strconv.Itoa(e.TelemetryLogMaxQueueSize))
218+
add(envTelemetryTraceCompressor, e.TelemetryTraceCompressor)
219+
add(envTelemetryMetricCompressor, e.TelemetryMetricCompressor)
220+
add(envTelemetryLogCompressor, e.TelemetryLogCompressor)
212221

213222
add(envChipIngressEndpoint, e.ChipIngressEndpoint)
214223
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
@@ -406,7 +415,9 @@ func (e *EnvConfig) parse() error {
406415
if err != nil {
407416
return fmt.Errorf("failed to parse %s: %w", envTelemetryLogMaxQueueSize, err)
408417
}
409-
418+
e.TelemetryTraceCompressor = os.Getenv(envTelemetryTraceCompressor)
419+
e.TelemetryMetricCompressor = os.Getenv(envTelemetryMetricCompressor)
420+
e.TelemetryLogCompressor = os.Getenv(envTelemetryLogCompressor)
410421
// Optional
411422
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
412423
e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection)

pkg/loop/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (s *Server) start(opts ...ServerOpt) error {
155155
OtelExporterGRPCEndpoint: s.EnvConfig.TelemetryEndpoint,
156156
ResourceAttributes: append(attributes, s.EnvConfig.TelemetryAttributes.AsStringAttributes()...),
157157
TraceSampleRatio: s.EnvConfig.TelemetryTraceSampleRatio,
158+
TraceCompressor: s.EnvConfig.TelemetryTraceCompressor,
158159
EmitterBatchProcessor: s.EnvConfig.TelemetryEmitterBatchProcessor,
159160
EmitterExportTimeout: s.EnvConfig.TelemetryEmitterExportTimeout,
160161
EmitterExportInterval: s.EnvConfig.TelemetryEmitterExportInterval,
@@ -167,9 +168,11 @@ func (s *Server) start(opts ...ServerOpt) error {
167168
LogExportMaxBatchSize: s.EnvConfig.TelemetryLogExportMaxBatchSize,
168169
LogExportInterval: s.EnvConfig.TelemetryLogExportInterval,
169170
LogMaxQueueSize: s.EnvConfig.TelemetryLogMaxQueueSize,
171+
LogCompressor: s.EnvConfig.TelemetryLogCompressor,
170172
ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "",
171173
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
172174
ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection,
175+
MetricCompressor: s.EnvConfig.TelemetryMetricCompressor,
173176
}
174177

175178
// Configure beholder auth - the client will determine rotating vs static mode

0 commit comments

Comments
 (0)