Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*
* @author Gary Russell
* @author Sagnhyeok An
* @author Choi Wang Gyu
* @since 2.3
*
*/
Expand Down Expand Up @@ -510,29 +511,52 @@ public String toString() {
}

protected final String renderProperties() {
return renderTopics()
+ "\n pollTimeout=" + this.pollTimeout
+ (this.groupId != null ? "\n groupId=" + this.groupId : "")
+ (StringUtils.hasText(this.clientId) ? "\n clientId=" + this.clientId : "")
+ (this.consumerRebalanceListener != null
? "\n consumerRebalanceListener=" + this.consumerRebalanceListener
: "")
+ (this.commitCallback != null ? "\n commitCallback=" + this.commitCallback : "")
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
+ "\n syncCommits=" + this.syncCommits
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
+ (!this.kafkaConsumerProperties.isEmpty() ? "\n properties=" + this.kafkaConsumerProperties : "")
+ "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval
+ "\n commitRetries=" + this.commitRetries
+ "\n fixTxOffsets" + this.fixTxOffsets;
}

private String renderTopics() {
return (this.topics != null ? "\n topics=" + Arrays.toString(this.topics) : "")
+ (this.topicPattern != null ? "\n topicPattern=" + this.topicPattern : "")
+ (this.topicPartitions != null
? "\n topicPartitions=" + Arrays.toString(this.topicPartitions)
: "");
StringBuilder sb = new StringBuilder();
renderTopics(sb);
sb.append("\n pollTimeout=").append(this.pollTimeout);

if (this.groupId != null) {
sb.append("\n groupId=").append(this.groupId);
}
if (StringUtils.hasText(this.clientId)) {
sb.append("\n clientId=").append(this.clientId);
}
if (this.consumerRebalanceListener != null) {
sb.append("\n consumerRebalanceListener=").append(this.consumerRebalanceListener);
}
if (this.commitCallback != null) {
sb.append("\n commitCallback=").append(this.commitCallback);
}
if (this.offsetAndMetadataProvider != null) {
sb.append("\n offsetAndMetadataProvider=").append(this.offsetAndMetadataProvider);
}

sb.append("\n syncCommits=").append(this.syncCommits);

if (this.syncCommitTimeout != null) {
sb.append("\n syncCommitTimeout=").append(this.syncCommitTimeout);
}
if (!this.kafkaConsumerProperties.isEmpty()) {
sb.append("\n properties=").append(this.kafkaConsumerProperties);
}

sb.append("\n authExceptionRetryInterval=").append(this.authExceptionRetryInterval);
sb.append("\n commitRetries=").append(this.commitRetries);
sb.append("\n fixTxOffsets=").append(this.fixTxOffsets);

return sb.toString();
}

