1515
1616import io .dapr .client .DaprClient ;
1717import io .dapr .client .domain .Metadata ;
18+ import io .dapr .spring .messaging .observation .DaprMessagingObservationConvention ;
19+ import io .dapr .spring .messaging .observation .DaprMessagingObservationDocumentation ;
20+ import io .dapr .spring .messaging .observation .DaprMessagingSenderContext ;
21+ import io .dapr .spring .messaging .observation .DefaultDaprMessagingObservationConvention ;
22+ import io .micrometer .observation .Observation ;
23+ import io .micrometer .observation .ObservationRegistry ;
24+ import org .slf4j .Logger ;
25+ import org .slf4j .LoggerFactory ;
26+ import org .springframework .beans .factory .BeanNameAware ;
27+ import org .springframework .beans .factory .SmartInitializingSingleton ;
28+ import org .springframework .context .ApplicationContext ;
29+ import org .springframework .context .ApplicationContextAware ;
1830import reactor .core .publisher .Mono ;
1931
32+ import javax .annotation .Nullable ;
33+
2034import java .util .Map ;
2135
22- public class DaprMessagingTemplate <T > implements DaprMessagingOperations <T > {
36+ /**
37+ * Create a new DaprMessagingTemplate.
38+ * @param <T> templated message type
39+ */
40+ public class DaprMessagingTemplate <T > implements DaprMessagingOperations <T >, ApplicationContextAware , BeanNameAware ,
41+ SmartInitializingSingleton {
2342
43+ private static final Logger LOGGER = LoggerFactory .getLogger (DaprMessagingTemplate .class );
2444 private static final String MESSAGE_TTL_IN_SECONDS = "10" ;
45+ private static final DaprMessagingObservationConvention DEFAULT_OBSERVATION_CONVENTION =
46+ DefaultDaprMessagingObservationConvention .INSTANCE ;
2547
2648 private final DaprClient daprClient ;
2749 private final String pubsubName ;
50+ private final Map <String , String > metadata ;
51+ private final boolean observationEnabled ;
52+
53+ @ Nullable
54+ private ApplicationContext applicationContext ;
55+
56+ @ Nullable
57+ private String beanName ;
58+
59+ @ Nullable
60+ private ObservationRegistry observationRegistry ;
2861
29- public DaprMessagingTemplate (DaprClient daprClient , String pubsubName ) {
62+ @ Nullable
63+ private DaprMessagingObservationConvention observationConvention ;
64+
65+ /**
66+ * Constructs a new DaprMessagingTemplate.
67+ * @param daprClient Dapr client
68+ * @param pubsubName pubsub name
69+ * @param observationEnabled whether to enable observations
70+ */
71+ public DaprMessagingTemplate (DaprClient daprClient , String pubsubName , boolean observationEnabled ) {
3072 this .daprClient = daprClient ;
3173 this .pubsubName = pubsubName ;
74+ this .metadata = Map .of (Metadata .TTL_IN_SECONDS , MESSAGE_TTL_IN_SECONDS );
75+ this .observationEnabled = observationEnabled ;
76+ }
77+
78+ @ Override
79+ public void setApplicationContext (ApplicationContext applicationContext ) {
80+ this .applicationContext = applicationContext ;
81+ }
82+
83+ @ Override
84+ public void setBeanName (String beanName ) {
85+ this .beanName = beanName ;
86+ }
87+
88+ /**
89+ * If observations are enabled, attempt to obtain the Observation registry and
90+ * convention.
91+ */
92+ @ Override
93+ public void afterSingletonsInstantiated () {
94+ if (!observationEnabled ) {
95+ LOGGER .debug ("Observations are not enabled - not recording" );
96+ return ;
97+ }
98+
99+ if (applicationContext == null ) {
100+ LOGGER .warn ("Observations enabled but application context null - not recording" );
101+ return ;
102+ }
103+
104+ observationRegistry = applicationContext .getBeanProvider (ObservationRegistry .class )
105+ .getIfUnique (() -> observationRegistry );
106+ observationConvention = applicationContext .getBeanProvider (DaprMessagingObservationConvention .class )
107+ .getIfUnique (() -> observationConvention );
32108 }
33109
34110 @ Override
@@ -38,29 +114,72 @@ public void send(String topic, T message) {
38114
39115 @ Override
40116 public SendMessageBuilder <T > newMessage (T message ) {
41- return new SendMessageBuilderImpl <>(this , message );
117+ return new DeefaultSendMessageBuilder <>(this , message );
42118 }
43119
44120 private void doSend (String topic , T message ) {
45121 doSendAsync (topic , message ).block ();
46122 }
47123
48124 private Mono <Void > doSendAsync (String topic , T message ) {
49- return daprClient .publishEvent (pubsubName ,
50- topic ,
51- message ,
52- Map .of (Metadata .TTL_IN_SECONDS , MESSAGE_TTL_IN_SECONDS ));
125+ LOGGER .trace ("Sending message to '{}' topic" , topic );
126+
127+ if (canUseObservation ()) {
128+ return publishEventWithObservation (pubsubName , topic , message );
129+ }
130+
131+ return publishEvent (pubsubName , topic , message );
132+ }
133+
134+ private boolean canUseObservation () {
135+ return observationEnabled
136+ && observationRegistry != null
137+ && observationConvention != null
138+ && beanName != null ;
139+ }
140+
141+ private Mono <Void > publishEvent (String pubsubName , String topic , T message ) {
142+ return daprClient .publishEvent (pubsubName , topic , message , metadata );
143+ }
144+
145+ private Mono <Void > publishEventWithObservation (String pubsubName , String topic , T message ) {
146+ DaprMessagingSenderContext senderContext = DaprMessagingSenderContext .newContext (topic , this .beanName );
147+ Observation observation = createObservation (senderContext );
148+
149+ return observation .observe (() ->
150+ publishEvent (pubsubName , topic , message )
151+ .doOnError (err -> {
152+ LOGGER .error ("Failed to send msg to '{}' topic" , topic , err );
153+
154+ observation .error (err );
155+ observation .stop ();
156+ })
157+ .doOnSuccess (ignore -> {
158+ LOGGER .trace ("Sent msg to '{}' topic" , topic );
159+
160+ observation .stop ();
161+ })
162+ );
163+ }
164+
165+ private Observation createObservation (DaprMessagingSenderContext senderContext ) {
166+ return DaprMessagingObservationDocumentation .TEMPLATE_OBSERVATION .observation (
167+ observationConvention ,
168+ DEFAULT_OBSERVATION_CONVENTION ,
169+ () -> senderContext ,
170+ observationRegistry
171+ );
53172 }
54173
55- private static class SendMessageBuilderImpl <T > implements SendMessageBuilder <T > {
174+ private static class DeefaultSendMessageBuilder <T > implements SendMessageBuilder <T > {
56175
57176 private final DaprMessagingTemplate <T > template ;
58177
59178 private final T message ;
60179
61180 private String topic ;
62181
63- SendMessageBuilderImpl (DaprMessagingTemplate <T > template , T message ) {
182+ DeefaultSendMessageBuilder (DaprMessagingTemplate <T > template , T message ) {
64183 this .template = template ;
65184 this .message = message ;
66185 }
@@ -74,12 +193,12 @@ public SendMessageBuilder<T> withTopic(String topic) {
74193
75194 @ Override
76195 public void send () {
77- this . template .doSend (this . topic , this . message );
196+ template .doSend (topic , message );
78197 }
79198
80199 @ Override
81200 public Mono <Void > sendAsync () {
82- return this . template .doSendAsync (this . topic , this . message );
201+ return template .doSendAsync (topic , message );
83202 }
84203
85204 }
0 commit comments