Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*
* @author Gary Russell
* @author Sagnhyeok An
* @author Choi Wang Gyu
* @since 2.3
*
*/
Expand Down Expand Up @@ -510,29 +511,52 @@ public String toString() {
}

protected final String renderProperties() {
return renderTopics()
+ "\n pollTimeout=" + this.pollTimeout
+ (this.groupId != null ? "\n groupId=" + this.groupId : "")
+ (StringUtils.hasText(this.clientId) ? "\n clientId=" + this.clientId : "")
+ (this.consumerRebalanceListener != null
? "\n consumerRebalanceListener=" + this.consumerRebalanceListener
: "")
+ (this.commitCallback != null ? "\n commitCallback=" + this.commitCallback : "")
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
+ "\n syncCommits=" + this.syncCommits
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
+ (!this.kafkaConsumerProperties.isEmpty() ? "\n properties=" + this.kafkaConsumerProperties : "")
+ "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval
+ "\n commitRetries=" + this.commitRetries
+ "\n fixTxOffsets" + this.fixTxOffsets;
}

private String renderTopics() {
return (this.topics != null ? "\n topics=" + Arrays.toString(this.topics) : "")
+ (this.topicPattern != null ? "\n topicPattern=" + this.topicPattern : "")
+ (this.topicPartitions != null
? "\n topicPartitions=" + Arrays.toString(this.topicPartitions)
: "");
StringBuilder sb = new StringBuilder();
renderTopics(sb);
sb.append("\n pollTimeout=").append(this.pollTimeout);

if (this.groupId != null) {
sb.append("\n groupId=").append(this.groupId);
}
if (StringUtils.hasText(this.clientId)) {
sb.append("\n clientId=").append(this.clientId);
}
if (this.consumerRebalanceListener != null) {
sb.append("\n consumerRebalanceListener=").append(this.consumerRebalanceListener);
}
if (this.commitCallback != null) {
sb.append("\n commitCallback=").append(this.commitCallback);
}
if (this.offsetAndMetadataProvider != null) {
sb.append("\n offsetAndMetadataProvider=").append(this.offsetAndMetadataProvider);
}

sb.append("\n syncCommits=").append(this.syncCommits);

if (this.syncCommitTimeout != null) {
sb.append("\n syncCommitTimeout=").append(this.syncCommitTimeout);
}
if (!this.kafkaConsumerProperties.isEmpty()) {
sb.append("\n properties=").append(this.kafkaConsumerProperties);
}

sb.append("\n authExceptionRetryInterval=").append(this.authExceptionRetryInterval);
sb.append("\n commitRetries=").append(this.commitRetries);
sb.append("\n fixTxOffsets=").append(this.fixTxOffsets);

return sb.toString();
}

