Skip to content

Commit 47ed16c

Browse files
committed
Adding event subscriber stream observer to simplify subscription logic
Signed-off-by: Artur Ciocanu <[email protected]>
1 parent 305baee commit 47ed16c

File tree

2 files changed

+201
-68
lines changed

2 files changed

+201
-68
lines changed

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 10 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
9292
import io.dapr.internal.resiliency.RetryPolicy;
9393
import io.dapr.internal.resiliency.TimeoutPolicy;
94+
import io.dapr.internal.subscription.EventSubscriberStreamObserver;
9495
import io.dapr.serializer.DaprObjectSerializer;
9596
import io.dapr.serializer.DefaultObjectSerializer;
9697
import io.dapr.utils.DefaultContentTypeConverter;
@@ -495,82 +496,23 @@ public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T>
495496
return Flux.create(sink -> {
496497
var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
497498

498-
// We need AtomicReference because we're accessing the stream reference from within the anonymous
499-
// StreamObserver implementation (to send acks). Java requires variables used in lambdas/anonymous
499+
// We need AtomicReference because we're accessing the stream reference from within the
500+
// EventSubscriberStreamObserver (to send acks). Java requires variables used in lambdas/anonymous
500501
// classes to be effectively final, so we can't use a plain variable. AtomicReference provides
501502
// the mutable container we need while keeping the reference itself final.
502503
AtomicReference<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>> streamRef =
503504
new AtomicReference<>();
504505

505506
// Create the gRPC bidirectional streaming observer
506507
// Note: StreamObserver.onNext() is thread-safe, so we can send acks directly
507-
streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
508-
@Override
509-
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
510-
try {
511-
if (response.getEventMessage() == null) {
512-
return;
513-
}
514-
515-
DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
516-
String pubsubName = message.getPubsubName();
517-
518-
if (pubsubName == null || pubsubName.isEmpty()) {
519-
return;
520-
}
521-
522-
var id = message.getId();
523-
524-
if (id == null || id.isEmpty()) {
525-
return;
526-
}
527-
528-
// Deserialize the event data
529-
T data = null;
530-
531-
if (type != null) {
532-
data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
533-
}
534-
535-
// Emit the data to the Flux (only if not null)
536-
if (data != null) {
537-
sink.next(data);
538-
}
539-
540-
// Send SUCCESS acknowledgment directly
541-
var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS);
542-
543-
streamRef.get().onNext(ack);
544-
} catch (Exception e) {
545-
// On error during processing, send RETRY acknowledgment
546-
try {
547-
var id = response.getEventMessage().getId();
548-
549-
if (id != null && !id.isEmpty()) {
550-
var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY);
551-
552-
streamRef.get().onNext(ack);
553-
}
554-
} catch (Exception ex) {
555-
// If we can't send ack, propagate the error
556-
sink.error(DaprException.propagate(ex));
557-
return;
558-
}
559-
560-
sink.error(DaprException.propagate(e));
561-
}
562-
}
563-
564-
@Override
565-
public void onError(Throwable throwable) {
566-
sink.error(DaprException.propagate(throwable));
567-
}
508+
EventSubscriberStreamObserver<T> observer = new EventSubscriberStreamObserver<>(
509+
sink,
510+
type,
511+
this.objectSerializer,
512+
streamRef
513+
);
568514

569-
@Override
570-
public void onCompleted() {
571-
sink.complete();
572-
}
573-
}));
515+
streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(observer));
574516

