Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*
* @author Gary Russell
* @author Sagnhyeok An
* @author Choi Wang Gyu
* @since 2.3
*
*/
Expand Down Expand Up @@ -510,29 +511,52 @@ public String toString() {
}

protected final String renderProperties() {
return renderTopics()
+ "\n pollTimeout=" + this.pollTimeout
+ (this.groupId != null ? "\n groupId=" + this.groupId : "")
+ (StringUtils.hasText(this.clientId) ? "\n clientId=" + this.clientId : "")
+ (this.consumerRebalanceListener != null
? "\n consumerRebalanceListener=" + this.consumerRebalanceListener
: "")
+ (this.commitCallback != null ? "\n commitCallback=" + this.commitCallback : "")
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
+ "\n syncCommits=" + this.syncCommits
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
+ (!this.kafkaConsumerProperties.isEmpty() ? "\n properties=" + this.kafkaConsumerProperties : "")
+ "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval
+ "\n commitRetries=" + this.commitRetries
+ "\n fixTxOffsets" + this.fixTxOffsets;
}

private String renderTopics() {
return (this.topics != null ? "\n topics=" + Arrays.toString(this.topics) : "")
+ (this.topicPattern != null ? "\n topicPattern=" + this.topicPattern : "")
+ (this.topicPartitions != null
? "\n topicPartitions=" + Arrays.toString(this.topicPartitions)
: "");
StringBuilder sb = new StringBuilder();
renderTopics(sb);
sb.append("\n pollTimeout=").append(this.pollTimeout);

if (this.groupId != null) {
sb.append("\n groupId=").append(this.groupId);
}
if (StringUtils.hasText(this.clientId)) {
sb.append("\n clientId=").append(this.clientId);
}
if (this.consumerRebalanceListener != null) {
sb.append("\n consumerRebalanceListener=").append(this.consumerRebalanceListener);
}
if (this.commitCallback != null) {
sb.append("\n commitCallback=").append(this.commitCallback);
}
if (this.offsetAndMetadataProvider != null) {
sb.append("\n offsetAndMetadataProvider=").append(this.offsetAndMetadataProvider);
}

sb.append("\n syncCommits=").append(this.syncCommits);

if (this.syncCommitTimeout != null) {
sb.append("\n syncCommitTimeout=").append(this.syncCommitTimeout);
}
if (!this.kafkaConsumerProperties.isEmpty()) {
sb.append("\n properties=").append(this.kafkaConsumerProperties);
}

sb.append("\n authExceptionRetryInterval=").append(this.authExceptionRetryInterval);
sb.append("\n commitRetries=").append(this.commitRetries);
sb.append("\n fixTxOffsets=").append(this.fixTxOffsets);

return sb.toString();
}

