Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ scala-library-2.13.10
scala-logging_2.13-3.9.4
scala-reflect-2.13.10
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.0
snappy-java-1.1.10.1
swagger-annotations-2.2.8
zookeeper-3.6.4
zookeeper-jute-3.6.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public OffsetAndMetadata offsetAndMetadata() {
@Override
public String toString() {
return String.format("Checkpoint{consumerGroupId=%s, topicPartition=%s, "
+ "upstreamOffset=%d, downstreamOffset=%d, metatadata=%s}",
+ "upstreamOffset=%d, downstreamOffset=%d, metadata=%s}",
consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void testClientConfigProperties() {
assertEquals("b__topic1", aClientConfig.replicationPolicy().formatRemoteTopic("b", "topic1"),
"replication.policy.separator is honored");
assertEquals(clusterABootstrap, aClientConfig.adminConfig().get("bootstrap.servers"),
"client configs include boostrap.servers");
"client configs include bootstrap.servers");
assertEquals(ForwardingAdmin.class.getName(), aClientConfig.forwardingAdmin(aClientConfig.adminConfig()).getClass().getName(),
"Cluster a uses the default ForwardingAdmin");
assertEquals("PLAINTEXT", aClientConfig.adminConfig().get("security.protocol"),
Expand All @@ -115,7 +115,7 @@ public void testClientConfigProperties() {
assertFalse(aClientConfig.adminConfig().containsKey("xxx"),
"unknown properties aren't included in client configs");
assertFalse(aClientConfig.adminConfig().containsKey("metric.reporters"),
"top-leve metrics reporters aren't included in client configs");
"top-level metrics reporters aren't included in client configs");
assertEquals("secret2", aClientConfig.getPassword("ssl.key.password").value(),
"security properties are translated from external sources");
assertEquals("secret2", ((Password) aClientConfig.adminConfig().get("ssl.key.password")).value(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static void validateReplicationFactor(String configName, short factor) {
}
}

public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultParitionCount) {
public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultPartitionCount) {
int orderInGroup = 0;
ConfigDef configDef = new ConfigDef();
configDef
Expand All @@ -115,7 +115,7 @@ public static ConfigDef configDef(String group, short defaultReplicationFactor,
ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, group, ++orderInGroup,
ConfigDef.Width.LONG, "Replication Factor for Topics in " + group)
.define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
defaultParitionCount, PARTITIONS_VALIDATOR,
defaultPartitionCount, PARTITIONS_VALIDATOR,
ConfigDef.Importance.LOW, PARTITIONS_DOC, group, ++orderInGroup,
ConfigDef.Width.LONG, "Partition Count for Topics in " + group);
return configDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class WorkerTransactionContext implements TransactionContext {

private static final Logger log = LoggerFactory.getLogger(WorkerTransactionContext.class);

private final Set<SourceRecord> commitableRecords = new HashSet<>();
private final Set<SourceRecord> committableRecords = new HashSet<>();
private final Set<SourceRecord> abortableRecords = new HashSet<>();
private boolean batchCommitRequested = false;
private boolean batchAbortRequested = false;
Expand All @@ -47,7 +47,7 @@ public synchronized void commitTransaction() {
@Override
public synchronized void commitTransaction(SourceRecord record) {
Objects.requireNonNull(record, "Source record used to define transaction boundaries may not be null");
commitableRecords.add(record);
committableRecords.add(record);
}

@Override
Expand Down Expand Up @@ -82,7 +82,7 @@ public synchronized boolean shouldCommitOn(SourceRecord record) {
// Essentially, instead of telling the task that it screwed up and trusting it to do the right thing, we rat on it to the
// worker and let it get punished accordingly.
checkRecordRequestConsistency(record);
return commitableRecords.remove(record);
return committableRecords.remove(record);
}

public synchronized boolean shouldAbortOn(SourceRecord record) {
Expand All @@ -97,7 +97,7 @@ private void checkBatchRequestsConsistency() {
}

private void checkRecordRequestConsistency(SourceRecord record) {
if (commitableRecords.contains(record) && abortableRecords.contains(record)) {
if (committableRecords.contains(record) && abortableRecords.contains(record)) {
log.trace("Connector will fail as it has requested both commit and abort of transaction for same record: {}", record);
throw new IllegalStateException(String.format(
"Connector requested both commit and abort of same record against topic/partition %s/%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception {
props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");

// tolerate all erros
// tolerate all errors
props.put(ERRORS_TOLERANCE_CONFIG, "all");

// retry for up to one second
Expand Down Expand Up @@ -205,7 +205,7 @@ public void testErrantRecordReporter() throws Exception {
props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");

// tolerate all erros
// tolerate all errors
props.put(ERRORS_TOLERANCE_CONFIG, "all");

// retry for up to one second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private ConnectorConfig connConfig(Map<String, String> connProps) {
}

@Test
public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
public void testErrorHandlingInSourceTasksWithBadConverter() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class PluginDescTest {
private final ClassLoader systemLoader = ClassLoader.getSystemClassLoader();
private final String regularVersion = "1.0.0";
private final String newerVersion = "1.0.1";
private final String snaphotVersion = "1.0.0-SNAPSHOT";
private final String snapshotVersion = "1.0.0-SNAPSHOT";
private final String noVersion = "undefined";
private PluginClassLoader pluginLoader;

Expand All @@ -61,11 +61,11 @@ public void testRegularPluginDesc() {

PluginDesc<Converter> converterDesc = new PluginDesc<>(
Converter.class,
snaphotVersion,
snapshotVersion,
pluginLoader
);

assertPluginDesc(converterDesc, Converter.class, snaphotVersion, pluginLoader.location());
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, pluginLoader.location());

PluginDesc<Transformation> transformDesc = new PluginDesc<>(
Transformation.class,
Expand All @@ -90,11 +90,11 @@ public void testPluginDescWithSystemClassLoader() {

PluginDesc<Converter> converterDesc = new PluginDesc<>(
Converter.class,
snaphotVersion,
snapshotVersion,
systemLoader
);

assertPluginDesc(converterDesc, Converter.class, snaphotVersion, location);
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, location);

PluginDesc<Transformation> transformDesc = new PluginDesc<>(
Transformation.class,
Expand Down Expand Up @@ -136,13 +136,13 @@ public void testPluginDescWithNullVersion() {
public void testPluginDescEquality() {
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
Connector.class,
snaphotVersion,
snapshotVersion,
pluginLoader
);

PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>(
Connector.class,
snaphotVersion,
snapshotVersion,
systemLoader
);

Expand Down Expand Up @@ -204,7 +204,7 @@ public void testPluginDescComparison() {

PluginDesc<Converter> converterDescClasspath = new PluginDesc<>(
Converter.class,
snaphotVersion,
snapshotVersion,
systemLoader
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ private List<Path> createBasicExpectedUrls() throws IOException {

private void assertUrls(List<Path> expected, List<Path> actual) {
Collections.sort(expected);
// not sorting 'actual' because it should be returned sorted from withing the PluginUtils.
// not sorting 'actual' because it should be returned sorted from within the PluginUtils.
assertEquals(expected, actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testAdvertisedUri() {
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop();

// Listener is overriden by advertised values
// Listener is overridden by advertised values
configMap = new HashMap<>(baseServerProps());
configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ public class KafkaConfigBackingStoreTest {
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);

private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false);
private static final Struct INLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
private static final List<Struct> RESTART_REQUEST_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true).put(INCLUDE_TASKS_FIELD_NAME, false),
ONLY_FAILED_MISSING_STRUCT,
INLUDE_TASKS_MISSING_STRUCT);
INCLUDE_TASKS_MISSING_STRUCT);

// The exact format doesn't matter here since both conversions are mocked
private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
Expand Down Expand Up @@ -1413,7 +1413,7 @@ public void testRecordToRestartRequestOnlyFailedInconsistent() {
public void testRecordToRestartRequestIncludeTasksInconsistent() {
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
Struct struct = INLUDE_TASKS_MISSING_STRUCT;
Struct struct = INCLUDE_TASKS_MISSING_STRUCT;
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), structToMap(struct));
RestartRequest restartRequest = configStorage.recordToRestartRequest(record, schemaAndValue);
assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Map<String, Optional<TopicDescription>> describeTopics(Set<String> topicN
Throwable cause = e.getCause();
if (cause instanceof UnknownTopicOrPartitionException) {
results.put(topicName, Optional.empty());
log.info("Found non-existant topic {}", topicName);
log.info("Found non-existent topic {}", topicName);
continue;
}
throw new AssertionError("Could not describe topic(s)" + topicNames, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ private void maybeUpdateReadOffset() throws RemoteStorageException {
// This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset
// of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
// previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
// epoch cache then it starts copying the segments from the earliest epoch entrys offset.
// epoch cache then it starts copying the segments from the earliest epoch entry's offset.
copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,13 @@ abstract class AbstractFetcherThread(name: String,
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag

// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching,
logAppendInfo.lastLeaderEpoch.asScala)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
if (validBytes > 0) fetcherStats.byteRate.mark(validBytes)
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2292,8 +2292,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
} else {
// ZK-based
if (migrationEnabled) {
require(!originals.containsKey(KafkaConfig.AuthorizerClassNameProp),
s"ZooKeeper migration does not yet support authorizers. Remove ${KafkaConfig.AuthorizerClassNameProp} before performing a migration.")
validateNonEmptyQuorumVotersForMigration()
require(controllerListenerNames.nonEmpty,
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void testStartup() {
}

// This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150.
// The leader epochs are [0->0, 1->100, 2->200]. We are verifying
// The leader epochs are [0->0, 1->100, 2->200]. We are verifying:
// 1. There's only 1 segment copied to remote storage
// 2. The segment got copied to remote storage is the old segment, not the active one
// 3. The log segment metadata stored into remoteLogMetadataManager is what we expected, both before and after copying the log segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,60 @@ class ZkMigrationIntegrationTest {
}
}

@ClusterTest(
brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(
new ClusterConfigProperty(key = "authorizer.class.name", value = "kafka.security.authorizer.AclAuthorizer"),
new ClusterConfigProperty(key = "super.users", value = "User:ANONYMOUS"),
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
)
)
def testStartZkBrokerWithAuthorizer(zkCluster: ClusterInstance): Unit = {
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)

// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed
zkCluster.waitForReadyBrokers()
readyFuture.get(30, TimeUnit.SECONDS)

val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")

def inDualWrite(): Boolean = {
val migrationState = kraftCluster.controllers().get(3000).migrationSupport.get.migrationDriver.migrationState().get(10, TimeUnit.SECONDS)
migrationState.allowDualWrite()
}
TestUtils.waitUntilTrue(() => inDualWrite(), "Timed out waiting for dual-write mode")
} finally {
shutdownInSequence(zkCluster, kraftCluster)
}
}

/**
* Test ZkMigrationClient against a real ZooKeeper-backed Kafka cluster. This test creates a ZK cluster
* and modifies data using AdminClient. The ZkMigrationClient is then used to read the metadata from ZK
Expand Down
7 changes: 2 additions & 5 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1669,12 +1669,9 @@ class KafkaConfigTest {
// All needed configs are now set
KafkaConfig.fromProps(props)

// Don't allow migration startup with an authorizer set
// Check that we allow authorizer to be set
props.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getCanonicalName)
assertEquals(
"requirement failed: ZooKeeper migration does not yet support authorizers. Remove authorizer.class.name before performing a migration.",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
props.remove(KafkaConfig.AuthorizerClassNameProp)
KafkaConfig.fromProps(props)

// Don't allow migration startup with an older IBP
props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.IBP_3_3_IV0.version())
Expand Down
Loading