From 4296ca1e0ca44cad32c209a3cfa1fdbd95e5366b Mon Sep 17 00:00:00 2001 From: Arjun Date: Fri, 24 Oct 2025 17:42:10 -0700 Subject: [PATCH 1/5] extend rt versioning to system stores --- .../com/linkedin/davinci/DaVinciBackend.java | 21 +- .../helix/HelixParticipationService.java | 3 +- .../consumer/KafkaStoreIngestionService.java | 8 +- .../kafka/consumer/StoreIngestionTask.java | 9 +- .../heartbeat/HeartbeatMonitoringService.java | 2 +- .../linkedin/davinci/DaVinciBackendTest.java | 13 + .../consumer/StoreIngestionTaskTest.java | 243 ++++++++++++++++++ .../DefaultPushJobHeartbeatSenderFactory.java | 2 +- .../linkedin/venice/meta/AbstractStore.java | 8 +- .../com/linkedin/venice/meta/SystemStore.java | 3 + .../PushStatusStoreVeniceWriterCache.java | 23 +- .../PushStatusStoreWriter.java | 7 +- .../venice/system/store/MetaStoreWriter.java | 28 +- .../java/com/linkedin/venice/utils/Utils.java | 68 +++-- .../system/store/MetaStoreWriterTest.java | 161 +++++++++++- .../com/linkedin/venice/utils/UtilsTest.java | 9 +- ...toreRepartitioningWithMultiDataCenter.java | 12 +- ...stParentControllerWithMultiDataCenter.java | 4 +- ...VeniceHelixAdminWithSharedEnvironment.java | 6 +- .../venice/endToEnd/PushStatusStoreTest.java | 3 +- .../integration/utils/ServiceFactory.java | 15 -- .../integrationTest/resources/log4j2-test.xml | 36 +++ .../com/linkedin/venice/controller/Admin.java | 2 + .../ParticipantStoreClientsManager.java | 8 +- .../UserSystemStoreLifeCycleHelper.java | 11 +- .../venice/controller/VeniceHelixAdmin.java | 111 +++++--- .../controller/VeniceParentHelixAdmin.java | 23 +- .../control/RealTimeTopicSwitcher.java | 2 +- .../ParticipantStoreClientsManagerTest.java | 15 +- .../controller/TestVeniceHelixAdmin.java | 25 +- 30 files changed, 742 insertions(+), 139 deletions(-) create mode 100644 internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index d712cfeb020..79c94576e2d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -57,6 +57,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataChangedListener; import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pushmonitor.ExecutionStatus; @@ -308,7 +309,8 @@ public DaVinciBackend( ingestionService.getVeniceWriterFactory(), instanceName, valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + (this::getStore)); } ingestionService.start(); @@ -560,6 +562,23 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() { return storeRepository; } + public final Object getStore(String storeName) { + VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); + if (systemStoreType != null) { + String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName); + Store userStore = storeRepository.getStore(userStoreName); + Map systemStores = userStore.getSystemStores(); + for (Map.Entry systemStoreEntries: systemStores.entrySet()) { + if (storeName.startsWith(systemStoreEntries.getKey())) { + return systemStoreEntries.getValue(); + } + } + return null; + } else { + return storeRepository.getStore(storeName); + } + } + public ObjectCacheBackend getObjectCache() { return cacheBackend.get(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java index 685282b32eb..3ab10312003 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java @@ -361,7 +361,8 @@ private void asyncStart() { ingestionService.getVeniceWriterFactory(), instance.getNodeId(), valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + helixReadOnlyStoreRepository::getStore); // Record replica status in Zookeeper. // Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 3dba837d2af..5ae472d5109 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -393,7 +393,8 @@ public KafkaStoreIngestionService( zkSharedSchemaRepository.get(), pubSubTopicRepository, serverConfig.getMetaStoreWriterCloseTimeoutInMS(), - serverConfig.getMetaStoreWriterCloseConcurrency()); + serverConfig.getMetaStoreWriterCloseConcurrency(), + storeName -> metadataRepo.getStore(storeName)); metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() { @Override public void handleStoreDeleted(Store store) { @@ -1486,7 +1487,8 @@ public InternalDaVinciRecordTransformerConfig getInternalRecordTransformerConfig return storeNameToInternalRecordTransformerConfig.get(storeName); } - public void attemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) { + public void attemptToPrintIngestionInfoFor(Store store, Integer version, Integer partition, String regionName) { + String storeName = store.getName(); try { PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version)); StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName()); @@ -1508,7 +1510,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In String infoPrefix = "isCurrentVersion: " + (storeIngestionTask.isCurrentVersion()) + "\n"; if (storeIngestionTask.isHybridMode() && partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) { - ingestingTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(storeName)); + ingestingTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); } PubSubTopicPartition ingestingTopicPartition = new PubSubTopicPartitionImpl(ingestingTopic, partition); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 15a910d3254..07b5489775a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -4357,9 +4357,12 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep // cluster these metastore writes could be spiky if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) { String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); - PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); - if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) { - metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId); + Store metaStore = metaStoreWriter.storeResolver.apply(metaStoreName); + if (metaStore != null) { + PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(metaStore)); + if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) { + metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId); + } } } return; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index a691c091291..dab5020fabd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -647,7 +647,7 @@ void checkAndMaybeLogHeartbeatDelayMap( heartbeatTs, currentTimestamp); getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor( - storeName.getKey(), + metadataRepository.getStore(storeName.getKey()), version.getKey(), partition.getKey(), region.getKey()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index f80e3722447..8a8e934ff80 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -11,6 +11,7 @@ import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -34,6 +35,7 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.helix.ZkClientFactory; import com.linkedin.venice.meta.ClusterInfoProvider; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; @@ -43,6 +45,7 @@ import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.avro.SchemaPresenceChecker; import com.linkedin.venice.service.ICProvider; +import com.linkedin.venice.stats.ZkClientStatusStats; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.metrics.MetricsRepository; import java.util.Optional; @@ -50,9 +53,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.helix.zookeeper.impl.client.ZkClient; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -67,6 +72,14 @@ public class DaVinciBackendTest { private MockedConstruction mockMetadataBuilder; private MockedConstruction mockSchemaPresenceChecker; + @BeforeClass + public void init() { + MockedStatic mockZkFactory = mockStatic(ZkClientFactory.class); + ZkClient mockZkClient = mock(ZkClient.class); + mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient); + doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class)); + } + @BeforeMethod public void setUp() throws Exception { ClientConfig clientConfig = new ClientConfig(STORE_NAME).setVeniceURL("http://localhost:7777") diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 09b556c27f8..f5dfa59f80c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -113,6 +113,7 @@ import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; import com.linkedin.davinci.transformer.TestStringRecordTransformer; import com.linkedin.davinci.validation.DataIntegrityValidator; +import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceIngestionTaskKilledException; @@ -203,6 +204,7 @@ import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.unit.matchers.ExceptionClassMatcher; import com.linkedin.venice.unit.matchers.NonEmptyStringMatcher; @@ -464,6 +466,10 @@ public static Object[][] sortedInputAndAAConfigProvider() { private Supplier storeVersionStateSupplier = () -> new StoreVersionState(); private MockStoreVersionConfigs storeAndVersionConfigsUnderTest; + private MetaStoreWriter mockMetaStoreWriter; + private Function mockStoreResolver; + private Store mockMetaStore; + private static byte[] getRandomKey(Integer partition) { String randomString = Utils.getUniqueString("KeyForPartition" + partition); return ByteBuffer.allocate(randomString.length() + 1) @@ -584,6 +590,11 @@ public void methodSetUp() throws Exception { kafkaUrlToRecordsThrottler = new HashMap<>(); kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); + mockMetaStoreWriter = mock(MetaStoreWriter.class); + mockStoreResolver = mock(Function.class); + mockMetaStore = mock(Store.class); + mockMetaStoreWriter.storeResolver = mockStoreResolver; + mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); doReturn(mockTopicManager).when(mockTopicManagerRepository).getLocalTopicManager(); @@ -1468,6 +1479,238 @@ private MockInMemoryPartitionPosition getTopicPartitionOffsetPair( offset); } + @Test + public void testMetaStoreWriterIntegration() throws Exception { + // Setup + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + int versionNumber = 1; + int schemaId = 1; + + Store mockMetaStore = mock(Store.class); + when(mockMetaStore.getName()).thenReturn(metaStoreName); + when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.storeResolver = mockStoreResolver; + + // Meta store exists + when(mockStoreResolver.apply(metaStoreName)).thenReturn(mockMetaStore); + + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore)); + when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true); + + // Setup StoreIngestionTask with real metaStoreWriter + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, schemaId).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that the meta store was resolved + verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + + // Verify that writeInUseValueSchema was called + verify(mockMetaStoreWriter, timeout(TEST_TIMEOUT_MS)) + .writeInUseValueSchema(storeNameWithoutVersionInfo, versionNumber, schemaId); + }, AA_OFF); + + // Override to inject our mock metaStoreWriter + config.setBeforeStartingConsumption(() -> { + try { + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreWriterWhenMetaStoreDoesNotExist() throws Exception { + // Setup + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + int schemaId = 1; + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.storeResolver = mockStoreResolver; + + // Meta store does NOT exist + when(mockStoreResolver.apply(metaStoreName)).thenReturn(null); + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, schemaId).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that the meta store was checked + verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + + // Verify that writeInUseValueSchema was NOT called (meta store doesn't exist) + verify(mockMetaStoreWriter, never()).writeInUseValueSchema(anyString(), anyInt(), anyInt()); + }, AA_OFF); + + config.setBeforeStartingConsumption(() -> { + try { + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreWriterWhenRTTopicDoesNotExist() throws Exception { + // Setup + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + int schemaId = 1; + + Store mockMetaStore = mock(Store.class); + when(mockMetaStore.getName()).thenReturn(metaStoreName); + when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.storeResolver = mockStoreResolver; + + // Meta store exists + when(mockStoreResolver.apply(metaStoreName)).thenReturn(mockMetaStore); + + // But RT topic does NOT exist + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore)); + when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(false); + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, schemaId).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that the meta store was resolved + verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + + // Verify RT topic check was made + verify(mockTopicManager, timeout(TEST_TIMEOUT_MS).atLeastOnce()).containsTopicWithRetries(metaStoreRTTopic, 5); + + // Verify that writeInUseValueSchema was NOT called (RT topic doesn't exist) + verify(mockMetaStoreWriter, never()).writeInUseValueSchema(anyString(), anyInt(), anyInt()); + }, AA_OFF); + + config.setBeforeStartingConsumption(() -> { + try { + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreWriterSkippedForSystemStore() throws Exception { + // When the store itself IS a system store, it should skip meta store writing + String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); + + MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.storeResolver = mockStoreResolver; + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID).get(); + + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + // Verify that storeResolver was NEVER called (system store should be skipped) + verify(mockStoreResolver, never()).apply(anyString()); + verify(mockMetaStoreWriter, never()).writeInUseValueSchema(anyString(), anyInt(), anyInt()); + }, AA_OFF); + + config.setBeforeStartingConsumption(() -> { + try { + // Override store name to be a system store + Field storeNameField = StoreIngestionTask.class.getDeclaredField("storeName"); + storeNameField.setAccessible(true); + storeNameField.set(storeIngestionTaskUnderTest, systemStoreName); + + Field metaStoreWriterField = StoreIngestionTask.class.getDeclaredField("metaStoreWriter"); + metaStoreWriterField.setAccessible(true); + metaStoreWriterField.set(storeIngestionTaskUnderTest, mockMetaStoreWriter); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + runTest(config); + } + + @Test + public void testMetaStoreResolutionWhenMetaStoreExists() { + // Setup + String STORE_NAME = "testStore"; + String META_STORE_NAME = "venice_system_store_meta_store_testStore"; + + when(mockStoreResolver.apply(META_STORE_NAME)).thenReturn(mockMetaStore); + when(mockMetaStore.getName()).thenReturn(META_STORE_NAME); + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore)); + when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true); + + // Execute: Call the method that uses metaStoreWriter.storeResolver + String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + Store resolvedStore = mockStoreResolver.apply(resolvedMetaStoreName); + + // Verify + assertNotNull(resolvedStore); + assertEquals(resolvedStore.getName(), META_STORE_NAME); + verify(mockStoreResolver, times(1)).apply(META_STORE_NAME); + } + + @Test + public void testMetaStoreResolutionWhenMetaStoreDoesNotExist() { + // Setup - metaStore is null + String STORE_NAME = "testStore"; + String META_STORE_NAME = "venice_system_store_meta_store_testStore"; + when(mockStoreResolver.apply(META_STORE_NAME)).thenReturn(null); + + // Execute + String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + Store resolvedStore = mockStoreResolver.apply(resolvedMetaStoreName); + + // Verify - should handle null gracefully + assertNull(resolvedStore); + verify(mockStoreResolver, times(1)).apply(META_STORE_NAME); + } + + @Test + public void testMetaStoreNameDerivation() { + // Test that we're deriving the meta store name correctly + String STORE_NAME = "testStore"; + String META_STORE_NAME = "venice_system_store_meta_store_testStore"; + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + assertEquals(metaStoreName, META_STORE_NAME); + + // Verify the pattern + assertTrue(metaStoreName.startsWith(VeniceSystemStoreType.META_STORE.getPrefix())); + assertTrue(metaStoreName.endsWith(STORE_NAME)); + } + + @Test + public void testSkipMetaStoreResolutionForSystemStores() { + // Setup - if storeName itself is already a meta store, we shouldn't resolve it again + String STORE_NAME = "testStore"; + String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); + + // Execute + boolean isSystemStore = VeniceSystemStoreType.META_STORE.isSystemStore(systemStoreName); + + // Verify - we should skip resolution for system stores + assertTrue(isSystemStore); + } + @Test(timeOut = 10 * Time.MS_PER_SECOND) public void testMissingZstdDictionary() throws Exception { doAnswer(invocation -> { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java index 1fb67041109..be6772faade 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java @@ -56,7 +56,7 @@ public PushJobHeartbeatSender createHeartbeatSender( StoreInfo storeInfo = heartBeatStoreResponse.getStore(); PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig(); int partitionNum = storeInfo.getPartitionCount(); - String heartbeatKafkaTopicName = Utils.composeRealTimeTopic(heartbeatStoreName); + String heartbeatKafkaTopicName = Utils.getRealTimeTopicName(heartBeatStoreResponse.getStore()); VeniceWriter veniceWriter = getVeniceWriter( heartbeatKafkaTopicName, partitionerConfig, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index 84dc177e5e4..583da572e7b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -1,6 +1,5 @@ package com.linkedin.venice.meta; -import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.meta.Version.DEFAULT_RT_VERSION_NUMBER; import com.linkedin.venice.exceptions.StoreDisabledException; @@ -174,12 +173,7 @@ private void addVersion( HybridStoreConfig hybridStoreConfig = getHybridStoreConfig(); if (hybridStoreConfig != null) { HybridStoreConfig clonedHybridStoreConfig = hybridStoreConfig.clone(); - if (currentRTVersionNumber > DEFAULT_RT_VERSION_NUMBER) { - String newRealTimeTopicName = Utils.isRTVersioningApplicable(getName()) - ? Utils.composeRealTimeTopic(getName(), currentRTVersionNumber) - : DEFAULT_REAL_TIME_TOPIC_NAME; - clonedHybridStoreConfig.setRealTimeTopicName(newRealTimeTopicName); - } + clonedHybridStoreConfig.setRealTimeTopicName(Utils.composeRealTimeTopic(getName(), currentRTVersionNumber)); version.setHybridStoreConfig(clonedHybridStoreConfig); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java index 2490cddad1c..7b2d790e4f7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java @@ -93,6 +93,9 @@ private void throwUnsupportedOperationException(String method) { */ private synchronized SystemStoreAttributes fetchAndBackfillSystemStoreAttributes(boolean readAccess) { SystemStoreAttributes systemStoreAttributes = veniceStore.getSystemStores().get(systemStoreType.getPrefix()); + if (veniceStore.getSystemStores().size() > 0) { + System.out.println(); + } if (systemStoreAttributes == null) { if (readAccess) { return DEFAULT_READ_ONLY_SYSTEM_STORE_ATTRIBUTE; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index bbd80440dc5..8497950aa11 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -1,6 +1,9 @@ package com.linkedin.venice.pushstatushelper; import com.linkedin.venice.common.VeniceSystemStoreUtils; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.utils.Utils; @@ -9,6 +12,7 @@ import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Map; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -25,17 +29,32 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable { private final Map veniceWriters = new VeniceConcurrentHashMap<>(); private final Schema valueSchema; private final Schema updateSchema; + Function storeResolver; // writerFactory Used for instantiating VeniceWriter - public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schema valueSchema, Schema updateSchema) { + public PushStatusStoreVeniceWriterCache( + VeniceWriterFactory writerFactory, + Schema valueSchema, + Schema updateSchema, + Function storeResolver) { this.writerFactory = writerFactory; this.valueSchema = valueSchema; this.updateSchema = updateSchema; + this.storeResolver = storeResolver; } public VeniceWriter prepareVeniceWriter(String storeName) { return veniceWriters.computeIfAbsent(storeName, s -> { - String rtTopic = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); + Object store = storeResolver.apply(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); + String rtTopic; + + if (store instanceof Store) { + rtTopic = Utils.getRealTimeTopicName((Store) store); + } else if (store instanceof StoreInfo) { + rtTopic = Utils.getRealTimeTopicName((StoreInfo) store); + } else { + rtTopic = Utils.getRealTimeTopicName((SystemStoreAttributes) store); + } VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic) .setKeyPayloadSerializer( new VeniceAvroKafkaSerializer( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index f054b56c096..33141aa5d29 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -17,6 +17,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,12 +55,14 @@ public PushStatusStoreWriter( VeniceWriterFactory writerFactory, String instanceName, SchemaEntry valueSchemaEntry, - DerivedSchemaEntry updateSchemaEntry) { + DerivedSchemaEntry updateSchemaEntry, + Function storeResolver) { this( new PushStatusStoreVeniceWriterCache( writerFactory, valueSchemaEntry.getSchema(), - updateSchemaEntry.getSchema()), + updateSchemaEntry.getSchema(), + storeResolver), instanceName, updateSchemaEntry.getValueSchemaID(), updateSchemaEntry.getId(), diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 1a9972cb831..1656b0ce88b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -6,6 +6,7 @@ import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.avro.Schema; @@ -78,13 +80,16 @@ public class MetaStoreWriter implements Closeable { private final long closeTimeoutMs; private final int numOfConcurrentVwCloseOps; + public Function storeResolver; + public MetaStoreWriter( TopicManager topicManager, VeniceWriterFactory writerFactory, HelixReadOnlyZKSharedSchemaRepository schemaRepo, PubSubTopicRepository pubSubTopicRepository, long closeTimeoutMs, - int numOfConcurrentVwCloseOps) { + int numOfConcurrentVwCloseOps, + Function storeResolver) { /** * TODO: get the write compute schema from the constructor so that this class does not use {@link WriteComputeSchemaConverter} */ @@ -97,7 +102,8 @@ public MetaStoreWriter( AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema()), pubSubTopicRepository, closeTimeoutMs, - numOfConcurrentVwCloseOps); + numOfConcurrentVwCloseOps, + storeResolver); } MetaStoreWriter( @@ -107,7 +113,8 @@ public MetaStoreWriter( Schema derivedComputeSchema, PubSubTopicRepository pubSubTopicRepository, long closeTimeoutMs, - int numOfConcurrentVwCloseOps) { + int numOfConcurrentVwCloseOps, + Function storeResolver) { this.topicManager = topicManager; this.writerFactory = writerFactory; this.derivedComputeSchema = derivedComputeSchema; @@ -115,6 +122,7 @@ public MetaStoreWriter( this.pubSubTopicRepository = pubSubTopicRepository; this.closeTimeoutMs = closeTimeoutMs; this.numOfConcurrentVwCloseOps = numOfConcurrentVwCloseOps; + this.storeResolver = storeResolver; } /** @@ -360,7 +368,16 @@ Map getMetaStoreWriterMap() { VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) { return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> { - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); + Store store = storeResolver.apply(metaStoreName); + int largestUsedRTVersionNumber; + VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName()); + if (type != null && store.isSystemStore()) { + largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber(); + } else { + largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber(); + } + String rt = Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName), largestUsedRTVersionNumber); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rt); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online"); } @@ -405,7 +422,8 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter, * to write a Control Message to the RT topic, and it could hang if the topic doesn't exist. * This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter. */ - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); + PubSubTopic rtTopic = + pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName))); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { LOGGER.info( "RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages", diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java index 5dc5af42732..05b8adbd400 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java @@ -1,12 +1,11 @@ package com.linkedin.venice.utils; import static com.linkedin.venice.HttpConstants.LOCALHOST; -import static com.linkedin.venice.meta.Version.REAL_TIME_TOPIC_SUFFIX; +import static com.linkedin.venice.meta.Version.DEFAULT_RT_VERSION_NUMBER; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; -import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.exceptions.ErrorType; @@ -25,6 +24,8 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.StoreVersionInfo; +import com.linkedin.venice.meta.SystemStore; +import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.pubsub.PubSubTopicImpl; @@ -614,29 +615,41 @@ public static File getTempDataDirectory(String prefix) { } } - /** This method should only be used for system stores. + /** This method should only be used to create old style rt topics. * For other stores, use {@link Utils#getRealTimeTopicName(Store)}, {@link Utils#getRealTimeTopicName(StoreInfo)} or * {@link Utils#getRealTimeTopicName(Version)} in source code. * For tests, use {@link Utils#composeRealTimeTopic(String, int)} */ public static String composeRealTimeTopic(String storeName) { - return storeName + REAL_TIME_TOPIC_SUFFIX; + return storeName + Version.REAL_TIME_TOPIC_SUFFIX; } public static String composeRealTimeTopic(String storeName, int versionNumber) { - return String.format(Version.REAL_TIME_TOPIC_TEMPLATE, storeName, versionNumber); + if (versionNumber == DEFAULT_RT_VERSION_NUMBER) { + return composeRealTimeTopic(storeName); + } else { + return String.format(Version.REAL_TIME_TOPIC_TEMPLATE, storeName, versionNumber); + } } /** * It follows the following order to search for real time topic name, * i) current store-version config, ii) store config, iii) other store-version configs, iv) default name */ - public static String getRealTimeTopicName(Store store) { + public static String getRealTimeTopicName(Store store, int rtVersionNumber) { return getRealTimeTopicName( store.getName(), store.getVersions(), store.getCurrentVersion(), - store.getHybridStoreConfig()); + store.getHybridStoreConfig(), + rtVersionNumber); + } + + public static String getRealTimeTopicName(Store store) { + if (store instanceof SystemStore) { + return getRealTimeTopicName(store, ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber()); + } + return getRealTimeTopicName(store, DEFAULT_RT_VERSION_NUMBER); } public static String getRealTimeTopicName(StoreInfo storeInfo) { @@ -644,26 +657,27 @@ public static String getRealTimeTopicName(StoreInfo storeInfo) { storeInfo.getName(), storeInfo.getVersions(), storeInfo.getCurrentVersion(), - storeInfo.getHybridStoreConfig()); + storeInfo.getHybridStoreConfig(), + DEFAULT_RT_VERSION_NUMBER); } - public static boolean isRTVersioningApplicable(String storeName) { - return !(VeniceSystemStoreUtils.isSystemStore(storeName) || VeniceSystemStoreUtils.isUserSystemStore(storeName) - || VeniceSystemStoreUtils.isParticipantStore(storeName)); + public static String getRealTimeTopicName(SystemStoreAttributes systemStoreAttributes) { + return getRealTimeTopicName( + null, + systemStoreAttributes.getVersions(), + systemStoreAttributes.getCurrentVersion(), + null, + DEFAULT_RT_VERSION_NUMBER); } public static String getRealTimeTopicName(Version version) { - if (!isRTVersioningApplicable(version.getStoreName())) { - return composeRealTimeTopic(version.getStoreName()); - } - if (version.isHybrid()) { String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName(); - return getRealTimeTopicNameIfEmpty(realTimeTopicName, version.getStoreName()); + return getRealTimeTopicNameIfEmpty(realTimeTopicName, version.getStoreName(), DEFAULT_RT_VERSION_NUMBER); } else { // if the version is not hybrid, caller should not ask for the real time topic, // but unfortunately that happens, so instead of throwing exception, we just return a default name. - return composeRealTimeTopic(version.getStoreName()); + return composeRealTimeTopic(version.getStoreName(), DEFAULT_RT_VERSION_NUMBER); } } @@ -671,17 +685,13 @@ public static Set getAllRealTimeTopicNames(Store store) { return store.getVersions().stream().map(Utils::getRealTimeTopicName).collect(Collectors.toSet()); } - static String getRealTimeTopicName( + public static String getRealTimeTopicName( String storeName, List versions, int currentVersionNumber, - HybridStoreConfig hybridStoreConfig) { - if (!isRTVersioningApplicable(storeName)) { - return composeRealTimeTopic(storeName); - } - + HybridStoreConfig hybridStoreConfig, + int rtVersionNumber) { Set realTimeTopicNames = new HashSet<>(); - for (Version version: versions) { if (version.isHybrid()) { String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName(); @@ -710,14 +720,16 @@ static String getRealTimeTopicName( if (hybridStoreConfig != null) { String realTimeTopicName = hybridStoreConfig.getRealTimeTopicName(); - return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName); + return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName, rtVersionNumber); } - return composeRealTimeTopic(storeName); + return composeRealTimeTopic(storeName, rtVersionNumber); } - private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) { - return StringUtils.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName; + private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName, int rtVersionNumber) { + return StringUtils.isBlank(realTimeTopicName) + ? composeRealTimeTopic(storeName, rtVersionNumber) + : realTimeTopicName; } public static String getRealTimeTopicNameFromSeparateRealTimeTopic(String separateRealTimeTopicName) { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java index f50e8324085..6b0761d1fcb 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java @@ -16,7 +16,10 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; @@ -35,15 +38,168 @@ import org.apache.avro.Schema; import org.mockito.ArgumentCaptor; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class MetaStoreWriterTest { + Store metaStore = mock(Store.class); + String metaStoreName = "testStore"; + + @BeforeClass + public void setUp() { + doReturn(metaStoreName).when(metaStore).getName(); + } + + @Test + public void testGetOrCreateMetaStoreWriterWithSystemStore() { + // Setup mocks + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Schema derivedComputeSchema = mock(Schema.class); + + // Mock SystemStore and its components + SystemStore systemStore = mock(SystemStore.class); + Store veniceStore = mock(Store.class); + String systemStoreName = "venice_system_store_davinci_push_status_store_testStore"; + int expectedRTVersion = 5; + + when(systemStore.getName()).thenReturn(systemStoreName); + when(systemStore.isSystemStore()).thenReturn(true); + when(systemStore.getVeniceStore()).thenReturn(veniceStore); + when(veniceStore.getLargestUsedRTVersionNumber()).thenReturn(expectedRTVersion); + + // Mock the PubSubTopic and TopicManager + PubSubTopic mockTopic = mock(PubSubTopic.class); + when(mockTopic.getName()).thenReturn(systemStoreName + "_rt_v" + expectedRTVersion); + when(pubSubTopicRepository.getTopic(anyString())).thenReturn(mockTopic); + when(topicManager.containsTopicAndAllPartitionsAreOnline(any())).thenReturn(true); + + // Mock VeniceWriter + VeniceWriter mockWriter = mock(VeniceWriter.class); + when(writerFactory.createVeniceWriter(any())).thenReturn(mockWriter); + + // Create MetaStoreWriter + MetaStoreWriter metaStoreWriter = new MetaStoreWriter( + topicManager, + writerFactory, + schemaRepo, + derivedComputeSchema, + pubSubTopicRepository, + 5000L, + 2, + storeName -> systemStore); + + // Execute + VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName); + + // Verify + Assert.assertNotNull(result); + verify(veniceStore, times(1)).getLargestUsedRTVersionNumber(); + verify(systemStore, times(1)).getVeniceStore(); + } + + @Test + public void testGetOrCreateMetaStoreWriterWithRegularStore() { + // Setup mocks + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Schema derivedComputeSchema = mock(Schema.class); + + // Mock regular Store + Store regularStore = mock(Store.class); + String storeName = "regularStore"; + int expectedRTVersion = 3; + + when(regularStore.getName()).thenReturn(storeName); + when(regularStore.isSystemStore()).thenReturn(false); + when(regularStore.getLargestUsedRTVersionNumber()).thenReturn(expectedRTVersion); + + // Mock the PubSubTopic and TopicManager + PubSubTopic mockTopic = mock(PubSubTopic.class); + when(mockTopic.getName()).thenReturn(storeName + "_rt_v" + expectedRTVersion); + when(pubSubTopicRepository.getTopic(anyString())).thenReturn(mockTopic); + when(topicManager.containsTopicAndAllPartitionsAreOnline(any())).thenReturn(true); + + // Mock VeniceWriter + VeniceWriter mockWriter = mock(VeniceWriter.class); + when(writerFactory.createVeniceWriter(any())).thenReturn(mockWriter); + + // Create MetaStoreWriter + MetaStoreWriter metaStoreWriter = new MetaStoreWriter( + topicManager, + writerFactory, + schemaRepo, + derivedComputeSchema, + pubSubTopicRepository, + 5000L, + 2, + storeName1 -> regularStore); + + // Execute + VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(storeName); + + // Verify + Assert.assertNotNull(result); + verify(regularStore, times(1)).getLargestUsedRTVersionNumber(); + // verify(regularStore, never()).getVeniceStore; // Should not call getVeniceStore() for regular stores + } + + @Test + public void testGetOrCreateMetaStoreWriterWithNonSystemStoreType() { + // Setup mocks + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Schema derivedComputeSchema = mock(Schema.class); + + // Mock a store that returns null for VeniceSystemStoreType but is a system store + Store store = mock(Store.class); + String storeName = "someSystemStore"; + int expectedRTVersion = 2; + + when(store.getName()).thenReturn(storeName); + when(store.isSystemStore()).thenReturn(false); // type is null, so condition fails + when(store.getLargestUsedRTVersionNumber()).thenReturn(expectedRTVersion); + + // Mock the PubSubTopic and TopicManager + PubSubTopic mockTopic = mock(PubSubTopic.class); + when(mockTopic.getName()).thenReturn(storeName + "_rt"); + when(pubSubTopicRepository.getTopic(anyString())).thenReturn(mockTopic); + when(topicManager.containsTopicAndAllPartitionsAreOnline(any())).thenReturn(true); + + // Mock VeniceWriter + VeniceWriter mockWriter = mock(VeniceWriter.class); + when(writerFactory.createVeniceWriter(any())).thenReturn(mockWriter); + + // Create MetaStoreWriter + MetaStoreWriter metaStoreWriter = new MetaStoreWriter( + topicManager, + writerFactory, + schemaRepo, + derivedComputeSchema, + pubSubTopicRepository, + 5000L, + 2, + storeName1 -> store); + + // Execute + VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(storeName); + + // Verify - should use else branch + Assert.assertNotNull(result); + verify(store, times(1)).getLargestUsedRTVersionNumber(); + } + @Test public void testMetaStoreWriterWillRestartUponProduceFailure() { MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class); - String metaStoreName = "testStore"; HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); GeneratedSchemaID generatedSchemaID = mock(GeneratedSchemaID.class); doReturn(true).when(generatedSchemaID).isValid(); @@ -111,7 +267,8 @@ public void testClose(long closeTimeoutMs, int numOfConcurrentVwCloseOps) derivedComputeSchema, pubSubTopicRepository, closeTimeoutMs, - numOfConcurrentVwCloseOps); + numOfConcurrentVwCloseOps, + storeName -> metaStore); Map metaStoreWriters = metaStoreWriter.getMetaStoreWriterMap(); List> completedFutures = new ArrayList<>(20); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java index 2eb2b9cf71a..e7b2bd043ce 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java @@ -333,15 +333,18 @@ void testGetRealTimeTopicNameWithHybridConfig() { HybridStoreConfig mockHybridConfig = mock(HybridStoreConfig.class); when(mockHybridConfig.getRealTimeTopicName()).thenReturn("RealTimeTopic"); - String result = Utils.getRealTimeTopicName("TestStore", Collections.EMPTY_LIST, 1, mockHybridConfig); + String result = Utils.getRealTimeTopicName("TestStore", Collections.EMPTY_LIST, 1, mockHybridConfig, 1); assertEquals(result, "RealTimeTopic"); } @Test void testGetRealTimeTopicNameWithoutHybridConfig() { - String result = Utils.getRealTimeTopicName(STORE_NAME, Collections.EMPTY_LIST, 0, null); + String result = Utils.getRealTimeTopicName(STORE_NAME, Collections.EMPTY_LIST, 0, null, 0); assertEquals(result, STORE_NAME + Version.REAL_TIME_TOPIC_SUFFIX); + + result = Utils.getRealTimeTopicName(STORE_NAME, Collections.EMPTY_LIST, 0, null, 1); + assertEquals(result, STORE_NAME + Version.REAL_TIME_TOPIC_SUFFIX + "_v1"); } @Test @@ -358,7 +361,7 @@ void testGetRealTimeTopicNameWithConflictingVersions() { when(mockConfig1.getRealTimeTopicName()).thenReturn("RealTimeTopic1"); when(mockConfig2.getRealTimeTopicName()).thenReturn("RealTimeTopic2"); - String result = Utils.getRealTimeTopicName(STORE_NAME, Lists.newArrayList(mockVersion1, mockVersion2), 1, null); + String result = Utils.getRealTimeTopicName(STORE_NAME, Lists.newArrayList(mockVersion1, mockVersion2), 1, null, 1); assertTrue(result.equals("RealTimeTopic1") || result.equals("RealTimeTopic2")); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java index 8710ba1c8d9..9d5f9134e1e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestHybridStoreRepartitioningWithMultiDataCenter.java @@ -133,8 +133,16 @@ private static void verifyStoreState( String realTimeTopicInVersion = Utils.getRealTimeTopicName(currentVersion); String realTimeTopicInBackupVersion = Utils.getRealTimeTopicName(backupVersion); - Assert.assertEquals(realTimeTopicNameInVersionConfig, expectedRealTimeTopicNameInVersionConfig); - Assert.assertEquals(realTimeTopicNameInBackupVersionConfig, expectedRealTimeTopicNameInBackupVersionConfig); + Assert.assertEquals( + realTimeTopicNameInVersionConfig, + expectedRealTimeTopicNameInVersionConfig.isEmpty() + ? oldStyleRealTimeTopicName + : expectedRealTimeTopicNameInVersionConfig); + Assert.assertEquals( + realTimeTopicNameInBackupVersionConfig, + expectedRealTimeTopicNameInBackupVersionConfig.isEmpty() + ? oldStyleRealTimeTopicName + : expectedRealTimeTopicNameInBackupVersionConfig); Assert.assertEquals( realTimeTopicInVersion, expectedRealTimeTopicNameInVersionConfig.isEmpty() diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java index ded1bbc67a5..dba0f5505aa 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java @@ -653,8 +653,8 @@ public void testDeleteStoreRTDeletion() { StoreInfo storeInfo = parentControllerClient.getStore(storeName).getStore(); String storeRT = Utils.getRealTimeTopicName(storeInfo); String pushStatusSystemStoreRT = - Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); - String metaSystemStoreRT = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getMetaStoreName(storeName)); + Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName), 1); + String metaSystemStoreRT = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getMetaStoreName(storeName), 1); // Ensure all the RT topics are created in all child datacenters TestUtils.waitForNonDeterministicAssertion(300, TimeUnit.SECONDS, false, true, () -> { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java index a26fa049571..33df4cdbf67 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java @@ -648,7 +648,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { Exception notSystemStoreException = Assert.expectThrows( VeniceNoStoreException.class, - () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, metaStoreName)); + () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, metaStoreName, 1)); assertTrue( notSystemStoreException.getMessage().contains("does not exist in"), "Got unexpected error message: " + notSystemStoreException.getMessage()); @@ -663,7 +663,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { Exception exception = Assert.expectThrows( VeniceException.class, - () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName)); + () -> veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName, 1)); assertTrue( exception.getMessage().contains("not a user system store"), "Got unexpected error message: " + notSystemStoreException.getMessage()); @@ -677,7 +677,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { TimeUnit.SECONDS, () -> !veniceAdmin.getTopicManager().containsTopic(pushStatusRealTimeTopic)); - veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, pushStatusStoreName); + veniceAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, pushStatusStoreName, 0); TestUtils.waitForNonDeterministicCompletion( 30, TimeUnit.SECONDS, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index 24261541474..169bd5aeaed 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -291,7 +291,8 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th cluster.getLeaderVeniceController().getVeniceHelixAdmin().getVeniceWriterFactory(), "dummyInstance", valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + (storeName) -> controllerClient.getStore(storeName).getStore()); // After deleting the inc push status belonging to just one partition we should expect // SOIP from the controller since other partition has replicas with EOIP status diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java index 33f1ea2af89..a2d8110696f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java @@ -1,6 +1,5 @@ package com.linkedin.venice.integration.utils; -import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_MAX_ATTEMPT; @@ -460,20 +459,6 @@ public static DaVinciClient getGenericAvroDaVinciClient( return client; } - public static DaVinciClient getGenericAvroDaVinciClientWithoutMetaSystemStoreRepo( - String storeName, - String zkAddress, - String dataBasePath) { - Properties extraBackendConfig = new Properties(); - extraBackendConfig.setProperty(DATA_BASE_PATH, dataBasePath); - extraBackendConfig.setProperty(CLIENT_USE_SYSTEM_STORE_REPOSITORY, String.valueOf(false)); - return getGenericAvroDaVinciClient( - storeName, - zkAddress, - new DaVinciConfig(), - new VeniceProperties(extraBackendConfig)); - } - public static DaVinciClient getGenericAvroDaVinciClientWithRetries( String storeName, String zkAddress, diff --git a/internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml b/internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml new file mode 100644 index 00000000000..65090322a30 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 437672ebd02..49f1bfa3735 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -972,6 +972,8 @@ default boolean isAdminTopicConsumptionEnabled(String clusterName) { int getLargestUsedVersion(String clusterName, String storeName); + int getLargestUsedRTVersion(String clusterName, String storeName); + /** * @return list of stores infos that are considered dead. A store is considered dead if it exists but has no * user traffic in it's read or write path. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java index b2627e57c86..571fb98eb47 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java @@ -9,6 +9,7 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.participant.protocol.ParticipantMessageKey; import com.linkedin.venice.participant.protocol.ParticipantMessageValue; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -64,12 +65,13 @@ public AvroSpecificStoreClient g }); } - public VeniceWriter getWriter(String clusterName) { + public VeniceWriter getWriter(String clusterName, Admin admin) { return writeClients.computeIfAbsent(clusterName, k -> { int attempts = 0; boolean verified = false; - PubSubTopic topic = pubSubTopicRepository - .getTopic(Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); + String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName); + Store store = admin.getStore(clusterName, participantStoreName); + PubSubTopic topic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); while (attempts < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS) { if (topicManagerRepository.getLocalTopicManager().containsTopicAndAllPartitionsAreOnline(topic)) { verified = true; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java index 4519ad88beb..a913bdc70df 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java @@ -2,7 +2,6 @@ import static com.linkedin.venice.common.VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE; import static com.linkedin.venice.common.VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE; -import static com.linkedin.venice.meta.Version.DEFAULT_RT_VERSION_NUMBER; import com.linkedin.venice.authorization.AuthorizerService; import com.linkedin.venice.authorization.Resource; @@ -86,9 +85,11 @@ public static Version materializeSystemStore( int partitionCount = parentAdmin.calculateNumberOfPartitions(clusterName, systemStoreName); int replicationFactor = parentAdmin.getReplicationFactor(clusterName, systemStoreName); final int systemStoreLargestUsedVersionNumber = parentAdmin.getLargestUsedVersion(clusterName, systemStoreName); + final int systemStoreLargestUsedRTVersionNumber = parentAdmin.getLargestUsedRTVersion(clusterName, systemStoreName); LOGGER.info( - "Get largest used version: {} for system store: {} in cluster: {}", + "Get largest used version: {} and largest used rt version: {} for system store: {} in cluster: {}", systemStoreLargestUsedVersionNumber, + systemStoreLargestUsedRTVersionNumber, systemStoreName, clusterName); if (systemStoreLargestUsedVersionNumber == Store.NON_EXISTING_VERSION) { @@ -112,7 +113,7 @@ public static Version materializeSystemStore( false, null, -1, - DEFAULT_RT_VERSION_NUMBER); + systemStoreLargestUsedRTVersionNumber); } parentAdmin.writeEndOfPush(clusterName, systemStoreName, version.getNumber(), true); return version; @@ -139,6 +140,7 @@ public static void deleteSystemStore( LOGGER.info("Start deleting system store: {}", systemStoreName); admin.deleteAllVersionsInStore(clusterName, systemStoreName); pushMonitor.cleanupStoreStatus(systemStoreName); + Store systemStore = storeRepository.getStore(systemStoreName); if (!isStoreMigrating) { VeniceSystemStoreType storeType = VeniceSystemStoreType.getSystemStoreType(systemStoreName); if (storeType == null) { @@ -164,12 +166,11 @@ public static void deleteSystemStore( } // skip truncating system store RT topics if it's parent fabric as it's not created for parent fabric if (!admin.isParent()) { - admin.truncateKafkaTopic(Utils.composeRealTimeTopic(systemStoreName)); + admin.truncateKafkaTopic(Utils.getRealTimeTopicName(systemStore)); } } else { LOGGER.info("The RT topic for: {} will not be deleted since the user store is migrating", systemStoreName); } - Store systemStore = storeRepository.getStore(systemStoreName); if (systemStore != null) { admin.truncateOldTopics(clusterName, systemStore, true); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index fd66b01853b..6b7d50a6b3d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -169,6 +169,7 @@ import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.StoreName; import com.linkedin.venice.meta.StoreVersionInfo; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.meta.VeniceETLStrategy; import com.linkedin.venice.meta.Version; @@ -665,7 +666,8 @@ public VeniceHelixAdmin( zkSharedSchemaRepository, pubSubTopicRepository, commonConfig.getMetaStoreWriterCloseTimeoutInMS(), - commonConfig.getMetaStoreWriterCloseConcurrency()); + commonConfig.getMetaStoreWriterCloseConcurrency(), + storeName -> getStore(discoverCluster(storeName), storeName)); metaStoreReader = new MetaStoreReader(d2Client, commonConfig.getClusterDiscoveryD2ServiceName()); pushStatusStoreReader = new PushStatusStoreReader( d2Client, @@ -680,7 +682,8 @@ public VeniceHelixAdmin( veniceWriterFactory, CONTROLLER_HEARTBEAT_INSTANCE_NAME, valueSchemaEntry, - updateSchemaEntry); + updateSchemaEntry, + storeName -> getStore(discoverCluster(storeName), storeName)); }); clusterToLiveClusterConfigRepo = new VeniceConcurrentHashMap<>(); @@ -1058,9 +1061,9 @@ public synchronized void initStorageCluster(String clusterName) { Collections.singletonList(VeniceControllerStateModel.getPartitionNameFromVeniceClusterName(clusterName)); helixAdminClient.enablePartition(true, controllerClusterName, controllerName, clusterName, partitionNames); if (multiClusterConfigs.getControllerConfig(clusterName).isParticipantMessageStoreEnabled()) { - participantMessageStoreRTTMap.put( - clusterName, - Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); + String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName); + String realTimeTopicName = Utils.composeRealTimeTopic(participantStoreName); + participantMessageStoreRTTMap.put(clusterName, realTimeTopicName); } waitUntilClusterResourceIsVisibleInEV(clusterName); } @@ -1250,7 +1253,17 @@ private void configureNewStore( /* If this store existed previously, we do not want to use the same RT topic name that was used by the previous store. To ensure this, increase largestUsedRTVersionNumber and new RT name will be different */ if (config.isRealTimeTopicVersioningEnabled()) { - newStore.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber + 1); + VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(newStore.getName()); + int newNumber; + if (type == null && newStore.isSystemStore()) { + // Top-level shared ZK store + newNumber = largestUsedRTVersionNumber; + } else { + // User-level system store OR regular store + newNumber = largestUsedRTVersionNumber + 1; + } + + newStore.setLargestUsedRTVersionNumber(newNumber); } } @@ -1643,9 +1656,10 @@ void sendPushJobDetailsToLocalRT(PushJobStatusRecordKey key, PushJobDetails valu + " is not configured"); } String pushJobDetailsStoreName = VeniceSystemStoreUtils.getPushJobDetailsStoreName(); + Store pushJobDetailsStore = getStore(pushJobStatusStoreClusterName, pushJobDetailsStoreName); if (pushJobDetailsRTTopic == null) { // Verify the RT topic exists and give some time in case it's getting created. - PubSubTopic expectedRTTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(pushJobDetailsStoreName)); + PubSubTopic expectedRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(pushJobDetailsStore)); for (int attempt = 0; attempt < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS; attempt++) { if (attempt > 0) Utils.sleep(INTERNAL_STORE_RTT_RETRY_BACKOFF_MS); @@ -2709,10 +2723,16 @@ public Version addVersionOnly( try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) { VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.META_STORE)) { - setUpMetaStoreAndMayProduceSnapshot(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpMetaStoreAndMayProduceSnapshot( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + largestUsedRTVersionNumber); } if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) { - setUpDaVinciPushStatusStore(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpDaVinciPushStatusStore( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + largestUsedRTVersionNumber); } // Update the store object to avoid potential system store flags reversion during repository.updateStore(store). @@ -2785,7 +2805,7 @@ public boolean addSpecificVersion(String clusterName, String storeName, Version if (store.containsVersion(versionNumber)) { throwVersionAlreadyExists(storeName, versionNumber); } else { - store.addVersion(version, false, DEFAULT_RT_VERSION_NUMBER); + store.addVersion(version, false, store.getLargestUsedRTVersionNumber()); } storeRepository.updateStore(store); } @@ -2987,6 +3007,15 @@ private Pair addVersion( Optional emergencySourceRegion, boolean versionSwapDeferred, int repushSourceVersion) { + Store store = getStore(clusterName, storeName); + int largestUsedRTVersionNumber; + VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName()); + if (type != null && store.isSystemStore()) { + largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber(); + } else { + largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber(); + } + return addVersion( clusterName, storeName, @@ -3008,7 +3037,7 @@ private Pair addVersion( versionSwapDeferred, null, repushSourceVersion, - getStore(clusterName, storeName).getLargestUsedRTVersionNumber()); + largestUsedRTVersionNumber); } private Optional getVersionFromSourceCluster( @@ -3124,10 +3153,16 @@ private Pair addVersion( */ VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.META_STORE)) { - setUpMetaStoreAndMayProduceSnapshot(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpMetaStoreAndMayProduceSnapshot( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + currentRTVersionNumber); } if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) { - setUpDaVinciPushStatusStore(clusterName, systemStoreType.extractRegularStoreName(storeName)); + setUpDaVinciPushStatusStore( + clusterName, + systemStoreType.extractRegularStoreName(storeName), + currentRTVersionNumber); } Store store = repository.getStore(storeName); @@ -3839,7 +3874,7 @@ private Optional getVersionWithPushId(String clusterName, String storeN * @throws VeniceNoStoreException if the store does not exist in the specified cluster. * @throws VeniceException if the store is not a user system store or if the partition count is invalid. */ - void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String storeName) { + void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String storeName, int currentRTVersionNumber) { checkControllerLeadershipFor(clusterName); Store store = getStore(clusterName, storeName); if (store == null) { @@ -3852,7 +3887,8 @@ void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String sto throw new VeniceException( "Failed to create real time topic for store: " + storeName + " because it is not a user system store."); } - PubSubTopic realTimeTopic = getPubSubTopicRepository().getTopic(Utils.getRealTimeTopicName(store)); + PubSubTopic realTimeTopic = + getPubSubTopicRepository().getTopic(Utils.getRealTimeTopicName(store, currentRTVersionNumber)); TopicManager topicManager = getTopicManager(); if (topicManager.containsTopic(realTimeTopic)) { return; @@ -3875,7 +3911,8 @@ void ensureRealTimeTopicExistsForUserSystemStores(String clusterName, String sto } VeniceControllerClusterConfig clusterConfig = getControllerConfig(clusterName); LOGGER.info( - "Creating real time topic for user system store: {} with partition count: {}", + "Creating real time topic {} for user system store: {} with partition count: {}", + realTimeTopic, storeName, partitionCount); getTopicManager().createTopic( @@ -5243,8 +5280,9 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP // However, safeguards are necessary to prevent clients from toggling the hybrid setting with a partition update // in edge cases. if (!store.isHybrid()) { - PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); - if (realTimeTopicForBatchStore != null) { + String realTimeTopic = Utils.getRealTimeTopicName(store); + if (realTimeTopic != null) { + PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); TopicManager topicManager = getTopicManagerForCluster(clusterConfig); if (topicManager.containsTopic(realTimeTopicForBatchStore)) { int rtPartitionCount = topicManager.getPartitionCount(realTimeTopicForBatchStore); @@ -5284,9 +5322,8 @@ private TopicManager getTopicManagerForCluster(VeniceControllerClusterConfig clu } private void generateAndUpdateRealTimeTopicName(Store store) { - String newRealTimeTopicName = Utils.isRTVersioningApplicable(store.getName()) - ? Utils.composeRealTimeTopic(store.getName(), store.getLargestUsedRTVersionNumber() + 1) - : DEFAULT_REAL_TIME_TOPIC_NAME; + String newRealTimeTopicName = + Utils.composeRealTimeTopic(store.getName(), store.getLargestUsedRTVersionNumber() + 1); store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName); /* @@ -6358,9 +6395,8 @@ protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig( } String newRealTimeTopicName = oldStore.getLargestUsedRTVersionNumber() > DEFAULT_RT_VERSION_NUMBER - && Utils.isRTVersioningApplicable(oldStore.getName()) - ? Utils.composeRealTimeTopic(oldStore.getName(), oldStore.getLargestUsedRTVersionNumber()) - : DEFAULT_REAL_TIME_TOPIC_NAME; + ? Utils.composeRealTimeTopic(oldStore.getName(), oldStore.getLargestUsedRTVersionNumber()) + : DEFAULT_REAL_TIME_TOPIC_NAME; mergedHybridStoreConfig = new HybridStoreConfigImpl( hybridRewindSeconds.get(), @@ -8160,7 +8196,7 @@ public void deleteParticipantStoreKillMessage(String clusterName, String kafkaTo "Delete KILL ingestion message for topic: {} from participant store in cluster: {}", kafkaTopic, clusterName); - VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName); + VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName, this); ParticipantMessageKey key = new ParticipantMessageKey(); key.resourceName = kafkaTopic; key.messageType = ParticipantMessageType.KILL_PUSH_JOB.getValue(); @@ -8170,7 +8206,7 @@ public void deleteParticipantStoreKillMessage(String clusterName, String kafkaTo public void sendKillMessageToParticipantStore(String clusterName, String kafkaTopic) { LOGGER.info("Send kill message for topic: {} to participant store of cluster: {}", kafkaTopic, clusterName); - VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName); + VeniceWriter writer = participantStoreClientsManager.getWriter(clusterName, this); ParticipantMessageType killPushJobType = ParticipantMessageType.KILL_PUSH_JOB; ParticipantMessageKey key = new ParticipantMessageKey(); key.resourceName = kafkaTopic; @@ -9308,7 +9344,7 @@ Long getInMemoryTopicCreationTime(String topic) { return topicToCreationTime.get(topic); } - private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { + private void setUpDaVinciPushStatusStore(String clusterName, String storeName, int currentRTVersionNumber) { checkControllerLeadershipFor(clusterName); ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository(); Store store = repository.getStore(storeName); @@ -9319,7 +9355,7 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { if (!isParent()) { // We do not materialize PS3 for parent region. Hence, skip RT topic creation. - ensureRealTimeTopicExistsForUserSystemStores(clusterName, daVinciPushStatusStoreName); + ensureRealTimeTopicExistsForUserSystemStores(clusterName, daVinciPushStatusStoreName, currentRTVersionNumber); } if (!store.isDaVinciPushStatusStoreEnabled()) { storeMetadataUpdate(clusterName, storeName, (s, resources) -> { @@ -9334,7 +9370,10 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { * @param clusterName The cluster name. * @param regularStoreName The regular user store name. */ - void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) { + void setUpMetaStoreAndMayProduceSnapshot( + String clusterName, + String regularStoreName, + int largestUsedRTVersionNumber) { checkControllerLeadershipFor(clusterName); ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository(); Store store = repository.getStore(regularStoreName); @@ -9347,7 +9386,8 @@ void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStore // We do not materialize meta store for parent region. Hence, skip RT topic creation. ensureRealTimeTopicExistsForUserSystemStores( clusterName, - VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName)); + VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName), + largestUsedRTVersionNumber); } // Update the store flag to enable meta system store. @@ -9592,6 +9632,17 @@ public int getLargestUsedVersion(String clusterName, String storeName) { return Math.max(store.getLargestUsedVersionNumber(), getStoreGraveyard().getLargestUsedVersionNumber(storeName)); } + @Override + public int getLargestUsedRTVersion(String clusterName, String storeName) { + Store store = getStore(clusterName, storeName); + // If the store does not exist, check the store graveyard. + if (store == null) { + return getStoreGraveyard().getLargestUsedRTVersionNumber(storeName); + } + return Math + .max(store.getLargestUsedRTVersionNumber(), getStoreGraveyard().getLargestUsedRTVersionNumber(storeName)); + } + /** * @see StoragePersonaRepository#addPersona(String, long, Set, Set) */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 459dd7b43ef..ef9863ce50d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -80,6 +80,7 @@ import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS; import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION; +import com.linkedin.venice.meta.SystemStore; import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR; import static com.linkedin.venice.meta.VersionStatus.CREATED; import static com.linkedin.venice.meta.VersionStatus.ERROR; @@ -1872,7 +1873,10 @@ public Version incrementVersionIdempotent( newVersion = getVeniceHelixAdmin().getIncrementalPushVersion(clusterName, storeName, pushJobId); } else { validateTargetedRegions(targetedRegions, clusterName); - + int largestUsedRTVersionNumber = + store.isSystemStore() && VeniceSystemStoreType.getSystemStoreType(store.getName()) != null + ? ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber() + : store.getLargestUsedRTVersionNumber(); newVersion = addVersionAndTopicOnly( clusterName, storeName, @@ -1890,7 +1894,7 @@ public Version incrementVersionIdempotent( versionSwapDeferred, targetedRegions, repushSourceVersion, - store.getLargestUsedRTVersionNumber()); + largestUsedRTVersionNumber); } if (VeniceSystemStoreType.getSystemStoreType(storeName) == null) { if (pushType.isBatch()) { @@ -2082,7 +2086,7 @@ Version getIncrementalPushVersion(Version incrementalPushVersion, ExecutionStatu throw new VeniceException("Cannot start incremental push since batch push is on going." + " store: " + storeName); } - String incrementalPushTopic = Utils.composeRealTimeTopic(storeName); + String incrementalPushTopic = Utils.getRealTimeTopicName(incrementalPushVersion); if (status.isError() || getVeniceHelixAdmin().isTopicTruncated(incrementalPushTopic)) { throw new VeniceException( "Cannot start incremental push since previous batch push has failed. Please run another bash job." @@ -5609,6 +5613,19 @@ public int getLargestUsedVersion(String clusterName, String storeName) { return aggregatedLargestUsedVersionNumber; } + @Override + public int getLargestUsedRTVersion(String clusterName, String storeName) { + Map childControllers = getVeniceHelixAdmin().getControllerClientMap(clusterName); + int aggregatedLargestUsedRTVersionNumber = getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName); + for (Map.Entry controller: childControllers.entrySet()) { + VersionResponse response = controller.getValue().getStoreLargestUsedVersion(clusterName, storeName); + if (response.getVersion() > aggregatedLargestUsedRTVersionNumber) { + aggregatedLargestUsedRTVersionNumber = response.getVersion(); + } + } + return aggregatedLargestUsedRTVersionNumber; + } + /** * Unsupported operation in the parent controller. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java index 050386ecdcf..a714dbac86e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java @@ -248,7 +248,7 @@ public void transmitVersionSwapMessage(Store store, int previousVersion, int nex broadcastVersionSwap(previousStoreVersion, nextStoreVersion, rtForNextVersion); } } else { - LOGGER.info("RT doesn't exist for store: {}. Skipping broadcast for Version Swap message."); + LOGGER.info("RT doesn't exist for store: {}. Skipping broadcast for Version Swap message.", storeName); } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java index 3aaec3ee615..a7880c541d3 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/ParticipantStoreClientsManagerTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.controller; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -14,6 +15,7 @@ import com.linkedin.venice.client.store.AvroSpecificStoreClient; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.participant.protocol.ParticipantMessageKey; import com.linkedin.venice.participant.protocol.ParticipantMessageValue; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -39,6 +41,7 @@ public class ParticipantStoreClientsManagerTest { private VeniceWriterFactory veniceWriterFactory; private PubSubTopicRepository pubSubTopicRepository; private ParticipantStoreClientsManager participantStoreClientsManager; + private Admin mockAdmin = mock(Admin.class); @BeforeMethod public void setUp() { @@ -52,6 +55,12 @@ public void setUp() { topicManagerRepository, veniceWriterFactory, pubSubTopicRepository); + Store mockStore = mock(Store.class); + String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(CLUSTER_NAME); + when(mockAdmin.getStore(eq(CLUSTER_NAME), eq(participantStoreName))).thenReturn(mockStore); + when(mockStore.isSystemStore()).thenReturn(true); + when(mockStore.isHybrid()).thenReturn(true); + when(mockStore.getName()).thenReturn(participantStoreName); } @Test @@ -111,12 +120,12 @@ public void testGetWriter() { return null; }).when(veniceWriterFactory).createVeniceWriter(any()); - VeniceWriter participantStoreWriter = participantStoreClientsManager.getWriter(CLUSTER_NAME); + VeniceWriter participantStoreWriter = participantStoreClientsManager.getWriter(CLUSTER_NAME, mockAdmin); assertNotNull(participantStoreWriter); assertSame(mockWriter, participantStoreWriter); // call getWriter again with the same cluster name - VeniceWriter result2 = participantStoreClientsManager.getWriter(CLUSTER_NAME); + VeniceWriter result2 = participantStoreClientsManager.getWriter(CLUSTER_NAME, mockAdmin); assertSame(participantStoreWriter, result2); } @@ -129,7 +138,7 @@ public void testGetWriterTopicNotFound() { when(localTopicManager.containsTopicAndAllPartitionsAreOnline(participantStoreTopic)).thenReturn(false); Exception exception = - expectThrows(VeniceException.class, () -> participantStoreClientsManager.getWriter(CLUSTER_NAME)); + expectThrows(VeniceException.class, () -> participantStoreClientsManager.getWriter(CLUSTER_NAME, mockAdmin)); assertEquals( exception.getMessage(), "Can't find the expected topic " + participantStoreTopic + " for participant message store " diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 5f168ee8e62..d27ae0c51bd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -162,8 +162,8 @@ public void testDropResources() { @Test public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() { VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); - doNothing().when(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString()); - doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + doNothing().when(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString(), anyInt()); + doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString(), anyInt()); InOrder inorder = inOrder(veniceHelixAdmin); @@ -176,11 +176,11 @@ public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() { doReturn(store).when(repo).getStore(anyString()); doReturn(Boolean.FALSE).when(store).isDaVinciPushStatusStoreEnabled(); - veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString(), anyInt()); // Enforce that ensureRealTimeTopicExistsForUserSystemStores happens before storeMetadataUpdate. See the above // comments for the reasons. - inorder.verify(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString()); + inorder.verify(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString(), anyInt()); inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any()); } @@ -371,10 +371,11 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { // Case 1: Store does not exist doReturn(null).when(veniceHelixAdmin).getStore(clusterName, storeName); doNothing().when(veniceHelixAdmin).checkControllerLeadershipFor(clusterName); - doCallRealMethod().when(veniceHelixAdmin).ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString()); + doCallRealMethod().when(veniceHelixAdmin) + .ensureRealTimeTopicExistsForUserSystemStores(anyString(), anyString(), anyInt()); Exception notFoundException = expectThrows( VeniceException.class, - () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName)); + () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName, 1)); assertTrue( notFoundException.getMessage().contains("does not exist in"), "Actual message: " + notFoundException.getMessage()); @@ -383,7 +384,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(userStore).when(veniceHelixAdmin).getStore(clusterName, storeName); Exception notUserSystemStoreException = expectThrows( VeniceException.class, - () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName)); + () -> veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, storeName, 1)); assertTrue( notUserSystemStoreException.getMessage().contains("is not a user system store"), "Actual message: " + notUserSystemStoreException.getMessage()); @@ -394,7 +395,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(Collections.emptyList()).when(systemStore).getVersions(); doReturn(systemStore).when(veniceHelixAdmin).getStore(clusterName, systemStoreName); doReturn(true).when(topicManager).containsTopic(any(PubSubTopic.class)); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); verify(topicManager, times(1)).containsTopic(any(PubSubTopic.class)); HelixVeniceClusterResources veniceClusterResources = mock(HelixVeniceClusterResources.class); @@ -407,7 +408,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { topicManager = mock(TopicManager.class); doReturn(topicManager).when(veniceHelixAdmin).getTopicManager(); doReturn(false).doReturn(true).when(topicManager).containsTopic(any(PubSubTopic.class)); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); verify(topicManager, times(2)).containsTopic(any(PubSubTopic.class)); verify(topicManager, never()).createTopic( any(PubSubTopic.class), @@ -424,7 +425,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { when(veniceHelixAdmin.getControllerConfig(clusterName)).thenReturn(clusterConfig); doReturn(0).when(systemStore).getPartitionCount(); doReturn(false).when(topicManager).containsTopic(any(PubSubTopic.class)); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); // should not throw + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); // should not throw ArgumentCaptor partitionCountArgumentCaptor = ArgumentCaptor.forClass(Integer.class); verify(topicManager, times(1)).createTopic( any(PubSubTopic.class), @@ -444,7 +445,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(false).when(topicManager).containsTopic(any(PubSubTopic.class)); doReturn(null).when(systemStore).getVersion(anyInt()); doReturn(5).when(systemStore).getPartitionCount(); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); verify(topicManager, times(1)).createTopic( any(PubSubTopic.class), partitionCountArgumentCaptor.capture(), @@ -462,7 +463,7 @@ public void testEnsureRealTimeTopicExistsForUserSystemStores() { doReturn(false).when(topicManager).containsTopic(any(PubSubTopic.class)); doReturn(version).when(systemStore).getVersion(anyInt()); doReturn(10).when(version).getPartitionCount(); - veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName); + veniceHelixAdmin.ensureRealTimeTopicExistsForUserSystemStores(clusterName, systemStoreName, 1); partitionCountArgumentCaptor = ArgumentCaptor.forClass(Integer.class); verify(topicManager, times(1)).createTopic( any(PubSubTopic.class), From ae75269af1ecefb8068fd1beb5624fe45260470f Mon Sep 17 00:00:00 2001 From: Arjun Date: Thu, 13 Nov 2025 13:00:04 -0800 Subject: [PATCH 2/5] . --- .../com/linkedin/davinci/DaVinciBackend.java | 6 ++++ .../linkedin/davinci/DaVinciBackendTest.java | 3 +- .../HeartbeatMonitoringServiceTest.java | 8 ++++- .../com/linkedin/venice/meta/SystemStore.java | 3 -- .../PushStatusStoreVeniceWriterCache.java | 2 +- .../venice/system/store/MetaStoreWriter.java | 9 +++-- .../integrationTest/resources/log4j2-test.xml | 36 ------------------- .../venice/controller/VeniceHelixAdmin.java | 2 +- .../controller/VeniceParentHelixAdmin.java | 12 ++----- 9 files changed, 26 insertions(+), 55 deletions(-) delete mode 100644 internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 79c94576e2d..83586510bf1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -562,9 +562,15 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() { return storeRepository; } + /** + * Resolves and returns the store object for the given store name. + * For user system stores, this returns the SystemStoreAttributes from the parent user store. + * For regular stores and shared system stores, this returns the Store object directly. + */ public final Object getStore(String storeName) { VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (systemStoreType != null) { + // it is a user system store String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName); Store userStore = storeRepository.getStore(userStoreName); Map systemStores = userStore.getSystemStores(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 8a8e934ff80..30d5f968897 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -71,10 +71,11 @@ public class DaVinciBackendTest { private MockedStatic mockClientFactory; private MockedConstruction mockMetadataBuilder; private MockedConstruction mockSchemaPresenceChecker; + private MockedStatic mockZkFactory; @BeforeClass public void init() { - MockedStatic mockZkFactory = mockStatic(ZkClientFactory.class); + mockZkFactory = mockStatic(ZkClientFactory.class); ZkClient mockZkClient = mock(ZkClient.class); mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient); doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class)); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index 99b6349e3d1..be222abd50d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -42,6 +42,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; +import java.lang.reflect.Field; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; @@ -816,7 +817,7 @@ public void testLargestHeartbeatLag() { } @Test - public void testTriggerAutoResubscribe() { + public void testTriggerAutoResubscribe() throws Exception { String store = "foo"; int version = 100; int partition = 123; @@ -831,6 +832,11 @@ public void testTriggerAutoResubscribe() { HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + ReadOnlyStoreRepository metadataRepository = mock(ReadOnlyStoreRepository.class); + doReturn(mock(Store.class)).when(metadataRepository).getStore(store); + Field metadataRepositoryField = HeartbeatMonitoringService.class.getDeclaredField("metadataRepository"); + metadataRepositoryField.setAccessible(true); + metadataRepositoryField.set(heartbeatMonitoringService, metadataRepository); VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig(); doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java index 7b2d790e4f7..2490cddad1c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java @@ -93,9 +93,6 @@ private void throwUnsupportedOperationException(String method) { */ private synchronized SystemStoreAttributes fetchAndBackfillSystemStoreAttributes(boolean readAccess) { SystemStoreAttributes systemStoreAttributes = veniceStore.getSystemStores().get(systemStoreType.getPrefix()); - if (veniceStore.getSystemStores().size() > 0) { - System.out.println(); - } if (systemStoreAttributes == null) { if (readAccess) { return DEFAULT_READ_ONLY_SYSTEM_STORE_ATTRIBUTE; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index 8497950aa11..0a79c5c57b0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -29,7 +29,7 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable { private final Map veniceWriters = new VeniceConcurrentHashMap<>(); private final Schema valueSchema; private final Schema updateSchema; - Function storeResolver; + private final Function storeResolver; // writerFactory Used for instantiating VeniceWriter public PushStatusStoreVeniceWriterCache( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 1656b0ce88b..4ae6cfac426 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -79,7 +79,10 @@ public class MetaStoreWriter implements Closeable { private final PubSubTopicRepository pubSubTopicRepository; private final long closeTimeoutMs; private final int numOfConcurrentVwCloseOps; - + /* + * Function to resolve store names to Store objects. Used to fetch system store metadata + * for determining the correct RT topic names. + */ public Function storeResolver; public MetaStoreWriter( @@ -372,11 +375,13 @@ VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) { int largestUsedRTVersionNumber; VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName()); if (type != null && store.isSystemStore()) { + // metaStoreName is user system store largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber(); } else { + // metaStoreName is zkShared system store largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber(); } - String rt = Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName), largestUsedRTVersionNumber); + String rt = Utils.getRealTimeTopicName(store, largestUsedRTVersionNumber); PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rt); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online"); diff --git a/internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml b/internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml deleted file mode 100644 index 65090322a30..00000000000 --- a/internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 6b7d50a6b3d..9d964f59b93 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5282,7 +5282,7 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP if (!store.isHybrid()) { String realTimeTopic = Utils.getRealTimeTopicName(store); if (realTimeTopic != null) { - PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); + PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(realTimeTopic); TopicManager topicManager = getTopicManagerForCluster(clusterConfig); if (topicManager.containsTopic(realTimeTopicForBatchStore)) { int rtPartitionCount = topicManager.getPartitionCount(realTimeTopicForBatchStore); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index ef9863ce50d..0de3528c8dc 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -80,7 +80,6 @@ import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS; import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION; -import com.linkedin.venice.meta.SystemStore; import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR; import static com.linkedin.venice.meta.VersionStatus.CREATED; import static com.linkedin.venice.meta.VersionStatus.ERROR; @@ -232,6 +231,7 @@ import com.linkedin.venice.meta.StoreGraveyard; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.StoreVersionInfo; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.VeniceETLStrategy; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; @@ -5615,15 +5615,7 @@ public int getLargestUsedVersion(String clusterName, String storeName) { @Override public int getLargestUsedRTVersion(String clusterName, String storeName) { - Map childControllers = getVeniceHelixAdmin().getControllerClientMap(clusterName); - int aggregatedLargestUsedRTVersionNumber = getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName); - for (Map.Entry controller: childControllers.entrySet()) { - VersionResponse response = controller.getValue().getStoreLargestUsedVersion(clusterName, storeName); - if (response.getVersion() > aggregatedLargestUsedRTVersionNumber) { - aggregatedLargestUsedRTVersionNumber = response.getVersion(); - } - } - return aggregatedLargestUsedRTVersionNumber; + return getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName); } /** From 57333c5b693ab27486a474a8dfad792736fb52eb Mon Sep 17 00:00:00 2001 From: Arjun Date: Tue, 6 Jan 2026 11:23:12 -0800 Subject: [PATCH 3/5] address review comments --- .../com/linkedin/davinci/DaVinciBackend.java | 8 ++++---- .../helix/HelixParticipationService.java | 2 +- .../PushStatusStoreVeniceWriterCache.java | 20 ++++--------------- .../PushStatusStoreWriter.java | 2 +- .../venice/endToEnd/PushStatusStoreTest.java | 2 +- .../UserSystemStoreLifeCycleHelper.java | 2 +- .../venice/controller/VeniceHelixAdmin.java | 2 +- 7 files changed, 13 insertions(+), 25 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 83586510bf1..f6528df3a25 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -310,7 +310,7 @@ public DaVinciBackend( instanceName, valueSchemaEntry, updateSchemaEntry, - (this::getStore)); + (this::getRealTimeTopicName)); } ingestionService.start(); @@ -567,7 +567,7 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() { * For user system stores, this returns the SystemStoreAttributes from the parent user store. * For regular stores and shared system stores, this returns the Store object directly. */ - public final Object getStore(String storeName) { + private String getRealTimeTopicName(String storeName) { VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (systemStoreType != null) { // it is a user system store @@ -576,12 +576,12 @@ public final Object getStore(String storeName) { Map systemStores = userStore.getSystemStores(); for (Map.Entry systemStoreEntries: systemStores.entrySet()) { if (storeName.startsWith(systemStoreEntries.getKey())) { - return systemStoreEntries.getValue(); + return Utils.getRealTimeTopicName(systemStoreEntries.getValue()); } } return null; } else { - return storeRepository.getStore(storeName); + return Utils.getRealTimeTopicName(storeRepository.getStore(storeName)); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java index 3ab10312003..b70762c2de6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java @@ -362,7 +362,7 @@ private void asyncStart() { instance.getNodeId(), valueSchemaEntry, updateSchemaEntry, - helixReadOnlyStoreRepository::getStore); + storeName -> Utils.getRealTimeTopicName(helixReadOnlyStoreRepository.getStore(storeName))); // Record replica status in Zookeeper. // Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier. diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index 0a79c5c57b0..d0079e8a5ea 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -1,12 +1,8 @@ package com.linkedin.venice.pushstatushelper; import com.linkedin.venice.common.VeniceSystemStoreUtils; -import com.linkedin.venice.meta.Store; -import com.linkedin.venice.meta.StoreInfo; -import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; -import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -29,32 +25,24 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable { private final Map veniceWriters = new VeniceConcurrentHashMap<>(); private final Schema valueSchema; private final Schema updateSchema; - private final Function storeResolver; + private final Function rtNameResolver; // writerFactory Used for instantiating VeniceWriter public PushStatusStoreVeniceWriterCache( VeniceWriterFactory writerFactory, Schema valueSchema, Schema updateSchema, - Function storeResolver) { + Function rtNameResolver) { this.writerFactory = writerFactory; this.valueSchema = valueSchema; this.updateSchema = updateSchema; - this.storeResolver = storeResolver; + this.rtNameResolver = rtNameResolver; } public VeniceWriter prepareVeniceWriter(String storeName) { return veniceWriters.computeIfAbsent(storeName, s -> { - Object store = storeResolver.apply(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); - String rtTopic; + String rtTopic = rtNameResolver.apply(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); - if (store instanceof Store) { - rtTopic = Utils.getRealTimeTopicName((Store) store); - } else if (store instanceof StoreInfo) { - rtTopic = Utils.getRealTimeTopicName((StoreInfo) store); - } else { - rtTopic = Utils.getRealTimeTopicName((SystemStoreAttributes) store); - } VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic) .setKeyPayloadSerializer( new VeniceAvroKafkaSerializer( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index 33141aa5d29..c778cb8e52a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -56,7 +56,7 @@ public PushStatusStoreWriter( String instanceName, SchemaEntry valueSchemaEntry, DerivedSchemaEntry updateSchemaEntry, - Function storeResolver) { + Function storeResolver) { this( new PushStatusStoreVeniceWriterCache( writerFactory, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index 169bd5aeaed..4ef76d8cd32 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -292,7 +292,7 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th "dummyInstance", valueSchemaEntry, updateSchemaEntry, - (storeName) -> controllerClient.getStore(storeName).getStore()); + (storeName) -> Utils.getRealTimeTopicName(controllerClient.getStore(storeName).getStore())); // After deleting the inc push status belonging to just one partition we should expect // SOIP from the controller since other partition has replicas with EOIP status diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java index a913bdc70df..50307978321 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java @@ -165,7 +165,7 @@ public static void deleteSystemStore( throw new VeniceException("Unknown system store type: " + systemStoreName); } // skip truncating system store RT topics if it's parent fabric as it's not created for parent fabric - if (!admin.isParent()) { + if (!admin.isParent() && systemStore != null) { admin.truncateKafkaTopic(Utils.getRealTimeTopicName(systemStore)); } } else { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 9d964f59b93..18dbb292d7d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -683,7 +683,7 @@ public VeniceHelixAdmin( CONTROLLER_HEARTBEAT_INSTANCE_NAME, valueSchemaEntry, updateSchemaEntry, - storeName -> getStore(discoverCluster(storeName), storeName)); + storeName -> Utils.getRealTimeTopicName(getStore(discoverCluster(storeName), storeName))); }); clusterToLiveClusterConfigRepo = new VeniceConcurrentHashMap<>(); From d27a207fea9e372aeb41e5dbf4507235ed9fb924 Mon Sep 17 00:00:00 2001 From: Arjun Date: Tue, 6 Jan 2026 13:53:52 -0800 Subject: [PATCH 4/5] address review comments --- .../consumer/KafkaStoreIngestionService.java | 2 +- .../kafka/consumer/StoreIngestionTask.java | 9 +-- .../consumer/StoreIngestionTaskTest.java | 60 ++++++++++--------- .../venice/system/store/MetaStoreWriter.java | 29 +++------ .../java/com/linkedin/venice/utils/Utils.java | 14 +++++ .../system/store/MetaStoreWriterTest.java | 31 +++------- .../venice/controller/VeniceHelixAdmin.java | 2 +- .../TestUserSystemStoreLifeCycleHelper.java | 3 + 8 files changed, 73 insertions(+), 77 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 5ae472d5109..81c2196a367 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -394,7 +394,7 @@ public KafkaStoreIngestionService( pubSubTopicRepository, serverConfig.getMetaStoreWriterCloseTimeoutInMS(), serverConfig.getMetaStoreWriterCloseConcurrency(), - storeName -> metadataRepo.getStore(storeName)); + storeName -> Utils.getRealTimeTopicNameForSystemStore(metadataRepo.getStore(storeName))); metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() { @Override public void handleStoreDeleted(Store store) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 07b5489775a..7e7baa6dc5d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -4355,11 +4355,12 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep availableSchemaIds.set(schemaId, new Object()); // TODO: Query metastore for existence of value schema id before doing an update. During bounce of large // cluster these metastore writes could be spiky - if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) { + if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName) + && !VeniceSystemStoreUtils.isSystemStore(storeName)) { String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); - Store metaStore = metaStoreWriter.storeResolver.apply(metaStoreName); - if (metaStore != null) { - PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(metaStore)); + String metaStoreRealTimeTopicName = metaStoreWriter.realTimeTopicNameResolver.apply(metaStoreName); + if (metaStoreRealTimeTopicName != null) { + PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName); if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) { metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index f5dfa59f80c..7e0459380f0 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -467,8 +467,9 @@ public static Object[][] sortedInputAndAAConfigProvider() { private MockStoreVersionConfigs storeAndVersionConfigsUnderTest; private MetaStoreWriter mockMetaStoreWriter; - private Function mockStoreResolver; + private Function mockStoreRealTimeTopicNameResolver; private Store mockMetaStore; + private String mockMetaStoreRealTimeTopicName; private static byte[] getRandomKey(Integer partition) { String randomString = Utils.getUniqueString("KeyForPartition" + partition); @@ -591,9 +592,12 @@ public void methodSetUp() throws Exception { kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); mockMetaStoreWriter = mock(MetaStoreWriter.class); - mockStoreResolver = mock(Function.class); + mockStoreRealTimeTopicNameResolver = mock(Function.class); mockMetaStore = mock(Store.class); - mockMetaStoreWriter.storeResolver = mockStoreResolver; + when(mockMetaStore.getName()).thenReturn("metaStoreName"); + when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + mockMetaStoreRealTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver; mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); @@ -1489,15 +1493,16 @@ public void testMetaStoreWriterIntegration() throws Exception { Store mockMetaStore = mock(Store.class); when(mockMetaStore.getName()).thenReturn(metaStoreName); when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + String realTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore); MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); - Function mockStoreResolver = mock(Function.class); - mockMetaStoreWriter.storeResolver = mockStoreResolver; + Function mockStoreRealTimeTopicNameResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver; // Meta store exists - when(mockStoreResolver.apply(metaStoreName)).thenReturn(mockMetaStore); + when(mockStoreRealTimeTopicNameResolver.apply(metaStoreName)).thenReturn(realTimeTopicName); - PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore)); + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(realTimeTopicName); when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true); // Setup StoreIngestionTask with real metaStoreWriter @@ -1506,7 +1511,7 @@ public void testMetaStoreWriterIntegration() throws Exception { StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // Verify that the meta store was resolved - verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + verify(mockStoreRealTimeTopicNameResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); // Verify that writeInUseValueSchema was called verify(mockMetaStoreWriter, timeout(TEST_TIMEOUT_MS)) @@ -1534,8 +1539,8 @@ public void testMetaStoreWriterWhenMetaStoreDoesNotExist() throws Exception { int schemaId = 1; MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); - Function mockStoreResolver = mock(Function.class); - mockMetaStoreWriter.storeResolver = mockStoreResolver; + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreResolver; // Meta store does NOT exist when(mockStoreResolver.apply(metaStoreName)).thenReturn(null); @@ -1573,16 +1578,17 @@ public void testMetaStoreWriterWhenRTTopicDoesNotExist() throws Exception { Store mockMetaStore = mock(Store.class); when(mockMetaStore.getName()).thenReturn(metaStoreName); when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1); + String realTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore); MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); - Function mockStoreResolver = mock(Function.class); - mockMetaStoreWriter.storeResolver = mockStoreResolver; + Function mockStoreRealTimeTopicNameResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver; // Meta store exists - when(mockStoreResolver.apply(metaStoreName)).thenReturn(mockMetaStore); + when(mockStoreRealTimeTopicNameResolver.apply(metaStoreName)).thenReturn(realTimeTopicName); // But RT topic does NOT exist - PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore)); + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(realTimeTopicName); when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(false); localVeniceWriter.broadcastStartOfPush(new HashMap<>()); @@ -1590,7 +1596,7 @@ public void testMetaStoreWriterWhenRTTopicDoesNotExist() throws Exception { StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { // Verify that the meta store was resolved - verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); + verify(mockStoreRealTimeTopicNameResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName); // Verify RT topic check was made verify(mockTopicManager, timeout(TEST_TIMEOUT_MS).atLeastOnce()).containsTopicWithRetries(metaStoreRTTopic, 5); @@ -1618,8 +1624,8 @@ public void testMetaStoreWriterSkippedForSystemStore() throws Exception { String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo); MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class); - Function mockStoreResolver = mock(Function.class); - mockMetaStoreWriter.storeResolver = mockStoreResolver; + Function mockStoreResolver = mock(Function.class); + mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreResolver; localVeniceWriter.broadcastStartOfPush(new HashMap<>()); localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID).get(); @@ -1654,19 +1660,19 @@ public void testMetaStoreResolutionWhenMetaStoreExists() { String STORE_NAME = "testStore"; String META_STORE_NAME = "venice_system_store_meta_store_testStore"; - when(mockStoreResolver.apply(META_STORE_NAME)).thenReturn(mockMetaStore); + when(mockStoreRealTimeTopicNameResolver.apply(META_STORE_NAME)).thenReturn(mockMetaStoreRealTimeTopicName); when(mockMetaStore.getName()).thenReturn(META_STORE_NAME); - PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore)); + PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(mockMetaStoreRealTimeTopicName); when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true); // Execute: Call the method that uses metaStoreWriter.storeResolver String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); - Store resolvedStore = mockStoreResolver.apply(resolvedMetaStoreName); + String realTimeTopicName = mockStoreRealTimeTopicNameResolver.apply(resolvedMetaStoreName); // Verify - assertNotNull(resolvedStore); - assertEquals(resolvedStore.getName(), META_STORE_NAME); - verify(mockStoreResolver, times(1)).apply(META_STORE_NAME); + assertNotNull(realTimeTopicName); + assertEquals(realTimeTopicName, mockMetaStoreRealTimeTopicName); + verify(mockStoreRealTimeTopicNameResolver, times(1)).apply(META_STORE_NAME); } @Test @@ -1674,15 +1680,15 @@ public void testMetaStoreResolutionWhenMetaStoreDoesNotExist() { // Setup - metaStore is null String STORE_NAME = "testStore"; String META_STORE_NAME = "venice_system_store_meta_store_testStore"; - when(mockStoreResolver.apply(META_STORE_NAME)).thenReturn(null); + when(mockStoreRealTimeTopicNameResolver.apply(META_STORE_NAME)).thenReturn(null); // Execute String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME); - Store resolvedStore = mockStoreResolver.apply(resolvedMetaStoreName); + String realTimeTopicName = mockStoreRealTimeTopicNameResolver.apply(resolvedMetaStoreName); // Verify - should handle null gracefully - assertNull(resolvedStore); - verify(mockStoreResolver, times(1)).apply(META_STORE_NAME); + assertNull(realTimeTopicName); + verify(mockStoreRealTimeTopicNameResolver, times(1)).apply(META_STORE_NAME); } @Test diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 4ae6cfac426..5bed25ec33e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -6,7 +6,6 @@ import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; -import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -21,7 +20,6 @@ import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.systemstore.schemas.StoreValueSchema; import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; -import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceResourceCloseResult; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.writer.VeniceWriter; @@ -83,7 +81,7 @@ public class MetaStoreWriter implements Closeable { * Function to resolve store names to Store objects. Used to fetch system store metadata * for determining the correct RT topic names. */ - public Function storeResolver; + public Function realTimeTopicNameResolver; public MetaStoreWriter( TopicManager topicManager, @@ -92,7 +90,7 @@ public MetaStoreWriter( PubSubTopicRepository pubSubTopicRepository, long closeTimeoutMs, int numOfConcurrentVwCloseOps, - Function storeResolver) { + Function realTimeTopicNameResolver) { /** * TODO: get the write compute schema from the constructor so that this class does not use {@link WriteComputeSchemaConverter} */ @@ -106,7 +104,7 @@ public MetaStoreWriter( pubSubTopicRepository, closeTimeoutMs, numOfConcurrentVwCloseOps, - storeResolver); + realTimeTopicNameResolver); } MetaStoreWriter( @@ -117,7 +115,7 @@ public MetaStoreWriter( PubSubTopicRepository pubSubTopicRepository, long closeTimeoutMs, int numOfConcurrentVwCloseOps, - Function storeResolver) { + Function realTimeTopicNameResolver) { this.topicManager = topicManager; this.writerFactory = writerFactory; this.derivedComputeSchema = derivedComputeSchema; @@ -125,7 +123,7 @@ public MetaStoreWriter( this.pubSubTopicRepository = pubSubTopicRepository; this.closeTimeoutMs = closeTimeoutMs; this.numOfConcurrentVwCloseOps = numOfConcurrentVwCloseOps; - this.storeResolver = storeResolver; + this.realTimeTopicNameResolver = realTimeTopicNameResolver; } /** @@ -371,18 +369,8 @@ Map getMetaStoreWriterMap() { VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) { return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> { - Store store = storeResolver.apply(metaStoreName); - int largestUsedRTVersionNumber; - VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName()); - if (type != null && store.isSystemStore()) { - // metaStoreName is user system store - largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber(); - } else { - // metaStoreName is zkShared system store - largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber(); - } - String rt = Utils.getRealTimeTopicName(store, largestUsedRTVersionNumber); - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rt); + String metaStoreRealTimeTopicName = realTimeTopicNameResolver.apply(metaStoreName); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online"); } @@ -427,8 +415,7 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter, * to write a Control Message to the RT topic, and it could hang if the topic doesn't exist. * This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter. */ - PubSubTopic rtTopic = - pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName))); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(realTimeTopicNameResolver.apply(metaStoreName)); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { LOGGER.info( "RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages", diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java index 05b8adbd400..93095c44c19 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.exceptions.ErrorType; @@ -748,6 +749,19 @@ public static String getSeparateRealTimeTopicName(StoreInfo storeInfo) { return getSeparateRealTimeTopicName(Utils.getRealTimeTopicName(storeInfo)); } + public static String getRealTimeTopicNameForSystemStore(Store systemStore) { + int largestUsedRTVersionNumber; + VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(systemStore.getName()); + if (type != null && systemStore.isSystemStore()) { + // systemStore is a user system store + largestUsedRTVersionNumber = ((SystemStore) systemStore).getVeniceStore().getLargestUsedRTVersionNumber(); + } else { + // systemStore is a zkShared system store + largestUsedRTVersionNumber = systemStore.getLargestUsedRTVersionNumber(); + } + return Utils.getRealTimeTopicName(systemStore, largestUsedRTVersionNumber); + } + public static int calculateTopicHashCode(PubSubTopic topic) { if (topic.isSeparateRealTimeTopic()) { String realTimeTopicName = Utils.getRealTimeTopicNameFromSeparateRealTimeTopic(topic.getName()); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java index 6b0761d1fcb..9b36c72a3c8 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java @@ -24,6 +24,7 @@ import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceResourceCloseResult; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -91,14 +92,9 @@ public void testGetOrCreateMetaStoreWriterWithSystemStore() { pubSubTopicRepository, 5000L, 2, - storeName -> systemStore); + storeName -> Utils.getRealTimeTopicNameForSystemStore(systemStore)); - // Execute - VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName); - - // Verify - Assert.assertNotNull(result); - verify(veniceStore, times(1)).getLargestUsedRTVersionNumber(); + Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName)); verify(systemStore, times(1)).getVeniceStore(); } @@ -139,15 +135,9 @@ public void testGetOrCreateMetaStoreWriterWithRegularStore() { pubSubTopicRepository, 5000L, 2, - storeName1 -> regularStore); - - // Execute - VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(storeName); + storeName1 -> Utils.getRealTimeTopicNameForSystemStore(regularStore)); - // Verify - Assert.assertNotNull(result); - verify(regularStore, times(1)).getLargestUsedRTVersionNumber(); - // verify(regularStore, never()).getVeniceStore; // Should not call getVeniceStore() for regular stores + Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName)); } @Test @@ -187,14 +177,9 @@ public void testGetOrCreateMetaStoreWriterWithNonSystemStoreType() { pubSubTopicRepository, 5000L, 2, - storeName1 -> store); - - // Execute - VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(storeName); + storeName1 -> Utils.getRealTimeTopicNameForSystemStore(store)); - // Verify - should use else branch - Assert.assertNotNull(result); - verify(store, times(1)).getLargestUsedRTVersionNumber(); + Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName)); } @Test @@ -268,7 +253,7 @@ public void testClose(long closeTimeoutMs, int numOfConcurrentVwCloseOps) pubSubTopicRepository, closeTimeoutMs, numOfConcurrentVwCloseOps, - storeName -> metaStore); + storeName -> metaStoreName + "_rt"); Map metaStoreWriters = metaStoreWriter.getMetaStoreWriterMap(); List> completedFutures = new ArrayList<>(20); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 18dbb292d7d..bdf62d2542d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -667,7 +667,7 @@ public VeniceHelixAdmin( pubSubTopicRepository, commonConfig.getMetaStoreWriterCloseTimeoutInMS(), commonConfig.getMetaStoreWriterCloseConcurrency(), - storeName -> getStore(discoverCluster(storeName), storeName)); + storeName -> Utils.getRealTimeTopicName(getStore(discoverCluster(storeName), storeName))); metaStoreReader = new MetaStoreReader(d2Client, commonConfig.getClusterDiscoveryD2ServiceName()); pushStatusStoreReader = new PushStatusStoreReader( d2Client, diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java index fc98702bc90..772d86a3ea6 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUserSystemStoreLifeCycleHelper.java @@ -69,6 +69,7 @@ public void setUp() { public void testDeleteMetaSystemStore() { // Test when running in child fabric - should truncate RT topic when(mockAdmin.isParent()).thenReturn(false); + when(mockStore.getName()).thenReturn(META_STORE_NAME); // Test deleting a META_STORE system store UserSystemStoreLifeCycleHelper.deleteSystemStore( mockAdmin, @@ -92,6 +93,7 @@ public void testDeleteMetaSystemStore() { public void testDeleteDaVinciPushStatusSystemStore() { // Test when running in child fabric - should truncate RT topic when(mockAdmin.isParent()).thenReturn(false); + when(mockStore.getName()).thenReturn(DAVINCI_STORE_NAME); // Test deleting a DAVINCI_PUSH_STATUS_STORE system store UserSystemStoreLifeCycleHelper.deleteSystemStore( mockAdmin, @@ -115,6 +117,7 @@ public void testDeleteDaVinciPushStatusSystemStore() { public void testDeleteBatchJobHeartbeatSystemStore() { // Test when running in child fabric - should truncate RT topic when(mockAdmin.isParent()).thenReturn(false); + when(mockStore.getName()).thenReturn(HEARTBEAT_STORE_NAME); // Test deleting a BATCH_JOB_HEARTBEAT_STORE system store // This should log an error but not throw an exception UserSystemStoreLifeCycleHelper.deleteSystemStore( From ad90f31a3fdb72b682299c25e24579bb12a342a8 Mon Sep 17 00:00:00 2001 From: Arjun Date: Wed, 7 Jan 2026 14:35:11 -0800 Subject: [PATCH 5/5] address review comments --- .../com/linkedin/davinci/DaVinciBackend.java | 8 ++-- .../consumer/KafkaStoreIngestionService.java | 2 +- .../java/com/linkedin/venice/utils/Utils.java | 37 ++++++++----------- .../system/store/MetaStoreWriterTest.java | 6 +-- .../venice/controller/VeniceHelixAdmin.java | 21 +++-------- 5 files changed, 30 insertions(+), 44 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index f6528df3a25..219baf2a2bc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -45,6 +45,7 @@ import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; @@ -564,12 +565,11 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() { /** * Resolves and returns the store object for the given store name. - * For user system stores, this returns the SystemStoreAttributes from the parent user store. - * For regular stores and shared system stores, this returns the Store object directly. + * For user system stores, it finds the RT name from the SystemStoreAttributes of the parent user store. + * For regular user stores and top level system stores, it finds the RT name from the Store object directly. */ private String getRealTimeTopicName(String storeName) { - VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); - if (systemStoreType != null) { + if (VeniceSystemStoreUtils.isUserSystemStore(storeName)) { // it is a user system store String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName); Store userStore = storeRepository.getStore(userStoreName); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 81c2196a367..79b387d2385 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -394,7 +394,7 @@ public KafkaStoreIngestionService( pubSubTopicRepository, serverConfig.getMetaStoreWriterCloseTimeoutInMS(), serverConfig.getMetaStoreWriterCloseConcurrency(), - storeName -> Utils.getRealTimeTopicNameForSystemStore(metadataRepo.getStore(storeName))); + storeName -> Utils.getRealTimeTopicName(metadataRepo.getStore(storeName))); metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() { @Override public void handleStoreDeleted(Store store) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java index 93095c44c19..fa4eab77ef5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; -import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.exceptions.ErrorType; @@ -633,6 +633,21 @@ public static String composeRealTimeTopic(String storeName, int versionNumber) { } } + public static int largestUsedRTVersionNumber(Store store) { + if (store.isSystemStore() && VeniceSystemStoreUtils.isUserSystemStore(store.getName())) { + // store is a user level system store + Store userStore = store instanceof SystemStore ? ((SystemStore) store).getVeniceStore() : store; + return userStore.getLargestUsedRTVersionNumber(); + } else { + // store is a regular user store or a top level system store + return store.getLargestUsedRTVersionNumber(); + } + } + + public static String getRealTimeTopicName(Store store) { + return getRealTimeTopicName(store, largestUsedRTVersionNumber(store)); + } + /** * It follows the following order to search for real time topic name, * i) current store-version config, ii) store config, iii) other store-version configs, iv) default name @@ -646,13 +661,6 @@ public static String getRealTimeTopicName(Store store, int rtVersionNumber) { rtVersionNumber); } - public static String getRealTimeTopicName(Store store) { - if (store instanceof SystemStore) { - return getRealTimeTopicName(store, ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber()); - } - return getRealTimeTopicName(store, DEFAULT_RT_VERSION_NUMBER); - } - public static String getRealTimeTopicName(StoreInfo storeInfo) { return getRealTimeTopicName( storeInfo.getName(), @@ -749,19 +757,6 @@ public static String getSeparateRealTimeTopicName(StoreInfo storeInfo) { return getSeparateRealTimeTopicName(Utils.getRealTimeTopicName(storeInfo)); } - public static String getRealTimeTopicNameForSystemStore(Store systemStore) { - int largestUsedRTVersionNumber; - VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(systemStore.getName()); - if (type != null && systemStore.isSystemStore()) { - // systemStore is a user system store - largestUsedRTVersionNumber = ((SystemStore) systemStore).getVeniceStore().getLargestUsedRTVersionNumber(); - } else { - // systemStore is a zkShared system store - largestUsedRTVersionNumber = systemStore.getLargestUsedRTVersionNumber(); - } - return Utils.getRealTimeTopicName(systemStore, largestUsedRTVersionNumber); - } - public static int calculateTopicHashCode(PubSubTopic topic) { if (topic.isSeparateRealTimeTopic()) { String realTimeTopicName = Utils.getRealTimeTopicNameFromSeparateRealTimeTopic(topic.getName()); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java index 9b36c72a3c8..2c4f6eefb55 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java @@ -92,7 +92,7 @@ public void testGetOrCreateMetaStoreWriterWithSystemStore() { pubSubTopicRepository, 5000L, 2, - storeName -> Utils.getRealTimeTopicNameForSystemStore(systemStore)); + storeName -> Utils.getRealTimeTopicName(systemStore)); Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName)); verify(systemStore, times(1)).getVeniceStore(); @@ -135,7 +135,7 @@ public void testGetOrCreateMetaStoreWriterWithRegularStore() { pubSubTopicRepository, 5000L, 2, - storeName1 -> Utils.getRealTimeTopicNameForSystemStore(regularStore)); + storeName1 -> Utils.getRealTimeTopicName(regularStore)); Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName)); } @@ -177,7 +177,7 @@ public void testGetOrCreateMetaStoreWriterWithNonSystemStoreType() { pubSubTopicRepository, 5000L, 2, - storeName1 -> Utils.getRealTimeTopicNameForSystemStore(store)); + storeName1 -> Utils.getRealTimeTopicName(store)); Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName)); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index bdf62d2542d..f0c6a147907 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -169,7 +169,6 @@ import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.StoreName; import com.linkedin.venice.meta.StoreVersionInfo; -import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.SystemStoreAttributes; import com.linkedin.venice.meta.VeniceETLStrategy; import com.linkedin.venice.meta.Version; @@ -1253,14 +1252,13 @@ private void configureNewStore( /* If this store existed previously, we do not want to use the same RT topic name that was used by the previous store. To ensure this, increase largestUsedRTVersionNumber and new RT name will be different */ if (config.isRealTimeTopicVersioningEnabled()) { - VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(newStore.getName()); int newNumber; - if (type == null && newStore.isSystemStore()) { - // Top-level shared ZK store - newNumber = largestUsedRTVersionNumber; - } else { - // User-level system store OR regular store + if (!newStore.isSystemStore() || VeniceSystemStoreUtils.isUserSystemStore(newStore.getName())) { + // user level system store OR regular store newNumber = largestUsedRTVersionNumber + 1; + } else { + // top level system store + newNumber = largestUsedRTVersionNumber; } newStore.setLargestUsedRTVersionNumber(newNumber); @@ -3008,14 +3006,7 @@ private Pair addVersion( boolean versionSwapDeferred, int repushSourceVersion) { Store store = getStore(clusterName, storeName); - int largestUsedRTVersionNumber; - VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName()); - if (type != null && store.isSystemStore()) { - largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber(); - } else { - largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber(); - } - + int largestUsedRTVersionNumber = Utils.largestUsedRTVersionNumber(store); return addVersion( clusterName, storeName,