2323import com .google .cloud .pubsublite .SequencedMessage ;
2424import com .google .cloud .pubsublite .internal .CloseableMonitor ;
2525import com .google .cloud .pubsublite .internal .ExtractStatus ;
26+ import com .google .cloud .pubsublite .internal .ProxyService ;
2627import com .google .cloud .pubsublite .internal .wire .Committer ;
2728import com .google .common .collect .ImmutableMap ;
2829import com .google .errorprone .annotations .concurrent .GuardedBy ;
3334import java .io .IOException ;
3435import java .util .ArrayDeque ;
3536import java .util .ArrayList ;
37+ import java .util .Collection ;
3638import java .util .List ;
3739import java .util .Map ;
3840import java .util .NoSuchElementException ;
3941import java .util .Optional ;
4042import java .util .Queue ;
43+ import java .util .function .Consumer ;
44+ import java .util .stream .Collectors ;
4145import org .apache .beam .sdk .io .UnboundedSource ;
4246import org .apache .beam .sdk .io .UnboundedSource .CheckpointMark ;
4347import org .apache .beam .sdk .io .UnboundedSource .UnboundedReader ;
@@ -52,17 +56,51 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
5256 @ GuardedBy ("monitor.monitor" )
5357 private final ImmutableMap <Partition , SubscriberState > subscriberMap ;
5458
59+ private final CommitterProxy committerProxy ;
60+
5561 @ GuardedBy ("monitor.monitor" )
5662 private final Queue <PartitionedSequencedMessage > messages = new ArrayDeque <>();
5763
5864 @ GuardedBy ("monitor.monitor" )
5965 private Optional <StatusException > permanentError = Optional .empty ();
6066
67+ private static class CommitterProxy extends ProxyService {
68+ private final Consumer <StatusException > permanentErrorSetter ;
69+
70+ CommitterProxy (
71+ Collection <SubscriberState > states , Consumer <StatusException > permanentErrorSetter )
72+ throws StatusException {
73+ this .permanentErrorSetter = permanentErrorSetter ;
74+ addServices (states .stream ().map (state -> state .committer ).collect (Collectors .toList ()));
75+ }
76+
77+ @ Override
78+ protected void start () {}
79+
80+ @ Override
81+ protected void stop () {}
82+
83+ @ Override
84+ protected void handlePermanentError (StatusException error ) {
85+ permanentErrorSetter .accept (error );
86+ }
87+ }
88+
6189 public PubsubLiteUnboundedReader (
6290 UnboundedSource <SequencedMessage , ?> source ,
63- ImmutableMap <Partition , SubscriberState > subscriberMap ) {
91+ ImmutableMap <Partition , SubscriberState > subscriberMap )
92+ throws StatusException {
6493 this .source = source ;
6594 this .subscriberMap = subscriberMap ;
95+ this .committerProxy =
96+ new CommitterProxy (
97+ subscriberMap .values (),
98+ error -> {
99+ try (CloseableMonitor .Hold h = monitor .enter ()) {
100+ permanentError = Optional .of (permanentError .orElse (error ));
101+ }
102+ });
103+ this .committerProxy .startAsync ().awaitRunning ();
66104 }
67105
68106 @ Override
@@ -188,14 +226,14 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
188226 public void close () {
189227 try (CloseableMonitor .Hold h = monitor .enter ()) {
190228 for (SubscriberState state : subscriberMap .values ()) {
191- state .committer .stopAsync ().awaitTerminated ();
192229 try {
193230 state .subscriber .close ();
194231 } catch (Exception e ) {
195232 throw new IllegalStateException (e );
196233 }
197234 }
198235 }
236+ committerProxy .stopAsync ().awaitTerminated ();
199237 }
200238
201239 @ Override
0 commit comments