Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -311,6 +312,9 @@ private IdealState getIdealState(String idealStatePath) {
ZNRecord znRecord = _zkDataAccessor.get(idealStatePath, stat, AccessOption.PERSISTENT);
if (znRecord != null) {
znRecord.setVersion(stat.getVersion());
// Intern all the instance id and state to reduce memory footprint
internMapFields(znRecord);
// TODO: Avoid copying ZNRecord. This requires Helix change.
return new IdealState(znRecord);
} else {
return null;
Expand All @@ -323,12 +327,24 @@ private ExternalView getExternalView(String externalViewPath) {
ZNRecord znRecord = _zkDataAccessor.get(externalViewPath, stat, AccessOption.PERSISTENT);
if (znRecord != null) {
znRecord.setVersion(stat.getVersion());
// Intern all the instance id and state to reduce memory footprint
internMapFields(znRecord);
// TODO: Avoid copying ZNRecord. This requires Helix change.
return new ExternalView(znRecord);
} else {
return null;
}
}

private void internMapFields(ZNRecord znRecord) {
for (Map.Entry<String, Map<String, String>> entry : znRecord.getMapFields().entrySet()) {
Map<String, String> instanceStateMap = entry.getValue();
Map<String, String> internedInstanceStateMap = new TreeMap<>();
instanceStateMap.forEach((k, v) -> internedInstanceStateMap.put(k.intern(), v.intern()));
entry.setValue(internedInstanceStateMap);
}
}

/**
* Returns true if the IS / EV version has changed (irrespective of whether the routing entry was updated), otherwise
* return false
Expand Down Expand Up @@ -384,14 +400,15 @@ private void processInstanceConfigChangeInternal() {
for (ZNRecord instanceConfigZNRecord : instanceConfigZNRecords) {
// Put instance initialization logics into try-catch block to prevent bad server configs affecting the entire
// cluster
String instanceId = instanceConfigZNRecord.getId();
try {
if (isEnabledServer(instanceConfigZNRecord)) {
// Intern the instance id to reduce memory footprint
String instanceId = instanceConfigZNRecord.getId().intern();
enabledServers.add(instanceId);

// Always refresh the server instance with the latest instance config in case it changes
InstanceConfig instanceConfig = new InstanceConfig(instanceConfigZNRecord);
ServerInstance serverInstance = new ServerInstance(instanceConfig);
ServerInstance serverInstance = new ServerInstance(instanceId, instanceConfig);
if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null) {
newEnabledServers.add(instanceId);

Expand All @@ -408,7 +425,7 @@ private void processInstanceConfigChangeInternal() {
}
}
} catch (Exception e) {
LOGGER.error("Caught exception while adding instance: {}, ignoring it", instanceId, e);
LOGGER.error("Caught exception while adding instance: {}, ignoring it", instanceConfigZNRecord.getId(), e);
}
}
List<String> newDisabledServers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ public enum RoutingType {
private final String _adminEndpoint;
private final int _pool;

/**
* By default (auto joined instances), server instance name is of format: {@code Server_<hostname>_<port>}, e.g.
* {@code Server_localhost_12345}, hostname is of format: {@code Server_<hostname>}, e.g. {@code Server_localhost}.
*/
public ServerInstance(InstanceConfig instanceConfig) {
_instanceId = instanceConfig.getInstanceName();
/// By default (auto joined instances), server instance id is of format: `Server_<hostname>_<port>`, e.g.
/// `Server_localhost_12345`, hostname is of format: `Server_<hostname>`, e.g. `Server_localhost`.
/// NOTE: Pass in `instanceId` so that it can be interned at the caller side.
public ServerInstance(String instanceId, InstanceConfig instanceConfig) {
_instanceId = instanceId;
String hostname = instanceConfig.getHostName();
if (hostname != null) {
if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
Expand All @@ -74,11 +73,10 @@ public ServerInstance(InstanceConfig instanceConfig) {
} else {
// Hostname might be null in some tests (InstanceConfig created by calling the constructor instead of fetching
// from ZK), directly parse the instance name
String instanceName = instanceConfig.getInstanceName();
if (instanceName.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
instanceName = instanceName.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
if (instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
instanceId = instanceId.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
}
String[] hostnameAndPort = StringUtils.split(instanceName, HOSTNAME_PORT_DELIMITER);
String[] hostnameAndPort = StringUtils.split(instanceId, HOSTNAME_PORT_DELIMITER);
_hostname = hostnameAndPort[0];
_port = Integer.parseInt(hostnameAndPort[1]);
}
Expand All @@ -92,6 +90,11 @@ public ServerInstance(InstanceConfig instanceConfig) {
_pool = extractPool(instanceConfig);
}

@VisibleForTesting
public ServerInstance(InstanceConfig instanceConfig) {
this(instanceConfig.getInstanceName(), instanceConfig);
}

@VisibleForTesting
ServerInstance(String hostname, int port) {
_instanceId = Helix.PREFIX_OF_SERVER_INSTANCE + hostname + "_" + port;
Expand Down
Loading