Skip to content

Commit 75d2589

Browse files
authored
logging and metrics for otel when the export chan gets full (#871)
1 parent 9c3f0ac commit 75d2589

File tree

4 files changed

+67
-21
lines changed

4 files changed

+67
-21
lines changed

cmd/ktranslate/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,13 @@ func applyFlags(cfg *ktranslate.Config) error {
455455
cfg.OtelFormat.ClientKey = val
456456
case "otel.root_ca":
457457
cfg.OtelFormat.RootCA = val
458+
case "otel.no_block":
459+
v, err := strconv.ParseBool(val)
460+
if err != nil {
461+
errCh <- err
462+
return
463+
}
464+
cfg.OtelFormat.NoBlockExport = v
458465
// pkg/formats/elasticsearch
459466
case "elastic.action":
460467
cfg.ElasticFormat.Action = val

config.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ type ElasticFormatConfig struct {
4242

4343
// OtelFormatConfig is the config for the otel format
4444
type OtelFormatConfig struct {
45-
Endpoint string
46-
Protocol string
47-
ClientCert string
48-
ClientKey string
49-
RootCA string
45+
Endpoint string
46+
Protocol string
47+
ClientCert string
48+
ClientKey string
49+
RootCA string
50+
NoBlockExport bool
5051
}
5152

5253
// SnmpFormatConfig is the config for the snmp format
@@ -402,11 +403,12 @@ func DefaultConfig() *Config {
402403
FlowsNeeded: 10,
403404
},
404405
OtelFormat: &OtelFormatConfig{
405-
Endpoint: "",
406-
Protocol: "stdout",
407-
ClientKey: "",
408-
ClientCert: "",
409-
RootCA: "",
406+
Endpoint: "",
407+
Protocol: "stdout",
408+
ClientKey: "",
409+
ClientCert: "",
410+
RootCA: "",
411+
NoBlockExport: false,
410412
},
411413
ElasticFormat: &ElasticFormatConfig{
412414
Action: "index",

pkg/formats/format.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func NewFormat(ctx context.Context, format Format, log logger.Underlying, regist
8484
case FORMAT_PROM_REMOTE:
8585
return prom.NewRemoteFormat(log, compression, cfg.PrometheusFormat)
8686
case FORMAT_OTEL:
87-
return otel.NewFormat(ctx, log, cfg.OtelFormat, logTee)
87+
return otel.NewFormat(ctx, log, cfg.OtelFormat, logTee, registry)
8888
case FORMAT_SNMP:
8989
return snmp.NewFormat(log, cfg.SnmpFormat)
9090
case FORMAT_PARQUET:

pkg/formats/otel/otel.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"sync"
1313

1414
"github.com/go-logr/stdr"
15+
go_metrics "github.com/kentik/go-metrics"
1516
"github.com/kentik/ktranslate"
1617
"github.com/kentik/ktranslate/pkg/eggs/logger"
1718
"github.com/kentik/ktranslate/pkg/formats/util"
@@ -40,19 +41,21 @@ type OtelFormat struct {
4041
inputs map[string]chan OtelData
4142
trapLog *OtelLogger
4243
logTee chan string
44+
metrics *OtelMetrics
4345
}
4446

4547
const (
4648
CHAN_SLACK = 10000
4749
)
4850

4951
var (
50-
endpoint string
51-
protocol string
52-
otelm metric.Meter
53-
clientCert string
54-
clientKey string
55-
rootCA string
52+
endpoint string
53+
protocol string
54+
otelm metric.Meter
55+
clientCert string
56+
clientKey string
57+
rootCA string
58+
noBlockExport bool
5659
)
5760

5861
func init() {
@@ -61,6 +64,11 @@ func init() {
6164
flag.StringVar(&clientCert, "otel.tls_cert", "", "Load TLS client cert from file.")
6265
flag.StringVar(&clientKey, "otel.tls_key", "", "Load TLS client key from file.")
6366
flag.StringVar(&rootCA, "otel.root_ca", "", "Load TLS root CA from file.")
67+
flag.BoolVar(&noBlockExport, "otel.no_block", false, "If set, drop metrics when the sending chan is full.")
68+
}
69+
70+
type OtelMetrics struct {
71+
ExportDrops go_metrics.Counter
6472
}
6573

6674
/*
@@ -70,7 +78,7 @@ Some usefule env vars to think about setting:
7078
* OTEL_METRIC_EXPORT_INTERVAL=30000 -- time in ms to export. Default 60,000 (1 min).
7179
* OTEL_EXPORTER_OTLP_COMPRESSION=gzip -- turn on gzip compression.
7280
*/
73-
func NewFormat(ctx context.Context, log logger.Underlying, cfg *ktranslate.OtelFormatConfig, logTee chan string) (*OtelFormat, error) {
81+
func NewFormat(ctx context.Context, log logger.Underlying, cfg *ktranslate.OtelFormatConfig, logTee chan string, registry go_metrics.Registry) (*OtelFormat, error) {
7482
jf := &OtelFormat{
7583
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "otel"}, log),
7684
lastMetadata: map[string]*kt.LastMetadata{},
@@ -80,6 +88,9 @@ func NewFormat(ctx context.Context, log logger.Underlying, cfg *ktranslate.OtelF
8088
config: cfg,
8189
inputs: map[string]chan OtelData{},
8290
logTee: logTee,
91+
metrics: &OtelMetrics{
92+
ExportDrops: go_metrics.GetOrRegisterCounter("otel_export_drops", registry),
93+
},
8394
}
8495

8596
var tlsC *tls.Config = nil
@@ -165,7 +176,7 @@ func NewFormat(ctx context.Context, log logger.Underlying, cfg *ktranslate.OtelF
165176
jf.trapLog = ol
166177

167178
otelm = otel.Meter("ktranslate")
168-
jf.Infof("Running exporting via %s to %s", cfg.Protocol, cfg.Endpoint)
179+
jf.Infof("Running exporting via %s to %s. Blocking: %v", cfg.Protocol, cfg.Endpoint, cfg.NoBlockExport)
169180

170181
return jf, nil
171182
}
@@ -211,7 +222,20 @@ func (f *OtelFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) {
211222
f.Infof("Creating otel export for %s", m.Name)
212223
}
213224
// Save this for later, for the next time the async callback is run.
214-
f.inputs[m.Name] <- m
225+
ch := f.inputs[m.Name]
226+
queueDepth := len(ch)
227+
if queueDepth >= CHAN_SLACK {
228+
f.Debugf("Channel queue at CHAN_SLACK limit for %s: %d/%d (100%%)", m.Name, queueDepth, CHAN_SLACK)
229+
}
230+
if f.config.NoBlockExport {
231+
select {
232+
case ch <- m:
233+
default:
234+
f.metrics.ExportDrops.Inc(1)
235+
}
236+
} else {
237+
ch <- m
238+
}
215239
}
216240

217241
f.mux.RUnlock()
@@ -259,7 +283,20 @@ func (f *OtelFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) {
259283
f.mux.RLock()
260284
}
261285
// Save this for later, for the next time the async callback is run.
262-
f.inputs[m.Name] <- m
286+
ch := f.inputs[m.Name]
287+
queueDepth := len(ch)
288+
if queueDepth >= CHAN_SLACK {
289+
f.Debugf("Channel queue at CHAN_SLACK limit for %s: %d/%d (100%%)", m.Name, queueDepth, CHAN_SLACK)
290+
}
291+
if f.config.NoBlockExport {
292+
select {
293+
case ch <- m:
294+
default:
295+
f.metrics.ExportDrops.Inc(1)
296+
}
297+
} else {
298+
ch <- m
299+
}
263300
}
264301

265302
f.mux.RUnlock()

0 commit comments

Comments
 (0)