Skip to content

Commit 4e1d8b5

Browse files
allows continuation of observations (#1076)
1 parent 32da131 commit 4e1d8b5

File tree

4 files changed

+69
-69
lines changed

4 files changed

+69
-69
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.1.3
14+
version=1.1.4-SNAPSHOT
1515
perfBaselineVersion=1.1.2

rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.java

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.reactivestreams.Publisher;
3333
import reactor.core.publisher.Flux;
3434
import reactor.core.publisher.Mono;
35+
import reactor.util.annotation.Nullable;
3536
import reactor.util.context.ContextView;
3637

3738
/**
@@ -43,13 +44,24 @@
4344
*/
4445
public class ObservationRequesterRSocketProxy extends RSocketProxy {
4546

47+
/** Aligned with ObservationThreadLocalAccessor#KEY */
48+
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
49+
4650
private final ObservationRegistry observationRegistry;
4751

48-
private RSocketRequesterObservationConvention observationConvention;
52+
@Nullable private final RSocketRequesterObservationConvention observationConvention;
4953

5054
public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
55+
this(source, observationRegistry, null);
56+
}
57+
58+
public ObservationRequesterRSocketProxy(
59+
RSocket source,
60+
ObservationRegistry observationRegistry,
61+
RSocketRequesterObservationConvention observationConvention) {
5162
super(source);
5263
this.observationRegistry = observationRegistry;
64+
this.observationConvention = observationConvention;
5365
}
5466

5567
@Override
@@ -76,15 +88,7 @@ <T> Mono<T> setObservation(
7688
FrameType frameType,
7789
ObservationDocumentation observation) {
7890
return Mono.deferContextual(
79-
contextView -> {
80-
if (contextView.hasKey(Observation.class)) {
81-
Observation parent = contextView.get(Observation.class);
82-
try (Observation.Scope scope = parent.openScope()) {
83-
return observe(input, payload, frameType, observation);
84-
}
85-
}
86-
return observe(input, payload, frameType, observation);
87-
});
91+
contextView -> observe(input, payload, frameType, observation, contextView));
8892
}
8993

9094
private String route(Payload payload) {
@@ -107,45 +111,40 @@ private <T> Mono<T> observe(
107111
Function<Payload, Mono<T>> input,
108112
Payload payload,
109113
FrameType frameType,
110-
ObservationDocumentation obs) {
114+
ObservationDocumentation obs,
115+
ContextView contextView) {
111116
String route = route(payload);
112117
RSocketContext rSocketContext =
113118
new RSocketContext(
114119
payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
120+
Observation parentObservation = contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
115121
Observation observation =
116-
obs.start(
117-
this.observationConvention,
118-
new DefaultRSocketRequesterObservationConvention(rSocketContext),
119-
() -> rSocketContext,
120-
observationRegistry);
122+
obs.observation(
123+
this.observationConvention,
124+
new DefaultRSocketRequesterObservationConvention(rSocketContext),
125+
() -> rSocketContext,
126+
observationRegistry)
127+
.parentObservation(parentObservation);
121128
setContextualName(frameType, route, observation);
129+
observation.start();
122130
Payload newPayload = payload;
123131
if (rSocketContext.modifiedPayload != null) {
124132
newPayload = rSocketContext.modifiedPayload;
125133
}
126134
return input
127135
.apply(newPayload)
128136
.doOnError(observation::error)
129-
.doFinally(signalType -> observation.stop());
130-
}
131-
132-
private Observation observation(ContextView contextView) {
133-
if (contextView.hasKey(Observation.class)) {
134-
return contextView.get(Observation.class);
135-
}
136-
return null;
137+
.doFinally(signalType -> observation.stop())
138+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
137139
}
138140

139141
@Override
140142
public Flux<Payload> requestStream(Payload payload) {
141-
return Flux.deferContextual(
142-
contextView ->
143-
setObservation(
144-
super::requestStream,
145-
payload,
146-
contextView,
147-
FrameType.REQUEST_STREAM,
148-
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM));
143+
return observationFlux(
144+
super::requestStream,
145+
payload,
146+
FrameType.REQUEST_STREAM,
147+
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM);
149148
}
150149

151150
@Override
@@ -155,32 +154,16 @@ public Flux<Payload> requestChannel(Publisher<Payload> inbound) {
155154
(firstSignal, flux) -> {
156155
final Payload firstPayload = firstSignal.get();
157156
if (firstPayload != null) {
158-
return setObservation(
157+
return observationFlux(
159158
p -> super.requestChannel(flux.skip(1).startWith(p)),
160159
firstPayload,
161-
firstSignal.getContextView(),
162160
FrameType.REQUEST_CHANNEL,
163161
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL);
164162
}
165163
return flux;
166164
});
167165
}
168166

169-
private Flux<Payload> setObservation(
170-
Function<Payload, Flux<Payload>> input,
171-
Payload payload,
172-
ContextView contextView,
173-
FrameType frameType,
174-
ObservationDocumentation obs) {
175-
Observation parentObservation = observation(contextView);
176-
if (parentObservation == null) {
177-
return observationFlux(input, payload, frameType, obs);
178-
}
179-
try (Observation.Scope scope = parentObservation.openScope()) {
180-
return observationFlux(input, payload, frameType, obs);
181-
}
182-
}
183-
184167
private Flux<Payload> observationFlux(
185168
Function<Payload, Flux<Payload>> input,
186169
Payload payload,
@@ -196,17 +179,22 @@ private Flux<Payload> observationFlux(
196179
frameType,
197180
route,
198181
RSocketContext.Side.REQUESTER);
182+
Observation parentObservation =
183+
contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
199184
Observation newObservation =
200-
obs.start(
201-
this.observationConvention,
202-
new DefaultRSocketRequesterObservationConvention(rSocketContext),
203-
() -> rSocketContext,
204-
this.observationRegistry);
185+
obs.observation(
186+
this.observationConvention,
187+
new DefaultRSocketRequesterObservationConvention(rSocketContext),
188+
() -> rSocketContext,
189+
this.observationRegistry)
190+
.parentObservation(parentObservation);
205191
setContextualName(frameType, route, newObservation);
192+
newObservation.start();
206193
return input
207194
.apply(rSocketContext.modifiedPayload)
208195
.doOnError(newObservation::error)
209-
.doFinally(signalType -> newObservation.stop());
196+
.doFinally(signalType -> newObservation.stop())
197+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
210198
});
211199
}
212200