private void renderTopics(StringBuilder sb) {
if (this.topics != null) {
sb.append("\n topics=").append(Arrays.toString(this.topics));
}
if (this.topicPattern != null) {
sb.append("\n topicPattern=").append(this.topicPattern);
}
if (this.topicPartitions != null) {
sb.append("\n topicPartitions=").append(Arrays.toString(this.topicPartitions));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author Lukasz Kaminski
* @author Kyuhyeok Park
* @author Wang Zhiyang
* @author Choi Wang Gyu
*/
public class ContainerProperties extends ConsumerProperties {

Expand Down Expand Up @@ -1116,56 +1117,96 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {

@Override
public String toString() {
return "ContainerProperties ["
+ renderProperties()
+ "\n ackMode=" + this.ackMode
+ "\n ackCount=" + this.ackCount
+ "\n ackTime=" + this.ackTime
+ "\n consumerStartTimeout=" + this.consumerStartTimeout
+ "\n messageListener=" + this.messageListener
+ (this.listenerTaskExecutor != null
? "\n listenerTaskExecutor=" + this.listenerTaskExecutor
: "")
+ "\n shutdownTimeout=" + this.shutdownTimeout
+ "\n idleEventInterval="
+ (this.idleEventInterval == null ? "not enabled" : this.idleEventInterval)
+ "\n idlePartitionEventInterval="
+ (this.idlePartitionEventInterval == null ? "not enabled" : this.idlePartitionEventInterval)
+ (this.transactionManager != null
? "\n transactionManager=" + this.transactionManager
: "")
+ (this.kafkaAwareTransactionManager != null
? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager
: "")
+ "\n monitorInterval=" + this.monitorInterval
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
+ "\n noPollThreshold=" + this.noPollThreshold
+ "\n pauseImmediate=" + this.pauseImmediate
+ "\n pollTimeoutWhilePaused=" + this.pollTimeoutWhilePaused
+ "\n subBatchPerPartition=" + this.subBatchPerPartition
+ "\n assignmentCommitOption=" + this.assignmentCommitOption
+ "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader
+ "\n batchRecoverAfterRollback=" + this.batchRecoverAfterRollback
+ "\n eosMode=" + this.eosMode
+ "\n transactionDefinition=" + this.transactionDefinition
+ "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced
+ "\n stopImmediate=" + this.stopImmediate
+ "\n asyncAcks=" + this.asyncAcks
+ "\n logContainerConfig=" + this.logContainerConfig
+ "\n missingTopicsFatal=" + this.missingTopicsFatal
+ "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier
+ "\n idleBetweenPolls=" + this.idleBetweenPolls
+ "\n micrometerEnabled=" + this.micrometerEnabled
+ "\n observationEnabled=" + this.observationEnabled
+ (this.observationConvention != null
? "\n observationConvention=" + this.observationConvention
: "")
+ (this.observationRegistry != null
? "\n observationRegistry=" + this.observationRegistry
: "")
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
+ "\n]";
StringBuilder sb = new StringBuilder("ContainerProperties [");
sb.append(renderProperties());

// Core acknowledgment properties
appendProperty(sb, "ackMode", this.ackMode);
appendProperty(sb, "ackCount", this.ackCount);
appendProperty(sb, "ackTime", this.ackTime);

// Timeout and startup properties
appendProperty(sb, "consumerStartTimeout", this.consumerStartTimeout);
appendProperty(sb, "shutdownTimeout", this.shutdownTimeout);

// Listener configuration
appendProperty(sb, "messageListener", this.messageListener);
appendProperty(sb, "listenerTaskExecutor", this.listenerTaskExecutor);

// Idle event configuration
appendEnabledProperty(sb, "idleEventInterval", this.idleEventInterval);
appendEnabledProperty(sb, "idlePartitionEventInterval", this.idlePartitionEventInterval);

// Transaction management
appendProperty(sb, "transactionManager", this.transactionManager);
appendProperty(sb, "kafkaAwareTransactionManager", this.kafkaAwareTransactionManager);
appendProperty(sb, "transactionDefinition", this.transactionDefinition);

// Monitoring and scheduling
appendProperty(sb, "monitorInterval", this.monitorInterval);
appendProperty(sb, "scheduler", this.scheduler);
appendProperty(sb, "noPollThreshold", this.noPollThreshold);

// Container behavior flags
appendProperty(sb, "pauseImmediate", this.pauseImmediate);
appendProperty(sb, "stopImmediate", this.stopImmediate);
appendProperty(sb, "stopContainerWhenFenced", this.stopContainerWhenFenced);
appendProperty(sb, "asyncAcks", this.asyncAcks);

// Polling and partition configuration
appendProperty(sb, "pollTimeoutWhilePaused", this.pollTimeoutWhilePaused);
appendProperty(sb, "subBatchPerPartition", this.subBatchPerPartition);
appendProperty(sb, "assignmentCommitOption", this.assignmentCommitOption);
appendProperty(sb, "idleBetweenPolls", this.idleBetweenPolls);

// Header and recovery configuration
appendProperty(sb, "deliveryAttemptHeader", this.deliveryAttemptHeader);
appendProperty(sb, "batchRecoverAfterRollback", this.batchRecoverAfterRollback);

// Exactly-once semantics
appendProperty(sb, "eosMode", this.eosMode);

// Logging and error handling
appendProperty(sb, "logContainerConfig", this.logContainerConfig);
appendProperty(sb, "missingTopicsFatal", this.missingTopicsFatal);
appendProperty(sb, "restartAfterAuthExceptions", this.restartAfterAuthExceptions);

// Metrics and observation
appendProperty(sb, "micrometerEnabled", this.micrometerEnabled);
appendProperty(sb, "observationEnabled", this.observationEnabled);
appendProperty(sb, "recordObservationsInBatch", this.recordObservationsInBatch);
appendProperty(sb, "observationConvention", this.observationConvention);
appendProperty(sb, "observationRegistry", this.observationRegistry);

// Data multiplier
appendProperty(sb, "idleBeforeDataMultiplier", this.idleBeforeDataMultiplier);

sb.append("\n]");
return sb.toString();
}

/**
* Append a property to the StringBuilder with consistent formatting.
* @param sb the StringBuilder
* @param name the property name
* @param value the property value
*/
private void appendProperty(StringBuilder sb, String name, @Nullable Object value) {
if(value != null){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you have not run check locally.
This is not a code style we follow.
See if you can apply src/idea/spring-framework.xml into your IntelliJ IDEA for our Spring-specific code style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I saw your comment after opening the PR.
I'll fix it locally and push the update soon.
Please wait a moment. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan
I'm sorry for keeping you waiting.
This is my first time contributing to open source, so I was a bit clumsy.
Thanks to your help, I was able to resolve the issue and learned a lot.
If there's anything else I should fix, please feel free to let me know.
I really appreciate your time and guidance. Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! All good.
We do have this guidance, though: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc.

Please, let us know if that is good enough to make your contribution routine smooth.
I do see that we need to mention that src/idea/spring-framework.xml code style config over there.
Plus, we don't need to update Copyright anymore 😄 .

But who knows what else causes headache for you but we can help to avoid it with some simple improvement to the process.
Thank you!

sb.append("\n ").append(name).append("=").append(value);
}
}


/**
* Append a property with "enabled/not enabled" formatting for nullable values.
* @param sb the StringBuilder
* @param name the property name
* @param value the property value (nullable)
*/
private void appendEnabledProperty(StringBuilder sb, String name, @Nullable Object value) {
sb.append("\n ").append(name).append("=")
.append(value == null ? "not enabled" : value);
}

}