Skip to content

Commit a67f278

Browse files
authored
Merge branch 'master' into invoke_no_typeref_master
2 parents f81e746 + 83e1753 commit a67f278

File tree

12 files changed

+423
-18
lines changed

12 files changed

+423
-18
lines changed

dapr-spring/dapr-spring-boot-autoconfigure/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
<optional>true</optional>
2929
</dependency>
3030
<dependency>
31-
<groupId>org.springframework.boot</groupId>
32-
<artifactId>spring-boot-starter</artifactId>
31+
<groupId>org.springframework.boot</groupId>
32+
<artifactId>spring-boot-starter</artifactId>
3333
</dependency>
3434
<dependency>
3535
<groupId>org.springframework.boot</groupId>

dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/pubsub/DaprPubSubProperties.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class DaprPubSubProperties {
2525
* Name of the PubSub Dapr component.
2626
*/
2727
private String name;
28+
private boolean observationEnabled;
2829

2930
public String getName() {
3031
return name;
@@ -34,4 +35,11 @@ public void setName(String name) {
3435
this.name = name;
3536
}
3637

38+
public boolean isObservationEnabled() {
39+
return observationEnabled;
40+
}
41+
42+
public void setObservationEnabled(boolean observationEnabled) {
43+
this.observationEnabled = observationEnabled;
44+
}
3745
}

dapr-spring/dapr-spring-messaging/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
<artifactId>dapr-spring-messaging</artifactId>
1313
<name>dapr-spring-messaging</name>
1414
<description>Dapr Spring Messaging</description>
15-
<packaging>jar</packaging>
15+
<packaging>jar</packaging>
1616

1717
</project>

dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java

Lines changed: 132 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,96 @@
1515

1616
import io.dapr.client.DaprClient;
1717
import io.dapr.client.domain.Metadata;
18+
import io.dapr.spring.messaging.observation.DaprMessagingObservationConvention;
19+
import io.dapr.spring.messaging.observation.DaprMessagingObservationDocumentation;
20+
import io.dapr.spring.messaging.observation.DaprMessagingSenderContext;
21+
import io.micrometer.observation.Observation;
22+
import io.micrometer.observation.ObservationRegistry;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
import org.springframework.beans.factory.BeanNameAware;
26+
import org.springframework.beans.factory.SmartInitializingSingleton;
27+
import org.springframework.context.ApplicationContext;
28+
import org.springframework.context.ApplicationContextAware;
1829
import reactor.core.publisher.Mono;
30+
import reactor.util.context.Context;
31+
32+
import javax.annotation.Nullable;
1933

2034
import java.util.Map;
2135

22-
public class DaprMessagingTemplate<T> implements DaprMessagingOperations<T> {
36+
/**
37+
* Create a new DaprMessagingTemplate.
38+
* @param <T> templated message type
39+
*/
40+
public class DaprMessagingTemplate<T> implements DaprMessagingOperations<T>, ApplicationContextAware, BeanNameAware,
41+
SmartInitializingSingleton {
2342

43+
private static final Logger LOGGER = LoggerFactory.getLogger(DaprMessagingTemplate.class);
2444
private static final String MESSAGE_TTL_IN_SECONDS = "10";
45+
private static final DaprMessagingObservationConvention DEFAULT_OBSERVATION_CONVENTION =
46+
DaprMessagingObservationConvention.getDefault();
2547

2648
private final DaprClient daprClient;
2749
private final String pubsubName;
50+
private final Map<String, String> metadata;
51+
private final boolean observationEnabled;
52+
53+
@Nullable
54+
private ApplicationContext applicationContext;
55+
56+
@Nullable
57+
private String beanName;
58+
59+
@Nullable
60+
private ObservationRegistry observationRegistry;
61+
62+
@Nullable
63+
private DaprMessagingObservationConvention observationConvention;
2864

29-
public DaprMessagingTemplate(DaprClient daprClient, String pubsubName) {
65+
/**
66+
* Constructs a new DaprMessagingTemplate.
67+
* @param daprClient Dapr client
68+
* @param pubsubName pubsub name
69+
* @param observationEnabled whether to enable observations
70+
*/
71+
public DaprMessagingTemplate(DaprClient daprClient, String pubsubName, boolean observationEnabled) {
3072
this.daprClient = daprClient;
3173
this.pubsubName = pubsubName;
74+
this.metadata = Map.of(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS);
75+
this.observationEnabled = observationEnabled;
76+
}
77+
78+
@Override
79+
public void setApplicationContext(ApplicationContext applicationContext) {
80+
this.applicationContext = applicationContext;
81+
}
82+
83+
@Override
84+
public void setBeanName(String beanName) {
85+
this.beanName = beanName;
86+
}
87+
88+
/**
89+
* If observations are enabled, attempt to obtain the Observation registry and
90+
* convention.
91+
*/
92+
@Override
93+
public void afterSingletonsInstantiated() {
94+
if (!observationEnabled) {
95+
LOGGER.debug("Observations are not enabled - not recording");
96+
return;
97+
}
98+
99+
if (applicationContext == null) {
100+
LOGGER.warn("Observations enabled but application context null - not recording");
101+
return;
102+
}
103+
104+
observationRegistry = applicationContext.getBeanProvider(ObservationRegistry.class)
105+
.getIfUnique(() -> observationRegistry);
106+
observationConvention = applicationContext.getBeanProvider(DaprMessagingObservationConvention.class)
107+
.getIfUnique(() -> observationConvention);
32108
}
33109

34110
@Override
@@ -38,29 +114,74 @@ public void send(String topic, T message) {
38114

39115
@Override
40116
public SendMessageBuilder<T> newMessage(T message) {
41-
return new SendMessageBuilderImpl<>(this, message);
117+
return new DefaultSendMessageBuilder<>(this, message);
42118
}
43119

44120
private void doSend(String topic, T message) {
45121
doSendAsync(topic, message).block();
46122
}
47123

48124
private Mono<Void> doSendAsync(String topic, T message) {
49-
return daprClient.publishEvent(pubsubName,
50-
topic,
51-
message,
52-
Map.of(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS));
125+
LOGGER.trace("Sending message to '{}' topic", topic);
126+
127+
if (canUseObservation()) {
128+
return publishEventWithObservation(pubsubName, topic, message);
129+
}
130+
131+
return publishEvent(pubsubName, topic, message);
132+
}
133+
134+
private boolean canUseObservation() {
135+
return observationEnabled && observationRegistry != null && beanName != null;
136+
}
137+
138+
private Mono<Void> publishEvent(String pubsubName, String topic, T message) {
139+
return daprClient.publishEvent(pubsubName, topic, message, metadata);
140+
}
141+
142+
private Mono<Void> publishEventWithObservation(String pubsubName, String topic, T message) {
143+
DaprMessagingSenderContext senderContext = DaprMessagingSenderContext.newContext(topic, this.beanName);
144+
Observation observation = createObservation(senderContext);
145+
146+
observation.start();
147+
148+
return publishEvent(pubsubName, topic, message)
149+
.contextWrite(getReactorContext(senderContext))
150+
.doOnError(err -> {
151+
LOGGER.error("Failed to send msg to '{}' topic", topic, err);
152+
153+
observation.error(err);
154+
observation.stop();
155+
})
156+
.doOnSuccess(ignore -> {
157+
LOGGER.trace("Sent msg to '{}' topic", topic);
158+
159+
observation.stop();
160+
});
161+
}
162+
163+
private Context getReactorContext(DaprMessagingSenderContext senderContext) {
164+
return Context.of(senderContext.properties());
165+
}
166+
167+
private Observation createObservation(DaprMessagingSenderContext senderContext) {
168+
return DaprMessagingObservationDocumentation.TEMPLATE_OBSERVATION.observation(
169+
observationConvention,
170+
DEFAULT_OBSERVATION_CONVENTION,
171+
() -> senderContext,
172+
observationRegistry
173+
);
53174
}
54175

55-
private static class SendMessageBuilderImpl<T> implements SendMessageBuilder<T> {
176+
private static class DefaultSendMessageBuilder<T> implements SendMessageBuilder<T> {
56177

57178
private final DaprMessagingTemplate<T> template;
58179

59180
private final T message;
60181

61182
private String topic;
62183

63-
SendMessageBuilderImpl(DaprMessagingTemplate<T> template, T message) {
184+
DefaultSendMessageBuilder(DaprMessagingTemplate<T> template, T message) {
64185
this.template = template;
65186
this.message = message;
66187
}
@@ -74,12 +195,12 @@ public SendMessageBuilder<T> withTopic(String topic) {
74195

75196
@Override
76197
public void send() {
77-
this.template.doSend(this.topic, this.message);
198+
template.doSend(topic, message);
78199
}
79200

80201
@Override
81202
public Mono<Void> sendAsync() {
82-
return this.template.doSendAsync(this.topic, this.message);
203+
return template.doSendAsync(topic, message);
83204
}
84205

85206
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2024 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.spring.messaging.observation;
15+
16+
import io.micrometer.observation.Observation.Context;
17+
import io.micrometer.observation.ObservationConvention;
18+
19+
/**
20+
* {@link ObservationConvention} for Dapr Messaging.
21+
*
22+
*/
23+
public interface DaprMessagingObservationConvention extends ObservationConvention<DaprMessagingSenderContext> {
24+
25+
@Override
26+
default boolean supportsContext(Context context) {
27+
return context instanceof DaprMessagingSenderContext;
28+
}
29+
30+
@Override
31+
default String getName() {
32+
return "spring.dapr.messaging.template";
33+
}
34+
35+
static DaprMessagingObservationConvention getDefault() {
36+
return DefaultDaprMessagingObservationConvention.INSTANCE;
37+
}
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2024 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.spring.messaging.observation;
15+
16+
import io.micrometer.common.docs.KeyName;
17+
import io.micrometer.observation.Observation;
18+
import io.micrometer.observation.Observation.Context;
19+
import io.micrometer.observation.ObservationConvention;
20+
import io.micrometer.observation.docs.ObservationDocumentation;
21+
22+
/**
23+
* An {@link Observation} for {@link io.dapr.spring.messaging.DaprMessagingTemplate}.
24+
*
25+
*/
26+
public enum DaprMessagingObservationDocumentation implements ObservationDocumentation {
27+
28+
/**
29+
* Observation created when a Dapr template sends a message.
30+
*/
31+
TEMPLATE_OBSERVATION {
32+
33+
@Override
34+
public Class<? extends ObservationConvention<? extends Context>> getDefaultConvention() {
35+
return DefaultDaprMessagingObservationConvention.class;
36+
}
37+
38+
@Override
39+
public String getPrefix() {
40+
return "spring.dapr.messaging.template";
41+
}
42+
43+
@Override
44+
public KeyName[] getLowCardinalityKeyNames() {
45+
return TemplateLowCardinalityTags.values();
46+
}
47+
};
48+
49+
/**
50+
* Low cardinality tags.
51+
*/
52+
public enum TemplateLowCardinalityTags implements KeyName {
53+
/**
54+
* Bean name of the template that sent the message.
55+
*/
56+
BEAN_NAME {
57+
58+
@Override
59+
public String asString() {
60+
return "spring.dapr.messaging.template.name";
61+
}
62+
}
63+
}
64+
}

0 commit comments

Comments
 (0)