Skip to content

Commit 1a11873

Browse files
committed
dpdk: add packet distribution analysis for ethdev
Introduce packet distribution analysis for PMD threads based on ethdev library. This analysis computes the distribution of packets retrieved in a single rte_eth_rx_burst() call, on a per-queue basis. Signed-off-by: Adel Belkhiri <adel.belkhiri@gmail.com>
1 parent 759d826 commit 1a11873

File tree

5 files changed

+292
-1
lines changed

5 files changed

+292
-1
lines changed

analyses/org.eclipse.tracecompass.incubator.dpdk.core/META-INF/MANIFEST.MF

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ Require-Bundle: org.eclipse.ui,
1616
org.eclipse.tracecompass.tmf.ctf.core,
1717
org.eclipse.tracecompass.analysis.os.linux.core,
1818
org.eclipse.jdt.annotation;bundle-version="2.2.400",
19-
com.google.guava
19+
com.google.guava,
20+
org.eclipse.tracecompass.analysis.lami.core
2021
Export-Package: org.eclipse.tracecompass.incubator.dpdk.core.trace,
2122
org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.spin.analysis,
2223
org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.throughput.analysis,

analyses/org.eclipse.tracecompass.incubator.dpdk.core/plugin.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@
3131
</tracetype>
3232
</module>
3333
</extension>
34+
<extension
35+
point="org.eclipse.tracecompass.tmf.core.analysis.ondemand">
36+
<analysis
37+
class="org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis.DpdkPollDistributionAnalysis"
38+
id="org.eclipse.tracecompass.incubator.dpdk.core.ethdev.poll.distribution">
39+
</analysis>
40+
</extension>
3441
<extension
3542
point="org.eclipse.tracecompass.tmf.core.dataprovider">
3643
<dataProviderFactory
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 École Polytechnique de Montréal
3+
*
4+
* All rights reserved. This program and the accompanying materials are
5+
* made available under the terms of the Eclipse Public License 2.0 which
6+
* accompanies this distribution, and is available at
7+
* https://www.eclipse.org/legal/epl-2.0/
8+
*
9+
* SPDX-License-Identifier: EPL-2.0
10+
*******************************************************************************/
11+
package org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis;
12+
13+
import java.text.NumberFormat;
14+
import java.util.ArrayList;
15+
import java.util.Arrays;
16+
import java.util.Collections;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.TreeMap;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
import java.util.function.Predicate;
23+
import java.util.stream.Collectors;
24+
25+
import org.eclipse.core.runtime.CoreException;
26+
import org.eclipse.core.runtime.IProgressMonitor;
27+
import org.eclipse.core.runtime.SubMonitor;
28+
import org.eclipse.jdt.annotation.NonNull;
29+
import org.eclipse.jdt.annotation.Nullable;
30+
import org.eclipse.tracecompass.incubator.dpdk.core.trace.DpdkTrace;
31+
import org.eclipse.tracecompass.incubator.internal.dpdk.core.analysis.DpdkEthdevEventLayout;
32+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.aspect.LamiGenericAspect;
33+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.aspect.LamiTableEntryAspect;
34+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiAnalysis;
35+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiResultTable;
36+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiTableClass;
37+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiTableEntry;
38+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiData;
39+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiLongNumber;
40+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiTimeRange;
41+
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiTimestamp;
42+
import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
43+
import org.eclipse.tracecompass.tmf.core.event.aspect.TmfContentFieldAspect;
44+
import org.eclipse.tracecompass.tmf.core.filter.model.TmfFilterMatchesNode;
45+
import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
46+
import org.eclipse.tracecompass.tmf.core.request.TmfEventRequest;
47+
import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
48+
import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
49+
50+
/**
51+
* Dpdk polls distribution analysis is an on-demand analysis that calculates the
52+
* number of packets retrieved in a single call to rte_eth_rx_burst(). The poll
53+
* distribution is calculated per port queue only.
54+
*
55+
* @author Adel Belkhiri
56+
*
57+
*/
58+
public class DpdkPollDistributionAnalysis extends LamiAnalysis {
59+
60+
private static final long PROGRESS_INTERVAL = (1 << 10) - 1L;
61+
private static final int MEMORY_SANITY_LIMIT = 40000;
62+
63+
/**
64+
* Constructor
65+
*/
66+
public DpdkPollDistributionAnalysis() {
67+
super(Messages.getMessage(Messages.EthdevPollDistribution_AnalysisName), false, trace -> true, Collections.emptyList());
68+
}
69+
70+
@Override
71+
protected synchronized void initialize() {
72+
// do nothing
73+
}
74+
75+
@Override
76+
public boolean canExecute(ITmfTrace trace) {
77+
if (trace instanceof DpdkTrace) {
78+
return ((DpdkTrace) trace).validate(null, trace.getPath()).isOK() ? true : false;
79+
}
80+
return false;
81+
}
82+
83+
private static int workRemaining(ITmfTrace trace) {
84+
return (int) Math.min(trace.getNbEvents() / (PROGRESS_INTERVAL + 1), Integer.MAX_VALUE);
85+
}
86+
87+
@Override
88+
public List<LamiResultTable> execute(ITmfTrace trace, @Nullable TmfTimeRange timeRange, String extraParamsString, IProgressMonitor monitor) throws CoreException {
89+
AtomicLong done = new AtomicLong();
90+
Map<String, Map<Integer, Long>> pollCountPerQueue = new TreeMap<>();
91+
TmfTimeRange adjustedTimeRange = timeRange == null ? TmfTimeRange.ETERNITY : timeRange;
92+
SubMonitor subMonitor = SubMonitor.convert(monitor, Messages.getMessage(Messages.EthdevPollDistribution_AnalysisName), workRemaining(trace));
93+
94+
/*
95+
* Handle the filter in case the user indicates a specific port to
96+
* process its events
97+
*/
98+
TmfFilterMatchesNode filter = new TmfFilterMatchesNode(null);
99+
filter.setEventAspect(new TmfContentFieldAspect(Messages.getMessage(Messages.EthdevPollDistribution_CountLabel), DpdkEthdevEventLayout.fieldPortId()));
100+
filter.setRegex(extraParamsString);
101+
Predicate<ITmfEvent> filterPred = (event -> extraParamsString.isEmpty() || filter.matches(event));
102+
103+
// create and send the event request
104+
TmfEventRequest eventRequest = createEventRequest(trace, adjustedTimeRange, filterPred,
105+
pollCountPerQueue, subMonitor, done);
106+
trace.sendRequest(eventRequest);
107+
108+
try {
109+
eventRequest.waitForCompletion();
110+
return convertToLamiTables(adjustedTimeRange, pollCountPerQueue);
111+
112+
} catch (InterruptedException e) {
113+
Thread.currentThread().interrupt();
114+
return Collections.emptyList();
115+
}
116+
}
117+
118+
private static TmfEventRequest createEventRequest(ITmfTrace trace, TmfTimeRange timeRange, Predicate<ITmfEvent> filterPredicate, Map<String, Map<Integer, Long>> pollAspectCounts, SubMonitor monitor, AtomicLong nbProcessevents) {
119+
return new TmfEventRequest(ITmfEvent.class, timeRange, 0, Integer.MAX_VALUE, ExecutionType.BACKGROUND) {
120+
@Override
121+
public void handleData(ITmfEvent event) {
122+
if (monitor.isCanceled()) {
123+
cancel();
124+
return;
125+
}
126+
127+
// process events to compute RX polls distribution
128+
processEvent(event, filterPredicate, pollAspectCounts);
129+
130+
if ((nbProcessevents.incrementAndGet() & PROGRESS_INTERVAL) == 0) {
131+
monitor.setWorkRemaining(workRemaining(trace));
132+
monitor.worked(1);
133+
monitor.setTaskName(String.format("DPDK Polls Distribution Analysis (%s events processed)", //$NON-NLS-1$
134+
NumberFormat.getInstance().format(nbProcessevents.get())));
135+
}
136+
}
137+
};
138+
}
139+
140+
private static void processEvent(ITmfEvent event, Predicate<ITmfEvent> filterPredicate,
141+
Map<String, Map<Integer, Long>> pollAspectCounts) {
142+
143+
if (event.getName().equals(DpdkEthdevEventLayout.eventEthdevRxBurstNonEmpty())
144+
&& filterPredicate.test(event)) {
145+
Integer nbRxPkts = event.getContent().getFieldValue(Integer.class, DpdkEthdevEventLayout.fieldNbRxPkts());
146+
Integer portId = event.getContent().getFieldValue(Integer.class, DpdkEthdevEventLayout.fieldPortId());
147+
Integer queueId = event.getContent().getFieldValue(Integer.class, DpdkEthdevEventLayout.fieldQueueId());
148+
149+
if (nbRxPkts != null && portId != null && queueId != null) {
150+
String queueName = "P" + portId + "/Q" + queueId; //$NON-NLS-1$ //$NON-NLS-2$
151+
Map<Integer, Long> dataSet = pollAspectCounts.computeIfAbsent(queueName, k -> new HashMap<>());
152+
if (dataSet.size() < MEMORY_SANITY_LIMIT) {
153+
dataSet.merge(nbRxPkts, 1L, Long::sum);
154+
}
155+
}
156+
}
157+
}
158+
159+
private @NonNull List<LamiResultTable> convertToLamiTables(TmfTimeRange timeRange,
160+
Map<String, Map<Integer, Long>> pollCountPerQueue) {
161+
List<LamiResultTable> results = new ArrayList<>();
162+
for (Map.Entry<String, Map<Integer, Long>> entry : pollCountPerQueue.entrySet()) {
163+
String queueName = entry.getKey();
164+
Map<Integer, Long> dataSet = entry.getValue();
165+
166+
List<LamiTableEntry> tableEntries = dataSet.entrySet().stream()
167+
.map(e -> new LamiTableEntry(Arrays.asList(
168+
new LamiString(e.getKey().toString()),
169+
new LamiLongNumber(e.getValue()))))
170+
.collect(Collectors.toList());
171+
172+
List<@NonNull LamiTableEntryAspect> tableAspects = Arrays.asList(
173+
new LamiCategoryAspect(Messages.EthdevPollDistribution_NumberOfPacketsLabel, 0),
174+
new LamiCountAspect(Messages.EthdevPollDistribution_CountLabel, 1));
175+
176+
LamiTableClass tableClass = new LamiTableClass(queueName, queueName, tableAspects, Collections.emptySet());
177+
results.add(new LamiResultTable(createTimeRange(timeRange), tableClass, tableEntries));
178+
}
179+
return results;
180+
}
181+
182+
/**
183+
* Count aspect, generic
184+
*
185+
*/
186+
private final class LamiCountAspect extends LamiGenericAspect {
187+
private LamiCountAspect(String name, int column) {
188+
super(name, null, column, true, false);
189+
}
190+
}
191+
192+
/**
193+
* Category aspect, generic
194+
*
195+
*/
196+
private final class LamiCategoryAspect extends LamiGenericAspect {
197+
private LamiCategoryAspect(String name, int column) {
198+
super(name, null, column, false, false);
199+
}
200+
}
201+
202+
/**
203+
* TODO: move to LAMI
204+
*/
205+
private static LamiTimeRange createTimeRange(TmfTimeRange timeRange) {
206+
return new LamiTimeRange(new LamiTimestamp(timeRange.getStartTime().toNanos()), new LamiTimestamp(timeRange.getEndTime().toNanos()));
207+
}
208+
209+
/**
210+
* TODO: LamiString in LAMI is private
211+
*/
212+
private final class LamiString extends LamiData {
213+
private final String fElement;
214+
215+
private LamiString(String element) {
216+
fElement = element;
217+
}
218+
219+
@Override
220+
public @NonNull String toString() {
221+
return fElement;
222+
}
223+
}
224+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 École Polytechnique de Montréal
3+
*
4+
* All rights reserved. This program and the accompanying materials are
5+
* made available under the terms of the Eclipse Public License 2.0 which
6+
* accompanies this distribution, and is available at
7+
* https://www.eclipse.org/legal/epl-2.0/
8+
*
9+
* SPDX-License-Identifier: EPL-2.0
10+
*******************************************************************************/
11+
12+
package org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis;
13+
14+
import org.eclipse.jdt.annotation.NonNull;
15+
import org.eclipse.jdt.annotation.Nullable;
16+
import org.eclipse.osgi.util.NLS;
17+
18+
/**
19+
* Messages for the {@link DpdkPollDistributionAnalysis} on-demand analysis
20+
*
21+
* @author Adel Belkhiri
22+
*/
23+
@SuppressWarnings("javadoc")
24+
public class Messages extends NLS {
25+
private static final String BUNDLE_NAME = "org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis.messages"; //$NON-NLS-1$
26+
27+
public static @Nullable String EthdevPollDistribution_AnalysisName;
28+
public static @Nullable String EthdevPollDistribution_NumberOfPacketsLabel;
29+
public static @Nullable String EthdevPollDistribution_CountLabel;
30+
31+
static @NonNull String getMessage(@Nullable String msg) {
32+
if (msg == null) {
33+
return ""; //$NON-NLS-1$
34+
}
35+
return msg;
36+
}
37+
38+
static {
39+
// initialize resource bundle
40+
NLS.initializeMessages(BUNDLE_NAME, Messages.class);
41+
}
42+
43+
private Messages() {
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
###############################################################################
2+
# Copyright (c) 2024 École Polytechnique de Montréal
3+
#
4+
# All rights reserved. This program and the accompanying materials
5+
# are made available under the terms of the Eclipse Public License 2.0
6+
# which accompanies this distribution, and is available at
7+
# https://www.eclipse.org/legal/epl-2.0
8+
#
9+
# SPDX-License-Identifier: EPL-2.0
10+
###############################################################################
11+
12+
EthdevPollDistribution_AnalysisName=DPDK Polls Distribution (ethdev)
13+
EthdevPollDistribution_NumberOfPacketsLabel=Number of retrieved packets
14+
EthdevPollDistribution_CountLabel=Count

0 commit comments

Comments
 (0)