-
Notifications
You must be signed in to change notification settings - Fork 0
MINOR: new checksum verification in gradlew #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…on (apache#19589) Add the `controller.quorum.auto.join.enable` configuration. When enabled with KIP-853 supported, follower controllers who are observers (their replica id + directory id are not in the voter set) will: - Automatically remove voter set entries which match their replica id but not directory id by sending the `RemoveVoterRPC` to the leader. - Automatically add themselves as a voter when their replica id is not present in the voter set by sending the `AddVoterRPC` to the leader. Reviewers: José Armando García Sancio [[email protected]](mailto:[email protected]), Chia-Ping Tsai [[email protected]](mailto:[email protected])
…e#20340) Log a warning for each topic that failed to be created as a result of an automatic creation. This makes the underlying cause more visible so users can take action. Previously, at the default log level, you could only see logs that the broker was attempting to autocreate topics. If the creation failed, then it was logged at debug. Signed-off-by: Robert Young <[email protected]> Reviewers: Luke Chen <[email protected]>, Kuan-Po Tseng <[email protected]>
…pache#20351) Refactor metric gauges instantiation to use lambda expressions instead of ImmutableValue. Reviewers: Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
Fix the typo in `AdminBootstrapAddresses`. Reviewers: TengYao Chi <[email protected]>, Ken Huang <[email protected]>
…ntegration-tests module (apache#20339) This PR does the following: - Rewrite to new test infra. - Rewrite to java. - Move to clients-integration-tests. - Add `ensureConsistentMetadata` method to `ClusterInstance`, similar to `ensureConsistentKRaftMetadata` in the old infra, and refactors related code. Reviewers: TengYao Chi <[email protected]>, Ken Huang <[email protected]>
…pache#20354) The broker observer should not read update voter set timer value when polling to determine its backoff, since brokers cannot auto-join the KRaft voter set. If auto-join or kraft.version=1 is not supported, controller observers should not read this timer either when polling. The updateVoterSetPeriodMs timer is not something that should be considered when calculating the backoff returned by polling, since this timer does not represent the same thing as the fetchTimeMs timer. Reviewers: Chia-Ping Tsai <[email protected]>, José Armando García Sancio <[email protected]>, Alyssa Huang <[email protected]>, Kuan-Po Tseng <[email protected]>
…20355) CoordinatorMetricsShard was split into a separate module in (apache#16883), causing the link in the javadoc to become invalid. So, remove broken link in CoordinatorMetricsShard javadoc. Reviewers: TengYao Chi <[email protected]>, Sanskar Jhajharia <[email protected]>, Chia-Ping Tsai <[email protected]>
Correct parameter name from `logManager` to `raftClient` (leftover from PR apache#10705) Reviewers: Chia-Ping Tsai <[email protected]>
apache#19699) 1. Move TransactionMetadata to transaction-coordinator module. 2. Rewrite TransactionMetadata in Java. 3. The `topicPartitions` field uses `HashSet` instead of `Set`, because it's mutable field. 4. In Scala, when calling `prepare*` methods, they can use current value as default input in `prepareTransitionTo`. However, in Java, it doesn't support function default input value. To avoid a lot of duplicated code or assign value to wrong field, we add a private class `TransitionData`. It can get current `TransactionMetadata` value as default value and `prepare*` methods just need to assign updated value. Reviewers: Justine Olshan <[email protected]>, Artem Livshits <[email protected]>, Chia-Ping Tsai <[email protected]>
…ositive (apache#20338) Fixes a false positive in `BrokerRegistrationRequestTest` caused by `isMigratingZkBroker`, and migrates the test from Scala to Java. Reviewers: Chia-Ping Tsai <[email protected]>
…Follower/makeLeader (apache#20335) Follow-up to [KAFKA-18486](https://issues.apache.org/jira/browse/KAFKA-18486) * Replace PartitionState with PartitionRegistration in makeFollower/makeLeader * Remove PartitionState.java since it is no longer referenced Reviewers: TaiJuWu <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
…20349) This PR fixes a problem related to `TestLinearWriteSpeed`. During my work on KIP-780, I discovered that benchmarks for `TestLinearWriteSpeed` do not account for compression algorithms. It always uses `Compression.NONE` when creating records. The problem was introduced in this PR [1]. [1] - apache#17736 Reviewers: Ken Huang <[email protected]>, Mickael Maison <[email protected]>, Chia-Ping Tsai <[email protected]>
jira: [KAFKA-19306](https://issues.apache.org/jira/browse/KAFKA-19306) log ``` Producing 1000000 messages..to topics log-cleaner-test-849894102467800668-0 Logging produce requests to /tmp/kafka-log-cleaner-produced-6049271649847384547.txt Sleeping for 20seconds... Consuming messages... Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7065252868189829937.txt 1000000 rows of data produced, 120176 rows of data consumed (88.0% reduction). De-duplicating and validating output files... Validated 90057 values, 0 mismatches. Data verification is completed ``` result ``` ================================================================================ SESSION REPORT (ALL TESTS) ducktape version: 0.12.0 session_id: 2025-07-10--001 run time: 1 minute 2.051 seconds tests run: 1 passed: 1 flaky: 0 failed: 0 ignored: 0 ================================================================================ test_id: kafkatest.tests.tools.log_compaction_test.LogCompactionTest.test_log_compaction.metadata_quorum=ISOLATED_KRAFT status: PASS run time: 1 minute 1.809 seconds ``` Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai <[email protected]>
Updated Kafka Streams configuration documentation to stay latest with version 4.0.0. Reviewers: Matthias J. Sax <[email protected]>
Fix typo and docs in following. ``` clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java clients/src/main/resources/common/message/FetchRequest.json raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ``` Reviewers: Kuan-Po Tseng <[email protected]>, Lan Ding <[email protected]>, Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>, PoAn Yang <[email protected]>
This PR aims at cleaning up the tools module further by getting rid of some extra code which can be replaced by `record` Reviewers: Chia-Ping Tsai <[email protected]>
… to handle requests gracefully (apache#20342) During shutdown, when the RSM closes first, then the ongoing requests might throw an error. To handle the ongoing requests gracefully, closing the RSM after closing the remote-log reader thread pools. Reviewers: Satish Duggana <[email protected]>
Now that Kafka support Java 17, this PR makes some changes in connect module. The changes in this PR are limited to only some files. A future PR(s) shall follow. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() Modules target: runtime/src/test Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
Remove unused PartitionState. It was unused after apache#7222. Reviewers: Chia-Ping Tsai <[email protected]>, PoAn Yang <[email protected]>
…own to clients when RLMM is not ready (apache#20345) During broker restarts, the topic-based RemoteLogMetadataManager (RLMM) constructs the state by reading the internal `__remote_log_metadata` topic. When the partition is not ready to perform remote storage operations, then ReplicaNotAvailableException thrown back to the consumer. The clients retries the request immediately. This results in a lot of FETCH requests on the broker and utilizes the request handler threads. Using the CountdownLatch to reduce the frequency of ReplicaNotAvailableException thrown back to the clients. This will improve the request handler thread usage on the broker. Previously for one consumer, when RLMM is not ready for a partition, then ~9K FetchConsumer requests / sec are received on the broker. With this patch, the number of FETCH requests reduced by 95% to 600 / sec. Reviewers: Lan Ding <[email protected]>, Satish Duggana <[email protected]>
This PR aims at cleaning up the `jmh-benchmarks` module further by getting rid of some extra code which can be replaced by record Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
… TestRaftServer (apache#20379) The `record-size` and `throughput` arguments don’t work in `TestRaftServer`. The `recordsPerSec` and `recordSize` values are always hard-coded. - Fix `recordsPerSec` and `recordSize` values hard-coded issue - Add "Required" description to command-line options to make it clear to users. Reviewers: Chia-Ping Tsai <[email protected]>
…Partiton (apache#20391) As per the current implementation in archiveRecords, when LSO is updated, if we have multiple record batches before the new LSO, then only the first one gets archived. This is because of the following lines of code -> `isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState);` `isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState);` The first record / batch will make `isAnyOffsetArchived` / `isAnyBatchArchived` true, after which this line of code will short-circuit and the methods `archivePerOffsetBatchRecords` / `archiveCompleteBatch` will not be called again. This PR changes the order of the expressions so that the short-circuit does not prevent from archiving all the required batches. Reviewers: Apoorv Mittal <[email protected]>
The test uses regular consumer to commit offsets. New protocol requires a streams consumer since we are using streams groups, otherwise we run into group ID conflicts. Followed the addition of the KafkaAdmin interface for setting offsets, a Kafka Admin client is created and used the interface in to set the committed offsets instead of instantiating a consumer. Also enable all tests for stream new protocol. Reviewers: Alieh Saeedi<[email protected]>, Kirk True <[email protected]>, Matthias Sax <[email protected]>, Bill Bejeck <[email protected]>
https://issues.apache.org/jira/browse/KAFKA-18699 This PR aims at cleaning up the `metadata` module further by getting rid of some extra code which can be replaced by record Reviewers: Ken Huang <[email protected]>, Ming-Yen Chung <[email protected]>, Chia-Ping Tsai <[email protected]>
) This implements KIP-1147 for kafka-producer-perf-test.sh. Reviewers: Chia-Ping Tsai <[email protected]>
…d should include the log directories (apache#20319) The ReassignPartitionsCommand shows the topic replicas on each broker. When using the --generate command, it returns the current partition replica assignment. However, the log directory for each current replica is always shown as any. This makes it impossible for users to determine which specific log directory is being used by each replica. Therefore, we should fix this behavior. ``` Current partition replica assignment { "version": 1, "partitions": [ { "topic": "test1", "partition": 0, "replicas": [ 4, 2 ], "log_dirs": [ "any", "any" ] } ] } ``` This PR ``` Current partition replica assignment { "version": 1, "partitions": [ { "topic": "test1", "partition": 0, "replicas": [ 4, 2 ], "log_dirs": [ "/tmp/kraft-broker-logs234", "/tmp/kraft-broker-logs" ] } ] } ``` Reviewers: PoAn Yang <[email protected]>, Jhen-Yung Hsu <[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
…ache#20372) issue: apache#19905 (comment) What: Change `String[] topics` to `Set<String> topics` throughout `LogCompactionTester`. Why: `Set<String>` is more modern and reduces the need for array-to-collection conversions. Reviewers: Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Jhen-Yung Hsu <[email protected]>, Lan Ding <[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>
…#20394) - Changes: Remove fetchQuotaMetrics and copyQuotaMetrics in RemoteLogManager on close from: apache#20342 (comment) Reviewers: Kamal Chandraprakash <[email protected]>
… for client telemetry (apache#20144) #### Summary This PR implements dynamic compression type selection and fallback mechanism for client telemetry to handle cases where compression libraries are not available on the client classpath. #### Problem Currently, when a compression library is missing (e.g., NoClassDefFoundError), the client telemetry system catches the generic Throwable but doesn't learn from the failure. This means, the same unsupported compression type will be attempted on every telemetry push #### Solution This PR introduces a comprehensive fallback mechanism: - Specific Exception Handling: Replace generic Throwable catching with specific exceptions (IOException, NoClassDefFoundError) - Unsupported Compression Tracking: Add unsupportedCompressionTypes collection to track compression types that have failed due to missing libraries - Dynamic Selection: Enhance ClientTelemetryUtils.preferredCompressionType() to accept an unsupported types parameter and filter out known problematic compression types - Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe access to the unsupported types collection - Improved Logging: Include exception details in log messages for better debugging #### Key Changes - Modified createPushRequest() to track failed compression types in unsupportedCompressionTypes - Updated ClientTelemetryUtils.preferredCompressionType() to filter out unsupported types - Enhanced exception handling with specific exception types instead of Throwable #### Testing - Added appropriate Unit tests - Testing apache kafka on local logs: ``` ✗ cat ~/Desktop/kafka-client.log | grep " org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter" 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry subscription request with client instance id AAAAAAAAAAAAAAAAAAAAAA 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_NEEDED to SUBSCRIPTION_IN_PROGRESS 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Telemetry subscription push interval value from broker was 5000; to stagger requests the first push interval is being adjusted to 4551 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Updating subscription - subscription: ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA, subscriptionId=1650084878, pushIntervalMs=5000, acceptedCompressionTypes=[zstd, lz4, snappy, none], deltaTemporality=true, selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398}; intervalMs: 4551, lastRequestMs: 1752739012639 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Client telemetry registered with client instance id: aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:56:57:224 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library zstd not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:56:57:295 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:02:296 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:02:297 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:02:300 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library lz4 not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:02:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:07:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:07:330 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:07:331 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library snappy not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:07:344 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:12:400 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:17:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:22:508 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:27:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:32:578 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:37:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:42:646 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:47:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:52:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:57:765 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED ``` Reviewers: poorv Mittal <[email protected]>, Chia-Ping Tsai <[email protected]>
…20671) Stores the existing values for both the fields in a local variable for logging. Reviewers: Omnia Ibrahim <[email protected]>
…he flakiness of the test. (apache#20664) MINOR: changed the condition to only check the test topic to reduce the flakiness of the test. Reviewers: Matthias J. Sax <[email protected]>, Lianet Magrans <[email protected]>
…ling (apache#20661) When a failure occurs with a push telemetry request, any exception is treated as fatal, increasing the time interval to `Integer.MAX_VALUE` effectively turning telemetry off. This PR updates the error handling to check if the exception is a transient one with expected recovery and keeps the telemetry interval value the same in those cases since a recovery is expected. Reviewers: Apoorv Mittal <[email protected]>, Matthias Sax<[email protected]>
This reverts commit c0b0428.
This reverts commit 461e8e3.
|
Important Review skippedMore than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review. 89 files out of 300 files are above the max files limit of 100. Please upgrade to Pro plan to get higher limits. You can disable this status message by setting the ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request focuses on improving the build system's robustness and updating project dependencies to their latest versions. It introduces a new checksum verification for the Gradle wrapper to prevent common build issues. A significant portion of the changes involves adding new integration tests for various client components, ensuring the stability and correctness of consumer and producer functionalities, including new transactional features and rack-aware assignment strategies. The Admin Client API is also refined with new configuration listing capabilities and clearer documentation for existing operations. Highlights
Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a substantial number of changes, including dependency upgrades, significant refactoring of the consumer and producer internals, and the addition of new features like two-phase commit support. Key improvements include making the consumer more robust against network stalls, modernizing the build scripts, and enhancing transactional capabilities. The introduction of new test utility classes and a large number of new integration tests significantly improves test coverage. My review focused on the critical path changes in the clients and the build system. The changes appear to be well-implemented and improve the overall quality and feature set of the clients.
| // RetriableExceptions from the Sender thread are converted to Abortable errors | ||
| // because they indicate that the transaction cannot be completed after all retry attempts. | ||
| // This conversion ensures the application layer treats these errors as abortable, | ||
| // preventing duplicate message delivery. | ||
| if (exception instanceof RetriableException || | ||
| exception instanceof InvalidTxnStateException) { | ||
| exception = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", exception); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting retriable exceptions from the Sender thread into a TransactionAbortableException is a crucial change for ensuring transactional correctness. When all retries are exhausted for a batch within a transaction, the application must be notified that the transaction cannot be committed and must be aborted. This change correctly enforces that semantic, preventing potential data loss or inconsistent transactional state.
| private synchronized boolean isFetchableAndSubscribed(TopicPartition topicPartition, TopicPartitionState topicPartitionState) { | ||
| if (subscriptionType.equals(SubscriptionType.AUTO_TOPICS) && !subscription.contains(topicPartition.topic())) { | ||
| log.trace("Assigned partition {} is not in the subscription {} so will be considered not fetchable.", topicPartition, subscription); | ||
| return false; | ||
| } | ||
| return topicPartitionState.isFetchable(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an important correctness fix. By adding the check to ensure that an assigned partition is also part of the current subscription for AUTO_TOPICS subscription type, you prevent the consumer from fetching data from partitions it is no longer supposed to be consuming from. This can happen during rebalances, and this change makes the consumer's behavior more robust and correct.
| * Options for {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)}. | ||
| */ | ||
| public class UnregisterBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> { | ||
| public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch correcting the generic type for AbstractOptions. The previous type UpdateFeaturesOptions was incorrect and would break method chaining for this class. This change ensures the class conforms to the intended design of the options classes.
public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> {| if (enable2PC && userConfiguredTransactionTimeout) { | ||
| throw new ConfigException( | ||
| "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + | ||
| " when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG + | ||
| " is set to true. Transactions will not expire with two-phase commit enabled." | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This validation is a great addition. It prevents a confusing misconfiguration where a user might set a transaction timeout for a two-phase commit transaction, where the timeout is not actually enforced by the broker and the transaction lifecycle is externally managed. This improves user experience and prevents unexpected behavior.
| @Override | ||
| public synchronized MetadataRequest.Builder newMetadataRequestBuilder() { | ||
| if (subscription.hasPatternSubscription() || subscription.hasRe2JPatternSubscription()) | ||
| if (subscription.hasPatternSubscription()) { | ||
| // Consumer subscribed to client-side regex => request all topics to compute regex | ||
| return MetadataRequest.Builder.allTopics(); | ||
| } | ||
| if (subscription.hasRe2JPatternSubscription() && transientTopics.isEmpty()) { | ||
| // Consumer subscribed to broker-side regex and no need for transient topic names metadata => request topic IDs | ||
| return MetadataRequest.Builder.forTopicIds(subscription.assignedTopicIds()); | ||
| } | ||
| // Subscription to explicit topic names or transient topics present. | ||
| // Note that in the case of RE2J broker-side regex subscription, we may end up in this path | ||
| // if there are transient topics. They are just needed temporarily (lifetime of offsets-related API calls), | ||
| // so we'll request them to unblock their APIs, then go back to requesting assigned topic IDs as needed | ||
| List<String> topics = new ArrayList<>(); | ||
| topics.addAll(subscription.metadataTopics()); | ||
| topics.addAll(transientTopics); | ||
| return new MetadataRequest.Builder(topics, allowAutoTopicCreation); | ||
| return MetadataRequest.Builder.forTopicNames(topics, allowAutoTopicCreation); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updated logic in newMetadataRequestBuilder to differentiate between client-side regex, broker-side regex (RE2J), and explicit topic subscriptions is a significant improvement. Using topic IDs for RE2J subscriptions when possible is a great optimization that makes the consumer more resilient to topic renames. The fallback to topic names when transient topics are needed for other operations is also a well-thought-out design.
According to the
discussion
in apache#19513 , add a new checksum verification process to determine whether
a new wrapper JAR needs to be downloaded.
This prevents developers from running into incompatibility issues when
using an outdated wrapper JAR after a Gradle upgrade.