Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apm-protocol/apm-network/src/main/proto
Submodule proto updated 1 files
+2 −0 ebpf/accesslog.proto
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Support analysis waypoint metrics in Envoy ALS receiver.
* Add Ztunnel component in the topology.
* [Break Change] Change `compomentId` to `componentIds` in the K8SServiceRelation Scope.
* Adapt the mesh metrics if detect the ambient mesh in the eBPF access log receiver.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-mesh-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>agent-analyzer</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.AccessLogConnection;
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.AccessLogConnectionTLSMode;
Expand All @@ -46,7 +47,15 @@
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.IPAddress;
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.KubernetesProcessAddress;
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.ZTunnelAttachmentEnvironment;
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.ZTunnelAttachmentEnvironmentDetectBy;
import org.apache.skywalking.apm.network.ebpf.accesslog.v3.ZTunnelAttachmentSecurityPolicy;
import org.apache.skywalking.apm.network.servicemesh.v3.HTTPServiceMeshMetric;
import org.apache.skywalking.apm.network.servicemesh.v3.HTTPServiceMeshMetrics;
import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetrics;
import org.apache.skywalking.apm.network.servicemesh.v3.TCPServiceMeshMetric;
import org.apache.skywalking.apm.network.servicemesh.v3.TCPServiceMeshMetrics;
import org.apache.skywalking.library.elasticsearch.response.NodeInfo;
import org.apache.skywalking.library.kubernetes.ObjectID;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
import org.apache.skywalking.oap.server.core.Const;
Expand All @@ -60,6 +69,7 @@
import org.apache.skywalking.oap.server.core.source.K8SServiceInstance;
import org.apache.skywalking.oap.server.core.source.K8SServiceInstanceRelation;
import org.apache.skywalking.oap.server.core.source.K8SServiceRelation;
import org.apache.skywalking.oap.server.core.source.Service;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtil;
Expand All @@ -74,6 +84,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -100,6 +111,7 @@ public AccessLogServiceHandler(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.namingControl = moduleManager.find(CoreModule.NAME).provider().getService(NamingControl.class);

TelemetryDataDispatcher.init(moduleManager);
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
Expand Down Expand Up @@ -150,6 +162,8 @@ public void onNext(EBPFAccessLogMessage logMessage) {
return;
}

prepareForDispatch(node, connection, logMessage);

for (AccessLogKernelLog accessLogKernelLog : logMessage.getKernelLogsList()) {
inCounter.inc();
dispatchKernelLog(node, connection, accessLogKernelLog);
Expand Down Expand Up @@ -178,6 +192,116 @@ public void onCompleted() {
};
}

protected void prepareForDispatch(NodeInfo node, ConnectionInfo connection, EBPFAccessLogMessage logMessage) {
// if the connection is communicated with ztunnel, then needs to generate mesh metrics
if (connection.getOriginalConnection().hasAttachment() && connection.getOriginalConnection().getAttachment().hasZTunnel()) {
prepareDispatchForZtunnelMesh(node, connection, logMessage);
}
}

protected void prepareDispatchForZtunnelMesh(NodeInfo node, ConnectionInfo connection, EBPFAccessLogMessage logMessage) {
// adapt to the mesh metrics
String tlsMode = Const.TLS_MODE.NON_TLS;
if (AccessLogConnectionTLSMode.TLS.equals(connection.getTlsMode())) {
tlsMode = Const.TLS_MODE.TLS;
}
if (ZTunnelAttachmentSecurityPolicy.MTLS.equals(connection.getOriginalConnection().getAttachment().getZTunnel().getSecurityPolicy())) {
tlsMode = Const.TLS_MODE.M_TLS;
}

if (logMessage.hasProtocolLog()) {
final AccessLogProtocolLogs protocolLog = logMessage.getProtocolLog();
final ServiceMeshMetrics.Builder serviceMeshMetrics = ServiceMeshMetrics.newBuilder();
switch (protocolLog.getProtocolCase()) {
case HTTP:
serviceMeshMetrics.setHttpMetrics(HTTPServiceMeshMetrics.newBuilder()
.addMetrics(generateHTTPServiceMeshMetrics(node, connection, protocolLog.getHttp(), tlsMode)));
break;
}
TelemetryDataDispatcher.process(serviceMeshMetrics.build());
}

final ServiceMeshMetrics.Builder serviceMeshMetrics = ServiceMeshMetrics.newBuilder();
final TCPServiceMeshMetrics.Builder builder = TCPServiceMeshMetrics.newBuilder();
for (AccessLogKernelLog accessLogKernelLog : logMessage.getKernelLogsList()) {
Optional.ofNullable(generateTCPServiceMeshMetrics(node, connection, accessLogKernelLog, tlsMode))
.ifPresent(builder::addMetrics);
}
serviceMeshMetrics.setTcpMetrics(builder.build());
TelemetryDataDispatcher.process(serviceMeshMetrics.build());
}

protected HTTPServiceMeshMetric generateHTTPServiceMeshMetrics(NodeInfo node, ConnectionInfo connection,
AccessLogHTTPProtocol http, String tlsMode) {
KubernetesProcessAddress source, dest;
if (DetectPoint.client.equals(connection.getRole())) {
source = connection.getLocal();
dest = connection.getRemote();
} else {
source = connection.getRemote();
dest = connection.getLocal();
}
final long startTime = node.parseTimestamp(http.getStartTime());
final long endTime = node.parseTimestamp(http.getEndTime());
return HTTPServiceMeshMetric.newBuilder()
.setStartTime(startTime)
.setEndTime(endTime)
.setSourceServiceName(buildServiceNameByAddress(node, source))
.setSourceServiceInstance(buildServiceInstanceName(source))
.setDestServiceName(buildServiceNameByAddress(node, dest))
.setDestServiceInstance(buildServiceInstanceName(dest))
.setEndpoint(buildHTTPProtocolEndpointName(connection, http))
.setLatency((int) (endTime - startTime))
.setResponseCode(http.getResponse().getStatusCode())
.setStatus(http.getResponse().getStatusCode() < 500)
.setProtocol(Protocol.HTTP)
.setDetectPoint(connection.getRole())
.setTlsMode(tlsMode).build();
}

protected TCPServiceMeshMetric generateTCPServiceMeshMetrics(NodeInfo node, ConnectionInfo connection,
AccessLogKernelLog kernelLog, String tlsMode) {
long receivedBytes = 0, sentBytes = 0;
long startTime = 0, endTime = 0;
switch (kernelLog.getOperationCase()) {
case CONNECT:
case ACCEPT:
case CLOSE:
return null;
case WRITE:
final AccessLogKernelWriteOperation write = kernelLog.getWrite();
sentBytes = write.getL4Metrics().getTotalPackageSize();
startTime = node.parseTimestamp(write.getStartTime());
endTime = node.parseTimestamp(write.getEndTime());
break;
case READ:
final AccessLogKernelReadOperation read = kernelLog.getRead();
receivedBytes = read.getL2Metrics().getTotalPackageSize();
startTime = node.parseTimestamp(read.getStartTime());
endTime = node.parseTimestamp(read.getEndTime());
break;
}
KubernetesProcessAddress source, dest;
if (DetectPoint.client.equals(connection.getRole())) {
source = connection.getLocal();
dest = connection.getRemote();
} else {
source = connection.getRemote();
dest = connection.getLocal();
}
return TCPServiceMeshMetric.newBuilder()
.setStartTime(startTime)
.setEndTime(endTime)
.setSourceServiceName(buildServiceNameByAddress(node, source))
.setSourceServiceInstance(buildServiceInstanceName(source))
.setDestServiceName(buildServiceNameByAddress(node, dest))
.setDestServiceInstance(buildServiceInstanceName(dest))
.setDetectPoint(connection.getRole())
.setTlsMode(tlsMode)
.setReceivedBytes(receivedBytes)
.setSentBytes(sentBytes).build();
}

protected List<K8SMetrics> buildKernelLogMetrics(NodeInfo node, ConnectionInfo connection, AccessLogKernelLog kernelLog) {
return Arrays.asList(connection.toService(), connection.toServiceInstance(),
connection.toServiceRelation(), connection.toServiceInstanceRelation());
Expand Down Expand Up @@ -395,6 +519,11 @@ public long parseMinuteTimeBucket(EBPFTimestamp timestamp) {
return TimeBucket.getMinuteTimeBucket(seconds * 1000);
}

public long parseTimestamp(EBPFTimestamp timestamp) {
return TimeUnit.SECONDS.toMillis(bootTime.getSeconds() +
TimeUnit.NANOSECONDS.toSeconds(timestamp.getOffset().getOffset()));
}

public boolean shouldExcludeNamespace(String namespace) {
return excludeNamespaces.contains(namespace);
}
Expand Down Expand Up @@ -422,17 +551,19 @@ protected String buildServiceInstanceName(KubernetesProcessAddress address) {
}

protected String buildProtocolEndpointName(ConnectionInfo connectionInfo, AccessLogProtocolLogs protocol) {
final String serviceName = connectionInfo.buildLocalServiceName();
switch (protocol.getProtocolCase()) {
case HTTP:
final AccessLogHTTPProtocol http = protocol.getHttp();
return namingControl.formatEndpointName(serviceName,
StringUtils.upperCase(http.getRequest().getMethod().name()) + ":" + http.getRequest().getPath());
return buildHTTPProtocolEndpointName(connectionInfo, protocol.getHttp());
default:
return null;
}
}

protected String buildHTTPProtocolEndpointName(ConnectionInfo connectionInfo, AccessLogHTTPProtocol http) {
return namingControl.formatEndpointName(connectionInfo.buildLocalServiceName(),
StringUtils.upperCase(http.getRequest().getMethod().name()) + ":" + http.getRequest().getPath());
}

protected void recordIgnoreSameService(String sourceService) {
final DropDataReason dropDataReason = dropReasons.computeIfAbsent(sourceService,
key -> DropDataReason.buildWhenSameService(sourceService));
Expand Down Expand Up @@ -491,11 +622,13 @@ protected void printDropReasons() {

protected KubernetesProcessAddress buildKubernetesAddressByIP(NodeInfo nodeInfo, AccessLogConnection connection, boolean isLocal, IPAddress ipAddress) {
String host = ipAddress.getHost();
// if the resolving address is not local, and have attached ztunnel info, then using the ztunnel mapped host
if (!isLocal && connection.hasAttachment() && connection.getAttachment().hasZTunnel()) {
// if the resolving address is not local, have attached ztunnel info,
// and must is detected by outbound, then using the ztunnel mapped host
if (!isLocal && connection.hasAttachment() && connection.getAttachment().hasZTunnel() &&
ZTunnelAttachmentEnvironmentDetectBy.ZTUNNEL_OUTBOUND_FUNC.equals(connection.getAttachment().getZTunnel().getBy())) {
final ZTunnelAttachmentEnvironment ztunnel = connection.getAttachment().getZTunnel();
host = ztunnel.getRealDestinationIp();
log.debug("detected the ztunnel connection, so update the remote IP address as: {}, detect by: {}", host,
log.debug("detected the ztunnel outbound connection, so update the remote IP address as: {}, detect by: {}", host,
ztunnel.getBy());
}
final ObjectID service = K8sInfoRegistry.getInstance().findServiceByIP(host);
Expand Down Expand Up @@ -535,9 +668,11 @@ protected KubernetesProcessAddress buildRemoteAddress(NodeInfo nodeInfo, ObjectI

protected List<Integer> buildConnectionComponentId(ConnectionInfo connectionInfo) {
final AccessLogConnection originalConnection = connectionInfo.getOriginalConnection();
if (originalConnection.hasAttachment() && originalConnection.getAttachment().hasZTunnel() &&
ZTunnelAttachmentSecurityPolicy.MTLS.equals(originalConnection.getAttachment().getZTunnel().getSecurityPolicy())) {
return Arrays.asList(142, 162); // mTLS, ztunnel
if (originalConnection.hasAttachment() && originalConnection.getAttachment().hasZTunnel()) {
if (ZTunnelAttachmentSecurityPolicy.MTLS.equals(originalConnection.getAttachment().getZTunnel().getSecurityPolicy())) {
return Arrays.asList(142, 162); // mTLS, ztunnel
}
return Arrays.asList(162); // ztunnel
}
return Arrays.asList(buildProtocolComponentID(connectionInfo));
}
Expand Down Expand Up @@ -725,6 +860,10 @@ public org.apache.skywalking.oap.server.core.source.DetectPoint parseToSourceRol
}
}

public AccessLogConnection getOriginal() {
return originalConnection;
}

public String toString() {
return String.format("local: %s, remote: %s, role: %s, tlsMode: %s, protocolType: %s, valid: %b",
buildConnectionAddressString(originalConnection.getLocal()),
Expand Down Expand Up @@ -766,4 +905,21 @@ public void increaseCount() {
protected long convertNsToMs(long latency) {
return TimeUnit.NANOSECONDS.toMillis(latency);
}

protected List<Service> buildBaseServiceFromRelation(K8SServiceRelation relation, Layer layer) {
if (relation == null) {
return Collections.emptyList();
}
Service localService = new Service();
localService.setLayer(layer);
localService.setName(relation.getSourceServiceName());
localService.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
Service remoteService = new Service();
remoteService.setLayer(layer);
remoteService.setName(relation.getDestServiceName());
remoteService.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
log.warn("generate the mesh layer service local service: {}, remote service: {}",
relation.getSourceServiceName(), relation.getDestServiceName());
return Arrays.asList(localService, remoteService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ ThreadPerTask-executor:
ztunnel:
id: 162
languages: ebpf, mesh
priority: 10

# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
Expand Down
Loading
Loading