Skip to content

Commit 7e9d50a

Browse files
authored
MINOR; Fix some Request toString methods (apache#19655) (apache#19690)
Reviewers: Colin P. McCabe <[email protected]> ``` Conflicts: clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java - import statement clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java - import statement core/src/test/scala/unit/kafka/network/RequestChannelTest.scala - difference in unrelated parameter core/src/test/scala/unit/kafka/server/KafkaApisTest.scala - different logging and metadatacache instantiation ``` Cherry-Picked-From: 042be5b Cherry-Picked-By: Alyssa Huang <[email protected]> Cherry-Picked-At: Mon May 12 11:11:19 2025 -0700
1 parent 257f8d4 commit 7e9d50a

File tree

7 files changed

+77
-9
lines changed

7 files changed

+77
-9
lines changed

checkstyle/import-control.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@
191191
<allow pkg="io.opentelemetry.proto"/>
192192
<!-- for testing -->
193193
<allow pkg="org.apache.kafka.common.telemetry" />
194+
<!-- for IncrementalAlterConfigsRequest and AlterUserScramCredentialsRequest -->
195+
<allow pkg="com.fasterxml.jackson.databind" />
194196
</subpackage>
195197

196198
<subpackage name="serialization">

clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public String toString(boolean verbose) {
136136
}
137137

138138
@Override
139-
public final String toString() {
139+
public String toString() {
140140
return toString(true);
141141
}
142142

clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
package org.apache.kafka.common.requests;
1818

1919
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
20+
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
2021
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
2122
import org.apache.kafka.common.protocol.ApiKeys;
2223
import org.apache.kafka.common.protocol.ByteBufferAccessor;
2324

25+
import com.fasterxml.jackson.databind.JsonNode;
26+
import com.fasterxml.jackson.databind.node.ObjectNode;
27+
2428
import java.nio.ByteBuffer;
2529
import java.util.List;
2630
import java.util.Set;
@@ -82,4 +86,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
8286
.collect(Collectors.toList());
8387
return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
8488
}
89+
90+
// Do not print salt or saltedPassword
91+
@Override
92+
public String toString() {
93+
JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy();
94+
95+
for (JsonNode upsertion : json.get("upsertions")) {
96+
((ObjectNode) upsertion).put("salt", "");
97+
((ObjectNode) upsertion).put("saltedPassword", "");
98+
}
99+
return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString();
100+
}
85101
}

clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121
import org.apache.kafka.common.config.ConfigResource;
2222
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
2323
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
24+
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
2425
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
2526
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
2627
import org.apache.kafka.common.protocol.ApiKeys;
2728
import org.apache.kafka.common.protocol.ByteBufferAccessor;
2829

30+
import com.fasterxml.jackson.databind.JsonNode;
31+
import com.fasterxml.jackson.databind.node.ObjectNode;
32+
2933
import java.nio.ByteBuffer;
3034
import java.util.Collection;
3135
import java.util.Map;
@@ -107,4 +111,16 @@ public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwab
107111
}
108112
return new IncrementalAlterConfigsResponse(response);
109113
}
114+
115+
// It is not safe to print all config values
116+
@Override
117+
public String toString() {
118+
JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy();
119+
for (JsonNode resource : json.get("resources")) {
120+
for (JsonNode config : resource.get("configs")) {
121+
((ObjectNode) config).put("value", "REDACTED");
122+
}
123+
}
124+
return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString();
125+
}
110126
}

core/src/test/scala/unit/kafka/network/RequestChannelTest.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,21 @@ class RequestChannelTest {
128128
op: OpType,
129129
entries: Map[String, String],
130130
expectedValues: Map[String, String]): Unit = {
131-
val alterConfigs = request(incrementalAlterConfigs(resource, entries, op))
132-
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
131+
val alterConfigs = incrementalAlterConfigs(resource, entries, op)
132+
val alterConfigsString = alterConfigs.toString
133+
entries.foreach { entry =>
134+
if (!alterConfigsString.contains(entry._1)) {
135+
fail("Config names should be in the request string")
136+
}
137+
if (entry._2 != null && alterConfigsString.contains(entry._2)) {
138+
fail("Config values should not be in the request string")
139+
}
140+
}
141+
val req = request(alterConfigs)
142+
val loggableAlterConfigs = req.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
133143
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
134144
assertEquals(expectedValues, toMap(loggedConfig))
135-
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString
145+
val alterConfigsDesc = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded).toString
136146
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
137147
}
138148

