1818
1919package org .apache .skywalking .oap .server .receiver .envoy .als .mx ;
2020
21- import com .google .protobuf .Any ;
22- import com .google .protobuf .TextFormat ;
23- import io .envoyproxy .envoy .data .accesslog .v3 .HTTPAccessLogEntry ;
24- import io .envoyproxy .envoy .service .accesslog .v3 .StreamAccessLogsMessage ;
25- import lombok .extern .slf4j .Slf4j ;
21+ import static org .apache .skywalking .oap .server .core .Const .TLS_MODE .NON_TLS ;
22+
23+ import java .util .Base64 ;
24+ import java .util .Map ;
25+ import java .util .concurrent .atomic .AtomicBoolean ;
26+
2627import org .apache .skywalking .apm .network .servicemesh .v3 .HTTPServiceMeshMetric ;
28+ import org .apache .skywalking .apm .network .servicemesh .v3 .HTTPServiceMeshMetrics .Builder ;
2729import org .apache .skywalking .oap .server .library .module .ModuleManager ;
2830import org .apache .skywalking .oap .server .library .module .ModuleStartException ;
2931import org .apache .skywalking .oap .server .library .util .FieldsHelper ;
3032import org .apache .skywalking .oap .server .receiver .envoy .EnvoyMetricReceiverConfig ;
3133import org .apache .skywalking .oap .server .receiver .envoy .als .AbstractALSAnalyzer ;
34+ import org .apache .skywalking .oap .server .receiver .envoy .als .AccessLogAnalyzer .Result .ResultBuilder ;
3235import org .apache .skywalking .oap .server .receiver .envoy .als .Role ;
3336import org .apache .skywalking .oap .server .receiver .envoy .als .ServiceMetaInfo ;
3437
35- import java .util .Base64 ;
36- import java .util .concurrent .atomic .AtomicBoolean ;
38+ import com .google .protobuf .Any ;
39+ import com .google .protobuf .Struct ;
40+ import com .google .protobuf .TextFormat ;
3741
38- import static org .apache .skywalking .oap .server .core .Const .TLS_MODE .NON_TLS ;
42+ import io .envoyproxy .envoy .data .accesslog .v3 .HTTPAccessLogEntry ;
43+ import io .envoyproxy .envoy .service .accesslog .v3 .StreamAccessLogsMessage ;
44+ import lombok .extern .slf4j .Slf4j ;
3945
4046@ Slf4j
4147public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
@@ -44,6 +50,10 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
4450
4551 public static final String DOWNSTREAM_KEY = "wasm.downstream_peer" ;
4652
53+ public static final String UPSTREAM_PEER = "upstream_peer" ;
54+
55+ public static final String DOWNSTREAM_PEER = "downstream_peer" ;
56+
4757 protected String fieldMappingFile = "metadata-service-mapping.yaml" ;
4858
4959 protected EnvoyMetricReceiverConfig config ;
@@ -86,13 +96,34 @@ public Result analysis(
8696 final var properties = entry .getCommonProperties ();
8797 final var stateMap = properties .getFilterStateObjectsMap ();
8898 final var result = previousResult .toBuilder ();
99+ if (log .isDebugEnabled ()) {
100+ log .debug ("Filter state object map: {}" , stateMap );
101+ }
89102 if (stateMap .isEmpty ()) {
90103 return result .service (currSvc ).build ();
91104 }
92105
93106 final var previousMetrics = previousResult .getMetrics ();
94107 final var httpMetrics = previousMetrics .getHttpMetricsBuilder ();
95108 final var downstreamExists = new AtomicBoolean ();
109+ parseFilterObject (previousResult , entry , role , currSvc , stateMap , result , httpMetrics , downstreamExists );
110+ parseFilterObjectPrior124 (previousResult , entry , role , currSvc , stateMap , result , httpMetrics , downstreamExists );
111+ if (role .equals (Role .PROXY ) && !downstreamExists .get ()) {
112+ final var metric = newAdapter (entry , config .serviceMetaInfoFactory ().unknown (), currSvc ).adaptToDownstreamMetrics ();
113+ if (log .isDebugEnabled ()) {
114+ log .debug ("Transformed a {} inbound mesh metric {}" , role , TextFormat .shortDebugString (metric ));
115+ }
116+ httpMetrics .addMetrics (metric );
117+ result .hasDownstreamMetrics (true );
118+ }
119+ return result .metrics (previousMetrics .setHttpMetrics (httpMetrics )).service (currSvc ).build ();
120+ }
121+
122+ // TODO: remove this when 1.24.0 is our minimum supported version.
123+ @ Deprecated (forRemoval = true )
124+ private void parseFilterObjectPrior124 (final Result previousResult , final HTTPAccessLogEntry entry , final Role role ,
125+ final ServiceMetaInfo currSvc , final Map <String , Any > stateMap , final ResultBuilder result ,
126+ final Builder httpMetrics , final AtomicBoolean downstreamExists ) {
96127 stateMap .forEach ((key , value ) -> {
97128 if (!key .equals (UPSTREAM_KEY ) && !key .equals (DOWNSTREAM_KEY )) {
98129 return ;
@@ -131,21 +162,63 @@ public Result analysis(
131162 break ;
132163 }
133164 });
134- if (role .equals (Role .PROXY ) && !downstreamExists .get ()) {
135- final var metric = newAdapter (entry , config .serviceMetaInfoFactory ().unknown (), currSvc ).adaptToDownstreamMetrics ();
165+ }
166+
167+ private void parseFilterObject (final Result previousResult , final HTTPAccessLogEntry entry , final Role role ,
168+ final ServiceMetaInfo currSvc , final Map <String , Any > stateMap , final ResultBuilder result ,
169+ final Builder httpMetrics , final AtomicBoolean downstreamExists ) {
170+ stateMap .forEach ((key , value ) -> {
171+ if (!key .equals (UPSTREAM_PEER ) && !key .equals (DOWNSTREAM_PEER )) {
172+ return ;
173+ }
136174 if (log .isDebugEnabled ()) {
137- log .debug ("Transformed a {} inbound mesh metric {}" , role , TextFormat . shortDebugString ( metric ) );
175+ log .debug ("Filter state object key: {}, value: {}" , key , value );
138176 }
139- httpMetrics .addMetrics (metric );
140- result .hasDownstreamMetrics (true );
141- }
142- return result .metrics (previousMetrics .setHttpMetrics (httpMetrics )).service (currSvc ).build ();
177+ final ServiceMetaInfo svc ;
178+ try {
179+ log .debug ("Filter state object value map: {}" , value .unpack (Struct .class ).getFieldsMap ());
180+ svc = adaptToServiceMetaInfo (value .unpack (Struct .class ));
181+ } catch (Exception e ) {
182+ log .error ("Fail to parse metadata {} to FlatNode" , Base64 .getEncoder ().encode (value .toByteArray ()));
183+ return ;
184+ }
185+ final HTTPServiceMeshMetric .Builder metrics ;
186+ switch (key ) {
187+ case UPSTREAM_PEER :
188+ if (previousResult .hasUpstreamMetrics ()) {
189+ break ;
190+ }
191+ metrics = newAdapter (entry , currSvc , svc ).adaptToUpstreamMetrics ().setTlsMode (NON_TLS );
192+ if (log .isDebugEnabled ()) {
193+ log .debug ("Transformed a {} outbound mesh metrics {}" , role , TextFormat .shortDebugString (metrics ));
194+ }
195+ httpMetrics .addMetrics (metrics );
196+ result .hasUpstreamMetrics (true );
197+ break ;
198+ case DOWNSTREAM_PEER :
199+ if (previousResult .hasDownstreamMetrics ()) {
200+ break ;
201+ }
202+ metrics = newAdapter (entry , svc , currSvc ).adaptToDownstreamMetrics ();
203+ if (log .isDebugEnabled ()) {
204+ log .debug ("Transformed a {} inbound mesh metrics {}" , role , TextFormat .shortDebugString (metrics ));
205+ }
206+ httpMetrics .addMetrics (metrics );
207+ downstreamExists .set (true );
208+ result .hasDownstreamMetrics (true );
209+ break ;
210+ }
211+ });
143212 }
144213
145214 protected ServiceMetaInfo adaptToServiceMetaInfo (final Any value ) throws Exception {
146215 return new ServiceMetaInfoAdapter (value );
147216 }
148217
218+ protected ServiceMetaInfo adaptToServiceMetaInfo (final Struct struct ) throws Exception {
219+ return config .serviceMetaInfoFactory ().fromStruct (struct );
220+ }
221+
149222 protected ServiceMetaInfo adaptToServiceMetaInfo (final StreamAccessLogsMessage .Identifier identifier ) throws Exception {
150223 return config .serviceMetaInfoFactory ().fromStruct (identifier .getNode ().getMetadata ());
151224 }
0 commit comments