Skip to content

Commit c91f386

Browse files
authored
Avoid extra copy of all data during routing (#44387)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 3cfc0c1 commit c91f386

File tree

4 files changed

+65
-59
lines changed

4 files changed

+65
-59
lines changed

.chloggen/avoid-extra-copy.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: connector/routing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Avoid extra copy of all data during routing
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [44387]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

connector/routingconnector/logs.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ func (*logsConnector) Capabilities() consumer.Capabilities {
6262
func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
6363
groups := make(map[consumer.Logs]plog.Logs)
6464
var errs error
65+
matched := plog.NewLogs()
6566
for i := 0; i < len(c.router.routeSlice) && ld.ResourceLogs().Len() > 0; i++ {
6667
route := c.router.routeSlice[i]
67-
matchedLogs := plog.NewLogs()
6868
switch route.statementContext {
6969
case "request":
7070
if route.requestCondition.matchRequest(ctx) {
71-
groupAllLogs(groups, route.consumer, ld)
72-
ld = plog.NewLogs() // all logs have been routed
71+
// all logs are routed
72+
ld.MoveTo(matched)
7373
}
7474
case "", "resource":
75-
plogutil.MoveResourcesIf(ld, matchedLogs,
75+
plogutil.MoveResourcesIf(ld, matched,
7676
func(rl plog.ResourceLogs) bool {
7777
rtx := ottlresource.NewTransformContext(rl.Resource(), rl)
7878
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
@@ -81,7 +81,7 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
8181
},
8282
)
8383
case "log":
84-
plogutil.MoveRecordsWithContextIf(ld, matchedLogs,
84+
plogutil.MoveRecordsWithContextIf(ld, matched,
8585
func(rl plog.ResourceLogs, sl plog.ScopeLogs, lr plog.LogRecord) bool {
8686
ltx := ottllog.NewTransformContext(lr, sl.Scope(), rl.Resource(), sl, rl)
8787
_, isMatch, err := route.logStatement.Execute(ctx, ltx)
@@ -94,9 +94,9 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
9494
if c.config.ErrorMode == ottl.PropagateError {
9595
return errs
9696
}
97-
groupAllLogs(groups, c.router.defaultConsumer, matchedLogs)
97+
groupAllLogs(groups, c.router.defaultConsumer, matched)
9898
}
99-
groupAllLogs(groups, route.consumer, matchedLogs)
99+
groupAllLogs(groups, route.consumer, matched)
100100
}
101101
// anything left wasn't matched by any route. Send to default consumer
102102
groupAllLogs(groups, c.router.defaultConsumer, ld)
@@ -110,24 +110,17 @@ func groupAllLogs(
110110
groups map[consumer.Logs]plog.Logs,
111111
cons consumer.Logs,
112112
logs plog.Logs,
113-
) {
114-
for i := 0; i < logs.ResourceLogs().Len(); i++ {
115-
groupLogs(groups, cons, logs.ResourceLogs().At(i))
116-
}
117-
}
118-
119-
func groupLogs(
120-
groups map[consumer.Logs]plog.Logs,
121-
cons consumer.Logs,
122-
logs plog.ResourceLogs,
123113
) {
124114
if cons == nil {
125115
return
126116
}
117+
if logs.ResourceLogs().Len() == 0 {
118+
return
119+
}
127120
group, ok := groups[cons]
128121
if !ok {
129122
group = plog.NewLogs()
123+
groups[cons] = group
130124
}
131-
logs.CopyTo(group.ResourceLogs().AppendEmpty())
132-
groups[cons] = group
125+
logs.ResourceLogs().MoveAndAppendTo(group.ResourceLogs())
133126
}

connector/routingconnector/metrics.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,17 @@ func (*metricsConnector) Capabilities() consumer.Capabilities {
6363
func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
6464
groups := make(map[consumer.Metrics]pmetric.Metrics)
6565
var errs error
66+
matched := pmetric.NewMetrics()
6667
for i := 0; i < len(c.router.routeSlice) && md.ResourceMetrics().Len() > 0; i++ {
6768
route := c.router.routeSlice[i]
68-
matchedMetrics := pmetric.NewMetrics()
6969
switch route.statementContext {
7070
case "request":
7171
if route.requestCondition.matchRequest(ctx) {
72-
groupAllMetrics(groups, route.consumer, md)
73-
md = pmetric.NewMetrics() // all metrics have been routed
72+
// all metrics are routed
73+
md.MoveTo(matched)
7474
}
7575
case "", "resource":
76-
pmetricutil.MoveResourcesIf(md, matchedMetrics,
76+
pmetricutil.MoveResourcesIf(md, matched,
7777
func(rs pmetric.ResourceMetrics) bool {
7878
rtx := ottlresource.NewTransformContext(rs.Resource(), rs)
7979
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
@@ -82,7 +82,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
8282
},
8383
)
8484
case "metric":
85-
pmetricutil.MoveMetricsWithContextIf(md, matchedMetrics,
85+
pmetricutil.MoveMetricsWithContextIf(md, matched,
8686
func(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) bool {
8787
mtx := ottlmetric.NewTransformContext(m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm)
8888
_, isMatch, err := route.metricStatement.Execute(ctx, mtx)
@@ -91,7 +91,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
9191
},
9292
)
9393
case "datapoint":
94-
pmetricutil.MoveDataPointsWithContextIf(md, matchedMetrics,
94+
pmetricutil.MoveDataPointsWithContextIf(md, matched,
9595
func(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric, dp any) bool {
9696
dptx := ottldatapoint.NewTransformContext(dp, m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm)
9797
_, isMatch, err := route.dataPointStatement.Execute(ctx, dptx)
@@ -104,9 +104,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
104104
if c.config.ErrorMode == ottl.PropagateError {
105105
return errs
106106
}
107-
groupAllMetrics(groups, c.router.defaultConsumer, matchedMetrics)
107+
groupAllMetrics(groups, c.router.defaultConsumer, matched)
108108
}
109-
groupAllMetrics(groups, route.consumer, matchedMetrics)
109+
groupAllMetrics(groups, route.consumer, matched)
110110
}
111111
// anything left wasn't matched by any route. Send to default consumer
112112
groupAllMetrics(groups, c.router.defaultConsumer, md)
@@ -121,23 +121,16 @@ func groupAllMetrics(
121121
cons consumer.Metrics,
122122
metrics pmetric.Metrics,
123123
) {
124-
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
125-
groupMetrics(groups, cons, metrics.ResourceMetrics().At(i))
124+
if cons == nil {
125+
return
126126
}
127-
}
128-
129-
func groupMetrics(
130-
groups map[consumer.Metrics]pmetric.Metrics,
131-
consumer consumer.Metrics,
132-
metrics pmetric.ResourceMetrics,
133-
) {
134-
if consumer == nil {
127+
if metrics.ResourceMetrics().Len() == 0 {
135128
return
136129
}
137-
group, ok := groups[consumer]
130+
group, ok := groups[cons]
138131
if !ok {
139132
group = pmetric.NewMetrics()
133+
groups[cons] = group
140134
}
141-
metrics.CopyTo(group.ResourceMetrics().AppendEmpty())
142-
groups[consumer] = group
135+
metrics.ResourceMetrics().MoveAndAppendTo(group.ResourceMetrics())
143136
}

connector/routingconnector/traces.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ func (*tracesConnector) Capabilities() consumer.Capabilities {
6262
func (c *tracesConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
6363
groups := make(map[consumer.Traces]ptrace.Traces)
6464
var errs error
65+
matched := ptrace.NewTraces()
6566
for i := 0; i < len(c.router.routeSlice) && td.ResourceSpans().Len() > 0; i++ {
6667
route := c.router.routeSlice[i]
67-
matchedSpans := ptrace.NewTraces()
6868
switch route.statementContext {
6969
case "request":
7070
if route.requestCondition.matchRequest(ctx) {
71-
groupAllTraces(groups, route.consumer, td)
72-
td = ptrace.NewTraces() // all traces have been routed
71+
// all traces are routed
72+
td.MoveTo(matched)
7373
}
7474
case "", "resource":
75-
ptraceutil.MoveResourcesIf(td, matchedSpans,
75+
ptraceutil.MoveResourcesIf(td, matched,
7676
func(rs ptrace.ResourceSpans) bool {
7777
rtx := ottlresource.NewTransformContext(rs.Resource(), rs)
7878
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
@@ -81,7 +81,7 @@ func (c *tracesConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) e
8181
},
8282
)
8383
case "span":
84-
ptraceutil.MoveSpansWithContextIf(td, matchedSpans,
84+
ptraceutil.MoveSpansWithContextIf(td, matched,
8585
func(rs ptrace.ResourceSpans, ss ptrace.ScopeSpans, s ptrace.Span) bool {
8686
mtx := ottlspan.NewTransformContext(s, ss.Scope(), rs.Resource(), ss, rs)
8787
_, isMatch, err := route.spanStatement.Execute(ctx, mtx)
@@ -94,9 +94,9 @@ func (c *tracesConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) e
9494
if c.config.ErrorMode == ottl.PropagateError {
9595
return errs
9696
}
97-
groupAllTraces(groups, c.router.defaultConsumer, matchedSpans)
97+
groupAllTraces(groups, c.router.defaultConsumer, matched)
9898
}
99-
groupAllTraces(groups, route.consumer, matchedSpans)
99+
groupAllTraces(groups, route.consumer, matched)
100100
}
101101
// anything left wasn't matched by any route. Send to default consumer
102102
groupAllTraces(groups, c.router.defaultConsumer, td)
@@ -110,24 +110,17 @@ func groupAllTraces(
110110
groups map[consumer.Traces]ptrace.Traces,
111111
cons consumer.Traces,
112112
traces ptrace.Traces,
113-
) {
114-
for i := 0; i < traces.ResourceSpans().Len(); i++ {
115-
groupTraces(groups, cons, traces.ResourceSpans().At(i))
116-
}
117-
}
118-
119-
func groupTraces(
120-
groups map[consumer.Traces]ptrace.Traces,
121-
cons consumer.Traces,
122-
spans ptrace.ResourceSpans,
123113
) {
124114
if cons == nil {
125115
return
126116
}
117+
if traces.ResourceSpans().Len() == 0 {
118+
return
119+
}
127120
group, ok := groups[cons]
128121
if !ok {
129122
group = ptrace.NewTraces()
123+
groups[cons] = group
130124
}
131-
spans.CopyTo(group.ResourceSpans().AppendEmpty())
132-
groups[cons] = group
125+
traces.ResourceSpans().MoveAndAppendTo(group.ResourceSpans())
133126
}

0 commit comments

Comments
 (0)