Skip to content

Commit c854264

Browse files
authored
Avoid unnecessary mutex in collector logs, replace by atomic pointer (#14008)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 0627444 commit c854264

File tree

4 files changed

+62
-42
lines changed

4 files changed

+62
-42
lines changed

.chloggen/avoid-mutex.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: pkg/otelcol
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Avoid unnecessary mutex in collector logs, replace by atomic pointer
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [14008]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

otelcol/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ type Collector struct {
122122
// NewCollector creates and returns a new instance of Collector.
123123
func NewCollector(set CollectorSettings) (*Collector, error) {
124124
bc := newBufferedCore(zapcore.DebugLevel)
125-
cc := &collectorCore{core: bc}
125+
cc := newCollectorCore(bc)
126126
options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...)
127127
logger := zap.New(cc, options...)
128128
set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: logger}

otelcol/collector_core.go

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,55 +4,52 @@
44
package otelcol // import "go.opentelemetry.io/collector/otelcol"
55

66
import (
7-
"sync"
7+
"sync/atomic"
88

99
"go.uber.org/zap/zapcore"
1010
)
1111

1212
var _ zapcore.Core = (*collectorCore)(nil)
1313

1414
type collectorCore struct {
15-
core zapcore.Core
16-
rw sync.RWMutex
15+
delegate atomic.Pointer[zapcore.Core]
16+
}
17+
18+
func newCollectorCore(core zapcore.Core) *collectorCore {
19+
cc := &collectorCore{}
20+
cc.SetCore(core)
21+
return cc
1722
}
1823

1924
func (c *collectorCore) Enabled(l zapcore.Level) bool {
20-
c.rw.RLock()
21-
defer c.rw.RUnlock()
22-
return c.core.Enabled(l)
25+
return c.loadDelegate().Enabled(l)
2326
}
2427

2528
func (c *collectorCore) With(f []zapcore.Field) zapcore.Core {
26-
c.rw.RLock()
27-
defer c.rw.RUnlock()
28-
return &collectorCore{
29-
core: c.core.With(f),
30-
}
29+
return newCollectorCore(c.loadDelegate().With(f))
3130
}
3231

3332
func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
34-
c.rw.RLock()
35-
defer c.rw.RUnlock()
36-
if c.core.Enabled(e.Level) {
37-
return ce.AddCore(e, c)
33+
core := c.loadDelegate()
34+
if core.Enabled(e.Level) {
35+
return ce.AddCore(e, core)
3836
}
3937
return ce
4038
}
4139

4240
func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error {
43-
c.rw.RLock()
44-
defer c.rw.RUnlock()
45-
return c.core.Write(e, f)
41+
return c.loadDelegate().Write(e, f)
4642
}
4743

4844
func (c *collectorCore) Sync() error {
49-
c.rw.RLock()
50-
defer c.rw.RUnlock()
51-
return c.core.Sync()
45+
return c.loadDelegate().Sync()
5246
}
5347

5448
func (c *collectorCore) SetCore(core zapcore.Core) {
55-
c.rw.Lock()
56-
defer c.rw.Unlock()
57-
c.core = core
49+
c.delegate.Store(&core)
50+
}
51+
52+
// loadDelegate returns the delegate.
53+
func (c *collectorCore) loadDelegate() zapcore.Core {
54+
return *c.delegate.Load()
5855
}

otelcol/collector_core_test.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,37 +12,35 @@ import (
1212
)
1313

1414
func Test_collectorCore_Enabled(t *testing.T) {
15-
cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)}
15+
cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel))
1616
assert.True(t, cc.Enabled(zapcore.ErrorLevel))
1717
assert.False(t, cc.Enabled(zapcore.DebugLevel))
1818
}
1919

2020
func Test_collectorCore_Check(t *testing.T) {
2121
t.Run("check passed", func(t *testing.T) {
2222
bc := newBufferedCore(zapcore.InfoLevel)
23-
cc := collectorCore{core: bc}
23+
cc := newCollectorCore(bc)
2424
e := zapcore.Entry{
2525
Level: zapcore.InfoLevel,
2626
}
2727
expected := &zapcore.CheckedEntry{}
28-
expected = expected.AddCore(e, &cc)
29-
ce := cc.Check(e, nil)
30-
assert.Equal(t, expected, ce)
28+
expected = expected.AddCore(e, bc)
29+
assert.Equal(t, expected, cc.Check(e, nil))
3130
})
3231

3332
t.Run("check did not pass", func(t *testing.T) {
34-
cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)}
33+
cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel))
3534
e := zapcore.Entry{
3635
Level: zapcore.DebugLevel,
3736
}
38-
ce := cc.Check(e, nil)
39-
assert.Nil(t, ce)
37+
assert.Nil(t, cc.Check(e, nil))
4038
})
4139
}
4240

4341
func Test_collectorCore_With(t *testing.T) {
44-
cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)}
45-
cc.core.(*bufferedCore).context = []zapcore.Field{
42+
cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel))
43+
cc.loadDelegate().(*bufferedCore).context = []zapcore.Field{
4644
{Key: "original", String: "context"},
4745
}
4846
inputs := []zapcore.Field{
@@ -53,11 +51,11 @@ func Test_collectorCore_With(t *testing.T) {
5351
{Key: "test", String: "passed"},
5452
}
5553
newCC := cc.With(inputs)
56-
assert.Equal(t, expected, newCC.(*collectorCore).core.(*bufferedCore).context)
54+
assert.Equal(t, expected, newCC.(*collectorCore).loadDelegate().(*bufferedCore).context)
5755
}
5856

5957
func Test_collectorCore_Write(t *testing.T) {
60-
cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)}
58+
cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel))
6159
e := zapcore.Entry{
6260
Level: zapcore.DebugLevel,
6361
Message: "test",
@@ -72,18 +70,18 @@ func Test_collectorCore_Write(t *testing.T) {
7270
e,
7371
fields,
7472
}
75-
require.Len(t, cc.core.(*bufferedCore).logs, 1)
76-
require.Equal(t, expected, cc.core.(*bufferedCore).logs[0])
73+
require.Len(t, cc.loadDelegate().(*bufferedCore).logs, 1)
74+
require.Equal(t, expected, cc.loadDelegate().(*bufferedCore).logs[0])
7775
}
7876

7977
func Test_collectorCore_Sync(t *testing.T) {
80-
cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)}
78+
cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel))
8179
assert.NoError(t, cc.Sync())
8280
}
8381

8482
func Test_collectorCore_SetCore(t *testing.T) {
85-
cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)}
83+
cc := newCollectorCore(newBufferedCore(zapcore.InfoLevel))
8684
newCore := newBufferedCore(zapcore.DebugLevel)
8785
cc.SetCore(newCore)
88-
assert.Equal(t, zapcore.DebugLevel, cc.core.(*bufferedCore).LevelEnabler)
86+
assert.Equal(t, zapcore.DebugLevel, cc.loadDelegate().(*bufferedCore).LevelEnabler)
8987
}

0 commit comments

Comments
 (0)