|
134 | 134 | import java.util.Iterator; |
135 | 135 | import java.util.List; |
136 | 136 | import java.util.Map; |
137 | | -import java.util.concurrent.BlockingQueue; |
138 | | -import java.util.concurrent.LinkedBlockingQueue; |
139 | | -import java.util.concurrent.atomic.AtomicBoolean; |
140 | | -import java.util.concurrent.atomic.AtomicReference; |
141 | 137 | import java.util.function.Consumer; |
142 | 138 | import java.util.function.Function; |
143 | 139 | import java.util.stream.Collectors; |
@@ -497,43 +493,15 @@ public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> |
497 | 493 |
|
498 | 494 | return Flux.create(sink -> { |
499 | 495 | var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); |
500 | | - BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<>(50); |
501 | | - AtomicReference<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>> streamRef = |
502 | | - new AtomicReference<>(); |
503 | | - AtomicBoolean running = new AtomicBoolean(true); |
504 | | - |
505 | | - // Thread to send acknowledgments back to Dapr |
506 | | - Thread acker = new Thread(() -> { |
507 | | - while (running.get()) { |
508 | | - try { |
509 | | - var ackResponse = ackQueue.take(); |
510 | | - if (ackResponse == null) { |
511 | | - continue; |
512 | | - } |
513 | 496 |
|
514 | | - var stream = streamRef.get(); |
515 | | - if (stream == null) { |
516 | | - Thread.sleep(100); |
517 | | - continue; |
518 | | - } |
| 497 | + // Use array wrapper to allow assignment within anonymous class (Java's effectively final requirement) |
| 498 | + // This is simpler than AtomicReference since we don't need atomicity - just mutability |
| 499 | + @SuppressWarnings("unchecked") |
| 500 | + StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>[] streamHolder = new StreamObserver[1]; |
519 | 501 |
|
520 | | - stream.onNext(ackResponse); |
521 | | - } catch (InterruptedException e) { |
522 | | - Thread.currentThread().interrupt(); |
523 | | - return; |
524 | | - } catch (Exception e) { |
525 | | - try { |
526 | | - Thread.sleep(100); |
527 | | - } catch (InterruptedException ex) { |
528 | | - Thread.currentThread().interrupt(); |
529 | | - return; |
530 | | - } |
531 | | - } |
532 | | - } |
533 | | - }); |
534 | | - |
535 | | - // Create the gRPC streaming observer |
536 | | - var stream = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { |
| 502 | + // Create the gRPC bidirectional streaming observer |
| 503 | + // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly |
| 504 | + streamHolder[0] = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { |
537 | 505 | @Override |
538 | 506 | public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { |
539 | 507 | try { |
@@ -562,50 +530,45 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { |
562 | 530 | sink.next(data); |
563 | 531 | } |
564 | 532 |
|
565 | | - // Send SUCCESS acknowledgment |
| 533 | + // Send SUCCESS acknowledgment directly (no blocking queue or thread needed) |
566 | 534 | var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS); |
567 | | - ackQueue.put(ack); |
| 535 | + streamHolder[0].onNext(ack); |
568 | 536 |
|
569 | 537 | } catch (Exception e) { |
570 | 538 | // On error during processing, send RETRY acknowledgment |
571 | 539 | try { |
572 | 540 | var id = response.getEventMessage().getId(); |
573 | 541 | if (id != null && !id.isEmpty()) { |
574 | 542 | var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY); |
575 | | - ackQueue.put(ack); |
| 543 | + streamHolder[0].onNext(ack); |
576 | 544 | } |
577 | | - } catch (InterruptedException ex) { |
578 | | - Thread.currentThread().interrupt(); |
| 545 | + } catch (Exception ex) { |
| 546 | + // If we can't send ack, propagate the error |
| 547 | + sink.error(DaprException.propagate(ex)); |
| 548 | + return; |
579 | 549 | } |
580 | 550 | sink.error(DaprException.propagate(e)); |
581 | 551 | } |
582 | 552 | } |
583 | 553 |
|
584 | 554 | @Override |
585 | 555 | public void onError(Throwable throwable) { |
586 | | - running.set(false); |
587 | 556 | sink.error(DaprException.propagate(throwable)); |
588 | 557 | } |
589 | 558 |
|
590 | 559 | @Override |
591 | 560 | public void onCompleted() { |
592 | | - running.set(false); |
593 | 561 | sink.complete(); |
594 | 562 | } |
595 | 563 | }); |
596 | 564 |
|
597 | | - streamRef.set(stream); |
598 | | - acker.start(); |
599 | | - |
600 | 565 | // Send initial request to start receiving events |
601 | | - stream.onNext(request); |
| 566 | + streamHolder[0].onNext(request); |
602 | 567 |
|
603 | 568 | // Cleanup when Flux is cancelled or completed |
604 | 569 | sink.onDispose(() -> { |
605 | | - running.set(false); |
606 | | - acker.interrupt(); |
607 | 570 | try { |
608 | | - stream.onCompleted(); |
| 571 | + streamHolder[0].onCompleted(); |
609 | 572 | } catch (Exception e) { |
610 | 573 | // Ignore cleanup errors |
611 | 574 | } |
|
0 commit comments