Skip to content

Commit a0d42aa

Browse files
Don't apply unnecessary otel config updates (#9985) (#10098)
* Don't apply unnecessary otel config updates * Add a comment explaining the confmap hashing logic * Add debug log when we skip config application * Use real otel config in hashing test * Remove outdated comment * Switch the hash function to FNV128 * Use bytes.Equal instead of bytes.Compare (cherry picked from commit d19d1b5) Co-authored-by: Mikołaj Świątek <[email protected]>
1 parent e31e041 commit a0d42aa

File tree

2 files changed

+229
-7
lines changed

2 files changed

+229
-7
lines changed

internal/pkg/otel/manager/manager.go

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
package manager
66

77
import (
8+
"bytes"
89
"context"
10+
"encoding/json"
911
"errors"
1012
"fmt"
13+
"hash/fnv"
1114
"os"
1215
"sync"
1316
"sync/atomic"
@@ -70,7 +73,8 @@ type OTelManager struct {
7073

7174
// The current configuration that the OTel collector is using. In the case that
7275
// the mergedCollectorCfg is nil then the collector is not running.
73-
mergedCollectorCfg *confmap.Conf
76+
mergedCollectorCfg *confmap.Conf
77+
mergedCollectorCfgHash []byte
7478

7579
currentCollectorStatus *status.AggregateStatus
7680
currentComponentStates map[string]runtime.ComponentComponentState
@@ -282,14 +286,30 @@ func (m *OTelManager) Run(ctx context.Context) error {
282286

283287
// this is the only place where we mutate the internal config attributes, take a write lock for the duration
284288
m.mx.Lock()
285-
m.mergedCollectorCfg = mergedCfg
289+
previousConfigHash := m.mergedCollectorCfgHash
290+
configChanged, configUpdateErr := m.maybeUpdateMergedConfig(mergedCfg)
286291
m.collectorCfg = cfgUpdate.collectorCfg
287292
m.components = cfgUpdate.components
288293
m.mx.Unlock()
289294

290-
err = m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
291-
// report the error unconditionally to indicate that the config was applied
292-
reportErr(ctx, m.errCh, err)
295+
if configUpdateErr != nil {
296+
m.logger.Warn("failed to calculate hash of merged config, proceeding with update", zap.Error(configUpdateErr))
297+
}
298+
299+
if configChanged {
300+
m.logger.Debugf(
301+
"new config hash (%d) is different than the old config hash (%d), applying update",
302+
m.mergedCollectorCfgHash, previousConfigHash)
303+
applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
304+
// only report the error if we actually apply the update
305+
// otherwise, we could override an actual error with a nil in the channel when the collector
306+
// state doesn't actually change
307+
reportErr(ctx, m.errCh, applyErr)
308+
} else {
309+
m.logger.Debugf(
310+
"new config hash (%d) is identical to the old config hash (%d), skipping update",
311+
m.mergedCollectorCfgHash, previousConfigHash)
312+
}
293313

294314
case otelStatus := <-collectorStatusCh:
295315
err = m.reportOtelStatusUpdate(ctx, otelStatus)
@@ -513,6 +533,19 @@ func (m *OTelManager) processComponentStates(componentStates []runtime.Component
513533
return componentStates
514534
}
515535

536+
// maybeUpdateMergedConfig updates the merged config if it's different from the current value. It checks this by
537+
// calculating a hash and comparing. It returns a value indicating if the configuration was updated.
538+
// If an error is encountered when calculating the hash, this will always be true.
539+
func (m *OTelManager) maybeUpdateMergedConfig(mergedCfg *confmap.Conf) (updated bool, err error) {
540+
// if we get an error here, we just proceed with the update, worst that can happen is that we reload unnecessarily
541+
mergedCfgHash, err := calculateConfmapHash(mergedCfg)
542+
previousConfigHash := m.mergedCollectorCfgHash
543+
544+
m.mergedCollectorCfg = mergedCfg
545+
m.mergedCollectorCfgHash = mergedCfgHash
546+
return !bytes.Equal(mergedCfgHash, previousConfigHash) || err != nil, err
547+
}
548+
516549
// reportComponentStateUpdates sends component state updates to the component watch channel. It first drains
517550
// the channel to ensure that only the most recent status is kept, as intermediate statuses can be safely discarded.
518551
// This ensures the receiver always observes the latest reported status.
@@ -532,3 +565,28 @@ func (m *OTelManager) reportComponentStateUpdates(ctx context.Context, component
532565
return
533566
}
534567
}
568+
569+
// calculateConfmapHash calculates a hash of a given configuration. It's optimized for speed, which is why it
570+
// json encodes the values directly into a xxhash instance, instead of converting to a map[string]any first.
571+
func calculateConfmapHash(conf *confmap.Conf) ([]byte, error) {
572+
if conf == nil {
573+
return nil, nil
574+
}
575+
576+
h := fnv.New128()
577+
// We encode the configuration to json instead of yaml, because it's simpler and more performant.
578+
// In general otel configuration can be marshalled to any format supported by koanf, but the confmap
579+
// API doesn't expose this. This is why the small workaround below to avoid converting to a Go map is necessary.
580+
encoder := json.NewEncoder(h)
581+
582+
for _, key := range conf.AllKeys() { // this is a sorted list, so the output is consistent
583+
if err := encoder.Encode(key); err != nil {
584+
return nil, err
585+
}
586+
if err := encoder.Encode(conf.Get(key)); err != nil {
587+
return nil, err
588+
}
589+
}
590+
591+
return h.Sum(nil), nil
592+
}

internal/pkg/otel/manager/manager_test.go

Lines changed: 166 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ var (
5353
"level": "none",
5454
"readers": []any{},
5555
},
56+
"logs": map[string]interface{}{
57+
"level": "info",
58+
},
5659
},
5760
"pipelines": map[string]interface{}{
5861
"traces": map[string]interface{}{
@@ -311,8 +314,10 @@ func TestOTelManager_Run(t *testing.T) {
311314
m.Update(cfg, nil)
312315
e.EnsureHealthy(t, updateTime)
313316

314-
// trigger update (no config compare is due externally to otel collector)
317+
// trigger update
315318
updateTime = time.Now()
319+
ok := cfg.Delete("service::telemetry::logs::level") // modify the config
320+
require.True(t, ok)
316321
m.Update(cfg, nil)
317322
e.EnsureHealthy(t, updateTime)
318323

@@ -336,8 +341,10 @@ func TestOTelManager_Run(t *testing.T) {
336341
m.Update(cfg, nil)
337342
e.EnsureHealthy(t, updateTime)
338343

339-
// trigger update (no config compare is due externally to otel collector)
344+
// trigger update
340345
updateTime = time.Now()
346+
ok := cfg.Delete("service::telemetry::logs::level") // modify the config
347+
require.True(t, ok)
341348
m.Update(cfg, nil)
342349
e.EnsureHealthy(t, updateTime)
343350

@@ -1361,3 +1368,160 @@ func assertOtelStatusesEqualIgnoringTimestamps(t require.TestingT, a, b *status.
13611368

13621369
return true
13631370
}
1371+
1372+
func TestCalculateConfmapHash(t *testing.T) {
1373+
t.Run("nil config returns zero", func(t *testing.T) {
1374+
hash, err := calculateConfmapHash(nil)
1375+
require.NoError(t, err)
1376+
assert.Equal(t, []byte(nil), hash)
1377+
})
1378+
1379+
t.Run("same value gives same result", func(t *testing.T) {
1380+
conf := confmap.NewFromStringMap(map[string]any{
1381+
"key1": "value1",
1382+
"key2": 123,
1383+
})
1384+
hash1, err := calculateConfmapHash(conf)
1385+
require.NoError(t, err)
1386+
hash2, err := calculateConfmapHash(conf)
1387+
require.NoError(t, err)
1388+
assert.Equal(t, hash1, hash2)
1389+
})
1390+
1391+
t.Run("different values give different results", func(t *testing.T) {
1392+
conf1 := confmap.NewFromStringMap(map[string]any{
1393+
"key1": "value1",
1394+
})
1395+
hash1, err := calculateConfmapHash(conf1)
1396+
require.NoError(t, err)
1397+
1398+
conf2 := confmap.NewFromStringMap(map[string]any{
1399+
"key1": "value2",
1400+
})
1401+
hash2, err := calculateConfmapHash(conf2)
1402+
require.NoError(t, err)
1403+
1404+
assert.NotEqual(t, hash1, hash2)
1405+
})
1406+
1407+
t.Run("list of maps is processed correctly", func(t *testing.T) {
1408+
conf1 := confmap.NewFromStringMap(map[string]any{
1409+
"items": []any{
1410+
map[string]any{"name": "A", "value": 1},
1411+
map[string]any{"name": "B", "value": 2},
1412+
},
1413+
})
1414+
hash1, err := calculateConfmapHash(conf1)
1415+
require.NoError(t, err)
1416+
1417+
t.Run("same list of maps gives same hash", func(t *testing.T) {
1418+
conf2 := confmap.NewFromStringMap(map[string]any{
1419+
"items": []any{
1420+
map[string]any{"name": "A", "value": 1},
1421+
map[string]any{"name": "B", "value": 2},
1422+
},
1423+
})
1424+
hash2, err := calculateConfmapHash(conf2)
1425+
require.NoError(t, err)
1426+
assert.Equal(t, hash1, hash2)
1427+
})
1428+
1429+
t.Run("different order in list gives different hash", func(t *testing.T) {
1430+
conf3 := confmap.NewFromStringMap(map[string]any{
1431+
"items": []any{
1432+
map[string]any{"name": "B", "value": 2},
1433+
map[string]any{"name": "A", "value": 1},
1434+
},
1435+
})
1436+
hash3, err := calculateConfmapHash(conf3)
1437+
require.NoError(t, err)
1438+
assert.NotEqual(t, hash1, hash3)
1439+
})
1440+
})
1441+
}
1442+
1443+
func TestOTelManager_maybeUpdateMergedConfig(t *testing.T) {
1444+
t.Run("initial config", func(t *testing.T) {
1445+
m := &OTelManager{}
1446+
conf := confmap.NewFromStringMap(testConfig)
1447+
1448+
updated, err := m.maybeUpdateMergedConfig(conf)
1449+
1450+
require.NoError(t, err)
1451+
assert.True(t, updated)
1452+
assert.Equal(t, conf, m.mergedCollectorCfg)
1453+
assert.NotEqual(t, uint64(0), m.mergedCollectorCfgHash)
1454+
})
1455+
1456+
t.Run("same config", func(t *testing.T) {
1457+
conf := confmap.NewFromStringMap(testConfig)
1458+
hash, err := calculateConfmapHash(conf)
1459+
require.NoError(t, err)
1460+
1461+
m := &OTelManager{
1462+
mergedCollectorCfg: conf,
1463+
mergedCollectorCfgHash: hash,
1464+
}
1465+
1466+
updated, err := m.maybeUpdateMergedConfig(conf)
1467+
1468+
require.NoError(t, err)
1469+
assert.False(t, updated)
1470+
assert.Equal(t, conf, m.mergedCollectorCfg)
1471+
assert.Equal(t, hash, m.mergedCollectorCfgHash)
1472+
})
1473+
1474+
t.Run("different config", func(t *testing.T) {
1475+
conf1 := confmap.NewFromStringMap(map[string]any{"key": "value1"})
1476+
hash1, err := calculateConfmapHash(conf1)
1477+
require.NoError(t, err)
1478+
1479+
m := &OTelManager{
1480+
mergedCollectorCfg: conf1,
1481+
mergedCollectorCfgHash: hash1,
1482+
}
1483+
1484+
conf2 := confmap.NewFromStringMap(map[string]any{"key": "value2"})
1485+
hash2, err := calculateConfmapHash(conf2)
1486+
require.NoError(t, err)
1487+
1488+
updated, err := m.maybeUpdateMergedConfig(conf2)
1489+
1490+
require.NoError(t, err)
1491+
assert.True(t, updated)
1492+
assert.Equal(t, conf2, m.mergedCollectorCfg)
1493+
assert.Equal(t, hash2, m.mergedCollectorCfgHash)
1494+
assert.NotEqual(t, hash1, m.mergedCollectorCfgHash)
1495+
})
1496+
1497+
t.Run("hashing error with previous config", func(t *testing.T) {
1498+
conf1 := confmap.NewFromStringMap(map[string]any{"key": "value1"})
1499+
hash1, err := calculateConfmapHash(conf1)
1500+
require.NoError(t, err)
1501+
1502+
m := &OTelManager{
1503+
mergedCollectorCfg: conf1,
1504+
mergedCollectorCfgHash: hash1,
1505+
}
1506+
1507+
badConf := confmap.NewFromStringMap(map[string]any{"bad": make(chan int)})
1508+
updated, err := m.maybeUpdateMergedConfig(badConf)
1509+
1510+
require.Error(t, err)
1511+
assert.True(t, updated, "update should proceed on hashing error")
1512+
assert.Equal(t, badConf, m.mergedCollectorCfg)
1513+
assert.Equal(t, []byte(nil), m.mergedCollectorCfgHash)
1514+
})
1515+
1516+
t.Run("hashing error with no previous config", func(t *testing.T) {
1517+
m := &OTelManager{}
1518+
1519+
badConf := confmap.NewFromStringMap(map[string]any{"bad": make(chan int)})
1520+
updated, err := m.maybeUpdateMergedConfig(badConf)
1521+
1522+
require.Error(t, err)
1523+
assert.True(t, updated, "update should proceed on hashing error, even with no previous config")
1524+
assert.Equal(t, badConf, m.mergedCollectorCfg)
1525+
assert.Equal(t, []byte(nil), m.mergedCollectorCfgHash)
1526+
})
1527+
}

0 commit comments

Comments
 (0)