Skip to content

Commit 00956e9

Browse files
committed
feat: cluster events
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent 750706b commit 00956e9

22 files changed

+1779
-5
lines changed

.claude/CLAUDE.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Build & Test Commands
6+
7+
```bash
8+
# Full build (skip tests)
9+
./gradlew build -x test
10+
11+
# Run all tests in a module
12+
./gradlew :clients:test
13+
14+
# Run a specific test class
15+
./gradlew :clients:test --tests org.apache.kafka.clients.admin.KafkaAdminClientTest
16+
17+
# Run a specific test method
18+
./gradlew :clients:test --tests org.apache.kafka.clients.admin.KafkaAdminClientTest.testListTopics
19+
20+
# Checkstyle (linting)
21+
./gradlew :clients:checkstyleMain :clients:checkstyleTest
22+
23+
# Auto-fix import ordering and formatting
24+
./gradlew :clients:spotlessApply
25+
26+
# Regenerate Kafka message classes (runs automatically before compileJava)
27+
./gradlew :clients:processMessages
28+
```
29+
30+
Java 17 is required. Gradle wrapper is at `./gradlew`.
31+
32+
## Module Structure
33+
34+
This is AutoMQ, a fork of Apache Kafka that replaces the local storage layer with S3-based shared storage. The key modules:
35+
36+
- `clients` — Kafka client libraries (producer, consumer, AdminClient). All new AutoMQ admin APIs go here.
37+
- `core` — Core broker logic (Scala). Handles request processing, log management, replication.
38+
- `server` / `server-common` — Broker server implementation and shared utilities.
39+
- `s3stream` — AutoMQ's S3 storage engine. Replaces Kafka's local log storage.
40+
- `storage` — Storage abstraction layer; `storage:storage-api` defines the interface.
41+
- `metadata` / `raft` — KRaft metadata and consensus.
42+
- `group-coordinator` / `transaction-coordinator` — Coordinator logic extracted from core.
43+
- `tools` — CLI tools (shell scripts in `bin/` delegate to Java classes here).
44+
- `automq-shell` — AutoMQ-specific CLI commands.
45+
46+
## AutoMQ Inject Pattern
47+
48+
AutoMQ customizations inside upstream Kafka files are wrapped with marker comments:
49+
50+
```java
51+
// AutoMQ inject start
52+
// ... AutoMQ-specific code ...
53+
// AutoMQ inject end
54+
```
55+
56+
When adding new AutoMQ functionality to an existing Kafka class, always use these markers. This makes it easy to track divergence from upstream and resolve merge conflicts.
57+
58+
To find all injection points: `grep -rn "AutoMQ inject" --include="*.java"`.
59+
60+
## Code Generation
61+
62+
Several modules generate Java source from JSON message definitions:
63+
64+
- **Message definitions**: `src/main/resources/common/message/*.json`
65+
- **Generated output**: `src/generated/java/org/apache/kafka/common/message/`
66+
- **Task**: `processMessages` (runs automatically before `compileJava`)
67+
68+
Do not edit files under `src/generated/` — they are overwritten on each build.
69+
70+
## Adding a New AdminClient Method
71+
72+
The pattern used throughout the codebase:
73+
74+
1. Add `default` + abstract method pair to `Admin.java` inside the `// AutoMQ inject` block.
75+
2. Add a stub `@Override` to `ForwardingAdmin.java` and `MockAdminClient.java` (test) inside their inject blocks — the build will fail with a compile error if you miss these.
76+
3. Create `*Options` (extends `AbstractOptions<T>`) and `*Result` (wraps `KafkaFuture`) classes in `clients/src/main/java/org/apache/kafka/clients/admin/`.
77+
4. Implement in `KafkaAdminClient.java` inside the inject block. For RPC-based methods use the `Call` pattern; for client-side reads (like `describeClusterEvents`) use a dedicated helper class.
78+
79+
## Dependency Management
80+
81+
Dependencies are declared in `gradle/dependencies.gradle` (not `build.gradle`):
82+
83+
```groovy
84+
// In versions map:
85+
myLib: "1.0.0"
86+
87+
// In libs map:
88+
myLib: "com.example:my-lib:$versions.myLib"
89+
```
90+
91+
Then referenced in `build.gradle` as `implementation libs.myLib`. The `clients` module uses a shadow jar — dependencies added there are shaded; `com.google.protobuf` is relocated to `org.apache.kafka.shaded.com.google.protobuf`, so avoid passing protobuf `Message` objects across the clients module boundary.
92+
93+
## Checkstyle & Spotless
94+
95+
Checkstyle enforces:
96+
- Max cyclomatic complexity: 16
97+
- Max NPath complexity: 500
98+
- No `toLowerCase()`/`toUpperCase()` without a `Locale` argument
99+
- Import ordering (run `spotlessApply` to fix automatically)
100+
101+
When adding new files, run `./gradlew :module:spotlessApply` before building to avoid import-order failures.
102+
103+
## Integration Testing
104+
105+
If your change or new feature requires integration testing, you can launch a local cluster by following the commands in `devkit/README.md`.

.claude/settings.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"Bash(./gradlew *)",
5+
"Bash(just *)"
6+
]
7+
},
8+
"hooks": {
9+
"Stop": [
10+
{
11+
"hooks": [
12+
{
13+
"type": "command",
14+
"command": "cd /Users/robin/code/automq && ./gradlew build -x test 2>&1 | tail -5 | jq -Rs '{systemMessage: (\"Build check: \" + (if test(\"BUILD SUCCESSFUL\") then \"✓ passed\" else \"✗ FAILED — run ./gradlew build -x test for details\" end))}'",
15+
"timeout": 300,
16+
"statusMessage": "Checking build..."
17+
}
18+
]
19+
}
20+
]
21+
}
22+
}

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,7 @@ __pycache__
6767
bin/
6868
!/bin/
6969
release/.venv/
70+
71+
# Claude Code project config
72+
.claude/settings.local.json
73+
.claude/plans/

