File tree Expand file tree Collapse file tree 4 files changed +37
-0
lines changed
main/java/com/rabbitmq/stream
test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 4 files changed +37
-0
lines changed Original file line number Diff line number Diff line change @@ -92,6 +92,16 @@ public interface StreamCreator {
9292 */
9393 StreamCreator filterSize (int size );
9494
95+ /**
96+ * Set the number of initial members the stream should have.
97+ *
98+ * @param initialMemberCount initial number of nodes
99+ * @return this creator instance
100+ * @see <a href="https://www.rabbitmq.com/docs/streams#replication-factor">Initial Replication
101+ * Factor</a>
102+ */
103+ StreamCreator initialMemberCount (int initialMemberCount );
104+
95105 /**
96106 * Configure the super stream to create.
97107 *
Original file line number Diff line number Diff line change @@ -2744,6 +2744,14 @@ public StreamParametersBuilder filterSize(int size) {
27442744 return this ;
27452745 }
27462746
2747+ public StreamParametersBuilder initialMemberCount (int initialMemberCount ) {
2748+ if (initialMemberCount <= 0 ) {
2749+ throw new IllegalArgumentException ("The initial member count must be greater than 0" );
2750+ }
2751+ this .parameters .put ("initial-cluster-size" , String .valueOf (initialMemberCount ));
2752+ return this ;
2753+ }
2754+
27472755 public StreamParametersBuilder put (String key , String value ) {
27482756 parameters .put (key , value );
27492757 return this ;
Original file line number Diff line number Diff line change @@ -86,6 +86,12 @@ public StreamCreator filterSize(int size) {
8686 return this ;
8787 }
8888
89+ @ Override
90+ public StreamCreator initialMemberCount (int initialMemberCount ) {
91+ streamParametersBuilder .initialMemberCount (initialMemberCount );
92+ return this ;
93+ }
94+
8995 @ Override
9096 public SuperStreamConfiguration superStream () {
9197 if (this .superStreamConfiguration == null ) {
Original file line number Diff line number Diff line change @@ -813,4 +813,17 @@ void enforceEntityPerConnectionLimits() {
813813 executor .shutdownNow ();
814814 }
815815 }
816+
817+ @ Test
818+ void brokerShouldAcceptInitialMemberCountArgument (TestInfo info ) {
819+ String s = streamName (info );
820+ Environment env = environmentBuilder .build ();
821+ try {
822+ env .streamCreator ().name (s ).initialMemberCount (1 ).create ();
823+ assertThat (env .streamExists (s )).isTrue ();
824+ } finally {
825+ env .deleteStream (s );
826+ env .close ();
827+ }
828+ }
816829}
You can’t perform that action at this time.
0 commit comments