diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java index 320fc2eebbba..aae7c5d50902 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -311,6 +312,9 @@ private IdealState getIdealState(String idealStatePath) { ZNRecord znRecord = _zkDataAccessor.get(idealStatePath, stat, AccessOption.PERSISTENT); if (znRecord != null) { znRecord.setVersion(stat.getVersion()); + // Intern all the instance id and state to reduce memory footprint + internMapFields(znRecord); + // TODO: Avoid copying ZNRecord. This requires Helix change. return new IdealState(znRecord); } else { return null; @@ -323,12 +327,24 @@ private ExternalView getExternalView(String externalViewPath) { ZNRecord znRecord = _zkDataAccessor.get(externalViewPath, stat, AccessOption.PERSISTENT); if (znRecord != null) { znRecord.setVersion(stat.getVersion()); + // Intern all the instance id and state to reduce memory footprint + internMapFields(znRecord); + // TODO: Avoid copying ZNRecord. This requires Helix change. return new ExternalView(znRecord); } else { return null; } } + private void internMapFields(ZNRecord znRecord) { + for (Map.Entry> entry : znRecord.getMapFields().entrySet()) { + Map instanceStateMap = entry.getValue(); + Map internedInstanceStateMap = new TreeMap<>(); + instanceStateMap.forEach((k, v) -> internedInstanceStateMap.put(k.intern(), v.intern())); + entry.setValue(internedInstanceStateMap); + } + } + /** * Returns true if the IS / EV version has changed (irrespective of whether the routing entry was updated), otherwise * return false @@ -384,14 +400,15 @@ private void processInstanceConfigChangeInternal() { for (ZNRecord instanceConfigZNRecord : instanceConfigZNRecords) { // Put instance initialization logics into try-catch block to prevent bad server configs affecting the entire // cluster - String instanceId = instanceConfigZNRecord.getId(); try { if (isEnabledServer(instanceConfigZNRecord)) { + // Intern the instance id to reduce memory footprint + String instanceId = instanceConfigZNRecord.getId().intern(); enabledServers.add(instanceId); // Always refresh the server instance with the latest instance config in case it changes InstanceConfig instanceConfig = new InstanceConfig(instanceConfigZNRecord); - ServerInstance serverInstance = new ServerInstance(instanceConfig); + ServerInstance serverInstance = new ServerInstance(instanceId, instanceConfig); if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null) { newEnabledServers.add(instanceId); @@ -408,7 +425,7 @@ private void processInstanceConfigChangeInternal() { } } } catch (Exception e) { - LOGGER.error("Caught exception while adding instance: {}, ignoring it", instanceId, e); + LOGGER.error("Caught exception while adding instance: {}, ignoring it", instanceConfigZNRecord.getId(), e); } } List newDisabledServers = new ArrayList<>(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java index 2df2b3a934ff..7471e50748d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java @@ -57,12 +57,11 @@ public enum RoutingType { private final String _adminEndpoint; private final int _pool; - /** - * By default (auto joined instances), server instance name is of format: {@code Server__}, e.g. - * {@code Server_localhost_12345}, hostname is of format: {@code Server_}, e.g. {@code Server_localhost}. - */ - public ServerInstance(InstanceConfig instanceConfig) { - _instanceId = instanceConfig.getInstanceName(); + /// By default (auto joined instances), server instance id is of format: `Server__`, e.g. + /// `Server_localhost_12345`, hostname is of format: `Server_`, e.g. `Server_localhost`. + /// NOTE: Pass in `instanceId` so that it can be interned at the caller side. + public ServerInstance(String instanceId, InstanceConfig instanceConfig) { + _instanceId = instanceId; String hostname = instanceConfig.getHostName(); if (hostname != null) { if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { @@ -74,11 +73,10 @@ public ServerInstance(InstanceConfig instanceConfig) { } else { // Hostname might be null in some tests (InstanceConfig created by calling the constructor instead of fetching // from ZK), directly parse the instance name - String instanceName = instanceConfig.getInstanceName(); - if (instanceName.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { - instanceName = instanceName.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH); + if (instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { + instanceId = instanceId.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH); } - String[] hostnameAndPort = StringUtils.split(instanceName, HOSTNAME_PORT_DELIMITER); + String[] hostnameAndPort = StringUtils.split(instanceId, HOSTNAME_PORT_DELIMITER); _hostname = hostnameAndPort[0]; _port = Integer.parseInt(hostnameAndPort[1]); } @@ -92,6 +90,11 @@ public ServerInstance(InstanceConfig instanceConfig) { _pool = extractPool(instanceConfig); } + @VisibleForTesting + public ServerInstance(InstanceConfig instanceConfig) { + this(instanceConfig.getInstanceName(), instanceConfig); + } + @VisibleForTesting ServerInstance(String hostname, int port) { _instanceId = Helix.PREFIX_OF_SERVER_INSTANCE + hostname + "_" + port;