575517
// Send initial request to start receiving events
576518
streamRef.get().onNext(request);
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright 2024 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.internal.subscription;
15+
16+
import io.dapr.exceptions.DaprException;
17+
import io.dapr.serializer.DaprObjectSerializer;
18+
import io.dapr.utils.TypeRef;
19+
import io.dapr.v1.DaprAppCallbackProtos;
20+
import io.dapr.v1.DaprProtos;
21+
import io.grpc.stub.StreamObserver;
22+
import reactor.core.publisher.FluxSink;
23+
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
/**
27+
* StreamObserver implementation for subscribing to Dapr pub/sub events.
28+
* <p>
29+
* This class handles the bidirectional gRPC streaming for event subscriptions, including:
30+
* <ul>
31+
* <li>Receiving events from Dapr</li>
32+
* <li>Deserializing event payloads</li>
33+
* <li>Emitting deserialized data to a Reactor Flux</li>
34+
* <li>Sending acknowledgments (SUCCESS/RETRY) back to Dapr</li>
35+
* </ul>
36+
* </p>
37+
*
38+
* @param <T> The type of the event payload
39+
*/
40+
public class EventSubscriberStreamObserver<T> implements StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1> {
41+
42+
private final FluxSink<T> sink;
43+
private final TypeRef<T> type;
44+
private final DaprObjectSerializer objectSerializer;
45+
private final AtomicReference<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>> requestStreamRef;
46+
47+
/**
48+
* Creates a new EventSubscriberStreamObserver.
49+
*
50+
* @param sink The FluxSink to emit deserialized events to
51+
* @param type The TypeRef for deserializing event payloads
52+
* @param objectSerializer The serializer to use for deserialization
53+
* @param requestStreamRef Reference to the request stream for sending acknowledgments
54+
*/
55+
public EventSubscriberStreamObserver(
56+
FluxSink<T> sink,
57+
TypeRef<T> type,
58+
DaprObjectSerializer objectSerializer,
59+
AtomicReference<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>> requestStreamRef) {
60+
this.sink = sink;
61+
this.type = type;
62+
this.objectSerializer = objectSerializer;
63+
this.requestStreamRef = requestStreamRef;
64+
}
65+
66+
@Override
67+
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
68+
try {
69+
if (response.getEventMessage() == null) {
70+
return;
71+
}
72+
73+
DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
74+
String pubsubName = message.getPubsubName();
75+
76+
if (pubsubName == null || pubsubName.isEmpty()) {
77+
return;
78+
}
79+
80+
String id = message.getId();
81+
82+
if (id == null || id.isEmpty()) {
83+
return;
84+
}
85+
86+
// Deserialize the event data
87+
T data = null;
88+
89+
if (type != null) {
90+
data = objectSerializer.deserialize(message.getData().toByteArray(), type);
91+
}
92+
93+
// Emit the data to the Flux (only if not null)
94+
if (data != null) {
95+
sink.next(data);
96+
}
97+
98+
// Send SUCCESS acknowledgment directly
99+
var ack = buildSuccessAck(id);
100+
101+
requestStreamRef.get().onNext(ack);
102+
} catch (Exception e) {
103+
// On error during processing, send RETRY acknowledgment
104+
try {
105+
var id = response.getEventMessage().getId();
106+
107+
if (id != null && !id.isEmpty()) {
108+
var ack = buildRetryAck(id);
109+
110+
requestStreamRef.get().onNext(ack);
111+
}
112+
} catch (Exception ex) {
113+
// If we can't send ack, propagate the error
114+
sink.error(DaprException.propagate(ex));
115+
return;
116+
}
117+
118+
sink.error(DaprException.propagate(e));
119+
}
120+
}
121+
122+
@Override
123+
public void onError(Throwable throwable) {
124+
sink.error(DaprException.propagate(throwable));
125+
}
126+
127+
@Override
128+
public void onCompleted() {
129+
sink.complete();
130+
}
131+
132+
/**
133+
* Builds a SUCCESS acknowledgment request.
134+
*
135+
* @param eventId The ID of the event to acknowledge
136+
* @return The acknowledgment request
137+
*/
138+
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildSuccessAck(String eventId) {
139+
return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS);
140+
}
141+
142+
/**
143+
* Builds a RETRY acknowledgment request.
144+
*
145+
* @param eventId The ID of the event to acknowledge
146+
* @return The acknowledgment request
147+
*/
148+
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildRetryAck(String eventId) {
149+
return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY);
150+
}
151+
152+
/**
153+
* Builds a DROP acknowledgment request.
154+
*
155+
* @param eventId The ID of the event to acknowledge
156+
* @return The acknowledgment request
157+
*/
158+
@SuppressWarnings("unused") // May be used in the future
159+
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String eventId) {
160+
return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP);
161+
}
162+
163+
/**
164+
* Builds an acknowledgment request with the specified status.
165+
* <p>
166+
* This method directly uses the protobuf enum instead of depending on
167+
* {@code SubscriptionListener.Status} to keep this class independent
168+
* of the older callback-based API.
169+
* </p>
170+
*
171+
* @param eventId The ID of the event to acknowledge
172+
* @param status The acknowledgment status (SUCCESS, RETRY, or DROP)
173+
* @return The acknowledgment request
174+
*/
175+
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
176+
String eventId,
177+
DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus status) {
178+
DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =
179+
DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder()
180+
.setId(eventId)
181+
.setStatus(
182+
DaprAppCallbackProtos.TopicEventResponse.newBuilder()
183+
.setStatus(status)
184+
.build())
185+
.build();
186+
187+
return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
188+
.setEventProcessed(eventProcessed)
189+
.build();
190+
}
191+
}

0 commit comments

Comments
 (0)