@@ -1345,317 +1345,97 @@ public void testNoopSupervisorSpecSerde() throws Exception
13451345 Assert .assertEquals (spec , specRoundTrip );
13461346 }
13471347
1348-
13491348 @ Test
13501349 public void testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed ()
13511350 {
1352- // Tests that when a new spec is submitted without taskCountStart, the merge function
1353- // follows the priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin.
1354-
1355- // Create an existing spec with taskCount=5 (simulating a scaled state)
1356- HashMap <String , Object > existingAutoScalerConfig = new HashMap <>();
1357- existingAutoScalerConfig .put ("enableTaskAutoScaler" , true );
1358- existingAutoScalerConfig .put ("taskCountMax" , 7 );
1359- existingAutoScalerConfig .put ("taskCountMin" , 1 );
1360- // Note: taskCountStart is NOT set in the existing spec
1361-
1362- SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1363- EasyMock .expect (existingIoConfig .getAutoScalerConfig ())
1364- .andReturn (OBJECT_MAPPER .convertValue (existingAutoScalerConfig , AutoScalerConfig .class ))
1365- .anyTimes ();
1366- EasyMock .expect (existingIoConfig .getStream ()).andReturn ("test-stream" ).anyTimes ();
1367- EasyMock .expect (existingIoConfig .getTaskCount ()).andReturn (5 ).anyTimes (); // existing taskCount is 5
1368- EasyMock .replay (existingIoConfig );
1369-
1370- DataSchema existingDataSchema = EasyMock .createMock (DataSchema .class );
1371- EasyMock .expect (existingDataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1372- EasyMock .replay (existingDataSchema );
1373-
1374- SeekableStreamSupervisorIngestionSpec existingIngestionSchema =
1375- EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1376- EasyMock .expect (existingIngestionSchema .getIOConfig ()).andReturn (existingIoConfig ).anyTimes ();
1377- EasyMock .expect (existingIngestionSchema .getDataSchema ()).andReturn (existingDataSchema ).anyTimes ();
1378- EasyMock .replay (existingIngestionSchema );
1379-
1380- TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec (
1381- "my-id" ,
1382- existingIngestionSchema
1383- );
1384-
1385- // Create a new spec WITHOUT taskCountStart and WITHOUT taskCount in autoScalerConfig
1386- HashMap <String , Object > newAutoScalerConfig = new HashMap <>();
1387- newAutoScalerConfig .put ("enableTaskAutoScaler" , true );
1388- newAutoScalerConfig .put ("taskCountMax" , 8 );
1389- newAutoScalerConfig .put ("taskCountMin" , 2 );
1390- // Note: taskCountStart is NOT set
1391-
1392- SeekableStreamSupervisorIOConfig newIoConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1393- EasyMock .expect (newIoConfig .getAutoScalerConfig ())
1394- .andReturn (OBJECT_MAPPER .convertValue (newAutoScalerConfig , AutoScalerConfig .class ))
1395- .anyTimes ();
1396- EasyMock .expect (newIoConfig .getStream ()).andReturn ("test-stream" ).anyTimes ();
1397- EasyMock .expect (newIoConfig .getTaskCount ()).andReturn (null ).anyTimes (); // new spec has no taskCount
1398- newIoConfig .setTaskCount (5 ); // Expect merge to set taskCount to existing value (5)
1399- EasyMock .expectLastCall ().once ();
1400- EasyMock .replay (newIoConfig );
1401-
1402- DataSchema newDataSchema = EasyMock .createMock (DataSchema .class );
1403- EasyMock .expect (newDataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1404- EasyMock .replay (newDataSchema );
1405-
1406- SeekableStreamSupervisorIngestionSpec newIngestionSchema =
1407- EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1408- EasyMock .expect (newIngestionSchema .getIOConfig ()).andReturn (newIoConfig ).anyTimes ();
1409- EasyMock .expect (newIngestionSchema .getDataSchema ()).andReturn (newDataSchema ).anyTimes ();
1410- EasyMock .replay (newIngestionSchema );
1351+ // New spec has no taskCount -> should use existing taskCount (5)
1352+ TestSeekableStreamSupervisorSpec existingSpec = createTestSpec (5 , 1 );
1353+ TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge (null , 2 , 5 );
14111354
1412- TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec (
1413- "my-id" ,
1414- newIngestionSchema
1415- )
1416- {
1417- @ Override
1418- public List <String > getDataSources ()
1419- {
1420- return Collections .singletonList ("datasource1" );
1421- }
1422- };
1423-
1424- // Set up mocks for SupervisorManager behavior
1425- EasyMock .expect (taskMaster .getSupervisorManager ()).andReturn (Optional .of (supervisorManager ));
1426-
1427- // Mock createOrUpdateAndStartSupervisor to call merge
1428- final SupervisorSpec existingSpecForMerge = existingSpec ;
1429- Capture <SupervisorSpec > capturedNewSpec = EasyMock .newCapture ();
1430- EasyMock .expect (supervisorManager .createOrUpdateAndStartSupervisor (EasyMock .capture (capturedNewSpec )))
1431- .andAnswer (() -> {
1432- SupervisorSpec arg = (SupervisorSpec ) EasyMock .getCurrentArguments ()[0 ];
1433- arg .merge (existingSpecForMerge );
1434- return true ;
1435- });
1436-
1437- // Mock getSupervisorSpec to return the existing spec (simulating an update scenario)
1438- EasyMock .expect (supervisorManager .getSupervisorSpec ("my-id" ))
1439- .andReturn (Optional .of (existingSpec ))
1440- .anyTimes ();
1441-
1442- setupMockRequest ();
1443- setupMockRequestForAudit ();
1444-
1445- EasyMock .expect (authConfig .isEnableInputSourceSecurity ()).andReturn (true );
1446- auditManager .doAudit (EasyMock .anyObject ());
1447- EasyMock .expectLastCall ().once ();
1448-
1449- replayAll ();
1450-
1451- // Before merge, taskCountStart should be null in new spec
1452- Assert .assertNull (newSpec .getIoConfig ().getAutoScalerConfig ().getTaskCountStart ());
1453-
1454- // When
1455- Response response = supervisorResource .specPost (newSpec , false , request );
1456- verifyAll ();
1457-
1458- Assert .assertEquals (200 , response .getStatus ());
1355+ newSpec .merge (existingSpec );
1356+ EasyMock .verify (newSpec .getIoConfig ());
14591357 }
14601358
14611359 @ Test
14621360 public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount ()
14631361 {
1464- // Create an existing spec with taskCount=5
1465- HashMap <String , Object > existingAutoScalerConfig = new HashMap <>();
1466- existingAutoScalerConfig .put ("enableTaskAutoScaler" , true );
1467- existingAutoScalerConfig .put ("taskCountMax" , 7 );
1468- existingAutoScalerConfig .put ("taskCountMin" , 1 );
1469-
1470- SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1471- EasyMock .expect (existingIoConfig .getAutoScalerConfig ())
1472- .andReturn (OBJECT_MAPPER .convertValue (existingAutoScalerConfig , AutoScalerConfig .class ))
1473- .anyTimes ();
1474- EasyMock .expect (existingIoConfig .getStream ()).andReturn ("test-stream" ).anyTimes ();
1475- EasyMock .expect (existingIoConfig .getTaskCount ()).andReturn (5 ).anyTimes ();
1476- EasyMock .replay (existingIoConfig );
1477-
1478- DataSchema existingDataSchema = EasyMock .createMock (DataSchema .class );
1479- EasyMock .expect (existingDataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1480- EasyMock .replay (existingDataSchema );
1362+ // New spec has taskCount=3 -> should use provided taskCount over existing (5)
1363+ TestSeekableStreamSupervisorSpec existingSpec = createTestSpec (5 , 1 );
1364+ TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge (3 , 2 , 3 );
14811365
1482- SeekableStreamSupervisorIngestionSpec existingIngestionSchema =
1483- EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1484- EasyMock .expect (existingIngestionSchema .getIOConfig ()).andReturn (existingIoConfig ).anyTimes ();
1485- EasyMock .expect (existingIngestionSchema .getDataSchema ()).andReturn (existingDataSchema ).anyTimes ();
1486- EasyMock .replay (existingIngestionSchema );
1487-
1488- TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec (
1489- "my-id" ,
1490- existingIngestionSchema
1491- );
1492-
1493- // Create a new spec with taskCount=3 (provided taskCount should take precedence)
1494- HashMap <String , Object > newAutoScalerConfig = new HashMap <>();
1495- newAutoScalerConfig .put ("enableTaskAutoScaler" , true );
1496- newAutoScalerConfig .put ("taskCountMax" , 8 );
1497- newAutoScalerConfig .put ("taskCountMin" , 2 );
1498-
1499- SeekableStreamSupervisorIOConfig newIoConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1500- EasyMock .expect (newIoConfig .getAutoScalerConfig ())
1501- .andReturn (OBJECT_MAPPER .convertValue (newAutoScalerConfig , AutoScalerConfig .class ))
1502- .anyTimes ();
1503- EasyMock .expect (newIoConfig .getStream ()).andReturn ("test-stream" ).anyTimes ();
1504- EasyMock .expect (newIoConfig .getTaskCount ()).andReturn (3 ).anyTimes (); // provided taskCount=3
1505- newIoConfig .setTaskCount (3 ); // Expect merge to use provided taskCount (3)
1506- EasyMock .expectLastCall ().once ();
1507- EasyMock .replay (newIoConfig );
1508-
1509- DataSchema newDataSchema = EasyMock .createMock (DataSchema .class );
1510- EasyMock .expect (newDataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1511- EasyMock .replay (newDataSchema );
1512-
1513- SeekableStreamSupervisorIngestionSpec newIngestionSchema =
1514- EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1515- EasyMock .expect (newIngestionSchema .getIOConfig ()).andReturn (newIoConfig ).anyTimes ();
1516- EasyMock .expect (newIngestionSchema .getDataSchema ()).andReturn (newDataSchema ).anyTimes ();
1517- EasyMock .replay (newIngestionSchema );
1518-
1519- TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec (
1520- "my-id" ,
1521- newIngestionSchema
1522- )
1523- {
1524- @ Override
1525- public List <String > getDataSources ()
1526- {
1527- return Collections .singletonList ("datasource1" );
1528- }
1529- };
1530-
1531- EasyMock .expect (taskMaster .getSupervisorManager ()).andReturn (Optional .of (supervisorManager ));
1532-
1533- final SupervisorSpec existingSpecForMerge = existingSpec ;
1534- Capture <SupervisorSpec > capturedNewSpec = EasyMock .newCapture ();
1535- EasyMock .expect (supervisorManager .createOrUpdateAndStartSupervisor (EasyMock .capture (capturedNewSpec )))
1536- .andAnswer (() -> {
1537- SupervisorSpec arg = (SupervisorSpec ) EasyMock .getCurrentArguments ()[0 ];
1538- arg .merge (existingSpecForMerge );
1539- return true ;
1540- });
1541-
1542- EasyMock .expect (supervisorManager .getSupervisorSpec ("my-id" ))
1543- .andReturn (Optional .of (existingSpec ))
1544- .anyTimes ();
1545-
1546- setupMockRequest ();
1547- setupMockRequestForAudit ();
1548-
1549- EasyMock .expect (authConfig .isEnableInputSourceSecurity ()).andReturn (true );
1550- auditManager .doAudit (EasyMock .anyObject ());
1551- EasyMock .expectLastCall ().once ();
1552-
1553- replayAll ();
1554-
1555- Response response = supervisorResource .specPost (newSpec , false , request );
1556- verifyAll ();
1557-
1558- Assert .assertEquals (200 , response .getStatus ());
1366+ newSpec .merge (existingSpec );
1367+ EasyMock .verify (newSpec .getIoConfig ());
15591368 }
15601369
15611370 @ Test
15621371 public void testSpecPostMergeFallsBackToProvidedTaskCountMin ()
15631372 {
1564- // Create an existing spec with no taskCount
1565- HashMap <String , Object > existingAutoScalerConfig = new HashMap <>();
1566- existingAutoScalerConfig .put ("enableTaskAutoScaler" , true );
1567- existingAutoScalerConfig .put ("taskCountMax" , 7 );
1568- existingAutoScalerConfig .put ("taskCountMin" , 1 );
1569-
1570- SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1571- EasyMock .expect (existingIoConfig .getAutoScalerConfig ())
1572- .andReturn (OBJECT_MAPPER .convertValue (existingAutoScalerConfig , AutoScalerConfig .class ))
1573- .anyTimes ();
1574- EasyMock .expect (existingIoConfig .getStream ()).andReturn ("test-stream" ).anyTimes ();
1575- EasyMock .expect (existingIoConfig .getTaskCount ()).andReturn (null ).anyTimes (); // existing has no taskCount
1576- EasyMock .replay (existingIoConfig );
1577-
1578- DataSchema existingDataSchema = EasyMock .createMock (DataSchema .class );
1579- EasyMock .expect (existingDataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1580- EasyMock .replay (existingDataSchema );
1581-
1582- SeekableStreamSupervisorIngestionSpec existingIngestionSchema =
1583- EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1584- EasyMock .expect (existingIngestionSchema .getIOConfig ()).andReturn (existingIoConfig ).anyTimes ();
1585- EasyMock .expect (existingIngestionSchema .getDataSchema ()).andReturn (existingDataSchema ).anyTimes ();
1586- EasyMock .replay (existingIngestionSchema );
1373+ // Neither has taskCount -> should fall back to taskCountMin (4)
1374+ TestSeekableStreamSupervisorSpec existingSpec = createTestSpec (null , 1 );
1375+ TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge (null , 4 , 4 );
15871376
1588- TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec (
1589- "my-id" ,
1590- existingIngestionSchema
1591- );
1592-
1593- // Create a new spec with taskCountMin=4, no taskCount
1594- HashMap <String , Object > newAutoScalerConfig = new HashMap <>();
1595- newAutoScalerConfig .put ("enableTaskAutoScaler" , true );
1596- newAutoScalerConfig .put ("taskCountMax" , 8 );
1597- newAutoScalerConfig .put ("taskCountMin" , 4 ); // provided taskCountMin
1377+ newSpec .merge (existingSpec );
1378+ EasyMock .verify (newSpec .getIoConfig ());
1379+ }
15981380
1599- SeekableStreamSupervisorIOConfig newIoConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1600- EasyMock .expect (newIoConfig .getAutoScalerConfig ())
1601- .andReturn (OBJECT_MAPPER .convertValue (newAutoScalerConfig , AutoScalerConfig .class ))
1381+ private TestSeekableStreamSupervisorSpec createTestSpec (Integer taskCount , int taskCountMin )
1382+ {
1383+ HashMap <String , Object > autoScalerConfig = new HashMap <>();
1384+ autoScalerConfig .put ("enableTaskAutoScaler" , true );
1385+ autoScalerConfig .put ("taskCountMax" , 10 );
1386+ autoScalerConfig .put ("taskCountMin" , taskCountMin );
1387+
1388+ SeekableStreamSupervisorIOConfig ioConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1389+ EasyMock .expect (ioConfig .getAutoScalerConfig ())
1390+ .andReturn (OBJECT_MAPPER .convertValue (autoScalerConfig , AutoScalerConfig .class ))
16021391 .anyTimes ();
1603- EasyMock .expect (newIoConfig .getStream ()).andReturn ("test-stream" ).anyTimes ();
1604- EasyMock .expect (newIoConfig .getTaskCount ()).andReturn (null ).anyTimes (); // no provided taskCount
1605- newIoConfig .setTaskCount (4 ); // Expect merge to fall back to provided taskCountMin (4)
1606- EasyMock .expectLastCall ().once ();
1607- EasyMock .replay (newIoConfig );
1392+ EasyMock .expect (ioConfig .getTaskCount ()).andReturn (taskCount ).anyTimes ();
1393+ EasyMock .replay (ioConfig );
16081394
1609- DataSchema newDataSchema = EasyMock .createMock (DataSchema .class );
1610- EasyMock .expect (newDataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1611- EasyMock .replay (newDataSchema );
1395+ DataSchema dataSchema = EasyMock .createMock (DataSchema .class );
1396+ EasyMock .expect (dataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1397+ EasyMock .replay (dataSchema );
16121398
1613- SeekableStreamSupervisorIngestionSpec newIngestionSchema =
1399+ SeekableStreamSupervisorIngestionSpec ingestionSchema =
16141400 EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1615- EasyMock .expect (newIngestionSchema .getIOConfig ()).andReturn (newIoConfig ).anyTimes ();
1616- EasyMock .expect (newIngestionSchema .getDataSchema ()).andReturn (newDataSchema ).anyTimes ();
1617- EasyMock .replay (newIngestionSchema );
1618-
1619- TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec (
1620- "my-id" ,
1621- newIngestionSchema
1622- )
1623- {
1624- @ Override
1625- public List <String > getDataSources ()
1626- {
1627- return Collections .singletonList ("datasource1" );
1628- }
1629- };
1401+ EasyMock .expect (ingestionSchema .getIOConfig ()).andReturn (ioConfig ).anyTimes ();
1402+ EasyMock .expect (ingestionSchema .getDataSchema ()).andReturn (dataSchema ).anyTimes ();
1403+ EasyMock .replay (ingestionSchema );
16301404
1631- EasyMock .expect (taskMaster .getSupervisorManager ()).andReturn (Optional .of (supervisorManager ));
1405+ return new TestSeekableStreamSupervisorSpec ("my-id" , ingestionSchema );
1406+ }
16321407
1633- final SupervisorSpec existingSpecForMerge = existingSpec ;
1634- Capture <SupervisorSpec > capturedNewSpec = EasyMock .newCapture ();
1635- EasyMock .expect (supervisorManager .createOrUpdateAndStartSupervisor (EasyMock .capture (capturedNewSpec )))
1636- .andAnswer (() -> {
1637- SupervisorSpec arg = (SupervisorSpec ) EasyMock .getCurrentArguments ()[0 ];
1638- arg .merge (existingSpecForMerge );
1639- return true ;
1640- });
1641-
1642- EasyMock .expect (supervisorManager .getSupervisorSpec ("my-id" ))
1643- .andReturn (Optional .of (existingSpec ))
1408+ private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge (
1409+ Integer taskCount ,
1410+ int taskCountMin ,
1411+ int expectedTaskCount
1412+ )
1413+ {
1414+ HashMap <String , Object > autoScalerConfig = new HashMap <>();
1415+ autoScalerConfig .put ("enableTaskAutoScaler" , true );
1416+ autoScalerConfig .put ("taskCountMax" , 10 );
1417+ autoScalerConfig .put ("taskCountMin" , taskCountMin );
1418+
1419+ SeekableStreamSupervisorIOConfig ioConfig = EasyMock .createMock (SeekableStreamSupervisorIOConfig .class );
1420+ EasyMock .expect (ioConfig .getAutoScalerConfig ())
1421+ .andReturn (OBJECT_MAPPER .convertValue (autoScalerConfig , AutoScalerConfig .class ))
16441422 .anyTimes ();
1645-
1646- setupMockRequest ();
1647- setupMockRequestForAudit ();
1648-
1649- EasyMock .expect (authConfig .isEnableInputSourceSecurity ()).andReturn (true );
1650- auditManager .doAudit (EasyMock .anyObject ());
1423+ EasyMock .expect (ioConfig .getTaskCount ()).andReturn (taskCount ).anyTimes ();
1424+ ioConfig .setTaskCount (expectedTaskCount );
16511425 EasyMock .expectLastCall ().once ();
1426+ EasyMock .replay (ioConfig );
16521427
1653- replayAll ();
1428+ DataSchema dataSchema = EasyMock .createMock (DataSchema .class );
1429+ EasyMock .expect (dataSchema .getDataSource ()).andReturn ("datasource1" ).anyTimes ();
1430+ EasyMock .replay (dataSchema );
16541431
1655- Response response = supervisorResource .specPost (newSpec , false , request );
1656- verifyAll ();
1432+ SeekableStreamSupervisorIngestionSpec ingestionSchema =
1433+ EasyMock .createMock (SeekableStreamSupervisorIngestionSpec .class );
1434+ EasyMock .expect (ingestionSchema .getIOConfig ()).andReturn (ioConfig ).anyTimes ();
1435+ EasyMock .expect (ingestionSchema .getDataSchema ()).andReturn (dataSchema ).anyTimes ();
1436+ EasyMock .replay (ingestionSchema );
16571437
1658- Assert . assertEquals ( 200 , response . getStatus () );
1438+ return new TestSeekableStreamSupervisorSpec ( "my-id" , ingestionSchema );
16591439 }
16601440
16611441 private void setupMockRequest ()
0 commit comments