Skip to content

Commit 1ef45be

Browse files
Allows continuation of observations
without this change we're not taking into account a parent observation into account. Nor do we set in the reactor context the current observation for the users to use with this change whenever an observation is being created we put it into the reacto context under a well-known micrometer.observation key (for more information look at the ObservationThreadLocalAccessor class from micrometer-core)
1 parent 32da131 commit 1ef45be

File tree

4 files changed

+69
-69
lines changed

4 files changed

+69
-69
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.1.3
14+
version=1.1.4-SNAPSHOT
1515
perfBaselineVersion=1.1.2

rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.java

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.reactivestreams.Publisher;
3333
import reactor.core.publisher.Flux;
3434
import reactor.core.publisher.Mono;
35+
import reactor.util.annotation.Nullable;
3536
import reactor.util.context.ContextView;
3637

3738
/**
@@ -43,13 +44,24 @@
4344
*/
4445
public class ObservationRequesterRSocketProxy extends RSocketProxy {
4546

47+
/** Aligned with ObservationThreadLocalAccessor#KEY */
48+
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
49+
4650
private final ObservationRegistry observationRegistry;
4751

48-
private RSocketRequesterObservationConvention observationConvention;
52+
@Nullable private final RSocketRequesterObservationConvention observationConvention;
4953

5054
public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
55+
this(source, observationRegistry, null);
56+
}
57+
58+
public ObservationRequesterRSocketProxy(
59+
RSocket source,
60+
ObservationRegistry observationRegistry,
61+
RSocketRequesterObservationConvention observationConvention) {
5162
super(source);
5263
this.observationRegistry = observationRegistry;
64+
this.observationConvention = observationConvention;
5365
}
5466

5567
@Override
@@ -76,15 +88,7 @@ <T> Mono<T> setObservation(
7688
FrameType frameType,
7789
ObservationDocumentation observation) {
7890
return Mono.deferContextual(
79-
contextView -> {
80-
if (contextView.hasKey(Observation.class)) {
81-
Observation parent = contextView.get(Observation.class);
82-
try (Observation.Scope scope = parent.openScope()) {
83-
return observe(input, payload, frameType, observation);
84-
}
85-
}
86-
return observe(input, payload, frameType, observation);
87-
});
91+
contextView -> observe(input, payload, frameType, observation, contextView));
8892
}
8993

9094
private String route(Payload payload) {
@@ -107,45 +111,40 @@ private <T> Mono<T> observe(
107111
Function<Payload, Mono<T>> input,
108112
Payload payload,
109113
FrameType frameType,
110-
ObservationDocumentation obs) {
114+
ObservationDocumentation obs,
115+
ContextView contextView) {
111116
String route = route(payload);
112117
RSocketContext rSocketContext =
113118
new RSocketContext(
114119
payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
120+
Observation parentObservation = contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
115121
Observation observation =
116-
obs.start(
117-
this.observationConvention,
118-
new DefaultRSocketRequesterObservationConvention(rSocketContext),
119-
() -> rSocketContext,
120-
observationRegistry);
122+
obs.observation(
123+
this.observationConvention,
124+
new DefaultRSocketRequesterObservationConvention(rSocketContext),
125+
() -> rSocketContext,
126+
observationRegistry)
127+
.parentObservation(parentObservation);
121128
setContextualName(frameType, route, observation);
129+
observation.start();
122130
Payload newPayload = payload;
123131
if (rSocketContext.modifiedPayload != null) {
124132
newPayload = rSocketContext.modifiedPayload;
125133
}
126134
return input
127135
.apply(newPayload)
128136
.doOnError(observation::error)
129-
.doFinally(signalType -> observation.stop());
130-
}
131-
132-
private Observation observation(ContextView contextView) {
133-
if (contextView.hasKey(Observation.class)) {
134-
return contextView.get(Observation.class);
135-
}
136-
return null;
137+
.doFinally(signalType -> observation.stop())
138+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
137139
}
138140

139141
@Override
140142
public Flux<Payload> requestStream(Payload payload) {
141-
return Flux.deferContextual(
142-
contextView ->
143-
setObservation(
144-
super::requestStream,
145-
payload,
146-
contextView,
147-
FrameType.REQUEST_STREAM,
148-
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM));
143+
return observationFlux(
144+
super::requestStream,
145+
payload,
146+
FrameType.REQUEST_STREAM,
147+
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM);
149148
}
150149

151150
@Override
@@ -155,32 +154,16 @@ public Flux<Payload> requestChannel(Publisher<Payload> inbound) {
155154
(firstSignal, flux) -> {
156155
final Payload firstPayload = firstSignal.get();
157156
if (firstPayload != null) {
158-
return setObservation(
157+
return observationFlux(
159158
p -> super.requestChannel(flux.skip(1).startWith(p)),
160159
firstPayload,
161-
firstSignal.getContextView(),
162160
FrameType.REQUEST_CHANNEL,
163161
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL);
164162
}
165163
return flux;
166164
});
167165
}
168166

169-
private Flux<Payload> setObservation(
170-
Function<Payload, Flux<Payload>> input,
171-
Payload payload,
172-
ContextView contextView,
173-
FrameType frameType,
174-
ObservationDocumentation obs) {
175-
Observation parentObservation = observation(contextView);
176-
if (parentObservation == null) {
177-
return observationFlux(input, payload, frameType, obs);
178-
}
179-
try (Observation.Scope scope = parentObservation.openScope()) {
180-
return observationFlux(input, payload, frameType, obs);
181-
}
182-
}
183-
184167
private Flux<Payload> observationFlux(
185168
Function<Payload, Flux<Payload>> input,
186169
Payload payload,
@@ -196,17 +179,22 @@ private Flux<Payload> observationFlux(
196179
frameType,
197180
route,
198181
RSocketContext.Side.REQUESTER);
182+
Observation parentObservation =
183+
contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
199184
Observation newObservation =
200-
obs.start(
201-
this.observationConvention,
202-
new DefaultRSocketRequesterObservationConvention(rSocketContext),
203-
() -> rSocketContext,
204-
this.observationRegistry);
185+
obs.observation(
186+
this.observationConvention,
187+
new DefaultRSocketRequesterObservationConvention(rSocketContext),
188+
() -> rSocketContext,
189+
this.observationRegistry)
190+
.parentObservation(parentObservation);
205191
setContextualName(frameType, route, newObservation);
192+
newObservation.start();
206193
return input
207194
.apply(rSocketContext.modifiedPayload)
208195
.doOnError(newObservation::error)
209-
.doFinally(signalType -> newObservation.stop());
196+
.doFinally(signalType -> newObservation.stop())
197+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
210198
});
211199
}
212200