private void renderTopics(StringBuilder sb) {
if (this.topics != null) {
sb.append("\n topics=").append(Arrays.toString(this.topics));
}
if (this.topicPattern != null) {
sb.append("\n topicPattern=").append(this.topicPattern);
}
if (this.topicPartitions != null) {
sb.append("\n topicPartitions=").append(Arrays.toString(this.topicPartitions));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author Lukasz Kaminski
* @author Kyuhyeok Park
* @author Wang Zhiyang
* @author Choi Wang Gyu
*/
public class ContainerProperties extends ConsumerProperties {

Expand Down Expand Up @@ -1116,56 +1117,95 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {

@Override
public String toString() {
return "ContainerProperties ["
+ renderProperties()
+ "\n ackMode=" + this.ackMode
+ "\n ackCount=" + this.ackCount
+ "\n ackTime=" + this.ackTime
+ "\n consumerStartTimeout=" + this.consumerStartTimeout
+ "\n messageListener=" + this.messageListener
+ (this.listenerTaskExecutor != null
? "\n listenerTaskExecutor=" + this.listenerTaskExecutor
: "")
+ "\n shutdownTimeout=" + this.shutdownTimeout
+ "\n idleEventInterval="
+ (this.idleEventInterval == null ? "not enabled" : this.idleEventInterval)
+ "\n idlePartitionEventInterval="
+ (this.idlePartitionEventInterval == null ? "not enabled" : this.idlePartitionEventInterval)
+ (this.transactionManager != null
? "\n transactionManager=" + this.transactionManager
: "")
+ (this.kafkaAwareTransactionManager != null
? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager
: "")
+ "\n monitorInterval=" + this.monitorInterval
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
+ "\n noPollThreshold=" + this.noPollThreshold
+ "\n pauseImmediate=" + this.pauseImmediate
+ "\n pollTimeoutWhilePaused=" + this.pollTimeoutWhilePaused
+ "\n subBatchPerPartition=" + this.subBatchPerPartition
+ "\n assignmentCommitOption=" + this.assignmentCommitOption
+ "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader
+ "\n batchRecoverAfterRollback=" + this.batchRecoverAfterRollback
+ "\n eosMode=" + this.eosMode
+ "\n transactionDefinition=" + this.transactionDefinition
+ "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced
+ "\n stopImmediate=" + this.stopImmediate
+ "\n asyncAcks=" + this.asyncAcks
+ "\n logContainerConfig=" + this.logContainerConfig
+ "\n missingTopicsFatal=" + this.missingTopicsFatal
+ "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier
+ "\n idleBetweenPolls=" + this.idleBetweenPolls
+ "\n micrometerEnabled=" + this.micrometerEnabled
+ "\n observationEnabled=" + this.observationEnabled
+ (this.observationConvention != null
? "\n observationConvention=" + this.observationConvention
: "")
+ (this.observationRegistry != null
? "\n observationRegistry=" + this.observationRegistry
: "")
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
+ "\n]";
StringBuilder sb = new StringBuilder("ContainerProperties [");
sb.append(renderProperties());

// Core acknowledgment properties
appendProperty(sb, "ackMode", this.ackMode);
appendProperty(sb, "ackCount", this.ackCount);
appendProperty(sb, "ackTime", this.ackTime);

// Timeout and startup properties
appendProperty(sb, "consumerStartTimeout", this.consumerStartTimeout);
appendProperty(sb, "shutdownTimeout", this.shutdownTimeout);

// Listener configuration
appendProperty(sb, "messageListener", this.messageListener);
appendProperty(sb, "messageListener", this.messageListener);
appendProperty(sb, "listenerTaskExecutor", this.listenerTaskExecutor);

// Idle event configuration
appendEnabledProperty(sb, "idleEventInterval", this.idleEventInterval);
appendEnabledProperty(sb, "idlePartitionEventInterval", this.idlePartitionEventInterval);

// Transaction management
appendProperty(sb, "transactionManager", this.transactionManager);
appendProperty(sb, "kafkaAwareTransactionManager", this.kafkaAwareTransactionManager);
appendProperty(sb, "transactionDefinition", this.transactionDefinition);

// Monitoring and scheduling
appendProperty(sb, "monitorInterval", this.monitorInterval);
appendProperty(sb, "scheduler", this.scheduler);
appendProperty(sb, "noPollThreshold", this.noPollThreshold);

// Container behavior flags
appendProperty(sb, "pauseImmediate", this.pauseImmediate);
appendProperty(sb, "stopImmediate", this.stopImmediate);
appendProperty(sb, "stopContainerWhenFenced", this.stopContainerWhenFenced);
appendProperty(sb, "asyncAcks", this.asyncAcks);

// Polling and partition configuration
appendProperty(sb, "pollTimeoutWhilePaused", this.pollTimeoutWhilePaused);
appendProperty(sb, "subBatchPerPartition", this.subBatchPerPartition);
appendProperty(sb, "assignmentCommitOption", this.assignmentCommitOption);
appendProperty(sb, "idleBetweenPolls", this.idleBetweenPolls);

// Header and recovery configuration
appendProperty(sb, "deliveryAttemptHeader", this.deliveryAttemptHeader);
appendProperty(sb, "batchRecoverAfterRollback", this.batchRecoverAfterRollback);

// Exactly-once semantics
appendProperty(sb, "eosMode", this.eosMode);

// Logging and error handling
appendProperty(sb, "logContainerConfig", this.logContainerConfig);
appendProperty(sb, "missingTopicsFatal", this.missingTopicsFatal);
appendProperty(sb, "restartAfterAuthExceptions", this.restartAfterAuthExceptions);

// Metrics and observation
appendProperty(sb, "micrometerEnabled", this.micrometerEnabled);
appendProperty(sb, "observationEnabled", this.observationEnabled);
appendProperty(sb, "recordObservationsInBatch", this.recordObservationsInBatch);
appendProperty(sb, "observationConvention", this.observationConvention);
appendProperty(sb, "observationRegistry", this.observationRegistry);

// Data multiplier
appendProperty(sb, "idleBeforeDataMultiplier", this.idleBeforeDataMultiplier);

sb.append("\n]");
return sb.toString();
}

/**
* Append a property to the StringBuilder with consistent formatting.
* @param sb the StringBuilder
* @param name the property name
* @param value the property value
*/
private void appendProperty(StringBuilder sb, String name, @Nullable Object value) {
sb.append("\n ").append(name).append("=").append(value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but why no if(value!=null) any more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mistake sorry

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not resolved.

}


/**
* Append a property with "enabled/not enabled" formatting for nullable values.
* @param sb the StringBuilder
* @param name the property name
* @param value the property value (nullable)
*/
private void appendEnabledProperty(StringBuilder sb, String name, @Nullable Object value) {
sb.append("\n ").append(name).append("=")
.append(value == null ? "not enabled" : value);
}

}
Loading