Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
/**
* The algorithm:
* 1. Create a map from the supplied desired String
* 2. Fill placeholders (e.g. ${BROKER_ID}) in desired map as the broker's {@code kafka_config_generator.sh} would
* 3a. Loop over all entries. If the entry is in IGNORABLE_PROPERTIES or entry.value from desired is equal to entry.value from current, do nothing
* 2a. Loop over all entries. If the entry is in IGNORABLE_PROPERTIES or entry.value from desired is equal to entry.value from current, do nothing
* else add it to the diff
* 3b. If entry was removed from desired, add it to the diff with null value.
* 3c. If custom entry was removed, delete property
* 2b. If entry was removed from desired, add it to the diff with null value.
* 2c. If custom entry was removed, delete property
*/
public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaBrokerConfigurationDiff.class);
Expand All @@ -44,20 +43,10 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
private final Map<String, ConfigModel> configModel;

/**
* These options are skipped because they contain placeholders
* 909[1-4] is for skipping all (internal, plain, secured, external) listeners properties
* These options are skipped from getting updated because they contain placeholders
*/
public static final Pattern IGNORABLE_PROPERTIES = Pattern.compile(
"^(broker\\.id"
+ "|.*-[0-9]{2,5}\\.ssl\\.keystore\\.location"
+ "|.*-[0-9]{2,5}\\.ssl\\.keystore\\.password"
+ "|.*-[0-9]{2,5}\\.ssl\\.keystore\\.type"
+ "|.*-[0-9]{2,5}\\.ssl\\.truststore\\.location"
+ "|.*-[0-9]{2,5}\\.ssl\\.truststore\\.password"
+ "|.*-[0-9]{2,5}\\.ssl\\.truststore\\.type"
+ "|.*-[0-9]{2,5}\\.ssl\\.client\\.auth"
+ "|.*-[0-9]{2,5}\\.scram-sha-512\\.sasl\\.jaas\\.config"
+ "|.*-[0-9]{2,5}\\.sasl\\.enabled\\.mechanisms"
+ "|advertised\\.listeners"
+ "|broker\\.rack)$");

Expand Down Expand Up @@ -158,51 +147,57 @@ private Collection<AlterConfigOp> diff(NodeRef brokerNodeRef, String desired,
.filter(configEntry -> configEntry.name().equals(pathValueWithoutSlash))
.findFirst();

boolean logConfigDiff = false;
String op = d.get("op").asText();
if (optEntry.isPresent()) {
ConfigEntry entry = optEntry.get();
if ("remove".equals(op)) {
removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry, brokerNodeRef.controller());
logConfigDiff = removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry, brokerNodeRef.controller());
} else if ("replace".equals(op)) {
// entry is in the current, desired is updated value
updateOrAdd(entry.name(), configModel, desiredMap, updatedCE, brokerNodeRef.controller());
logConfigDiff = updateOrAdd(entry.name(), configModel, desiredMap, updatedCE, brokerNodeRef.controller());
}
} else {
if ("add".equals(op)) {
// entry is not in the current, it is added
updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE, brokerNodeRef.controller());
logConfigDiff = updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE, brokerNodeRef.controller());
}
}

if ("remove".equals(op)) {
// there is a lot of properties set by default - not having them in desired causes very noisy log output
LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d);
LOGGER.traceCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue));
LOGGER.traceCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue));
} else {
LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d);
LOGGER.debugCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue));
LOGGER.debugCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue));
// Log config difference only if they are not ignorable or custom as they always contain different values and are not dynamically updated, logging them causes very noisy log output.
if (logConfigDiff) {
if ("remove".equals(op)) {
// there is a lot of properties set by default - not having them in desired causes very noisy log output
LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d);
LOGGER.traceCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue));
LOGGER.traceCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, with this improvement, can we now move these back to debug? They are on trace because there were too many default values. But if they should be filtered out now, maybe we can make the actual removes more visible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I have made the suggested change along with another improvement.

} else {
LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to move these logs into removeProperty and updateOrAdd methods providing all the necessary parameters.

LOGGER.debugCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue));
LOGGER.debugCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue));
}
}
}

return updatedCE;
}

private void updateOrAdd(String propertyName, Map<String, ConfigModel> configModel, Map<String, String> desiredMap, Collection<AlterConfigOp> updatedCE, boolean nodeIsController) {
private boolean updateOrAdd(String propertyName, Map<String, ConfigModel> configModel, Map<String, String> desiredMap, Collection<AlterConfigOp> updatedCE, boolean nodeIsController) {
if (!isIgnorableProperty(propertyName, nodeIsController)) {
if (KafkaConfiguration.isCustomConfigurationOption(propertyName, configModel)) {
LOGGER.traceCr(reconciliation, "custom property {} has been updated/added {}", propertyName, desiredMap.get(propertyName));
} else {
LOGGER.traceCr(reconciliation, "property {} has been updated/added {}", propertyName, desiredMap.get(propertyName));
updatedCE.add(new AlterConfigOp(new ConfigEntry(propertyName, desiredMap.get(propertyName)), AlterConfigOp.OpType.SET));
return true;
}
} else {
LOGGER.traceCr(reconciliation, "{} is ignorable, not considering");
LOGGER.traceCr(reconciliation, "{} is ignorable, not considering", propertyName);
}
return false;
}

private void removeProperty(Map<String, ConfigModel> configModel, Collection<AlterConfigOp> updatedCE, String pathValueWithoutSlash, ConfigEntry entry, boolean nodeIsController) {
private boolean removeProperty(Map<String, ConfigModel> configModel, Collection<AlterConfigOp> updatedCE, String pathValueWithoutSlash, ConfigEntry entry, boolean nodeIsController) {
if (KafkaConfiguration.isCustomConfigurationOption(entry.name(), configModel)) {
// we are deleting custom option
LOGGER.traceCr(reconciliation, "removing custom property {}", entry.name());
Expand All @@ -218,10 +213,12 @@ private void removeProperty(Map<String, ConfigModel> configModel, Collection<Alt
if (!isIgnorableProperty(pathValueWithoutSlash, nodeIsController)) {
updatedCE.add(new AlterConfigOp(new ConfigEntry(pathValueWithoutSlash, null), AlterConfigOp.OpType.DELETE));
LOGGER.infoCr(reconciliation, "{} not set in desired, unsetting back to default {}", entry.name(), "deleted entry");
return true;
} else {
LOGGER.traceCr(reconciliation, "{} is ignorable, not considering as removed");
LOGGER.traceCr(reconciliation, "{} is ignorable, not considering as removed", entry.name());
}
}
return false;
}

/**
Expand Down