@@ -214,6 +214,11 @@ public static class Settings {
214214 */
215215 public final boolean advancedViews ;
216216
217+ /**
218+ * Maximum number of joins allowed in view queries.
219+ */
220+ public final Optional <Integer > maxViewJoins ;
221+
217222 /**
218223 * To override workflow tick interval for integration tests
219224 */
@@ -236,7 +241,7 @@ public static class Settings {
236241 */
237242 @ Deprecated
238243 public Settings (final Duration stopTimeout ) {
239- this (stopTimeout , "self" , false , false , Optional .empty (), Collections .emptyMap (), TEST_BROKER , MockedEventing .EMPTY );
244+ this (stopTimeout , "self" , false , false , Optional .empty (), Optional . empty (), Collections .emptyMap (), TEST_BROKER , MockedEventing .EMPTY );
240245 }
241246
242247 public enum EventingSupport {
@@ -266,6 +271,7 @@ private Settings(
266271 final String serviceName ,
267272 final boolean aclEnabled ,
268273 final boolean advancedViews ,
274+ final Optional <Integer > maxViewJoins ,
269275 final Optional <Duration > workflowTickInterval ,
270276 final Map <String , String > servicePortMappings ,
271277 final EventingSupport eventingSupport ,
@@ -274,6 +280,7 @@ private Settings(
274280 this .serviceName = serviceName ;
275281 this .aclEnabled = aclEnabled ;
276282 this .advancedViews = advancedViews ;
283+ this .maxViewJoins = maxViewJoins ;
277284 this .workflowTickInterval = workflowTickInterval ;
278285 this .servicePortMappings = servicePortMappings ;
279286 this .eventingSupport = eventingSupport ;
@@ -287,7 +294,7 @@ private Settings(
287294 * @return updated Settings
288295 */
289296 public Settings withStopTimeout (final Duration stopTimeout ) {
290- return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
297+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
291298 }
292299
293300 /**
@@ -299,7 +306,7 @@ public Settings withStopTimeout(final Duration stopTimeout) {
299306 * @return The updated settings.
300307 */
301308 public Settings withServiceName (final String serviceName ) {
302- return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
309+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
303310 }
304311
305312 /**
@@ -308,7 +315,7 @@ public Settings withServiceName(final String serviceName) {
308315 * @return The updated settings.
309316 */
310317 public Settings withAclDisabled () {
311- return new Settings (stopTimeout , serviceName , false , advancedViews , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
318+ return new Settings (stopTimeout , serviceName , false , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
312319 }
313320
314321 /**
@@ -317,7 +324,7 @@ public Settings withAclDisabled() {
317324 * @return The updated settings.
318325 */
319326 public Settings withAclEnabled () {
320- return new Settings (stopTimeout , serviceName , true , advancedViews , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
327+ return new Settings (stopTimeout , serviceName , true , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
321328 }
322329
323330 /**
@@ -326,7 +333,17 @@ public Settings withAclEnabled() {
326333 * @return The updated settings.
327334 */
328335 public Settings withAdvancedViews () {
329- return new Settings (stopTimeout , serviceName , aclEnabled , true , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
336+ return new Settings (stopTimeout , serviceName , aclEnabled , true , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
337+ }
338+
339+ /**
340+ * Sets the maximum number of joins allowed in view queries.
341+ *
342+ * @param maxJoins the maximum number of joins
343+ * @return The updated settings.
344+ */
345+ public Settings withMaxViewJoins (int maxJoins ) {
346+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , Optional .of (maxJoins ), workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
330347 }
331348
332349 /**
@@ -335,46 +352,46 @@ public Settings withAdvancedViews() {
335352 * @return The updated settings.
336353 */
337354 public Settings withWorkflowTickInterval (Duration tickInterval ) {
338- return new Settings (stopTimeout , serviceName , aclEnabled , true , Optional .of (tickInterval ), servicePortMappings , eventingSupport , mockedEventing );
355+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , Optional .of (tickInterval ), servicePortMappings , eventingSupport , mockedEventing );
339356 }
340357
341358 /**
342359 * Mock the incoming messages flow from a ValueEntity.
343360 */
344361 public Settings withValueEntityIncomingMessages (String typeId ) {
345- return new Settings (stopTimeout , serviceName , aclEnabled , true , workflowTickInterval , servicePortMappings , eventingSupport ,
362+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport ,
346363 mockedEventing .withValueEntityIncomingMessages (typeId ));
347364 }
348365
349366 /**
350367 * Mock the incoming events flow from an EventSourcedEntity.
351368 */
352369 public Settings withEventSourcedEntityIncomingMessages (String typeId ) {
353- return new Settings (stopTimeout , serviceName , aclEnabled , true , workflowTickInterval , servicePortMappings , eventingSupport ,
370+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport ,
354371 mockedEventing .withEventSourcedIncomingMessages (typeId ));
355372 }
356373
357374 /**
358375 * Mock the incoming messages flow from a Stream (eventing.in.direct in case of protobuf SDKs).
359376 */
360377 public Settings withStreamIncomingMessages (String service , String streamId ) {
361- return new Settings (stopTimeout , serviceName , aclEnabled , true , workflowTickInterval , servicePortMappings , eventingSupport ,
378+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport ,
362379 mockedEventing .withStreamIncomingMessages (service , streamId ));
363380 }
364381
365382 /**
366383 * Mock the incoming events flow from a Topic.
367384 */
368385 public Settings withTopicIncomingMessages (String topic ) {
369- return new Settings (stopTimeout , serviceName , aclEnabled , true , workflowTickInterval , servicePortMappings , eventingSupport ,
386+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport ,
370387 mockedEventing .withTopicIncomingMessages (topic ));
371388 }
372389
373390 /**
374391 * Mock the outgoing events flow for a Topic.
375392 */
376393 public Settings withTopicOutgoingMessages (String topic ) {
377- return new Settings (stopTimeout , serviceName , aclEnabled , true , workflowTickInterval , servicePortMappings , eventingSupport ,
394+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport ,
378395 mockedEventing .withTopicOutgoingMessages (topic ));
379396 }
380397
@@ -386,11 +403,11 @@ public Settings withTopicOutgoingMessages(String topic) {
386403 public Settings withServicePortMapping (String serviceName , String host , int port ) {
387404 var updatedMappings = new HashMap <>(servicePortMappings );
388405 updatedMappings .put (serviceName , host + ":" + port );
389- return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , workflowTickInterval , Map .copyOf (updatedMappings ), eventingSupport , mockedEventing );
406+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , Map .copyOf (updatedMappings ), eventingSupport , mockedEventing );
390407 }
391408
392409 public Settings withEventingSupport (EventingSupport eventingSupport ) {
393- return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
410+ return new Settings (stopTimeout , serviceName , aclEnabled , advancedViews , maxViewJoins , workflowTickInterval , servicePortMappings , eventingSupport , mockedEventing );
394411 }
395412
396413 @ Override
@@ -404,6 +421,7 @@ public String toString() {
404421 ", serviceName='" + serviceName + '\'' +
405422 ", aclEnabled=" + aclEnabled +
406423 ", advancedViews=" + advancedViews +
424+ ", maxViewJoins=" + maxViewJoins +
407425 ", workflowTickInterval=" + workflowTickInterval +
408426 ", servicePortMappings=[" + String .join (", " , portMappingsRendered ) + "]" +
409427 ", eventingSupport=" + eventingSupport +
@@ -534,6 +552,7 @@ private void runProxy(Boolean useTestContainers, int port, int grpcEventingBacke
534552
535553 List <String > javaOptions = new ArrayList <>();
536554 javaOptions .add ("-Dlogback.configurationFile=logback-dev-mode.xml" );
555+ settings .maxViewJoins .ifPresent (maxJoins -> javaOptions .add ("-Dkalix.proxy.view.features.max-joins=" + maxJoins ));
537556
538557 //always passing grpc params, in the case of e.g. testing pubsub with mocked incoming messages
539558 if (settings .mockedEventing .hasConfig ()) {
@@ -826,3 +845,4 @@ public EventingTestKit.MessageBuilder getMessageBuilder() {
826845 return messageBuilder ;
827846 }
828847}
848+
0 commit comments