Skip to content

Commit 084fcbd

Browse files
authored
KAFKA-18599: Remove Optional wrapping for forwardingManager in ApiVersionManager (apache#18630)
`forwardingManager` is always present now. Reviewers: Ismael Juma <[email protected]>
1 parent 7e86bd8 commit 084fcbd

File tree

3 files changed

+15
-54
lines changed

3 files changed

+15
-54
lines changed

core/src/main/scala/kafka/server/ApiVersionManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ object ApiVersionManager {
4646
def apply(
4747
listenerType: ListenerType,
4848
config: KafkaConfig,
49-
forwardingManager: Option[ForwardingManager],
49+
forwardingManager: ForwardingManager,
5050
supportedFeatures: BrokerFeatures,
5151
metadataCache: MetadataCache,
5252
clientMetricsManager: Option[ClientMetricsManager]
@@ -129,7 +129,7 @@ class SimpleApiVersionManager(
129129
*/
130130
class DefaultApiVersionManager(
131131
val listenerType: ListenerType,
132-
forwardingManager: Option[ForwardingManager],
132+
forwardingManager: ForwardingManager,
133133
brokerFeatures: BrokerFeatures,
134134
metadataCache: MetadataCache,
135135
val enableUnstableLastVersion: Boolean,
@@ -143,7 +143,7 @@ class DefaultApiVersionManager(
143143
alterFeatureLevel0: Boolean
144144
): ApiVersionsResponse = {
145145
val finalizedFeatures = metadataCache.features()
146-
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
146+
val controllerApiVersions = forwardingManager.controllerApiVersions
147147
val clientTelemetryEnabled = clientMetricsManager match {
148148
case Some(manager) => manager.isTelemetryReceiverConfigured
149149
case None => false

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ class BrokerServer(
253253
val apiVersionManager = ApiVersionManager(
254254
ListenerType.BROKER,
255255
config,
256-
Some(forwardingManager),
256+
forwardingManager,
257257
brokerFeatures,
258258
metadataCache,
259259
Some(clientMetricsManager)

core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala

Lines changed: 11 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
2121
import org.apache.kafka.common.protocol.ApiKeys
2222
import org.apache.kafka.server.BrokerFeatures
2323
import org.apache.kafka.server.common.KRaftVersion
24-
import org.junit.jupiter.api.{Disabled, Test}
24+
import org.junit.jupiter.api.Test
2525
import org.junit.jupiter.api.Assertions._
2626
import org.junit.jupiter.params.ParameterizedTest
2727
import org.junit.jupiter.params.provider.EnumSource
@@ -36,9 +36,10 @@ class ApiVersionManagerTest {
3636
@ParameterizedTest
3737
@EnumSource(classOf[ListenerType])
3838
def testApiScope(apiScope: ListenerType): Unit = {
39+
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
3940
val versionManager = new DefaultApiVersionManager(
4041
listenerType = apiScope,
41-
forwardingManager = None,
42+
forwardingManager = forwardingManager,
4243
brokerFeatures = brokerFeatures,
4344
metadataCache = metadataCache,
4445
enableUnstableLastVersion = true
@@ -54,9 +55,10 @@ class ApiVersionManagerTest {
5455
@ParameterizedTest
5556
@EnumSource(classOf[ListenerType])
5657
def testDisabledApis(apiScope: ListenerType): Unit = {
58+
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
5759
val versionManager = new DefaultApiVersionManager(
5860
listenerType = apiScope,
59-
forwardingManager = None,
61+
forwardingManager = forwardingManager,
6062
brokerFeatures = brokerFeatures,
6163
metadataCache = metadataCache,
6264
enableUnstableLastVersion = false
@@ -85,7 +87,7 @@ class ApiVersionManagerTest {
8587

8688
val versionManager = new DefaultApiVersionManager(
8789
listenerType = ListenerType.ZK_BROKER,
88-
forwardingManager = Some(forwardingManager),
90+
forwardingManager = forwardingManager,
8991
brokerFeatures = brokerFeatures,
9092
metadataCache = metadataCache,
9193
enableUnstableLastVersion = true
@@ -103,59 +105,18 @@ class ApiVersionManagerTest {
103105
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
104106
Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
105107

106-
for (forwardingManagerOpt <- Seq(Some(forwardingManager), None)) {
107-
val versionManager = new DefaultApiVersionManager(
108-
listenerType = ListenerType.BROKER,
109-
forwardingManager = forwardingManagerOpt,
110-
brokerFeatures = brokerFeatures,
111-
metadataCache = metadataCache,
112-
enableUnstableLastVersion = true
113-
)
114-
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
115-
assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
116-
117-
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false)
118-
val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
119-
assertNull(envelopeVersion)
120-
}
121-
}
122-
123-
@Disabled("Enable after enable KIP-590 forwarding in KAFKA-12886")
124-
@Test
125-
def testEnvelopeEnabledWhenForwardingManagerPresent(): Unit = {
126-
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
127-
Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
128-
129108
val versionManager = new DefaultApiVersionManager(
130-
listenerType = ListenerType.ZK_BROKER,
131-
forwardingManager = Some(forwardingManager),
109+
listenerType = ListenerType.BROKER,
110+
forwardingManager = forwardingManager,
132111
brokerFeatures = brokerFeatures,
133112
metadataCache = metadataCache,
134113
enableUnstableLastVersion = true
135114
)
136-
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
137-
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
115+
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
116+
assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
138117

139118
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false)
140119
val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
141-
assertNotNull(envelopeVersion)
142-
assertEquals(ApiKeys.ENVELOPE.oldestVersion, envelopeVersion.minVersion)
143-
assertEquals(ApiKeys.ENVELOPE.latestVersion, envelopeVersion.maxVersion)
144-
}
145-
146-
@Test
147-
def testEnvelopeDisabledWhenForwardingManagerEmpty(): Unit = {
148-
val versionManager = new DefaultApiVersionManager(
149-
listenerType = ListenerType.ZK_BROKER,
150-
forwardingManager = None,
151-
brokerFeatures = brokerFeatures,
152-
metadataCache = metadataCache,
153-
enableUnstableLastVersion = true
154-
)
155-
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
156-
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
157-
158-
val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false)
159-
assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
120+
assertNull(envelopeVersion)
160121
}
161122
}

0 commit comments

Comments
 (0)