diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index d712cfeb020..219baf2a2bc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -45,6 +45,7 @@ import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; @@ -57,6 +58,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataChangedListener; import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pushmonitor.ExecutionStatus; @@ -308,7 +310,8 @@ public DaVinciBackend( ingestionService.getVeniceWriterFactory(), instanceName, valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + (this::getRealTimeTopicName)); } ingestionService.start(); @@ -560,6 +563,28 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() { return storeRepository; } + /** + * Resolves and returns the store object for the given store name. + * For user system stores, it finds the RT name from the SystemStoreAttributes of the parent user store. + * For regular user stores and top level system stores, it finds the RT name from the Store object directly. + */ + private String getRealTimeTopicName(String storeName) { + if (VeniceSystemStoreUtils.isUserSystemStore(storeName)) { + // it is a user system store + String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName); + Store userStore = storeRepository.getStore(userStoreName); + Map systemStores = userStore.getSystemStores(); + for (Map.Entry systemStoreEntries: systemStores.entrySet()) { + if (storeName.startsWith(systemStoreEntries.getKey())) { + return Utils.getRealTimeTopicName(systemStoreEntries.getValue()); + } + } + return null; + } else { + return Utils.getRealTimeTopicName(storeRepository.getStore(storeName)); + } + } + public ObjectCacheBackend getObjectCache() { return cacheBackend.get(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java index 685282b32eb..b70762c2de6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java @@ -361,7 +361,8 @@ private void asyncStart() { ingestionService.getVeniceWriterFactory(), instance.getNodeId(), valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + storeName -> Utils.getRealTimeTopicName(helixReadOnlyStoreRepository.getStore(storeName))); // Record replica status in Zookeeper. // Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 3dba837d2af..79b387d2385 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -393,7 +393,8 @@ public KafkaStoreIngestionService( zkSharedSchemaRepository.get(), pubSubTopicRepository, serverConfig.getMetaStoreWriterCloseTimeoutInMS(), - serverConfig.getMetaStoreWriterCloseConcurrency()); + serverConfig.getMetaStoreWriterCloseConcurrency(), + storeName -> Utils.getRealTimeTopicName(metadataRepo.getStore(storeName))); metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() { @Override public void handleStoreDeleted(Store store) { @@ -1486,7 +1487,8 @@ public InternalDaVinciRecordTransformerConfig getInternalRecordTransformerConfig return storeNameToInternalRecordTransformerConfig.get(storeName); } - public void attemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) { + public void attemptToPrintIngestionInfoFor(Store store, Integer version, Integer partition, String regionName) { + String storeName = store.getName(); try { PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version)); StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName()); @@ -1508,7 +1510,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In String infoPrefix = "isCurrentVersion: " + (storeIngestionTask.isCurrentVersion()) + "\n"; if (storeIngestionTask.isHybridMode() && partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) { - ingestingTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(storeName)); + ingestingTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); } PubSubTopicPartition ingestingTopicPartition = new PubSubTopicPartitionImpl(ingestingTopic, partition); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 15a910d3254..7e7baa6dc5d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -4355,11 +4355,15 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep availableSchemaIds.set(schemaId, new Object()); // TODO: Query metastore for existence of value schema id before doing an update. During bounce of large // cluster these metastore writes could be spiky - if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) { + if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName) + && !VeniceSystemStoreUtils.isSystemStore(storeName)) { String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); - PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); - if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) { - metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId); + String metaStoreRealTimeTopicName = metaStoreWriter.realTimeTopicNameResolver.apply(metaStoreName); + if (metaStoreRealTimeTopicName != null) { + PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName); + if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) { + metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId); + } } } return; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index a691c091291..dab5020fabd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -647,7 +647,7 @@ void checkAndMaybeLogHeartbeatDelayMap( heartbeatTs, currentTimestamp); getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor( - storeName.getKey(), + metadataRepository.getStore(storeName.getKey()), version.getKey(), partition.getKey(), region.getKey()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index f80e3722447..30d5f968897 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -11,6 +11,7 @@ import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -34,6 +35,7 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.helix.ZkClientFactory; import com.linkedin.venice.meta.ClusterInfoProvider; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; @@ -43,6 +45,7 @@ import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.avro.SchemaPresenceChecker; import com.linkedin.venice.service.ICProvider; +import com.linkedin.venice.stats.ZkClientStatusStats; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.metrics.MetricsRepository; import java.util.Optional; @@ -50,9 +53,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.helix.zookeeper.impl.client.ZkClient; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -66,6 +71,15 @@ public class DaVinciBackendTest { private MockedStatic mockClientFactory; private MockedConstruction mockMetadataBuilder; private MockedConstruction mockSchemaPresenceChecker; + private MockedStatic mockZkFactory; + + @BeforeClass + public void init() { + mockZkFactory = mockStatic(ZkClientFactory.class); + ZkClient mockZkClient = mock(ZkClient.class); + mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient); + doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class)); + } @BeforeMethod public void setUp() throws Exception { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 09b556c27f8..7e0459380f0 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -113,6 +113,7 @@ import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; import com.linkedin.davinci.transformer.TestStringRecordTransformer; import com.linkedin.davinci.validation.DataIntegrityValidator; +import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceIngestionTaskKilledException; @@ -203,6 +204,7 @@ import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.unit.matchers.ExceptionClassMatcher; import com.linkedin.venice.unit.matchers.NonEmptyStringMatcher; @@ -464,6 +466,11 @@ public static Object[][] sortedInputAndAAConfigProvider() { private Supplier storeVersionStateSupplier = () -> new StoreVersionState(); private MockStoreVersionConfigs storeAndVersionConfigsUnderTest; + private MetaStoreWriter mockMetaStoreWriter; + private Function mockStoreRealTimeTopicNameResolver; + private Store mockMetaStore; + private String mockMetaStoreRealTimeTopicName; + private static byte[] getRandomKey(Integer partition) { String randomString = Utils.getUniqueString("KeyForPartition" + partition); return ByteBuffer.allocate(randomString.length() + 1) @@ -584,6 +591,14 @@ public void methodSetUp() throws Exception { kafkaUrlToRecordsThrottler = new HashMap<>(); kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); + mockMetaStoreWriter = mock(MetaStoreWriter.class); + mockStoreRealTimeTopicNameResolver = mock(Function.class); + mockMetaStore = mock(Store.class); + when(mockMetaStore.getName()).thenReturn("metaStoreName"); + when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + mockMetaStoreRealTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver; + mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); doReturn(mockTopicManager).when(mockTopicManagerRepository).getLocalTopicManager(); @@ -1468,6 +1483,240 @@ private MockInMemoryPartitionPosition getTopicPartitionOffsetPair( offset); } + @Test + public void testMetaStoreWriterIntegration() throws Exception { + // Setup + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + int versionNumber = 1; + int schemaId = 1; + + Store mockMetaStore = mock(Store.class); + when(mockMetaStore.getName()).thenReturn(metaStoreName); + when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + String realTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore); + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreRealTimeTopicNameResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver; + + // Meta store exists + when(mockStoreRealTimeTopicNameResolver.apply(metaStoreName)).thenReturn(realTimeTopicName); + + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(realTimeTopicName); + when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true); + + // Setup StoreIngestionTask with real metaStoreWriter + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, schemaId).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that the meta store was resolved + verify(mockStoreRealTimeTopicNameResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + + // Verify that writeInUseValueSchema was called + verify(mockMetaStoreWriter, timeout(TEST_TIMEOUT_MS)) + .writeInUseValueSchema(storeNameWithoutVersionInfo, versionNumber, schemaId); + }, AA_OFF); + + // Override to inject our mock metaStoreWriter + config.setBeforeStartingConsumption(() -> { + try { + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreWriterWhenMetaStoreDoesNotExist() throws Exception { + // Setup + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + int schemaId = 1; + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreResolver; + + // Meta store does NOT exist + when(mockStoreResolver.apply(metaStoreName)).thenReturn(null); + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, schemaId).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that the meta store was checked + verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + + // Verify that writeInUseValueSchema was NOT called (meta store doesn't exist) + verify(mockMetaStoreWriter, never()).writeInUseValueSchema(anyString(), anyInt(), anyInt()); + }, AA_OFF); + + config.setBeforeStartingConsumption(() -> { + try { + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreWriterWhenRTTopicDoesNotExist() throws Exception { + // Setup + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + int schemaId = 1; + + Store mockMetaStore = mock(Store.class); + when(mockMetaStore.getName()).thenReturn(metaStoreName); + when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + String realTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore); + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreRealTimeTopicNameResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver; + + // Meta store exists + when(mockStoreRealTimeTopicNameResolver.apply(metaStoreName)).thenReturn(realTimeTopicName); + + // But RT topic does NOT exist + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(realTimeTopicName); + when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(false); + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, schemaId).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that the meta store was resolved + verify(mockStoreRealTimeTopicNameResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + + // Verify RT topic check was made + verify(mockTopicManager, timeout(TEST_TIMEOUT_MS).atLeastOnce()).containsTopicWithRetries(metaStoreRTTopic, 5); + + // Verify that writeInUseValueSchema was NOT called (RT topic doesn't exist) + verify(mockMetaStoreWriter, never()).writeInUseValueSchema(anyString(), anyInt(), anyInt()); + }, AA_OFF); + + config.setBeforeStartingConsumption(() -> { + try { + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreWriterSkippedForSystemStore() throws Exception { + // When the store itself IS a system store, it should skip meta store writing + String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreResolver; + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that storeResolver was NEVER called (system store should be skipped) + verify(mockStoreResolver, never()).apply(anyString()); + verify(mockMetaStoreWriter, never()).writeInUseValueSchema(anyString(), anyInt(), anyInt()); + }, AA_OFF); + + config.setBeforeStartingConsumption(() -> { + try { + // Override store name to be a system store + Field storeNameField = StoreIngestionTask.class.getDeclaredField("storeName"); + storeNameField.setAccessible(true); + storeNameField.set(storeIngestionTaskUnderTest, systemStoreName); + + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreResolutionWhenMetaStoreExists() { + // Setup + String STORE_NAME = "testStore"; + String META_STORE_NAME = "venice_system_store_meta_store_testStore"; + + when(mockStoreRealTimeTopicNameResolver.apply(META_STORE_NAME)).thenReturn(mockMetaStoreRealTimeTopicName); + when(mockMetaStore.getName()).thenReturn(META_STORE_NAME); + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(mockMetaStoreRealTimeTopicName); + when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true); + + // Execute: Call the method that uses metaStoreWriter.storeResolver + String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + String realTimeTopicName = mockStoreRealTimeTopicNameResolver.apply(resolvedMetaStoreName); + + // Verify + assertNotNull(realTimeTopicName); + assertEquals(realTimeTopicName, mockMetaStoreRealTimeTopicName); + verify(mockStoreRealTimeTopicNameResolver, times(1)).apply(META_STORE_NAME); + } + + @Test + public void testMetaStoreResolutionWhenMetaStoreDoesNotExist() { + // Setup - metaStore is null + String STORE_NAME = "testStore"; + String META_STORE_NAME = "venice_system_store_meta_store_testStore"; + when(mockStoreRealTimeTopicNameResolver.apply(META_STORE_NAME)).thenReturn(null); + + // Execute + String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + String realTimeTopicName = mockStoreRealTimeTopicNameResolver.apply(resolvedMetaStoreName); + + // Verify - should handle null gracefully + assertNull(realTimeTopicName); + verify(mockStoreRealTimeTopicNameResolver, times(1)).apply(META_STORE_NAME); + } + + @Test + public void testMetaStoreNameDerivation() { + // Test that we're deriving the meta store name correctly + String STORE_NAME = "testStore"; + String META_STORE_NAME = "venice_system_store_meta_store_testStore"; + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + assertEquals(metaStoreName, META_STORE_NAME); + + // Verify the pattern + assertTrue(metaStoreName.startsWith(VeniceSystemStoreType.META_STORE.getPrefix())); + assertTrue(metaStoreName.endsWith(STORE_NAME)); + } + + @Test + public void testSkipMetaStoreResolutionForSystemStores() { + // Setup - if storeName itself is already a meta store, we shouldn't resolve it again + String STORE_NAME = "testStore"; + String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + + // Execute + boolean isSystemStore = VeniceSystemStoreType.META_STORE.isSystemStore(systemStoreName); + + // Verify - we should skip resolution for system stores + assertTrue(isSystemStore); + } + @Test(timeOut = 10 * Time.MS_PER_SECOND) public void testMissingZstdDictionary() throws Exception { doAnswer(invocation -> { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index 99b6349e3d1..be222abd50d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -42,6 +42,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; +import java.lang.reflect.Field; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; @@ -816,7 +817,7 @@ public void testLargestHeartbeatLag() { } @Test - public void testTriggerAutoResubscribe() { + public void testTriggerAutoResubscribe() throws Exception { String store = "foo"; int version = 100; int partition = 123; @@ -831,6 +832,11 @@ public void testTriggerAutoResubscribe() { HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + ReadOnlyStoreRepository metadataRepository = mock(ReadOnlyStoreRepository.class); + doReturn(mock(Store.class)).when(metadataRepository).getStore(store); + Field metadataRepositoryField = HeartbeatMonitoringService.class.getDeclaredField("metadataRepository"); + metadataRepositoryField.setAccessible(true); + metadataRepositoryField.set(heartbeatMonitoringService, metadataRepository); VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig(); doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService(); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java index 1fb67041109..be6772faade 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java @@ -56,7 +56,7 @@ public PushJobHeartbeatSender createHeartbeatSender( StoreInfo storeInfo = heartBeatStoreResponse.getStore(); PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig(); int partitionNum = storeInfo.getPartitionCount(); - String heartbeatKafkaTopicName = Utils.composeRealTimeTopic(heartbeatStoreName); + String heartbeatKafkaTopicName = Utils.getRealTimeTopicName(heartBeatStoreResponse.getStore()); VeniceWriter veniceWriter = getVeniceWriter( heartbeatKafkaTopicName, partitionerConfig, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index 84dc177e5e4..583da572e7b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -1,6 +1,5 @@ package com.linkedin.venice.meta; -import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.meta.Version.DEFAULT_RT_VERSION_NUMBER; import com.linkedin.venice.exceptions.StoreDisabledException; @@ -174,12 +173,7 @@ private void addVersion( HybridStoreConfig hybridStoreConfig = getHybridStoreConfig(); if (hybridStoreConfig != null) { HybridStoreConfig clonedHybridStoreConfig = hybridStoreConfig.clone(); - if (currentRTVersionNumber > DEFAULT_RT_VERSION_NUMBER) { - String newRealTimeTopicName = Utils.isRTVersioningApplicable(getName()) - ? Utils.composeRealTimeTopic(getName(), currentRTVersionNumber) - : DEFAULT_REAL_TIME_TOPIC_NAME; - clonedHybridStoreConfig.setRealTimeTopicName(newRealTimeTopicName); - } + clonedHybridStoreConfig.setRealTimeTopicName(Utils.composeRealTimeTopic(getName(), currentRTVersionNumber)); version.setHybridStoreConfig(clonedHybridStoreConfig); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index bbd80440dc5..d0079e8a5ea 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -3,12 +3,12 @@ import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; -import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Map; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -25,17 +25,24 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable { private final Map veniceWriters = new VeniceConcurrentHashMap<>(); private final Schema valueSchema; private final Schema updateSchema; + private final Function rtNameResolver; // writerFactory Used for instantiating VeniceWriter - public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schema valueSchema, Schema updateSchema) { + public PushStatusStoreVeniceWriterCache( + VeniceWriterFactory writerFactory, + Schema valueSchema, + Schema updateSchema, + Function rtNameResolver) { this.writerFactory = writerFactory; this.valueSchema = valueSchema; this.updateSchema = updateSchema; + this.rtNameResolver = rtNameResolver; } public VeniceWriter prepareVeniceWriter(String storeName) { return veniceWriters.computeIfAbsent(storeName, s -> { - String rtTopic = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); + String rtTopic = rtNameResolver.apply(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); + VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic) .setKeyPayloadSerializer( new VeniceAvroKafkaSerializer( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index f054b56c096..c778cb8e52a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -17,6 +17,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,12 +55,14 @@ public PushStatusStoreWriter( VeniceWriterFactory writerFactory, String instanceName, SchemaEntry valueSchemaEntry, - DerivedSchemaEntry updateSchemaEntry) { + DerivedSchemaEntry updateSchemaEntry, + Function storeResolver) { this( new PushStatusStoreVeniceWriterCache( writerFactory, valueSchemaEntry.getSchema(), - updateSchemaEntry.getSchema()), + updateSchemaEntry.getSchema(), + storeResolver), instanceName, updateSchemaEntry.getValueSchemaID(), updateSchemaEntry.getId(), diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 1a9972cb831..5bed25ec33e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -20,7 +20,6 @@ import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.systemstore.schemas.StoreValueSchema; import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; -import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceResourceCloseResult; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.writer.VeniceWriter; @@ -43,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.avro.Schema; @@ -77,6 +77,11 @@ public class MetaStoreWriter implements Closeable { private final PubSubTopicRepository pubSubTopicRepository; private final long closeTimeoutMs; private final int numOfConcurrentVwCloseOps; + /* + * Function to resolve store names to Store objects. Used to fetch system store metadata + * for determining the correct RT topic names. + */ + public Function realTimeTopicNameResolver; public MetaStoreWriter( TopicManager topicManager, @@ -84,7 +89,8 @@ public MetaStoreWriter( HelixReadOnlyZKSharedSchemaRepository schemaRepo, PubSubTopicRepository pubSubTopicRepository, long closeTimeoutMs, - int numOfConcurrentVwCloseOps) { + int numOfConcurrentVwCloseOps, + Function realTimeTopicNameResolver) { /** * TODO: get the write compute schema from the constructor so that this class does not use {@link WriteComputeSchemaConverter} */ @@ -97,7 +103,8 @@ public MetaStoreWriter( AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema()), pubSubTopicRepository, closeTimeoutMs, - numOfConcurrentVwCloseOps); + numOfConcurrentVwCloseOps, + realTimeTopicNameResolver); } MetaStoreWriter( @@ -107,7 +114,8 @@ public MetaStoreWriter( Schema derivedComputeSchema, PubSubTopicRepository pubSubTopicRepository, long closeTimeoutMs, - int numOfConcurrentVwCloseOps) { + int numOfConcurrentVwCloseOps, + Function realTimeTopicNameResolver) { this.topicManager = topicManager; this.writerFactory = writerFactory; this.derivedComputeSchema = derivedComputeSchema; @@ -115,6 +123,7 @@ public MetaStoreWriter( this.pubSubTopicRepository = pubSubTopicRepository; this.closeTimeoutMs = closeTimeoutMs; this.numOfConcurrentVwCloseOps = numOfConcurrentVwCloseOps; + this.realTimeTopicNameResolver = realTimeTopicNameResolver; } /** @@ -360,7 +369,8 @@ Map getMetaStoreWriterMap() { VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) { return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> { - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); + String metaStoreRealTimeTopicName = realTimeTopicNameResolver.apply(metaStoreName); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online"); } @@ -405,7 +415,7 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter, * to write a Control Message to the RT topic, and it could hang if the topic doesn't exist. * This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter. */ - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(realTimeTopicNameResolver.apply(metaStoreName)); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { LOGGER.info( "RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages", diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java index 5dc5af42732..fa4eab77ef5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java @@ -1,7 +1,7 @@ package com.linkedin.venice.utils; import static com.linkedin.venice.HttpConstants.LOCALHOST; -import static com.linkedin.venice.meta.Version.REAL_TIME_TOPIC_SUFFIX; +import static com.linkedin.venice.meta.Version.DEFAULT_RT_VERSION_NUMBER; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -25,6 +25,8 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.StoreVersionInfo; +import com.linkedin.venice.meta.SystemStore; +import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.pubsub.PubSubTopicImpl; @@ -614,29 +616,49 @@ public static File getTempDataDirectory(String prefix) { } } - /** This method should only be used for system stores. + /** This method should only be used to create old style rt topics. * For other stores, use {@link Utils#getRealTimeTopicName(Store)}, {@link Utils#getRealTimeTopicName(StoreInfo)} or * {@link Utils#getRealTimeTopicName(Version)} in source code. * For tests, use {@link Utils#composeRealTimeTopic(String, int)} */ public static String composeRealTimeTopic(String storeName) { - return storeName + REAL_TIME_TOPIC_SUFFIX; + return storeName + Version.REAL_TIME_TOPIC_SUFFIX; } public static String composeRealTimeTopic(String storeName, int versionNumber) { - return String.format(Version.REAL_TIME_TOPIC_TEMPLATE, storeName, versionNumber); + if (versionNumber == DEFAULT_RT_VERSION_NUMBER) { + return composeRealTimeTopic(storeName); + } else { + return String.format(Version.REAL_TIME_TOPIC_TEMPLATE, storeName, versionNumber); + } + } + + public static int largestUsedRTVersionNumber(Store store) { + if (store.isSystemStore() && VeniceSystemStoreUtils.isUserSystemStore(store.getName())) { + // store is a user level system store + Store userStore = store instanceof SystemStore ? ((SystemStore) store).getVeniceStore() : store; + return userStore.getLargestUsedRTVersionNumber(); + } else { + // store is a regular user store or a top level system store + return store.getLargestUsedRTVersionNumber(); + } + } + + public static String getRealTimeTopicName(Store store) { + return getRealTimeTopicName(store, largestUsedRTVersionNumber(store)); } /** * It follows the following order to search for real time topic name, * i) current store-version config, ii) store config, iii) other store-version configs, iv) default name */ - public static String getRealTimeTopicName(Store store) { + public static String getRealTimeTopicName(Store store, int rtVersionNumber) { return getRealTimeTopicName( store.getName(), store.getVersions(), store.getCurrentVersion(), - store.getHybridStoreConfig()); + store.getHybridStoreConfig(), + rtVersionNumber); } public static String getRealTimeTopicName(StoreInfo storeInfo) { @@ -644,26 +666,27 @@ public static String getRealTimeTopicName(StoreInfo storeInfo) { storeInfo.getName(), storeInfo.getVersions(), storeInfo.getCurrentVersion(), - storeInfo.getHybridStoreConfig()); + storeInfo.getHybridStoreConfig(), + DEFAULT_RT_VERSION_NUMBER); } - public static boolean isRTVersioningApplicable(String storeName) { - return !(VeniceSystemStoreUtils.isSystemStore(storeName) || VeniceSystemStoreUtils.isUserSystemStore(storeName) - || VeniceSystemStoreUtils.isParticipantStore(storeName)); + public static String getRealTimeTopicName(SystemStoreAttributes systemStoreAttributes) { + return getRealTimeTopicName( + null, + systemStoreAttributes.getVersions(), + systemStoreAttributes.getCurrentVersion(), + null, + DEFAULT_RT_VERSION_NUMBER); } public static String getRealTimeTopicName(Version version) { - if (!isRTVersioningApplicable(version.getStoreName())) { - return composeRealTimeTopic(version.getStoreName()); - } - if (version.isHybrid()) { String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName(); - return getRealTimeTopicNameIfEmpty(realTimeTopicName, version.getStoreName()); + return getRealTimeTopicNameIfEmpty(realTimeTopicName, version.getStoreName(), DEFAULT_RT_VERSION_NUMBER); } else { // if the version is not hybrid, caller should not ask for the real time topic, // but unfortunately that happens, so instead of throwing exception, we just return a default name. - return composeRealTimeTopic(version.getStoreName()); + return composeRealTimeTopic(version.getStoreName(), DEFAULT_RT_VERSION_NUMBER); } } @@ -671,17 +694,13 @@ public static Set getAllRealTimeTopicNames(Store store) { return store.getVersions().stream().map(Utils::getRealTimeTopicName).collect(Collectors.toSet()); } - static String getRealTimeTopicName( + public static String getRealTimeTopicName( String storeName, List versions, int currentVersionNumber, - HybridStoreConfig hybridStoreConfig) { - if (!isRTVersioningApplicable(storeName)) { - return composeRealTimeTopic(storeName); - } - + HybridStoreConfig hybridStoreConfig, + int rtVersionNumber) { Set realTimeTopicNames = new HashSet<>(); - for (Version version: versions) { if (version.isHybrid()) { String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName(); @@ -710,14 +729,16 @@ static String getRealTimeTopicName( if (hybridStoreConfig != null) { String realTimeTopicName = hybridStoreConfig.getRealTimeTopicName(); - return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName); + return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName, rtVersionNumber); } - return composeRealTimeTopic(storeName); + return composeRealTimeTopic(storeName, rtVersionNumber); } - private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) { - return StringUtils.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName; + private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName, int rtVersionNumber) { + return StringUtils.isBlank(realTimeTopicName) + ? composeRealTimeTopic(storeName, rtVersionNumber) + : realTimeTopicName; } public static String getRealTimeTopicNameFromSeparateRealTimeTopic(String separateRealTimeTopicName) { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java index f50e8324085..2c4f6eefb55 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java @@ -16,11 +16,15 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceResourceCloseResult; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -35,15 +39,152 @@ import org.apache.avro.Schema; import org.mockito.ArgumentCaptor; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class MetaStoreWriterTest { + Store metaStore = mock(Store.class); + String metaStoreName = "testStore"; + + @BeforeClass + public void setUp() { + doReturn(metaStoreName).when(metaStore).getName(); + } + + @Test + public void testGetOrCreateMetaStoreWriterWithSystemStore() { + // Setup mocks + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Schema derivedComputeSchema = mock(Schema.class); + + // Mock SystemStore and its components + SystemStore systemStore = mock(SystemStore.class); + Store veniceStore = mock(Store.class); + String systemStoreName = "venice_system_store_davinci_push_status_store_testStore"; + int expectedRTVersion = 5; + + when(systemStore.getName()).thenReturn(systemStoreName); + when(systemStore.isSystemStore()).thenReturn(true); + when(systemStore.getVeniceStore()).thenReturn(veniceStore); + when(veniceStore.getLargestUsedRTVersionNumber()).thenReturn(expectedRTVersion); + + // Mock the PubSubTopic and TopicManager + PubSubTopic mockTopic = mock(PubSubTopic.class); + when(mockTopic.getName()).thenReturn(systemStoreName + "_rt_v" + expectedRTVersion); + when(pubSubTopicRepository.getTopic(anyString())).thenReturn(mockTopic); + when(topicManager.containsTopicAndAllPartitionsAreOnline(any())).thenReturn(true); + + // Mock VeniceWriter + VeniceWriter mockWriter = mock(VeniceWriter.class); + when(writerFactory.createVeniceWriter(any())).thenReturn(mockWriter); + + // Create MetaStoreWriter + MetaStoreWriter metaStoreWriter = new MetaStoreWriter( + topicManager, + writerFactory, + schemaRepo, + derivedComputeSchema, + pubSubTopicRepository, + 5000L, + 2, + storeName -> Utils.getRealTimeTopicName(systemStore)); + + Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName)); + verify(systemStore, times(1)).getVeniceStore(); + } + + @Test + public void testGetOrCreateMetaStoreWriterWithRegularStore() { + // Setup mocks + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Schema derivedComputeSchema = mock(Schema.class); + + // Mock regular Store + Store regularStore = mock(Store.class); + String storeName = "regularStore"; + int expectedRTVersion = 3; + + when(regularStore.getName()).thenReturn(storeName); + when(regularStore.isSystemStore()).thenReturn(false); + when(regularStore.getLargestUsedRTVersionNumber()).thenReturn(expectedRTVersion); + + // Mock the PubSubTopic and TopicManager + PubSubTopic mockTopic = mock(PubSubTopic.class); + when(mockTopic.getName()).thenReturn(storeName + "_rt_v" + expectedRTVersion); + when(pubSubTopicRepository.getTopic(anyString())).thenReturn(mockTopic); + when(topicManager.containsTopicAndAllPartitionsAreOnline(any())).thenReturn(true); + + // Mock VeniceWriter + VeniceWriter mockWriter = mock(VeniceWriter.class); + when(writerFactory.createVeniceWriter(any())).thenReturn(mockWriter); + + // Create MetaStoreWriter + MetaStoreWriter metaStoreWriter = new MetaStoreWriter( + topicManager, + writerFactory, + schemaRepo, + derivedComputeSchema, + pubSubTopicRepository, + 5000L, + 2, + storeName1 -> Utils.getRealTimeTopicName(regularStore)); + + Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName)); + } + + @Test + public void testGetOrCreateMetaStoreWriterWithNonSystemStoreType() { + // Setup mocks + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Schema derivedComputeSchema = mock(Schema.class); + + // Mock a store that returns null for VeniceSystemStoreType but is a system store + Store store = mock(Store.class); + String storeName = "someSystemStore"; + int expectedRTVersion = 2; + + when(store.getName()).thenReturn(storeName); + when(store.isSystemStore()).thenReturn(false); // type is null, so condition fails + when(store.getLargestUsedRTVersionNumber()).thenReturn(expectedRTVersion); + + // Mock the PubSubTopic and TopicManager + PubSubTopic mockTopic = mock(PubSubTopic.class); + when(mockTopic.getName()).thenReturn(storeName + "_rt"); + when(pubSubTopicRepository.getTopic(anyString())).thenReturn(mockTopic); + when(topicManager.containsTopicAndAllPartitionsAreOnline(any())).thenReturn(true); + + // Mock VeniceWriter + VeniceWriter mockWriter = mock(VeniceWriter.class); + when(writerFactory.createVeniceWriter(any())).thenReturn(mockWriter); + + // Create MetaStoreWriter + MetaStoreWriter metaStoreWriter = new MetaStoreWriter( + topicManager, + writerFactory, + schemaRepo, + derivedComputeSchema, + pubSubTopicRepository, + 5000L, + 2, + storeName1 -> Utils.getRealTimeTopicName(store)); + + Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName)); + } + @Test public void testMetaStoreWriterWillRestartUponProduceFailure() { MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class); - String metaStoreName = "testStore"; HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); GeneratedSchemaID generatedSchemaID = mock(GeneratedSchemaID.class); doReturn(true).when(generatedSchemaID).isValid(); @@ -111,7 +252,8 @@ public void testClose(long closeTimeoutMs, int numOfConcurrentVwCloseOps) derivedComputeSchema, pubSubTopicRepository, closeTimeoutMs, - numOfConcurrentVwCloseOps); + numOfConcurrentVwCloseOps, + storeName -> metaStoreName + "_rt"); Map metaStoreWriters = metaStoreWriter.getMetaStoreWriterMap(); List> completedFutures = new ArrayList<>(20); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java index 2eb2b9cf71a..e7b2bd043ce 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java @@ -333,15 +333,18 @@ void testGetRealTimeTopicNameWithHybridConfig() { HybridStoreConfig mockHybridConfig = mock(HybridStoreConfig.class); when(mockHybridConfig.getRealTimeTopicName()).thenReturn("RealTimeTopic"); - String result = Utils.getRealTimeTopicName("TestStore", Collections.EMPTY_LIST, 1, mockHybridConfig); + String result = Utils.getRealTimeTopicName("TestStore", Collections.EMPTY_LIST, 1, mockHybridConfig, 1); assertEquals(result, "RealTimeTopic"); } @Test void testGetRealTimeTopicNameWithoutHybridConfig() { - String result = Utils.getRealTimeTopicName(STORE_NAME, Collections.EMPTY_LIST, 0, null); + String result = Utils.getRealTimeTopicName(STORE_NAME, Collections.EMPTY_LIST, 0, null, 0); assertEquals(result, STORE_NAME + Version.REAL_TIME_TOPIC_SUFFIX); + + result = Utils.getRealTimeTopicName(STORE_NAME, Collections.EMPTY_LIST, 0, null, 1); + assertEquals(result, STORE_NAME + Version.REAL_TIME_TOPIC_SUFFIX + "_v1"); } @Test @@ -358,7 +361,7 @@ void testGetRealTimeTopicNameWithConflictingVersions() { when(mockConfig1.getRealTimeTopicName()).thenReturn("RealTimeTopic1"); when(mockConfig2.getRealTimeTopicName()).thenReturn("RealTimeTopic2"); - String result = Utils.getRealTimeTopicName(STORE_NAME, Lists.newArrayList(mockVersion1, mockVersion2), 1, null); + String result = Utils.getRealTimeTopicName(STORE_NAME, Lists.newArrayList(mockVersion1, mockVersion2), 1, null, 1); assertTrue(result.equals("RealTimeTopic1") || result.equals("RealTimeTopic2")); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java index 8710ba1c8d9..9d5f9134e1e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java @@ -133,8 +133,16 @@ private static void verifyStoreState( String realTimeTopicInVersion = Utils.getRealTimeTopicName(currentVersion); String realTimeTopicInBackupVersion = Utils.getRealTimeTopicName(backupVersion); - Assert.assertEquals(realTimeTopicNameInVersionConfig, expectedRealTimeTopicNameInVersionConfig); - Assert.assertEquals(realTimeTopicNameInBackupVersionConfig, expectedRealTimeTopicNameInBackupVersionConfig); + Assert.assertEquals( + realTimeTopicNameInVersionConfig, + expectedRealTimeTopicNameInVersionConfig.isEmpty() + ? oldStyleRealTimeTopicName + : expectedRealTimeTopicNameInVersionConfig); + Assert.assertEquals( + realTimeTopicNameInBackupVersionConfig, + expectedRealTimeTopicNameInBackupVersionConfig.isEmpty() + ? oldStyleRealTimeTopicName + : expectedRealTimeTopicNameInBackupVersionConfig); Assert.assertEquals( realTimeTopicInVersion, expectedRealTimeTopicNameInVersionConfig.isEmpty() diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java index ded1bbc67a5..dba0f5505aa 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java @@ -653,8 +653,8 @@ public void testDeleteStoreRTDeletion() { StoreInfo storeInfo = parentControllerClient.getStore(storeName).getStore(); String storeRT = Utils.getRealTimeTopicName(storeInfo); String pushStatusSystemStoreRT = - Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); - String metaSystemStoreRT = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getMetaStoreName(storeName)); + Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName), 1); + String metaSystemStoreRT = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getMetaStoreName(storeName), 1); // Ensure all the RT topics are created in all child datacenters TestUtils.waitForNonDeterministicAssertion(300, TimeUnit.SECONDS, false, true, () -> { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java index a26fa049571..33df4cdbf67 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java @@ -648,7 +648,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { Exception notSystemStoreException = Assert.expectThrows( VeniceNoStoreException.class, - () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, metaStoreName)); + () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, metaStoreName, 1)); assertTrue( notSystemStoreException.getMessage().contains("does not exist in"), "Got unexpected error message: " + notSystemStoreException.getMessage()); @@ -663,7 +663,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { Exception exception = Assert.expectThrows( VeniceException.class, - () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName)); + () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName, 1)); assertTrue( exception.getMessage().contains("not a user system store"), "Got unexpected error message: " + notSystemStoreException.getMessage()); @@ -677,7 +677,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { TimeUnit.SECONDS, () -> !veniceAdmin.getTopicManager().containsTopic(pushStatusRealTimeTopic)); - veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, pushStatusStoreName); + veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, pushStatusStoreName, 0); TestUtils.waitForNonDeterministicCompletion( 30, TimeUnit.SECONDS, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index 24261541474..4ef76d8cd32 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -291,7 +291,8 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th cluster.getLeaderVeniceController().getVeniceHelixAdmin().getVeniceWriterFactory(), "dummyInstance", valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + (storeName) -> Utils.getRealTimeTopicName(controllerClient.getStore(storeName).getStore())); // After deleting the inc push status belonging to just one partition we should expect // SOIP from the controller since other partition has replicas with EOIP status diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java index 33f1ea2af89..a2d8110696f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java @@ -1,6 +1,5 @@ package com.linkedin.venice.integration.utils; -import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_MAX_ATTEMPT; @@ -460,20 +459,6 @@ public static DaVinciClient getGenericAvroDaVinciClient( return client; } - public static DaVinciClient getGenericAvroDaVinciClientWithoutMetaSystemStoreRepo( - String storeName, - String zkAddress, - String dataBasePath) { - Properties extraBackendConfig = new Properties(); - extraBackendConfig.setProperty(DATA_BASE_PATH, dataBasePath); - extraBackendConfig.setProperty(CLIENT_USE_SYSTEM_STORE_REPOSITORY, String.valueOf(false)); - return getGenericAvroDaVinciClient( - storeName, - zkAddress, - new DaVinciConfig(), - new VeniceProperties(extraBackendConfig)); - } - public static DaVinciClient getGenericAvroDaVinciClientWithRetries( String storeName, String zkAddress, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 437672ebd02..49f1bfa3735 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -972,6 +972,8 @@ default boolean isAdminTopicConsumptionEnabled(String clusterName) { int getLargestUsedVersion(String clusterName, String storeName); + int getLargestUsedRTVersion(String clusterName, String storeName); + /** * @return list of stores infos that are considered dead. A store is considered dead if it exists but has no * user traffic in it's read or write path. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java index b2627e57c86..571fb98eb47 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java @@ -9,6 +9,7 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.participant.protocol.ParticipantMessageKey; import com.linkedin.venice.participant.protocol.ParticipantMessageValue; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -64,12 +65,13 @@ public AvroSpecificStoreClient g }); } - public VeniceWriter getWriter(String clusterName) { + public VeniceWriter getWriter(String clusterName, Admin admin) { return writeClients.computeIfAbsent(clusterName, k -> { int attempts = 0; boolean verified = false; - PubSubTopic topic = pubSubTopicRepository - .getTopic(Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); + String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName); + Store store = admin.getStore(clusterName, participantStoreName); + PubSubTopic topic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); while (attempts < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS) { if (topicManagerRepository.getLocalTopicManager().containsTopicAndAllPartitionsAreOnline(topic)) { verified = true; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java index 4519ad88beb..50307978321 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java @@ -2,7 +2,6 @@ import static com.linkedin.venice.common.VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE; import static com.linkedin.venice.common.VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE; -import static com.linkedin.venice.meta.Version.DEFAULT_RT_VERSION_NUMBER; import com.linkedin.venice.authorization.AuthorizerService; import com.linkedin.venice.authorization.Resource; @@ -86,9 +85,11 @@ public static Version materializeSystemStore( int partitionCount = parentAdmin.calculateNumberOfPartitions(clusterName, systemStoreName); int replicationFactor = parentAdmin.getReplicationFactor(clusterName, systemStoreName); final int systemStoreLargestUsedVersionNumber = parentAdmin.getLargestUsedVersion(clusterName, systemStoreName); + final int systemStoreLargestUsedRTVersionNumber = parentAdmin.getLargestUsedRTVersion(clusterName, systemStoreName); LOGGER.info( - "Get largest used version: {} for system store: {} in cluster: {}", + "Get largest used version: {} and largest used rt version: {} for system store: {} in cluster: {}", systemStoreLargestUsedVersionNumber, + systemStoreLargestUsedRTVersionNumber, systemStoreName, clusterName); if (systemStoreLargestUsedVersionNumber == Store.NON_EXISTING_VERSION) { @@ -112,7 +113,7 @@ public static Version materializeSystemStore( false, null, -1, - DEFAULT_RT_VERSION_NUMBER); + systemStoreLargestUsedRTVersionNumber); } parentAdmin.writeEndOfPush(clusterName, systemStoreName, version.getNumber(), true); return version; @@ -139,6 +140,7 @@ public static void deleteSystemStore( LOGGER.info("Start deleting system store: {}", systemStoreName); admin.deleteAllVersionsInStore(clusterName, systemStoreName); pushMonitor.cleanupStoreStatus(systemStoreName); + Store systemStore = storeRepository.getStore(systemStoreName); if (!isStoreMigrating) { VeniceSystemStoreType storeType = VeniceSystemStoreType.getSystemStoreType(systemStoreName); if (storeType == null) { @@ -163,13 +165,12 @@ public static void deleteSystemStore( throw new VeniceException("Unknown system store type: " + systemStoreName); } // skip truncating system store RT topics if it's parent fabric as it's not created for parent fabric - if (!admin.isParent()) { - admin.truncateKafkaTopic(Utils.composeRealTimeTopic(systemStoreName)); + if (!admin.isParent() && systemStore != null) { + admin.truncateKafkaTopic(Utils.getRealTimeTopicName(systemStore)); } } else { LOGGER.info("The RT topic for: {} will not be deleted since the user store is migrating", systemStoreName); } - Store systemStore = storeRepository.getStore(systemStoreName); if (systemStore != null) { admin.truncateOldTopics(clusterName, systemStore, true); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index fd66b01853b..f0c6a147907 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -665,7 +665,8 @@ public VeniceHelixAdmin( zkSharedSchemaRepository, pubSubTopicRepository, commonConfig.getMetaStoreWriterCloseTimeoutInMS(), - commonConfig.getMetaStoreWriterCloseConcurrency()); + commonConfig.getMetaStoreWriterCloseConcurrency(), + storeName -> Utils.getRealTimeTopicName(getStore(discoverCluster(storeName), storeName))); metaStoreReader = new MetaStoreReader(d2Client, commonConfig.getClusterDiscoveryD2ServiceName()); pushStatusStoreReader = new PushStatusStoreReader( d2Client, @@ -680,7 +681,8 @@ public VeniceHelixAdmin( veniceWriterFactory, CONTROLLER_HEARTBEAT_INSTANCE_NAME, valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + storeName -> Utils.getRealTimeTopicName(getStore(discoverCluster(storeName), storeName))); }); clusterToLiveClusterConfigRepo = new VeniceConcurrentHashMap<>(); @@ -1058,9 +1060,9 @@ public synchronized void initStorageCluster(String clusterName) { Collections.singletonList(VeniceControllerStateModel.getPartitionNameFromVeniceClusterName(clusterName)); helixAdminClient.enablePartition(true, controllerClusterName, controllerName, clusterName, partitionNames); if (multiClusterConfigs.getControllerConfig(clusterName).isParticipantMessageStoreEnabled()) { - participantMessageStoreRTTMap.put( - clusterName, - Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); + String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName); + String realTimeTopicName = Utils.composeRealTimeTopic(participantStoreName); + participantMessageStoreRTTMap.put(clusterName, realTimeTopicName); } waitUntilClusterResourceIsVisibleInEV(clusterName); } @@ -1250,7 +1252,16 @@ private void configureNewStore( /* If this store existed previously, we do not want to use the same RT topic name that was used by the previous store. To ensure this, increase largestUsedRTVersionNumber and new RT name will be different */ if (config.isRealTimeTopicVersioningEnabled()) { - newStore.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber + 1); + int newNumber; + if (!newStore.isSystemStore() || VeniceSystemStoreUtils.isUserSystemStore(newStore.getName())) { + // user level system store OR regular store + newNumber = largestUsedRTVersionNumber + 1; + } else { + // top level system store + newNumber = largestUsedRTVersionNumber; + } + + newStore.setLargestUsedRTVersionNumber(newNumber); } } @@ -1643,9 +1654,10 @@ void sendPushJobDetailsToLocalRT(PushJobStatusRecordKey key, PushJobDetails valu + " is not configured"); } String pushJobDetailsStoreName = VeniceSystemStoreUtils.getPushJobDetailsStoreName(); + Store pushJobDetailsStore = getStore(pushJobStatusStoreClusterName, pushJobDetailsStoreName); if (pushJobDetailsRTTopic == null) { // Verify the RT topic exists and give some time in case it's getting created. - PubSubTopic expectedRTTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(pushJobDetailsStoreName)); + PubSubTopic expectedRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(pushJobDetailsStore)); for (int attempt = 0; attempt < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS; attempt++) { if (attempt > 0) Utils.sleep(INTERNAL_STORE_RTT_RETRY_BACKOFF_MS); @@ -2709,10 +2721,16 @@ public Version addVersionOnly( try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) { VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.META_STORE)) { - setUpMetaStoreAndMayProduceSnapshot(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpMetaStoreAndMayProduceSnapshot( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + largestUsedRTVersionNumber); } if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) { - setUpDaVinciPushStatusStore(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpDaVinciPushStatusStore( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + largestUsedRTVersionNumber); } // Update the store object to avoid potential system store flags reversion during repository.updateStore(store). @@ -2785,7 +2803,7 @@ public boolean addSpecificVersion(String clusterName, String storeName, Version if (store.containsVersion(versionNumber)) { throwVersionAlreadyExists(storeName, versionNumber); } else { - store.addVersion(version, false, DEFAULT_RT_VERSION_NUMBER); + store.addVersion(version, false, store.getLargestUsedRTVersionNumber()); } storeRepository.updateStore(store); } @@ -2987,6 +3005,8 @@ private Pair addVersion( Optional emergencySourceRegion, boolean versionSwapDeferred, int repushSourceVersion) { + Store store = getStore(clusterName, storeName); + int largestUsedRTVersionNumber = Utils.largestUsedRTVersionNumber(store); return addVersion( clusterName, storeName, @@ -3008,7 +3028,7 @@ private Pair addVersion( versionSwapDeferred, null, repushSourceVersion, - getStore(clusterName, storeName).getLargestUsedRTVersionNumber()); + largestUsedRTVersionNumber); } private Optional getVersionFromSourceCluster( @@ -3124,10 +3144,16 @@ private Pair addVersion( */ VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.META_STORE)) { - setUpMetaStoreAndMayProduceSnapshot(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpMetaStoreAndMayProduceSnapshot( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + currentRTVersionNumber); } if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) { - setUpDaVinciPushStatusStore(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpDaVinciPushStatusStore( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + currentRTVersionNumber); } Store store = repository.getStore(storeName); @@ -3839,7 +3865,7 @@ private Optional getVersionWithPushId(String clusterName, String storeN * @throws VeniceNoStoreException if the store does not exist in the specified cluster. * @throws VeniceException if the store is not a user system store or if the partition count is invalid. */ - void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String storeName) { + void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String storeName, int currentRTVersionNumber) { checkControllerLeadershipFor(clusterName); Store store = getStore(clusterName, storeName); if (store == null) { @@ -3852,7 +3878,8 @@ void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String sto throw new VeniceException( "Failed to create real time topic for store: " + storeName + " because it is not a user system store."); } - PubSubTopic realTimeTopic = getPubSubTopicRepository().getTopic(Utils.getRealTimeTopicName(store)); + PubSubTopic realTimeTopic = + getPubSubTopicRepository().getTopic(Utils.getRealTimeTopicName(store, currentRTVersionNumber)); TopicManager topicManager = getTopicManager(); if (topicManager.containsTopic(realTimeTopic)) { return; @@ -3875,7 +3902,8 @@ void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String sto } VeniceControllerClusterConfig clusterConfig = getControllerConfig(clusterName); LOGGER.info( - "Creating real time topic for user system store: {} with partition count: {}", + "Creating real time topic {} for user system store: {} with partition count: {}", + realTimeTopic, storeName, partitionCount); getTopicManager().createTopic( @@ -5243,8 +5271,9 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP // However, safeguards are necessary to prevent clients from toggling the hybrid setting with a partition update // in edge cases. if (!store.isHybrid()) { - PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); - if (realTimeTopicForBatchStore != null) { + String realTimeTopic = Utils.getRealTimeTopicName(store); + if (realTimeTopic != null) { + PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(realTimeTopic); TopicManager topicManager = getTopicManagerForCluster(clusterConfig); if (topicManager.containsTopic(realTimeTopicForBatchStore)) { int rtPartitionCount = topicManager.getPartitionCount(realTimeTopicForBatchStore); @@ -5284,9 +5313,8 @@ private TopicManager getTopicManagerForCluster(VeniceControllerClusterConfig clu } private void generateAndUpdateRealTimeTopicName(Store store) { - String newRealTimeTopicName = Utils.isRTVersioningApplicable(store.getName()) - ? Utils.composeRealTimeTopic(store.getName(), store.getLargestUsedRTVersionNumber() + 1) - : DEFAULT_REAL_TIME_TOPIC_NAME; + String newRealTimeTopicName = + Utils.composeRealTimeTopic(store.getName(), store.getLargestUsedRTVersionNumber() + 1); store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName); /* @@ -6358,9 +6386,8 @@ protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig( } String newRealTimeTopicName = oldStore.getLargestUsedRTVersionNumber() > DEFAULT_RT_VERSION_NUMBER - && Utils.isRTVersioningApplicable(oldStore.getName()) - ? Utils.composeRealTimeTopic(oldStore.getName(), oldStore.getLargestUsedRTVersionNumber()) - : DEFAULT_REAL_TIME_TOPIC_NAME; + ? Utils.composeRealTimeTopic(oldStore.getName(), oldStore.getLargestUsedRTVersionNumber()) + : DEFAULT_REAL_TIME_TOPIC_NAME; mergedHybridStoreConfig = new HybridStoreConfigImpl( hybridRewindSeconds.get(), @@ -8160,7 +8187,7 @@ public void deleteParticipantStoreKillMessage(String clusterName, String kafkaTo "Delete KILL ingestion message for topic: {} from participant store in cluster: {}", kafkaTopic, clusterName); - VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName); + VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName, this); ParticipantMessageKey key = new ParticipantMessageKey(); key.resourceName = kafkaTopic; key.messageType = ParticipantMessageType.KILL_PUSH_JOB.getValue(); @@ -8170,7 +8197,7 @@ public void deleteParticipantStoreKillMessage(String clusterName, String kafkaTo public void sendKillMessageToParticipantStore(String clusterName, String kafkaTopic) { LOGGER.info("Send kill message for topic: {} to participant store of cluster: {}", kafkaTopic, clusterName); - VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName); + VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName, this); ParticipantMessageType killPushJobType = ParticipantMessageType.KILL_PUSH_JOB; ParticipantMessageKey key = new ParticipantMessageKey(); key.resourceName = kafkaTopic; @@ -9308,7 +9335,7 @@ Long getInMemoryTopicCreationTime(String topic) { return topicToCreationTime.get(topic); } - private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { + private void setUpDaVinciPushStatusStore(String clusterName, String storeName, int currentRTVersionNumber) { checkControllerLeadershipFor(clusterName); ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository(); Store store = repository.getStore(storeName); @@ -9319,7 +9346,7 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { if (!isParent()) { // We do not materialize PS3 for parent region. Hence, skip RT topic creation. - ensureRealTimeTopicExistsForUserSystemStores(clusterName, daVinciPushStatusStoreName); + ensureRealTimeTopicExistsForUserSystemStores(clusterName, daVinciPushStatusStoreName, currentRTVersionNumber); } if (!store.isDaVinciPushStatusStoreEnabled()) { storeMetadataUpdate(clusterName, storeName, (s, resources) -> { @@ -9334,7 +9361,10 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { * @param clusterName The cluster name. * @param regularStoreName The regular user store name. */ - void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) { + void setUpMetaStoreAndMayProduceSnapshot( + String clusterName, + String regularStoreName, + int largestUsedRTVersionNumber) { checkControllerLeadershipFor(clusterName); ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository(); Store store = repository.getStore(regularStoreName); @@ -9347,7 +9377,8 @@ void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStore // We do not materialize meta store for parent region. Hence, skip RT topic creation. ensureRealTimeTopicExistsForUserSystemStores( clusterName, - VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName)); + VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName), + largestUsedRTVersionNumber); } // Update the store flag to enable meta system store. @@ -9592,6 +9623,17 @@ public int getLargestUsedVersion(String clusterName, String storeName) { return Math.max(store.getLargestUsedVersionNumber(), getStoreGraveyard().getLargestUsedVersionNumber(storeName)); } + @Override + public int getLargestUsedRTVersion(String clusterName, String storeName) { + Store store = getStore(clusterName, storeName); + // If the store does not exist, check the store graveyard. + if (store == null) { + return getStoreGraveyard().getLargestUsedRTVersionNumber(storeName); + } + return Math + .max(store.getLargestUsedRTVersionNumber(), getStoreGraveyard().getLargestUsedRTVersionNumber(storeName)); + } + /** * @see StoragePersonaRepository#addPersona(String, long, Set, Set) */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 459dd7b43ef..0de3528c8dc 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -231,6 +231,7 @@ import com.linkedin.venice.meta.StoreGraveyard; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.StoreVersionInfo; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.VeniceETLStrategy; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; @@ -1872,7 +1873,10 @@ public Version incrementVersionIdempotent( newVersion = getVeniceHelixAdmin().getIncrementalPushVersion(clusterName, storeName, pushJobId); } else { validateTargetedRegions(targetedRegions, clusterName); - + int largestUsedRTVersionNumber = + store.isSystemStore() && VeniceSystemStoreType.getSystemStoreType(store.getName()) != null + ? ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber() + : store.getLargestUsedRTVersionNumber(); newVersion = addVersionAndTopicOnly( clusterName, storeName, @@ -1890,7 +1894,7 @@ public Version incrementVersionIdempotent( versionSwapDeferred, targetedRegions, repushSourceVersion, - store.getLargestUsedRTVersionNumber()); + largestUsedRTVersionNumber); } if (VeniceSystemStoreType.getSystemStoreType(storeName) == null) { if (pushType.isBatch()) { @@ -2082,7 +2086,7 @@ Version getIncrementalPushVersion(Version incrementalPushVersion, ExecutionStatu throw new VeniceException("Cannot start incremental push since batch push is on going." + " store: " + storeName); } - String incrementalPushTopic = Utils.composeRealTimeTopic(storeName); + String incrementalPushTopic = Utils.getRealTimeTopicName(incrementalPushVersion); if (status.isError() || getVeniceHelixAdmin().isTopicTruncated(incrementalPushTopic)) { throw new VeniceException( "Cannot start incremental push since previous batch push has failed. Please run another bash job." @@ -5609,6 +5613,11 @@ public int getLargestUsedVersion(String clusterName, String storeName) { return aggregatedLargestUsedVersionNumber; } + @Override + public int getLargestUsedRTVersion(String clusterName, String storeName) { + return getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName); + } + /** * Unsupported operation in the parent controller. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java index 050386ecdcf..a714dbac86e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java @@ -248,7 +248,7 @@ public void transmitVersionSwapMessage(Store store, int previousVersion, int nex broadcastVersionSwap(previousStoreVersion, nextStoreVersion, rtForNextVersion); } } else { - LOGGER.info("RT doesn't exist for store: {}. Skipping broadcast for Version Swap message."); + LOGGER.info("RT doesn't exist for store: {}. Skipping broadcast for Version Swap message.", storeName); } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java index 3aaec3ee615..a7880c541d3 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.controller; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -14,6 +15,7 @@ import com.linkedin.venice.client.store.AvroSpecificStoreClient; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.participant.protocol.ParticipantMessageKey; import com.linkedin.venice.participant.protocol.ParticipantMessageValue; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -39,6 +41,7 @@ public class ParticipantStoreClientsManagerTest { private VeniceWriterFactory veniceWriterFactory; private PubSubTopicRepository pubSubTopicRepository; private ParticipantStoreClientsManager participantStoreClientsManager; + private Admin mockAdmin = mock(Admin.class); @BeforeMethod public void setUp() { @@ -52,6 +55,12 @@ public void setUp() { topicManagerRepository, veniceWriterFactory, pubSubTopicRepository); + Store mockStore = mock(Store.class); + String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(CLUSTER_NAME); + when(mockAdmin.getStore(eq(CLUSTER_NAME), eq(participantStoreName))).thenReturn(mockStore); + when(mockStore.isSystemStore()).thenReturn(true); + when(mockStore.isHybrid()).thenReturn(true); + when(mockStore.getName()).thenReturn(participantStoreName); } @Test @@ -111,12 +120,12 @@ public void testGetWriter() { return null; }).when(veniceWriterFactory).createVeniceWriter(any()); - VeniceWriter participantStoreWriter = participantStoreClientsManager.getWriter(CLUSTER_NAME); + VeniceWriter participantStoreWriter = participantStoreClientsManager.getWriter(CLUSTER_NAME, mockAdmin); assertNotNull(participantStoreWriter); assertSame(mockWriter, participantStoreWriter); // call getWriter again with the same cluster name - VeniceWriter result2 = participantStoreClientsManager.getWriter(CLUSTER_NAME); + VeniceWriter result2 = participantStoreClientsManager.getWriter(CLUSTER_NAME, mockAdmin); assertSame(participantStoreWriter, result2); } @@ -129,7 +138,7 @@ public void testGetWriterTopicNotFound() { when(localTopicManager.containsTopicAndAllPartitionsAreOnline(participantStoreTopic)).thenReturn(false); Exception exception = - expectThrows(VeniceException.class, () -> participantStoreClientsManager.getWriter(CLUSTER_NAME)); + expectThrows(VeniceException.class, () -> participantStoreClientsManager.getWriter(CLUSTER_NAME, mockAdmin)); assertEquals( exception.getMessage(), "Can't find the expected topic " + participantStoreTopic + " for participant message store " diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java index fc98702bc90..772d86a3ea6 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java @@ -69,6 +69,7 @@ public void setUp() { public void testDeleteMetaSystemStore() { // Test when running in child fabric - should truncate RT topic when(mockAdmin.isParent()).thenReturn(false); + when(mockStore.getName()).thenReturn(META_STORE_NAME); // Test deleting a META_STORE system store UserSystemStoreLifeCycleHelper.deleteSystemStore( mockAdmin, @@ -92,6 +93,7 @@ public void testDeleteMetaSystemStore() { public void testDeleteDaVinciPushStatusSystemStore() { // Test when running in child fabric - should truncate RT topic when(mockAdmin.isParent()).thenReturn(false); + when(mockStore.getName()).thenReturn(DAVINCI_STORE_NAME); // Test deleting a DAVINCI_PUSH_STATUS_STORE system store UserSystemStoreLifeCycleHelper.deleteSystemStore( mockAdmin, @@ -115,6 +117,7 @@ public void testDeleteDaVinciPushStatusSystemStore() { public void testDeleteBatchJobHeartbeatSystemStore() { // Test when running in child fabric - should truncate RT topic when(mockAdmin.isParent()).thenReturn(false); + when(mockStore.getName()).thenReturn(HEARTBEAT_STORE_NAME); // Test deleting a BATCH_JOB_HEARTBEAT_STORE system store // This should log an error but not throw an exception UserSystemStoreLifeCycleHelper.deleteSystemStore( diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 5f168ee8e62..d27ae0c51bd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -162,8 +162,8 @@ public void testDropResources() { @Test public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() { VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); - doNothing().when(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString()); - doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + doNothing().when(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString(), anyInt()); + doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString(), anyInt()); InOrder inorder = inOrder(veniceHelixAdmin); @@ -176,11 +176,11 @@ public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() { doReturn(store).when(repo).getStore(anyString()); doReturn(Boolean.FALSE).when(store).isDaVinciPushStatusStoreEnabled(); - veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString(), anyInt()); // Enforce that ensureRealTimeTopicExistsForUserSystemStores happens before storeMetadataUpdate. See the above // comments for the reasons. - inorder.verify(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString()); + inorder.verify(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString(), anyInt()); inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any()); } @@ -371,10 +371,11 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { // Case 1: Store does not exist doReturn(null).when(veniceHelixAdmin).getStore(clusterName, storeName); doNothing().when(veniceHelixAdmin).checkControllerLeadershipFor(clusterName); - doCallRealMethod().when(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString()); + doCallRealMethod().when(veniceHelixAdmin) + .ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString(), anyInt()); Exception notFoundException = expectThrows( VeniceException.class, - () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName)); + () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName, 1)); assertTrue( notFoundException.getMessage().contains("does not exist in"), "Actual message: " + notFoundException.getMessage()); @@ -383,7 +384,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(userStore).when(veniceHelixAdmin).getStore(clusterName, storeName); Exception notUserSystemStoreException = expectThrows( VeniceException.class, - () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName)); + () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName, 1)); assertTrue( notUserSystemStoreException.getMessage().contains("is not a user system store"), "Actual message: " + notUserSystemStoreException.getMessage()); @@ -394,7 +395,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(Collections.emptyList()).when(systemStore).getVersions(); doReturn(systemStore).when(veniceHelixAdmin).getStore(clusterName, systemStoreName); doReturn(true).when(topicManager).containsTopic(any(PubSubTopic.class)); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); verify(topicManager, times(1)).containsTopic(any(PubSubTopic.class)); HelixVeniceClusterResources veniceClusterResources = mock(HelixVeniceClusterResources.class); @@ -407,7 +408,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { topicManager = mock(TopicManager.class); doReturn(topicManager).when(veniceHelixAdmin).getTopicManager(); doReturn(false).doReturn(true).when(topicManager).containsTopic(any(PubSubTopic.class)); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); verify(topicManager, times(2)).containsTopic(any(PubSubTopic.class)); verify(topicManager, never()).createTopic( any(PubSubTopic.class), @@ -424,7 +425,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { when(veniceHelixAdmin.getControllerConfig(clusterName)).thenReturn(clusterConfig); doReturn(0).when(systemStore).getPartitionCount(); doReturn(false).when(topicManager).containsTopic(any(PubSubTopic.class)); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); // should not throw + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); // should not throw ArgumentCaptor partitionCountArgumentCaptor = ArgumentCaptor.forClass(Integer.class); verify(topicManager, times(1)).createTopic( any(PubSubTopic.class), @@ -444,7 +445,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(false).when(topicManager).containsTopic(any(PubSubTopic.class)); doReturn(null).when(systemStore).getVersion(anyInt()); doReturn(5).when(systemStore).getPartitionCount(); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); verify(topicManager, times(1)).createTopic( any(PubSubTopic.class), partitionCountArgumentCaptor.capture(), @@ -462,7 +463,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(false).when(topicManager).containsTopic(any(PubSubTopic.class)); doReturn(version).when(systemStore).getVersion(anyInt()); doReturn(10).when(version).getPartitionCount(); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); partitionCountArgumentCaptor = ArgumentCaptor.forClass(Integer.class); verify(topicManager, times(1)).createTopic( any(PubSubTopic.class),