Skip to content

Commit 305baee

Browse files
committed
Fix unit tests and simplify implementation
Signed-off-by: Artur Ciocanu <[email protected]>
1 parent b363e49 commit 305baee

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
import java.util.Iterator;
135135
import java.util.List;
136136
import java.util.Map;
137+
import java.util.concurrent.atomic.AtomicReference;
137138
import java.util.function.Consumer;
138139
import java.util.function.Function;
139140
import java.util.stream.Collectors;
@@ -494,33 +495,39 @@ public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T>
494495
return Flux.create(sink -> {
495496
var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
496497

497-
// Use array wrapper to allow assignment within anonymous class (Java's effectively final requirement)
498-
// This is simpler than AtomicReference since we don't need atomicity - just mutability
499-
@SuppressWarnings("unchecked")
500-
StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>[] streamHolder = new StreamObserver[1];
498+
// We need AtomicReference because we're accessing the stream reference from within the anonymous
499+
// StreamObserver implementation (to send acks). Java requires variables used in lambdas/anonymous
500+
// classes to be effectively final, so we can't use a plain variable. AtomicReference provides
501+
// the mutable container we need while keeping the reference itself final.
502+
AtomicReference<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>> streamRef =
503+
new AtomicReference<>();
501504

502505
// Create the gRPC bidirectional streaming observer
503506
// Note: StreamObserver.onNext() is thread-safe, so we can send acks directly
504-
streamHolder[0] = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
507+
streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
505508
@Override
506509
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
507510
try {
508511
if (response.getEventMessage() == null) {
509512
return;
510513
}
511514

512-
var message = response.getEventMessage();
513-
if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) {
515+
DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
516+
String pubsubName = message.getPubsubName();
517+
518+
if (pubsubName == null || pubsubName.isEmpty()) {
514519
return;
515520
}
516521

517522
var id = message.getId();
518-
if ((id == null) || id.isEmpty()) {
523+
524+
if (id == null || id.isEmpty()) {
519525
return;
520526
}
521527

522528
// Deserialize the event data
523529
T data = null;
530+
524531
if (type != null) {
525532
data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
526533
}
@@ -530,23 +537,26 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
530537
sink.next(data);
531538
}
532539

533-
// Send SUCCESS acknowledgment directly (no blocking queue or thread needed)
540+
// Send SUCCESS acknowledgment directly
534541
var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS);
535-
streamHolder[0].onNext(ack);
536542

543+
streamRef.get().onNext(ack);
537544
} catch (Exception e) {
538545
// On error during processing, send RETRY acknowledgment
539546
try {
540547
var id = response.getEventMessage().getId();
548+
541549
if (id != null && !id.isEmpty()) {
542550
var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY);
543-
streamHolder[0].onNext(ack);
551+
552+
streamRef.get().onNext(ack);
544553
}
545554
} catch (Exception ex) {
546555
// If we can't send ack, propagate the error
547556
sink.error(DaprException.propagate(ex));
548557
return;
549558
}
559+
550560
sink.error(DaprException.propagate(e));
551561
}
552562
}
@@ -560,15 +570,15 @@ public void onError(Throwable throwable) {
560570
public void onCompleted() {
561571
sink.complete();
562572
}
563-
});
573+
}));
564574

565575
// Send initial request to start receiving events
566-
streamHolder[0].onNext(request);
576+
streamRef.get().onNext(request);
567577

568578
// Cleanup when Flux is cancelled or completed
569579
sink.onDispose(() -> {
570580
try {
571-
streamHolder[0].onCompleted();
581+
streamRef.get().onCompleted();
572582
} catch (Exception e) {
573583
// Ignore cleanup errors
574584
}

sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -636,14 +636,14 @@ public void onCompleted() {
636636
};
637637
}).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
638638

639-
final Set<String> receivedEvents = Collections.synchronizedSet(new HashSet<>());
639+
final AtomicInteger eventCount = new AtomicInteger(0);
640640
final Semaphore gotAll = new Semaphore(0);
641641

642642
var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING)
643643
.doOnNext(eventData -> {
644644
assertEquals(data, eventData);
645-
receivedEvents.add(eventData);
646-
if (receivedEvents.size() >= numEvents) {
645+
int count = eventCount.incrementAndGet();
646+
if (count >= numEvents) {
647647
gotAll.release();
648648
}
649649
})
@@ -652,7 +652,7 @@ public void onCompleted() {
652652
gotAll.acquire();
653653
disposable.dispose();
654654

655-
assertEquals(numEvents, receivedEvents.size());
655+
assertEquals(numEvents, eventCount.get());
656656
}
657657

658658
@Test

0 commit comments

Comments
 (0)