@@ -217,8 +205,4 @@ private void setContextualName(FrameType frameType, String route, Observation ne
217205
newObservation.contextualName(frameType.name());
218206
}
219207
}
220-
221-
public void setObservationConvention(RSocketRequesterObservationConvention convention) {
222-
this.observationConvention = convention;
223-
}
224208
}

rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.reactivestreams.Publisher;
3131
import reactor.core.publisher.Flux;
3232
import reactor.core.publisher.Mono;
33+
import reactor.util.annotation.Nullable;
3334

3435
/**
3536
* Tracing representation of a {@link RSocketProxy} for the responder.
@@ -39,14 +40,24 @@
3940
* @since 1.1.4
4041
*/
4142
public class ObservationResponderRSocketProxy extends RSocketProxy {
43+
/** Aligned with ObservationThreadLocalAccessor#KEY */
44+
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
4245

4346
private final ObservationRegistry observationRegistry;
4447

45-
private RSocketResponderObservationConvention observationConvention;
48+
@Nullable private final RSocketResponderObservationConvention observationConvention;
4649

4750
public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
51+
this(source, observationRegistry, null);
52+
}
53+
54+
public ObservationResponderRSocketProxy(
55+
RSocket source,
56+
ObservationRegistry observationRegistry,
57+
RSocketResponderObservationConvention observationConvention) {
4858
super(source);
4959
this.observationRegistry = observationRegistry;
60+
this.observationConvention = observationConvention;
5061
}
5162

5263
@Override
@@ -66,7 +77,8 @@ public Mono<Void> fireAndForget(Payload payload) {
6677
startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext);
6778
return super.fireAndForget(rSocketContext.modifiedPayload)
6879
.doOnError(newObservation::error)
69-
.doFinally(signalType -> newObservation.stop());
80+
.doFinally(signalType -> newObservation.stop())
81+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
7082
}
7183

7284
private Observation startObservation(
@@ -94,7 +106,8 @@ public Mono<Payload> requestResponse(Payload payload) {
94106
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext);
95107
return super.requestResponse(rSocketContext.modifiedPayload)
96108
.doOnError(newObservation::error)
97-
.doFinally(signalType -> newObservation.stop());
109+
.doFinally(signalType -> newObservation.stop())
110+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
98111
}
99112

100113
@Override
@@ -109,7 +122,8 @@ public Flux<Payload> requestStream(Payload payload) {
109122
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext);
110123
return super.requestStream(rSocketContext.modifiedPayload)
111124
.doOnError(newObservation::error)
112-
.doFinally(signalType -> newObservation.stop());
125+
.doFinally(signalType -> newObservation.stop())
126+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
113127
}
114128

115129
@Override
@@ -137,7 +151,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
137151
}
138152
return super.requestChannel(flux.skip(1).startWith(rSocketContext.modifiedPayload))
139153
.doOnError(newObservation::error)
140-
.doFinally(signalType -> newObservation.stop());
154+
.doFinally(signalType -> newObservation.stop())
155+
.contextWrite(
156+
context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
141157
}
142158
return flux;
143159
});
@@ -160,8 +176,4 @@ private String route(Payload payload, ByteBuf headers) {
160176
}
161177
return null;
162178
}
163-
164-
public void setObservationConvention(RSocketResponderObservationConvention convention) {
165-
this.observationConvention = convention;
166-
}
167179
}

rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ public Tracer getTracer() {
6969
public void onStart(RSocketContext context) {
7070
Payload payload = context.payload;
7171
Span.Builder spanBuilder = this.tracer.spanBuilder();
72+
Span parentSpan = getParentSpan(context);
73+
if (parentSpan != null) {
74+
spanBuilder.setParent(parentSpan.context());
75+
}
7276
Span span = spanBuilder.kind(Span.Kind.PRODUCER).start();
7377
log.debug("Extracted result from context or thread local {}", span);
7478
// TODO: newmetadata returns an empty composite byte buf

0 commit comments

Comments
 (0)