core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,17 +263,25 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
263263
// create a bunch of credentials
264264
val request1_0 = new AlterUserScramCredentialsRequest.Builder(
265265
new AlterUserScramCredentialsRequestData()
266+
.setDeletions(util.Arrays.asList(
267+
new AlterUserScramCredentialsRequestData.ScramCredentialDeletion()
268+
.setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
266269
.setUpsertions(util.Arrays.asList(
267270
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
268271
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
269272
.setIterations(4096)
270273
.setSalt(saltBytes)
271274
.setSaltedPassword(saltedPasswordBytes),
272275
))).build()
276+
assertEquals("AlterUserScramCredentialsRequestData(" +
277+
"deletions=[ScramCredentialDeletion(name='" + user2 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + ")], " +
278+
"upsertions=[ScramCredentialUpsertion(name='" + user1 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` +
279+
", iterations=4096, salt=[], saltedPassword=[])])", request1_0.toString)
273280
val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results
274-
assertEquals(1, results1_0.size)
275-
checkNoErrorsAlteringCredentials(results1_0)
281+
assertEquals(2, results1_0.size)
282+
assertEquals(1, results1_0.asScala.count(_.errorCode == Errors.RESOURCE_NOT_FOUND.code()))
276283
checkUserAppearsInAlterResults(results1_0, user1)
284+
checkUserAppearsInAlterResults(results1_0, user2)
277285

278286
// When creating credentials, do not update the same user more than once per request
279287
val request1_1 = new AlterUserScramCredentialsRequest.Builder(
@@ -295,6 +303,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
295303
.setSalt(saltBytes)
296304
.setSaltedPassword(saltedPasswordBytes),
297305
))).build()
306+
assertFalse(request1_1.toString.contains(saltBytes))
307+
assertFalse(request1_1.toString.contains(saltedPasswordBytes))
298308
val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results
299309
assertEquals(3, results1_1.size)
300310
checkNoErrorsAlteringCredentials(results1_1)

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7137,7 +7137,12 @@ class KafkaApisTest extends Logging {
71377137

71387138
@Test
71397139
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
7140-
val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort))
7140+
val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)
7141+
assertEquals(
7142+
"IncrementalAlterConfigsRequestData(resources=[], validateOnly=false)",
7143+
alterConfigsRequest.toString
7144+
)
7145+
val request = buildRequest(alterConfigsRequest)
71417146
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
71427147
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
71437148
any[Long])).thenReturn(0)
@@ -7149,15 +7154,24 @@ class KafkaApisTest extends Logging {
71497154

71507155
@Test
71517156
def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
7152-
val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
7157+
val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
71537158
setValidateOnly(true).
71547159
setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource().
71557160
setResourceName(brokerId.toString).
71567161
setResourceType(BROKER_LOGGER.id()).
71577162
setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig().
71587163
setName(Log4jController.ROOT_LOGGER).
71597164
setValue("TRACE")).iterator()))).iterator())),
7160-
1.toShort))
7165+
1.toShort)
7166+
assertEquals(
7167+
"IncrementalAlterConfigsRequestData(resources=[" +
7168+
"AlterConfigsResource(resourceType=" + BROKER_LOGGER.id() + ", " +
7169+
"resourceName='"+ brokerId + "', " +
7170+
"configs=[AlterableConfig(name='" + Log4jController.ROOT_LOGGER + "', configOperation=0, value='REDACTED')])], " +
7171+
"validateOnly=true)",
7172+
alterConfigsRequest.toString
7173+
)
7174+
val request = buildRequest(alterConfigsRequest)
71617175
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
71627176
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
71637177
any[Long])).thenReturn(0)

0 commit comments

Comments
 (0)