bin/kafka-cluster-events.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ClusterEventsCommand "$@"

build.gradle

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ plugins {
4444
// Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed
4545
// artifacts - see https://github.com/johnrengelman/shadow/issues/901
4646
id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
47+
id 'com.google.protobuf' version '0.9.4' apply false
4748
// Spotless 6.13.0 has issue with Java 21 (see https://github.com/diffplug/spotless/pull/1920), and Spotless 6.14.0+ requires JRE 11
4849
// We are going to drop JDK8 support. Hence, the spotless is upgrade to newest version and be applied only if the build env is compatible with JDK 11.
4950
// spotless 6.15.0+ has issue in runtime with JDK8 even through we define it with `apply:false`. see https://github.com/diffplug/spotless/issues/2156 for more details
@@ -768,7 +769,7 @@ subprojects {
768769
apply plugin: 'com.diffplug.spotless'
769770
spotless {
770771
java {
771-
targetExclude('src/generated/**/*.java','src/generated-test/**/*.java')
772+
targetExclude('src/generated/**/*.java', 'src/generated-test/**/*.java', 'build/generated/**/*.java')
772773
importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 'java', 'javax', '', '\\#')
773774
removeUnusedImports()
774775
}
@@ -1683,6 +1684,8 @@ project(':generator') {
16831684
}
16841685

16851686
project(':clients') {
1687+
apply plugin: 'com.google.protobuf'
1688+
16861689
base {
16871690
archivesName = "kafka-clients"
16881691
}
@@ -1700,6 +1703,10 @@ project(':clients') {
17001703
implementation libs.opentelemetryProto
17011704
implementation libs.protobuf
17021705

1706+
// AutoMQ inject start
1707+
implementation libs.cloudeventsKafka
1708+
// AutoMQ inject end
1709+
17031710
// libraries which should be added as runtime dependencies in generated pom.xml should be defined here:
17041711
shadowed libs.zstd
17051712
shadowed libs.lz4
@@ -1818,7 +1825,7 @@ project(':clients') {
18181825
sourceSets {
18191826
main {
18201827
java {
1821-
srcDirs = ["src/generated/java", "src/main/java"]
1828+
srcDirs = ["src/generated/java", "src/main/java", "$buildDir/generated/source/proto/main/java"] // AutoMQ: add proto source
18221829
}
18231830
}
18241831
test {
@@ -1828,7 +1835,28 @@ project(':clients') {
18281835
}
18291836
}
18301837

1838+
protobuf {
1839+
protoc {
1840+
artifact = "com.google.protobuf:protoc:$versions.protobuf"
1841+
}
1842+
}
1843+
1844+
// Exclude protobuf-generated sources from checkstyle and spotless
1845+
afterEvaluate {
1846+
checkstyleMain.source = checkstyleMain.source.filter { !it.path.contains('/generated/source/proto/') }
1847+
if (tasks.findByName('spotlessJava')) {
1848+
spotless {
1849+
java {
1850+
targetExclude('build/generated/source/proto/**/*.java')
1851+
}
1852+
}
1853+
}
1854+
}
1855+
18311856
compileJava.dependsOn 'processMessages'
1857+
// AutoMQ inject start: ensure proto sources are generated before compilation
1858+
compileJava.dependsOn 'generateProto'
1859+
// AutoMQ inject end
18321860
srcJar.dependsOn 'processMessages'
18331861

18341862
compileTestJava.dependsOn 'processTestMessages'
@@ -2475,7 +2503,11 @@ project(':tools') {
24752503
}
24762504
implementation libs.bucket4j
24772505
implementation libs.oshi
2478-
// AutoMQ inject end
2506+
implementation libs.cloudeventsKafka
2507+
implementation libs.protobuf
2508+
// // Proto-generated event classes (separate artifact, excluded from kafka-clients shadow jar)
2509+
// implementation project(path: ':clients', configuration: 'eventsProto')
2510+
// // AutoMQ inject end
24792511

24802512
// for SASL/OAUTHBEARER JWT validation
24812513
implementation (libs.jose4j){
@@ -2538,6 +2570,9 @@ project(':tools') {
25382570
from (configurations.runtimeClasspath) {
25392571
exclude('kafka-clients*')
25402572
}
2573+
// // AutoMQ inject start
2574+
// from (project(':clients').configurations.eventsProto.artifacts.files)
2575+
// // AutoMQ inject end
25412576
into "$buildDir/dependant-libs-${versions.scala}"
25422577
duplicatesStrategy 'exclude'
25432578
}

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,21 @@ default ExportClusterManifestResult exportClusterManifest() {
17571757
}
17581758

17591759
ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options);
1760+
1761+
/**
1762+
* Read events from the {@code __cluster_events} internal topic, using the default options.
1763+
*/
1764+
default DescribeClusterEventsResult describeClusterEvents() {
1765+
return describeClusterEvents(new DescribeClusterEventsOptions());
1766+
}
1767+
1768+
/**
1769+
* Read events from the {@code __cluster_events} internal topic.
1770+
*
1771+
* @param options Filters and limits for the query.
1772+
* @return The DescribeClusterEventsResult.
1773+
*/
1774+
DescribeClusterEventsResult describeClusterEvents(DescribeClusterEventsOptions options);
17601775
// AutoMQ inject end
17611776

17621777
/**

0 commit comments

Comments
 (0)