@@ -217,8 +205,4 @@ private void setContextualName(FrameType frameType, String route, Observation ne
217205
newObservation.contextualName(frameType.name());
218206
}
219207
}
220-
221-
public void setObservationConvention(RSocketRequesterObservationConvention convention) {
222-
this.observationConvention = convention;
223-
}
224208
}

rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.reactivestreams.Publisher;
3131
import reactor.core.publisher.Flux;
3232
import reactor.core.publisher.Mono;
33+
import reactor.util.annotation.Nullable;
3334

3435
/**
3536
* Tracing representation of a {@link RSocketProxy} for the responder.
@@ -39,14 +40,24 @@
3940
* @since 1.1.4
4041
*/
4142
public class ObservationResponderRSocketProxy extends RSocketProxy {
43+
/** Aligned with ObservationThreadLocalAccessor#KEY */
44+
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
4245

4346
private final ObservationRegistry observationRegistry;
4447

45-
private RSocketResponderObservationConvention observationConvention;
48+
@Nullable private final RSocketResponderObservationConvention observationConvention;
4649

4750
public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
51+
this(source, observationRegistry, null);
52+
}
53+
54+
public ObservationResponderRSocketProxy(
55+
RSocket source,
56+
ObservationRegistry observationRegistry,
57+
RSocketResponderObservationConvention observationConvention) {
4858
super(source);
4959
this.observationRegistry = observationRegistry;
60+
this.observationConvention = observationConvention;
5061
}
5162

5263
@Override
@@ -66,7 +77,8 @@ public Mono<Void> fireAndForget(Payload payload) {
6677
startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext);
6778
return super.fireAndForget(rSocketContext.modifiedPayload)
6879
.doOnError(newObservation::error)
69-
.doFinally(signalType -> newObservation.stop());
80+
.doFinally(signalType -> newObservation.stop())
81+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
7082
}
7183

7284
private Observation startObservation(
@@ -94,7 +106,8 @@ public Mono<Payload> requestResponse(Payload payload) {
94106
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext);
95107
return super.requestResponse(rSocketContext.modifiedPayload)
96108
.doOnError(newObservation::error)
97-
.doFinally(signalType -> newObservation.stop());
109+
.doFinally(signalType -> newObservation.stop())
110+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
98111
}
99112

100113
@Override
@@ -109,7 +122,8 @@ public Flux<Payload> requestStream(Payload payload) {
109122
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext);
110123
return super.requestStream(rSocketContext.modifiedPayload)
111124
.doOnError(newObservation::error)
112-
.doFinally(signalType -> newObservation.stop());
125+
.doFinally(signalType -> newObservation.stop())
126+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
113127
}
114128

115129
@Override
@@ -137,7 +151,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
137151
}
138152
return super.requestChannel(flux.skip(1).startWith(rSocketContext.modifiedPayload))
139153
.doOnError(newObservation::error)
140-
.doFinally(signalType -> newObservation.stop());
154+
.doFinally(signalType -> newObservation.stop())
155+
.contextWrite(
156+
context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
141157
}
142158
return flux;
143159
});
@@ -160,8 +176,4 @@ private String route(Payload payload, ByteBuf headers) {
160176
}
161177
return null;
162178
}
163-
164-
public void setObservationConvention(RSocketResponderObservationConvention convention) {
165-
this.observationConvention = convention;
166-
}
167179
}

rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ public Tracer getTracer() {
6969
public void onStart(RSocketContext context) {
7070
Payload payload = context.payload;
7171
Span.Builder spanBuilder = this.tracer.spanBuilder();
72+
Span parentSpan = getParentSpan(context);
73+
if (parentSpan != null) {
74+
spanBuilder.setParent(parentSpan.context());
75+
}
7276
Span span = spanBuilder.kind(Span.Kind.PRODUCER).start();
7377
log.debug("Extracted result from context or thread local {}", span);
7478
// TODO: newmetadata returns an empty composite byte buf

0 commit comments

Comments
 (0)