2424import com .google .cloud .pubsublite .SubscriptionPath ;
2525import com .google .cloud .pubsublite .SubscriptionPaths ;
2626import com .google .cloud .pubsublite .cloudpubsub .internal .AckSetTrackerImpl ;
27+ import com .google .cloud .pubsublite .cloudpubsub .internal .AssigningSubscriber ;
2728import com .google .cloud .pubsublite .cloudpubsub .internal .MultiPartitionSubscriber ;
29+ import com .google .cloud .pubsublite .cloudpubsub .internal .PartitionSubscriberFactory ;
2830import com .google .cloud .pubsublite .cloudpubsub .internal .SinglePartitionSubscriber ;
2931import com .google .cloud .pubsublite .internal .Preconditions ;
32+ import com .google .cloud .pubsublite .internal .wire .AssignerBuilder ;
33+ import com .google .cloud .pubsublite .internal .wire .AssignerFactory ;
3034import com .google .cloud .pubsublite .internal .wire .CommitterBuilder ;
3135import com .google .cloud .pubsublite .internal .wire .PubsubContext ;
3236import com .google .cloud .pubsublite .internal .wire .PubsubContext .Framework ;
3337import com .google .cloud .pubsublite .internal .wire .SubscriberBuilder ;
3438import com .google .cloud .pubsublite .proto .CursorServiceGrpc ;
39+ import com .google .cloud .pubsublite .proto .PartitionAssignmentServiceGrpc .PartitionAssignmentServiceStub ;
3540import com .google .cloud .pubsublite .proto .SubscriberServiceGrpc ;
36- import com .google .common .collect .ImmutableList ;
3741import com .google .pubsub .v1 .PubsubMessage ;
3842import io .grpc .StatusException ;
3943import java .util .ArrayList ;
@@ -51,17 +55,21 @@ public abstract class SubscriberSettings {
5155
5256 abstract SubscriptionPath subscriptionPath ();
5357
54- abstract ImmutableList <Partition > partitions ();
55-
5658 abstract FlowControlSettings perPartitionFlowControlSettings ();
5759
5860 // Optional parameters.
61+
62+ // If set, disables auto-assignment.
63+ abstract Optional <List <Partition >> partitions ();
64+
5965 abstract Optional <MessageTransformer <SequencedMessage , PubsubMessage >> transformer ();
6066
6167 abstract Optional <SubscriberServiceGrpc .SubscriberServiceStub > subscriberServiceStub ();
6268
6369 abstract Optional <CursorServiceGrpc .CursorServiceStub > cursorServiceStub ();
6470
71+ abstract Optional <PartitionAssignmentServiceStub > assignmentServiceStub ();
72+
6573 abstract Optional <NackHandler > nackHandler ();
6674
6775 public static Builder newBuilder () {
@@ -76,11 +84,12 @@ public abstract static class Builder {
7684
7785 public abstract Builder setSubscriptionPath (SubscriptionPath path );
7886
79- public abstract Builder setPartitions (List <Partition > partition );
80-
8187 public abstract Builder setPerPartitionFlowControlSettings (FlowControlSettings settings );
8288
8389 // Optional parameters.
90+ /** If set, disables auto-assignment. */
91+ public abstract Builder setPartitions (List <Partition > partition );
92+
8493 public abstract Builder setTransformer (
8594 MessageTransformer <SequencedMessage , PubsubMessage > transformer );
8695
@@ -89,14 +98,17 @@ public abstract Builder setSubscriberServiceStub(
8998
9099 public abstract Builder setCursorServiceStub (CursorServiceGrpc .CursorServiceStub stub );
91100
101+ public abstract Builder setAssignmentServiceStub (PartitionAssignmentServiceStub stub );
102+
92103 public abstract Builder setNackHandler (NackHandler nackHandler );
93104
94105 abstract SubscriberSettings autoBuild ();
95106
96107 public SubscriberSettings build () throws StatusException {
97108 SubscriberSettings settings = autoBuild ();
98109 Preconditions .checkArgument (
99- !settings .partitions ().isEmpty (), "Must provide at least one partition." );
110+ !settings .partitions ().isPresent () || !settings .partitions ().get ().isEmpty (),
111+ "Must provide at least one partition if setting partitions explicitly." );
100112 SubscriptionPaths .check (settings .subscriptionPath ());
101113 return settings ;
102114 }
@@ -113,18 +125,36 @@ Subscriber instantiate() throws StatusException {
113125 wireCommitterBuilder .setSubscriptionPath (subscriptionPath ());
114126 cursorServiceStub ().ifPresent (wireCommitterBuilder ::setCursorStub );
115127
116- List <Subscriber > perPartitionSubscribers = new ArrayList <>();
117- for (Partition partition : partitions ()) {
118- wireSubscriberBuilder .setPartition (partition );
119- wireCommitterBuilder .setPartition (partition );
120- perPartitionSubscribers .add (
121- new SinglePartitionSubscriber (
128+ PartitionSubscriberFactory partitionSubscriberFactory =
129+ partition -> {
130+ wireSubscriberBuilder .setPartition (partition );
131+ wireCommitterBuilder .setPartition (partition );
132+ return new SinglePartitionSubscriber (
122133 receiver (),
123134 transformer ().orElse (MessageTransforms .toCpsSubscribeTransformer ()),
124135 new AckSetTrackerImpl (wireCommitterBuilder .build ()),
125136 nackHandler ().orElse (new NackHandler () {}),
126137 messageConsumer -> wireSubscriberBuilder .setMessageConsumer (messageConsumer ).build (),
127- perPartitionFlowControlSettings ()));
138+ perPartitionFlowControlSettings ());
139+ };
140+
141+ if (!partitions ().isPresent ()) {
142+ AssignerBuilder .Builder assignerBuilder = AssignerBuilder .newBuilder ();
143+ assignerBuilder .setSubscriptionPath (subscriptionPath ());
144+ assignmentServiceStub ().ifPresent (assignerBuilder ::setAssignmentStub );
145+ AssignerFactory assignerFactory =
146+ receiver -> {
147+ assignerBuilder .setReceiver (receiver );
148+ return assignerBuilder .build ();
149+ };
150+ return new AssigningSubscriber (partitionSubscriberFactory , assignerFactory );
151+ }
152+
153+ List <Subscriber > perPartitionSubscribers = new ArrayList <>();
154+ for (Partition partition : partitions ().get ()) {
155+ wireSubscriberBuilder .setPartition (partition );
156+ wireCommitterBuilder .setPartition (partition );
157+ perPartitionSubscribers .add (partitionSubscriberFactory .New (partition ));
128158 }
129159 return MultiPartitionSubscriber .of (perPartitionSubscribers );
130160 }
0 commit comments