private void renderTopics(StringBuilder sb) {
if (this.topics != null) {
sb.append("\n topics=").append(Arrays.toString(this.topics));
}
if (this.topicPattern != null) {
sb.append("\n topicPattern=").append(this.topicPattern);
}
if (this.topicPartitions != null) {
sb.append("\n topicPartitions=").append(Arrays.toString(this.topicPartitions));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author Lukasz Kaminski
* @author Kyuhyeok Park
* @author Wang Zhiyang
* @author Choi Wang Gyu
*/
public class ContainerProperties extends ConsumerProperties {

Expand Down Expand Up @@ -1116,56 +1117,95 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {

@Override
public String toString() {
return "ContainerProperties ["
+ renderProperties()
+ "\n ackMode=" + this.ackMode
+ "\n ackCount=" + this.ackCount
+ "\n ackTime=" + this.ackTime
+ "\n consumerStartTimeout=" + this.consumerStartTimeout
+ "\n messageListener=" + this.messageListener
+ (this.listenerTaskExecutor != null
? "\n listenerTaskExecutor=" + this.listenerTaskExecutor
: "")
+ "\n shutdownTimeout=" + this.shutdownTimeout
+ "\n idleEventInterval="
+ (this.idleEventInterval == null ? "not enabled" : this.idleEventInterval)
+ "\n idlePartitionEventInterval="
+ (this.idlePartitionEventInterval == null ? "not enabled" : this.idlePartitionEventInterval)
+ (this.transactionManager != null
? "\n transactionManager=" + this.transactionManager
: "")
+ (this.kafkaAwareTransactionManager != null
? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager
: "")
+ "\n monitorInterval=" + this.monitorInterval
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
+ "\n noPollThreshold=" + this.noPollThreshold
+ "\n pauseImmediate=" + this.pauseImmediate
+ "\n pollTimeoutWhilePaused=" + this.pollTimeoutWhilePaused
+ "\n subBatchPerPartition=" + this.subBatchPerPartition
+ "\n assignmentCommitOption=" + this.assignmentCommitOption
+ "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader
+ "\n batchRecoverAfterRollback=" + this.batchRecoverAfterRollback
+ "\n eosMode=" + this.eosMode
+ "\n transactionDefinition=" + this.transactionDefinition
+ "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced
+ "\n stopImmediate=" + this.stopImmediate
+ "\n asyncAcks=" + this.asyncAcks
+ "\n logContainerConfig=" + this.logContainerConfig
+ "\n missingTopicsFatal=" + this.missingTopicsFatal
+ "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier
+ "\n idleBetweenPolls=" + this.idleBetweenPolls
+ "\n micrometerEnabled=" + this.micrometerEnabled
+ "\n observationEnabled=" + this.observationEnabled
+ (this.observationConvention != null
? "\n observationConvention=" + this.observationConvention
: "")
+ (this.observationRegistry != null
? "\n observationRegistry=" + this.observationRegistry
: "")
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
+ "\n]";
StringBuilder sb = new StringBuilder("ContainerProperties [");
sb.append(renderProperties());

// Core acknowledgment properties
appendProperty(sb, "ackMode", this.ackMode);
appendProperty(sb, "ackCount", this.ackCount);
appendProperty(sb, "ackTime", this.ackTime);

// Timeout and startup properties
appendProperty(sb, "consumerStartTimeout", this.consumerStartTimeout);
appendProperty(sb, "shutdownTimeout", this.shutdownTimeout);

// Listener configuration
appendProperty(sb, "messageListener", this.messageListener);
appendProperty(sb, "listenerTaskExecutor", this.listenerTaskExecutor);

// Idle event configuration
appendEnabledProperty(sb, "idleEventInterval", this.idleEventInterval);
appendEnabledProperty(sb, "idlePartitionEventInterval", this.idlePartitionEventInterval);

// Transaction management
appendProperty(sb, "transactionManager", this.transactionManager);
appendProperty(sb, "kafkaAwareTransactionManager", this.kafkaAwareTransactionManager);
appendProperty(sb, "transactionDefinition", this.transactionDefinition);

// Monitoring and scheduling
appendProperty(sb, "monitorInterval", this.monitorInterval);
appendProperty(sb, "scheduler", this.scheduler);
appendProperty(sb, "noPollThreshold", this.noPollThreshold);

// Container behavior flags
appendProperty(sb, "pauseImmediate", this.pauseImmediate);
appendProperty(sb, "stopImmediate", this.stopImmediate);
appendProperty(sb, "stopContainerWhenFenced", this.stopContainerWhenFenced);
appendProperty(sb, "asyncAcks", this.asyncAcks);

// Polling and partition configuration
appendProperty(sb, "pollTimeoutWhilePaused", this.pollTimeoutWhilePaused);
appendProperty(sb, "subBatchPerPartition", this.subBatchPerPartition);
appendProperty(sb, "assignmentCommitOption", this.assignmentCommitOption);
appendProperty(sb, "idleBetweenPolls", this.idleBetweenPolls);

// Header and recovery configuration
appendProperty(sb, "deliveryAttemptHeader", this.deliveryAttemptHeader);
appendProperty(sb, "batchRecoverAfterRollback", this.batchRecoverAfterRollback);

// Exactly-once semantics
appendProperty(sb, "eosMode", this.eosMode);

// Logging and error handling
appendProperty(sb, "logContainerConfig", this.logContainerConfig);
appendProperty(sb, "missingTopicsFatal", this.missingTopicsFatal);
appendProperty(sb, "restartAfterAuthExceptions", this.restartAfterAuthExceptions);

// Metrics and observation
appendProperty(sb, "micrometerEnabled", this.micrometerEnabled);
appendProperty(sb, "observationEnabled", this.observationEnabled);
appendProperty(sb, "recordObservationsInBatch", this.recordObservationsInBatch);
appendProperty(sb, "observationConvention", this.observationConvention);
appendProperty(sb, "observationRegistry", this.observationRegistry);

// Data multiplier
appendProperty(sb, "idleBeforeDataMultiplier", this.idleBeforeDataMultiplier);

sb.append("\n]");
return sb.toString();
}

/**
* Append a property to the StringBuilder with consistent formatting.
* @param sb the StringBuilder
* @param name the property name
* @param value the property value
*/
private void appendProperty(StringBuilder sb, String name, @Nullable Object value) {
if (value != null) {
sb.append("\n ").append(name).append("=").append(value);
}
}

/**
* Append a property with "enabled/not enabled" formatting for nullable values.
* @param sb the StringBuilder
* @param name the property name
* @param value the property value (nullable)
*/
private void appendEnabledProperty(StringBuilder sb, String name, @Nullable Object value) {
sb.append("\n ").append(name).append("=")
.append(value == null ? "not enabled" : value);
}

}